diff --git a/config.go b/config.go index 85e63a2..4f9d704 100644 --- a/config.go +++ b/config.go @@ -39,7 +39,7 @@ type Config struct { // Whether to enable writing state diffs directly to track blockchain head EnableWriteLoop bool // The maximum number of blocks to backfill when tracking head. - BackfillMaxHeadGap uint64 + BackfillMaxDepth uint64 // The maximum number of blocks behind the startup position to check for gaps. BackfillCheckPastBlocks uint64 // Size of the worker pool @@ -60,6 +60,19 @@ type Params struct { watchedAddressesLeafPaths [][]byte } +func (p *Params) Copy() Params { + ret := Params{ + IncludeBlock: p.IncludeBlock, + IncludeReceipts: p.IncludeReceipts, + IncludeTD: p.IncludeTD, + IncludeCode: p.IncludeCode, + } + ret.WatchedAddresses = make([]common.Address, len(p.WatchedAddresses)) + copy(ret.WatchedAddresses, p.WatchedAddresses) + + return ret +} + // ComputeWatchedAddressesLeafPaths populates a slice with paths (hex_encoding(Keccak256)) of each of the WatchedAddresses func (p *Params) ComputeWatchedAddressesLeafPaths() { p.watchedAddressesLeafPaths = make([][]byte, len(p.WatchedAddresses)) @@ -74,6 +87,14 @@ type ParamsWithMutex struct { sync.RWMutex } +// CopyParams returns a defensive copy of the Params +func (p *ParamsWithMutex) CopyParams() Params { + p.RLock() + defer p.RUnlock() + + return p.Params.Copy() +} + // Args bundles the arguments for the state diff builder type Args struct { OldStateRoot, NewStateRoot, BlockHash common.Hash diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index fd9b44b..70e86b7 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -187,6 +187,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he UnclesHash: header.UncleHash.String(), Timestamp: header.Time, Coinbase: header.Coinbase.String(), + Canonical: true, } _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) return headerID, err diff --git a/indexer/database/file/csv_writer.go b/indexer/database/file/csv_writer.go index 6c73ef7..5463e64 100644 --- a/indexer/database/file/csv_writer.go +++ b/indexer/database/file/csv_writer.go @@ -231,7 +231,8 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { var values []interface{} values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) + header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase, + header.Canonical) csw.rows <- tableRow{schema.TableHeader, values} metrics.IndexerMetrics.BlocksCounter.Inc(1) } diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index 38c9a10..85d6958 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -244,6 +244,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld UnclesHash: header.UncleHash.String(), Timestamp: header.Time, Coinbase: header.Coinbase.String(), + Canonical: true, }) return headerID } diff --git a/indexer/database/file/sql_writer.go b/indexer/database/file/sql_writer.go index 849319a..b6b5c4f 100644 --- a/indexer/database/file/sql_writer.go +++ b/indexer/database/file/sql_writer.go @@ -140,8 +140,8 @@ const ( ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + - "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s');\n" + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n" uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" @@ -189,7 +189,7 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) { func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase) + header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical) sqw.stmts <- []byte(stmt) metrics.IndexerMetrics.BlocksCounter.Inc(1) } diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 5811993..2fc2ae5 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -257,6 +257,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he UnclesHash: header.UncleHash.String(), Timestamp: header.Time, Coinbase: header.Coinbase.String(), + Canonical: true, }) } diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index 2296be3..db13ac7 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -49,6 +49,7 @@ type Statements interface { MaxHeaderStm() string ExistsHeaderStm() string InsertHeaderStm() string + SetCanonicalHeaderStm() string InsertUncleStm() string InsertTxStm() string InsertRctStm() string diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index c646d76..ecebd7d 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -62,6 +62,12 @@ func (db *DB) InsertHeaderStm() string { return schema.TableHeader.ToInsertStatement(db.upsert) } +// SetCanonicalHeaderStm satisfies the sql.Statements interface +// Stm == Statement +func (db *DB) SetCanonicalHeaderStm() string { + return fmt.Sprintf("UPDATE %s SET canonical = false WHERE block_number = $1::BIGINT AND block_hash <> $2::TEXT AND canonical = true", schema.TableHeader.Name) +} + // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { return schema.TableUncle.ToInsertStatement(db.upsert) diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 5270365..6d6dd31 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -90,6 +90,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) { &model.Bloom, &model.Timestamp, &model.Coinbase, + &model.Canonical, ) model.BlockNumber = strconv.FormatUint(number, 10) model.TotalDifficulty = strconv.FormatUint(td, 10) @@ -118,11 +119,22 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.UnclesHash, header.Bloom, header.Timestamp, - header.Coinbase) + header.Coinbase, + header.Canonical, + ) if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } metrics.IndexerMetrics.BlocksCounter.Inc(1) + + _, err = tx.Exec(w.db.Context(), w.db.SetCanonicalHeaderStm(), + header.BlockNumber, + header.BlockHash, + ) + if err != nil { + return insertError{"eth.header_cids", err, w.db.SetCanonicalHeaderStm(), header} + } + return nil } diff --git a/indexer/models/models.go b/indexer/models/models.go index 0019209..0fcc964 100644 --- a/indexer/models/models.go +++ b/indexer/models/models.go @@ -41,6 +41,7 @@ type HeaderModel struct { Bloom []byte `db:"bloom"` Timestamp uint64 `db:"timestamp"` Coinbase string `db:"coinbase"` + Canonical bool `db:"canonical"` } // UncleModel is the db model for eth.uncle_cids diff --git a/indexer/shared/schema/schema.go b/indexer/shared/schema/schema.go index b5bda6e..1fbc54e 100644 --- a/indexer/shared/schema/schema.go +++ b/indexer/shared/schema/schema.go @@ -54,6 +54,7 @@ var TableHeader = Table{ {Name: "bloom", Type: Dbytea}, {Name: "timestamp", Type: Dnumeric}, {Name: "coinbase", Type: Dvarchar}, + {Name: "canonical", Type: Dboolean}, }, UpsertClause: OnConflict("block_number", "block_hash").Set( "parent_hash", @@ -68,6 +69,7 @@ var TableHeader = Table{ "bloom", "timestamp", "coinbase", + "canonical", )} var TableStateNode = Table{ diff --git a/main/flags.go b/main/flags.go index 1e4a627..fa2a723 100644 --- a/main/flags.go +++ b/main/flags.go @@ -56,9 +56,9 @@ func init() { "statediff.backfillcheckpastblocks", 7200, "Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking", ) - Flags.Uint64Var(&config.BackfillMaxHeadGap, - "statediff.backfillmaxheadgap", 7200, - "Maximum gap between the startup statediff and startup head positions that can be backfilled", + Flags.Uint64Var(&config.BackfillMaxDepth, + "statediff.backfillmaxdepth", 7200, + "When statediffing head, the maximum number of missing parents that can be backfilled", ) Flags.Var(&dbType, diff --git a/service.go b/service.go index 61c7e5b..4b89fd9 100644 --- a/service.go +++ b/service.go @@ -83,7 +83,7 @@ type Service struct { // Parameters to use in the service write loop, if enabled writeLoopParams ParamsWithMutex // Settings to use for backfilling state diffs (plugging gaps when tracking head) - backfillMaxHeadGap uint64 + backfillMaxDepth uint64 backfillCheckPastBlocks uint64 // Size of the worker pool numWorkers uint @@ -133,6 +133,12 @@ type jobStatusSubscription struct { quitChan chan<- bool } +// Utility type for showing the relative positions of the blockchain and the statediff indexer. +type servicePosition struct { + chainBlockNumber uint64 + indexerBlockNumber uint64 +} + // BlockCache caches the last block for safe access from different service loops type BlockCache struct { sync.Mutex @@ -141,9 +147,9 @@ type BlockCache struct { } type workerParams struct { - chainEventCh <-chan core.ChainEvent - wg *sync.WaitGroup - id uint + blockCh <-chan *types.Block + wg *sync.WaitGroup + id uint } func NewBlockCache(max uint) BlockCache { @@ -172,7 +178,7 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde ShouldWaitForSync: cfg.WaitForSync, indexer: indexer, enableWriteLoop: cfg.EnableWriteLoop, - backfillMaxHeadGap: cfg.BackfillMaxHeadGap, + backfillMaxDepth: cfg.BackfillMaxDepth, backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks, numWorkers: workers, maxRetry: defaultRetryLimit, @@ -213,16 +219,21 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc BlockChain) // WriteLoop event loop for progressively processing and writing diffs directly to DB func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { - log.Info("Starting statediff write loop") + initialPos := sds.currentPosition() + log.Info( + "WriteLoop: initial positions", + "chain", initialPos.chainBlockNumber, + "indexer", initialPos.indexerBlockNumber, + ) log := log.New("context", "statediff writing") sub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer sub.Unsubscribe() var wg sync.WaitGroup - chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + blockFwd := make(chan *types.Block, chainEventChanSize) defer func() { log.Info("Quitting") - close(chainEventFwd) + close(blockFwd) }() wg.Add(1) @@ -232,16 +243,30 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { select { case event := <-chainEventCh: // First process metrics for chain events, then forward to workers - lastHeight := defaultStatediffMetrics.lastEventHeight.Value() + lastHeight := uint64(defaultStatediffMetrics.lastEventHeight.Value()) + if lastHeight == 0 { + lastHeight = initialPos.indexerBlockNumber + } block := event.Block log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash) - nextHeight := int64(block.Number().Uint64()) - if nextHeight-lastHeight != 1 { - log.Warn("Received block out-of-order", "next", nextHeight, "last", lastHeight) + nextHeight := block.Number().Uint64() + if nextHeight > lastHeight { + distance := nextHeight - lastHeight + if distance == 1 { + log.Info("WriteLoop: received expected block", + "block number", nextHeight, "last number", lastHeight) + } else { + log.Warn("WriteLoop: received unexpected block from the future", + "block number", nextHeight, "last number", lastHeight) + } + blockFwd <- block + defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) + } else { + log.Warn("WriteLoop: received unexpected block from the past", + "block number", nextHeight, "last number", lastHeight) + blockFwd <- block } - defaultStatediffMetrics.lastEventHeight.Update(nextHeight) defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) - chainEventFwd <- event case err := <-sub.Err(): if err != nil { log.Error("Error from subscription", "error", err) @@ -255,7 +280,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} + params := workerParams{blockCh: blockFwd, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -264,10 +289,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Logger) { // For genesis block we need to return the entire state trie hence we diff it with an empty trie. log.Info("Writing genesis state diff", "number", genesisBlockNumber) - sds.writeLoopParams.RLock() - defer sds.writeLoopParams.RUnlock() - err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params) + err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.CopyParams()) if err != nil { log.Error("failed to write state diff", "number", genesisBlockNumber, "error", err) @@ -279,27 +302,71 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Log func (sds *Service) writeLoopWorker(params workerParams) { log := log.New("context", "statediff writing", "worker", params.id) defer params.wg.Done() + + // statediffs the indicated block, and while maxBackfill > 0, backfills missing parent blocks. + var writeBlockWithParents func(*types.Block, uint64, Params) error + writeBlockWithParents = func(block *types.Block, maxBackfill uint64, writeParams Params) error { + parentBlock := sds.BlockCache.getParentBlock(block, sds.BlockChain) + if parentBlock == nil { + return errors.New("Parent block is nil, skipping this block") + } + + parentIsGenesis := parentBlock.Number().Uint64() == genesisBlockNumber + + // chainEvent streams block from block 1, but we also need to include data from the genesis block. + if parentIsGenesis { + sds.writeGenesisStateDiff(parentBlock, log) + } + + log.Info("Writing state diff", "number", block.Number()) + err := sds.writeStateDiffWithRetry(block, parentBlock.Root(), writeParams) + if err != nil { + return err + } + + if parentIsGenesis { + return nil + } + // We do this _after_ indexing the requested block. This makes sure that if a child of ours arrives for + // statediffing while we are still working on missing ancestors, its regress stops at us, and only we + // continue working backward. + parentIndexed, err := sds.indexedOrInProgress(parentBlock) + if err != nil { + log.Error("Error checking for indexing status of parent block.", + "number", block.Number(), "hash", block.Hash(), + "parent number", parentBlock.Number(), "parent hash", parentBlock.Hash(), + "error", err) + return err + } + if parentIndexed { + return nil + } + if maxBackfill > 0 { + log.Info("Parent block not indexed. Indexing now.", + "number", block.Number(), "hash", block.Hash(), + "parent number", parentBlock.Number(), "parent hash", parentBlock.Hash()) + err = writeBlockWithParents(parentBlock, maxBackfill-1, writeParams) + if err != nil { + log.Error("Error indexing parent block.", + "number", block.Number(), "hash", block.Hash(), + "parent number", parentBlock.Number(), "parent hash", parentBlock.Hash(), + "error", err) + } + } else { + log.Error("Parent block not indexed but max backfill depth exceeded. Index MUST be corrected manually.", + "number", block.Number(), "hash", block.Hash(), + "parent number", parentBlock.Number(), "parent hash", parentBlock.Hash()) + } + return nil + } + for { select { - case event := <-params.chainEventCh: - block := event.Block - parent := sds.BlockCache.getParentBlock(block, sds.BlockChain) - if parent == nil { - log.Error("Parent block is nil, skipping this block", "number", block.Number()) - continue - } - - // chainEvent streams block from block 1, but we also need to include data from the genesis block. - if parent.Number().Uint64() == genesisBlockNumber { - sds.writeGenesisStateDiff(parent, log) - } - - log.Info("Writing state diff", "number", block.Number()) - sds.writeLoopParams.RLock() - err := sds.writeStateDiffWithRetry(block, parent.Root(), sds.writeLoopParams.Params) - sds.writeLoopParams.RUnlock() + case block := <-params.blockCh: + log.Debug("Block received", "number", block.Number()) + err := writeBlockWithParents(block, sds.backfillMaxDepth, sds.writeLoopParams.CopyParams()) if err != nil { - log.Error("failed to write state diff", + log.Error("Error processing statediff", "number", block.Number(), "hash", block.Hash(), "error", err) @@ -681,6 +748,23 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params)) } +// indexedOrInProgress returns true if the block has already been statediffed or is in progress, else false. +func (sds *Service) indexedOrInProgress(block *types.Block) (bool, error) { + if sds.statediffInProgress(block) { + return true, nil + } + return sds.indexer.HasBlock(block.Hash(), block.NumberU64()) +} + +// statediffInProgress returns true if statediffing is currently in progress for the block, else false. +func (sds *Service) statediffInProgress(block *types.Block) bool { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + + key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64()) + return sds.currentBlocks[key] +} + // Claim exclusive access for state diffing the specified block. // Returns true and a function to release access if successful, else false, nil. func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { @@ -928,110 +1012,43 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add // Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head. func (sds *Service) Backfill() { - chainBlock := sds.BlockChain.CurrentBlock() - if nil == chainBlock { - log.Info("Backfill: No previous chain block, nothing to backfill.") - return - } - - chainBlockNumber := chainBlock.Number.Uint64() - if chainBlockNumber == 0 { + pos := sds.currentPosition() + if pos.chainBlockNumber == 0 { log.Info("Backfill: At start of chain, nothing to backfill.") return } - indexerBlock, err := sds.indexer.CurrentBlock() - if nil == indexerBlock { - log.Info("Backfill: No previous indexer block, nothing to backfill.") - return - } - if nil != err { - log.Error("Backfill error", err) - return - } - - indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) - if nil != err { - log.Error("Backfill error", err) - return - } - - headGap := chainBlockNumber - indexerBlockNumber log.Info( "Backfill: initial positions", - "chain", chainBlockNumber, - "indexer", indexerBlockNumber, - "headGap", headGap, + "chain", pos.chainBlockNumber, + "indexer", pos.indexerBlockNumber, ) - if sds.backfillMaxHeadGap > 0 && headGap > 0 { - if headGap < sds.backfillMaxHeadGap { - sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) - log.Info("Backfill: all workers done filling headGap.") - } else { - log.Error("Backfill: headGap too large to fill.") - } - } - if sds.backfillCheckPastBlocks > 0 { var gapCheckBeginNumber uint64 = 0 - if indexerBlockNumber > sds.backfillCheckPastBlocks { - gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks + if pos.indexerBlockNumber > sds.backfillCheckPastBlocks { + gapCheckBeginNumber = pos.indexerBlockNumber - sds.backfillCheckPastBlocks } - blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) + blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, pos.chainBlockNumber) if nil != err { - log.Error("Backfill error", err) + log.Error("Backfill error", "error", err) return } if nil != blockGaps && len(blockGaps) > 0 { gapsMsg, _ := json.Marshal(blockGaps) - log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + log.Info("Backfill: detected gaps in range", + "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg)) sds.backfillDetectedGaps(blockGaps) - log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + log.Info("Backfill: done processing detected gaps in range", + "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg)) } else { - log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) + log.Info("Backfill: no gaps detected in range", + "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber) } } } -// backfillHeadGap fills in any gap between the statediff position and the chain position at startup. -// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost, -// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc. -func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { - var ch = make(chan uint64) - var wg sync.WaitGroup - for i := uint(0); i < sds.numWorkers; i++ { - wg.Add(1) - go func(w uint) { - defer wg.Done() - for { - select { - case num, ok := <-ch: - if !ok { - log.Info("Backfill: headGap done", "worker", w) - return - } - log.Info("Backfill: backfilling head gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params) - if err != nil { - log.Error("Backfill error", err) - } - case <-sds.QuitChan: - log.Info("Backfill: quitting before finish", "worker", w) - return - } - } - }(i) - } - - for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ { - ch <- bn - } - close(ch) - wg.Wait() -} - // backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of // transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole) // a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block @@ -1051,7 +1068,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { return } log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params) + err := sds.writeStateDiffAt(num, sds.writeLoopParams.CopyParams()) if err != nil { log.Error("Backfill error: ", err) } @@ -1071,3 +1088,24 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { close(ch) wg.Wait() } + +// currentPosition returns the current block height for both the BlockChain and the statediff indexer. +func (sds *Service) currentPosition() servicePosition { + ret := servicePosition{} + chainBlock := sds.BlockChain.CurrentBlock() + if nil != chainBlock { + ret.chainBlockNumber = chainBlock.Number.Uint64() + } + + indexerBlock, _ := sds.indexer.CurrentBlock() + if nil != indexerBlock { + indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) + if nil == err { + ret.indexerBlockNumber = indexerBlockNumber + } else { + log.Error("Error parsing indexer block number", "block", indexerBlock.BlockNumber) + } + } + + return ret +}