diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 61adb4a66..830a05af8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -5,7 +5,7 @@ on: env: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'e62830c982d4dfc5f3c1c2b12c1754a7e9b538f1'}} - ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '66cd1d9e696cfa72f9d272927f9e945905c9f093' }} + ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '1b922dbff350bfe2a9aec5fe82079e9d855ea7ed' }} GOPATH: /tmp/go jobs: @@ -53,6 +53,17 @@ jobs: - name: Checkout code uses: actions/checkout@v2 + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-eth-db-ref }} + repository: cerc-io/ipld-eth-db + path: "./ipld-eth-db/" + fetch-depth: 0 + + - name: Build ipld-eth-db + run: | + docker build -f ./ipld-eth-db/Dockerfile ./ipld-eth-db/ -t cerc/ipld-eth-db:local + - name: Run docker compose run: | docker-compose up -d diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 384a2bd2b..04753f78e 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -281,7 +281,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name), WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name), BackfillCheckPastBlocks: ctx.Uint64(utils.StateDiffBackfillCheckPastBlocks.Name), - BackfillMaxHeadGap: ctx.Uint64(utils.StateDiffBackfillMaxHeadGap.Name), + BackfillMaxDepth: ctx.Uint64(utils.StateDiffBackfillMaxDepth.Name), } utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c193c7c39..da1f6507e 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -177,7 +177,7 @@ var ( utils.StateDiffLogStatements, utils.StateDiffCopyFrom, utils.StateDiffBackfillCheckPastBlocks, - utils.StateDiffBackfillMaxHeadGap, + utils.StateDiffBackfillMaxDepth, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e4fd04cd3..d7b608704 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1125,9 +1125,9 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Usage: "The number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking.", Value: 7200, } - StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{ - Name: "statediff.backfillmaxheadgap", - Usage: "The maximum gap between the current statediff and head positions that can be backfilled.", + StateDiffBackfillMaxDepth = &cli.Uint64Flag{ + Name: "statediff.backfillmaxdepth", + Usage: "When statediffing head, the maximum number of missing parents that can be backfilled.", Value: 7200, } ) diff --git a/docker-compose.yml b/docker-compose.yml index 86648b16c..e24202e1c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: restart: on-failure depends_on: - ipld-eth-db - image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.2-alpha + image: cerc/ipld-eth-db:local environment: DATABASE_USER: "vdbm" DATABASE_NAME: "cerc_testing" diff --git a/statediff/config.go b/statediff/config.go index 6b29631bc..dc2604e2b 100644 --- a/statediff/config.go +++ b/statediff/config.go @@ -37,7 +37,7 @@ type Config struct { // Whether to enable writing state diffs directly to track blockchain head EnableWriteLoop bool // The maximum number of blocks to backfill when tracking head. - BackfillMaxHeadGap uint64 + BackfillMaxDepth uint64 // The maximum number of blocks behind the startup position to check for gaps. BackfillCheckPastBlocks uint64 // Size of the worker pool @@ -66,12 +66,33 @@ func (p *Params) ComputeWatchedAddressesLeafPaths() { } } +func (p *Params) Copy() Params { + ret := Params{ + IncludeBlock: p.IncludeBlock, + IncludeReceipts: p.IncludeReceipts, + IncludeTD: p.IncludeTD, + IncludeCode: p.IncludeCode, + } + ret.WatchedAddresses = make([]common.Address, len(p.WatchedAddresses)) + copy(ret.WatchedAddresses, p.WatchedAddresses) + + return ret +} + // ParamsWithMutex allows to lock the parameters while they are being updated | read from type ParamsWithMutex struct { Params sync.RWMutex } +// CopyParams returns a defensive copy of the Params +func (p *ParamsWithMutex) CopyParams() Params { + p.RLock() + defer p.RUnlock() + + return p.Params.Copy() +} + // Args bundles the arguments for the state diff builder type Args struct { OldStateRoot, NewStateRoot, BlockHash common.Hash diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 608a3d382..ded2e9f16 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -186,6 +186,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he UnclesHash: header.UncleHash.String(), Timestamp: header.Time, Coinbase: header.Coinbase.String(), + Canonical: true, } _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) return headerID, err diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 23e92296a..9f69e7a5b 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -234,7 +234,8 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { var values []interface{} values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) + header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase, + header.Canonical) csw.rows <- tableRow{schema.TableHeader, values} metrics.IndexerMetrics.BlocksCounter.Inc(1) } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 3fb9b8d4d..18d24c894 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -246,6 +246,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld UnclesHash: header.UncleHash.String(), Timestamp: header.Time, Coinbase: header.Coinbase.String(), + Canonical: true, }) return headerID } diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go index 1e0acb21f..eba593335 100644 --- a/statediff/indexer/database/file/sql_writer.go +++ b/statediff/indexer/database/file/sql_writer.go @@ -140,8 +140,8 @@ const ( ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + - "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s');\n" + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n" uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" @@ -189,7 +189,7 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) { func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase) + header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical) sqw.stmts <- []byte(stmt) metrics.IndexerMetrics.BlocksCounter.Inc(1) } diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 8089e2167..6ba3834d6 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -260,6 +260,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he UnclesHash: header.UncleHash.String(), Timestamp: header.Time, Coinbase: header.Coinbase.String(), + Canonical: true, }) } diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index bb912cbc7..c257c12c1 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -49,6 +49,7 @@ type Statements interface { MaxHeaderStm() string ExistsHeaderStm() string InsertHeaderStm() string + SetCanonicalHeaderStm() string InsertUncleStm() string InsertTxStm() string InsertRctStm() string diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 0358e566f..40e23a65d 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -62,6 +62,12 @@ func (db *DB) InsertHeaderStm() string { return schema.TableHeader.ToInsertStatement(db.upsert) } +// SetCanonicalHeaderStm satisfies the sql.Statements interface +// Stm == Statement +func (db *DB) SetCanonicalHeaderStm() string { + return fmt.Sprintf("UPDATE %s SET canonical = false WHERE block_number = $1::BIGINT AND block_hash <> $2::TEXT AND canonical = true", schema.TableHeader.Name) +} + // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { return schema.TableUncle.ToInsertStatement(db.upsert) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index ba9407e0a..240aaf7dc 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -92,6 +92,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) { &model.Bloom, &model.Timestamp, &model.Coinbase, + &model.Canonical, ) model.BlockNumber = strconv.FormatUint(number, 10) model.TotalDifficulty = strconv.FormatUint(td, 10) @@ -120,11 +121,22 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.UnclesHash, header.Bloom, header.Timestamp, - header.Coinbase) + header.Coinbase, + header.Canonical, + ) if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } metrics.IndexerMetrics.BlocksCounter.Inc(1) + + _, err = tx.Exec(w.db.Context(), w.db.SetCanonicalHeaderStm(), + header.BlockNumber, + header.BlockHash, + ) + if err != nil { + return insertError{"eth.header_cids", err, w.db.SetCanonicalHeaderStm(), header} + } + return nil } diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index 0019209e7..0fcc96418 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -41,6 +41,7 @@ type HeaderModel struct { Bloom []byte `db:"bloom"` Timestamp uint64 `db:"timestamp"` Coinbase string `db:"coinbase"` + Canonical bool `db:"canonical"` } // UncleModel is the db model for eth.uncle_cids diff --git a/statediff/indexer/shared/schema/schema.go b/statediff/indexer/shared/schema/schema.go index b5bda6eec..1fbc54e3d 100644 --- a/statediff/indexer/shared/schema/schema.go +++ b/statediff/indexer/shared/schema/schema.go @@ -54,6 +54,7 @@ var TableHeader = Table{ {Name: "bloom", Type: Dbytea}, {Name: "timestamp", Type: Dnumeric}, {Name: "coinbase", Type: Dvarchar}, + {Name: "canonical", Type: Dboolean}, }, UpsertClause: OnConflict("block_number", "block_hash").Set( "parent_hash", @@ -68,6 +69,7 @@ var TableHeader = Table{ "bloom", "timestamp", "coinbase", + "canonical", )} var TableStateNode = Table{ diff --git a/statediff/service.go b/statediff/service.go index 9839cd048..4f926d60b 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -139,7 +139,7 @@ type Service struct { // Whether to enable writing state diffs directly to track blockchain head. enableWriteLoop bool // Settings to use for backfilling state diffs (plugging gaps when tracking head) - backfillMaxHeadGap uint64 + backfillMaxDepth uint64 backfillCheckPastBlocks uint64 // Size of the worker pool numWorkers uint @@ -232,7 +232,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params WaitForSync: params.WaitForSync, indexer: indexer, enableWriteLoop: params.EnableWriteLoop, - backfillMaxHeadGap: params.BackfillMaxHeadGap, + backfillMaxDepth: params.BackfillMaxDepth, backfillCheckPastBlocks: params.BackfillCheckPastBlocks, numWorkers: workers, maxRetry: defaultRetryLimit, @@ -274,7 +274,7 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index WaitForSync: cfg.WaitForSync, indexer: indexer, enableWriteLoop: cfg.EnableWriteLoop, - backfillMaxHeadGap: cfg.BackfillMaxHeadGap, + backfillMaxDepth: cfg.BackfillMaxDepth, backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks, numWorkers: workers, maxRetry: defaultRetryLimit, @@ -365,14 +365,6 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } else { log.Warn("WriteLoop: received unexpected block from the future", "block height", nextHeight, "last height", lastHeight) - if distance <= sds.backfillMaxHeadGap { - for i := lastHeight + 1; i < nextHeight; i++ { - log.Info("WriteLoop: backfilling gap to head", "block", i, "block height", nextHeight, "last height", lastHeight) - blockFwd <- sds.BlockChain.GetBlockByNumber(i) - } - } else { - log.Warn("WriteLoop: gap to head too large to backfill", "block height", nextHeight, "last height", lastHeight, "gap", distance) - } blockFwd <- chainEvent.Block defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } @@ -409,9 +401,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { // For genesis block we need to return the entire state trie hence we diff it with an empty trie. log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) - writeLoopParams.RLock() - err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.Params) - writeLoopParams.RUnlock() + err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.CopyParams()) if err != nil { log.Error("statediff.Service.WriteLoop: processing error", "block height", genesisBlockNumber, "error", err.Error(), "worker", workerId) @@ -422,27 +412,78 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) func (sds *Service) writeLoopWorker(params workerParams) { defer params.wg.Done() + + // statediffs the indicated block, and while maxBackfill > 0, backfills missing parent blocks. + var writeBlockWithParents func(*types.Block, uint64, Params) error + writeBlockWithParents = func(block *types.Block, maxBackfill uint64, writeParams Params) error { + parentBlock := sds.BlockCache.getParentBlock(block, sds.BlockChain) + if parentBlock == nil { + log.Error("Parent block is nil, skipping this block", "block height", block.Number()) + return nil + } + + parentIsGenesis := parentBlock.Number().Uint64() == genesisBlockNumber + + // chainEvent streams block from block 1, but we also need to include data from the genesis block. + if parentIsGenesis { + sds.writeGenesisStateDiff(parentBlock, params.id) + } + + log.Info("Writing state diff", "block height", block.Number().Uint64(), "worker", params.id) + err := sds.writeStateDiffWithRetry(block, parentBlock.Root(), writeParams) + if err != nil { + log.Error("statediff.Service.WriteLoop: processing error", + "number", block.Number().Uint64(), + "hash", block.Hash().Hex(), + "error", err.Error(), + "worker", params.id) + return err + } + + if !parentIsGenesis { + // We do this _after_ indexing the requested block. This makes sure that if a child of ours arrives for + // statediffing while we are still working on missing ancestors, its regress stops at us, and only we + // continue working backward. + parentIndexed, err := sds.indexedOrInProgress(parentBlock) + if err != nil { + log.Error("Error checking for indexing status of parent block.", + "number", block.Number(), "hash", block.Hash().String(), + "parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(), + "error", err.Error(), + "worker", params.id) + } else if !parentIndexed { + if maxBackfill > 0 { + log.Info("Parent block not indexed. Indexing now.", + "number", block.Number(), "hash", block.Hash().Hex(), + "parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(), + "worker", params.id) + err = writeBlockWithParents(parentBlock, maxBackfill-1, writeParams) + if err != nil { + log.Error("Error indexing parent block.", + "number", block.Number(), "hash", block.Hash().Hex(), + "parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(), + "error", err.Error(), + "worker", params.id) + } + } else { + log.Error("ERROR: Parent block not indexed but max backfill depth exceeded. Index MUST be corrected manually.", + "number", block.Number(), "hash", block.Hash().String(), + "parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().String(), + "worker", params.id) + } + } + } + + return nil + } + for { select { //Notify chain event channel of events case chainEvent := <-params.blockCh: log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent - parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) - if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) - continue - } - - // chainEvent streams block from block 1, but we also need to include data from the genesis block. - if parentBlock.Number().Uint64() == genesisBlockNumber { - sds.writeGenesisStateDiff(parentBlock, params.id) - } - - log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) - writeLoopParams.RLock() - err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params) - writeLoopParams.RUnlock() + err := writeBlockWithParents(currentBlock, sds.backfillMaxDepth, writeLoopParams.CopyParams()) if err != nil { log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), @@ -861,6 +902,14 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } +// indexedOrInProgress returns true if the block has already been statediffed or is in progress, else false. +func (sds *Service) indexedOrInProgress(block *types.Block) (bool, error) { + if sds.statediffInProgress(block) { + return true, nil + } + return sds.indexer.HasBlock(block.Hash(), block.NumberU64()) +} + // Claim exclusive access for state diffing the specified block. // Returns true and a function to release access if successful, else false, nil. func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { @@ -879,6 +928,15 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { } } +// statediffInProgress returns true if statediffing is currently in progress for the block, else false. +func (sds *Service) statediffInProgress(block *types.Block) bool { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + + key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64()) + return sds.currentBlocks[key] +} + // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { if granted, relinquish := sds.claimExclusiveAccess(block); granted { @@ -1202,7 +1260,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { return } log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, writeLoopParams.Params) + err := sds.writeStateDiffAt(num, writeLoopParams.CopyParams()) if err != nil { log.Error("Backfill error: " + err.Error()) }