Statediff missing parent blocks automatically #8

Merged
roysc merged 1 commits from backfill-parent-blocks into main 2023-07-20 02:15:51 +00:00
13 changed files with 212 additions and 127 deletions

View File

@ -39,7 +39,7 @@ type Config struct {
// Whether to enable writing state diffs directly to track blockchain head // Whether to enable writing state diffs directly to track blockchain head
EnableWriteLoop bool EnableWriteLoop bool
// The maximum number of blocks to backfill when tracking head. // The maximum number of blocks to backfill when tracking head.
BackfillMaxHeadGap uint64 BackfillMaxDepth uint64
// The maximum number of blocks behind the startup position to check for gaps. // The maximum number of blocks behind the startup position to check for gaps.
BackfillCheckPastBlocks uint64 BackfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
@ -60,6 +60,19 @@ type Params struct {
watchedAddressesLeafPaths [][]byte watchedAddressesLeafPaths [][]byte
} }
func (p *Params) Copy() Params {
ret := Params{
IncludeBlock: p.IncludeBlock,
IncludeReceipts: p.IncludeReceipts,
IncludeTD: p.IncludeTD,
IncludeCode: p.IncludeCode,
}
ret.WatchedAddresses = make([]common.Address, len(p.WatchedAddresses))
copy(ret.WatchedAddresses, p.WatchedAddresses)
return ret
}
// ComputeWatchedAddressesLeafPaths populates a slice with paths (hex_encoding(Keccak256)) of each of the WatchedAddresses // ComputeWatchedAddressesLeafPaths populates a slice with paths (hex_encoding(Keccak256)) of each of the WatchedAddresses
func (p *Params) ComputeWatchedAddressesLeafPaths() { func (p *Params) ComputeWatchedAddressesLeafPaths() {
p.watchedAddressesLeafPaths = make([][]byte, len(p.WatchedAddresses)) p.watchedAddressesLeafPaths = make([][]byte, len(p.WatchedAddresses))
@ -74,6 +87,14 @@ type ParamsWithMutex struct {
sync.RWMutex sync.RWMutex
} }
// CopyParams returns a defensive copy of the Params
func (p *ParamsWithMutex) CopyParams() Params {
p.RLock()
defer p.RUnlock()
return p.Params.Copy()
}
// Args bundles the arguments for the state diff builder // Args bundles the arguments for the state diff builder
type Args struct { type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash OldStateRoot, NewStateRoot, BlockHash common.Hash

View File

@ -187,6 +187,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
UnclesHash: header.UncleHash.String(), UnclesHash: header.UncleHash.String(),
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true,
} }
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err return headerID, err

View File

@ -231,7 +231,8 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{} var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase,
header.Canonical)
csw.rows <- tableRow{schema.TableHeader, values} csw.rows <- tableRow{schema.TableHeader, values}
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
} }

View File

@ -244,6 +244,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld
UnclesHash: header.UncleHash.String(), UnclesHash: header.UncleHash.String(),
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true,
}) })
return headerID return headerID
} }

View File

@ -140,8 +140,8 @@ const (
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES " + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s');\n" "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
@ -189,7 +189,7 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase) header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical)
sqw.stmts <- []byte(stmt) sqw.stmts <- []byte(stmt)
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
} }

View File

@ -257,6 +257,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
UnclesHash: header.UncleHash.String(), UnclesHash: header.UncleHash.String(),
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true,
}) })
} }

View File

@ -49,6 +49,7 @@ type Statements interface {
MaxHeaderStm() string MaxHeaderStm() string
ExistsHeaderStm() string ExistsHeaderStm() string
InsertHeaderStm() string InsertHeaderStm() string
SetCanonicalHeaderStm() string
InsertUncleStm() string InsertUncleStm() string
InsertTxStm() string InsertTxStm() string
InsertRctStm() string InsertRctStm() string

View File

@ -62,6 +62,12 @@ func (db *DB) InsertHeaderStm() string {
return schema.TableHeader.ToInsertStatement(db.upsert) return schema.TableHeader.ToInsertStatement(db.upsert)
} }
// SetCanonicalHeaderStm satisfies the sql.Statements interface
// Stm == Statement
func (db *DB) SetCanonicalHeaderStm() string {
return fmt.Sprintf("UPDATE %s SET canonical = false WHERE block_number = $1::BIGINT AND block_hash <> $2::TEXT AND canonical = true", schema.TableHeader.Name)
}
// InsertUncleStm satisfies the sql.Statements interface // InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string { func (db *DB) InsertUncleStm() string {
return schema.TableUncle.ToInsertStatement(db.upsert) return schema.TableUncle.ToInsertStatement(db.upsert)

View File

@ -90,6 +90,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) {
&model.Bloom, &model.Bloom,
&model.Timestamp, &model.Timestamp,
&model.Coinbase, &model.Coinbase,
&model.Canonical,
) )
model.BlockNumber = strconv.FormatUint(number, 10) model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10) model.TotalDifficulty = strconv.FormatUint(td, 10)
@ -118,11 +119,22 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
header.UnclesHash, header.UnclesHash,
header.Bloom, header.Bloom,
header.Timestamp, header.Timestamp,
header.Coinbase) header.Coinbase,
header.Canonical,
)
if err != nil { if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
} }
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
_, err = tx.Exec(w.db.Context(), w.db.SetCanonicalHeaderStm(),
header.BlockNumber,
header.BlockHash,
)
if err != nil {
return insertError{"eth.header_cids", err, w.db.SetCanonicalHeaderStm(), header}
}
return nil return nil
} }

View File

@ -41,6 +41,7 @@ type HeaderModel struct {
Bloom []byte `db:"bloom"` Bloom []byte `db:"bloom"`
Timestamp uint64 `db:"timestamp"` Timestamp uint64 `db:"timestamp"`
Coinbase string `db:"coinbase"` Coinbase string `db:"coinbase"`
Canonical bool `db:"canonical"`
} }
// UncleModel is the db model for eth.uncle_cids // UncleModel is the db model for eth.uncle_cids

View File

@ -54,6 +54,7 @@ var TableHeader = Table{
{Name: "bloom", Type: Dbytea}, {Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric}, {Name: "timestamp", Type: Dnumeric},
{Name: "coinbase", Type: Dvarchar}, {Name: "coinbase", Type: Dvarchar},
{Name: "canonical", Type: Dboolean},
}, },
UpsertClause: OnConflict("block_number", "block_hash").Set( UpsertClause: OnConflict("block_number", "block_hash").Set(
"parent_hash", "parent_hash",
@ -68,6 +69,7 @@ var TableHeader = Table{
"bloom", "bloom",
"timestamp", "timestamp",
"coinbase", "coinbase",
"canonical",
)} )}
var TableStateNode = Table{ var TableStateNode = Table{

View File

@ -56,9 +56,9 @@ func init() {
"statediff.backfillcheckpastblocks", 7200, "statediff.backfillcheckpastblocks", 7200,
"Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking", "Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking",
) )
Flags.Uint64Var(&config.BackfillMaxHeadGap, Flags.Uint64Var(&config.BackfillMaxDepth,
"statediff.backfillmaxheadgap", 7200, "statediff.backfillmaxdepth", 7200,
"Maximum gap between the startup statediff and startup head positions that can be backfilled", "When statediffing head, the maximum number of missing parents that can be backfilled",
) )
Flags.Var(&dbType, Flags.Var(&dbType,

View File

@ -83,7 +83,7 @@ type Service struct {
// Parameters to use in the service write loop, if enabled // Parameters to use in the service write loop, if enabled
writeLoopParams ParamsWithMutex writeLoopParams ParamsWithMutex
// Settings to use for backfilling state diffs (plugging gaps when tracking head) // Settings to use for backfilling state diffs (plugging gaps when tracking head)
backfillMaxHeadGap uint64 backfillMaxDepth uint64
backfillCheckPastBlocks uint64 backfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
numWorkers uint numWorkers uint
@ -133,6 +133,12 @@ type jobStatusSubscription struct {
quitChan chan<- bool quitChan chan<- bool
} }
// Utility type for showing the relative positions of the blockchain and the statediff indexer.
type servicePosition struct {
chainBlockNumber uint64
indexerBlockNumber uint64
}
// BlockCache caches the last block for safe access from different service loops // BlockCache caches the last block for safe access from different service loops
type BlockCache struct { type BlockCache struct {
sync.Mutex sync.Mutex
@ -141,9 +147,9 @@ type BlockCache struct {
} }
type workerParams struct { type workerParams struct {
chainEventCh <-chan core.ChainEvent blockCh <-chan *types.Block
wg *sync.WaitGroup wg *sync.WaitGroup
id uint id uint
} }
func NewBlockCache(max uint) BlockCache { func NewBlockCache(max uint) BlockCache {
@ -172,7 +178,7 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
ShouldWaitForSync: cfg.WaitForSync, ShouldWaitForSync: cfg.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop, enableWriteLoop: cfg.EnableWriteLoop,
backfillMaxHeadGap: cfg.BackfillMaxHeadGap, backfillMaxDepth: cfg.BackfillMaxDepth,
backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks, backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
numWorkers: workers, numWorkers: workers,
maxRetry: defaultRetryLimit, maxRetry: defaultRetryLimit,
@ -213,16 +219,21 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc BlockChain)
// WriteLoop event loop for progressively processing and writing diffs directly to DB // WriteLoop event loop for progressively processing and writing diffs directly to DB
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
log.Info("Starting statediff write loop") initialPos := sds.currentPosition()
log.Info(
"WriteLoop: initial positions",
"chain", initialPos.chainBlockNumber,
"indexer", initialPos.indexerBlockNumber,
)
log := log.New("context", "statediff writing") log := log.New("context", "statediff writing")
sub := sds.BlockChain.SubscribeChainEvent(chainEventCh) sub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer sub.Unsubscribe() defer sub.Unsubscribe()
var wg sync.WaitGroup var wg sync.WaitGroup
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) blockFwd := make(chan *types.Block, chainEventChanSize)
defer func() { defer func() {
log.Info("Quitting") log.Info("Quitting")
close(chainEventFwd) close(blockFwd)
}() }()
wg.Add(1) wg.Add(1)
@ -232,16 +243,30 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
select { select {
case event := <-chainEventCh: case event := <-chainEventCh:
// First process metrics for chain events, then forward to workers // First process metrics for chain events, then forward to workers
lastHeight := defaultStatediffMetrics.lastEventHeight.Value() lastHeight := uint64(defaultStatediffMetrics.lastEventHeight.Value())
if lastHeight == 0 {
lastHeight = initialPos.indexerBlockNumber
}
block := event.Block block := event.Block
log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash) log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash)
nextHeight := int64(block.Number().Uint64()) nextHeight := block.Number().Uint64()
if nextHeight-lastHeight != 1 { if nextHeight > lastHeight {
log.Warn("Received block out-of-order", "next", nextHeight, "last", lastHeight) distance := nextHeight - lastHeight
if distance == 1 {
log.Info("WriteLoop: received expected block",
"block number", nextHeight, "last number", lastHeight)
} else {
log.Warn("WriteLoop: received unexpected block from the future",
"block number", nextHeight, "last number", lastHeight)
}
blockFwd <- block
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
} else {
log.Warn("WriteLoop: received unexpected block from the past",
"block number", nextHeight, "last number", lastHeight)
blockFwd <- block
} }
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- event
case err := <-sub.Err(): case err := <-sub.Err():
if err != nil { if err != nil {
log.Error("Error from subscription", "error", err) log.Error("Error from subscription", "error", err)
@ -255,7 +280,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
}() }()
wg.Add(int(sds.numWorkers)) wg.Add(int(sds.numWorkers))
for worker := uint(0); worker < sds.numWorkers; worker++ { for worker := uint(0); worker < sds.numWorkers; worker++ {
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} params := workerParams{blockCh: blockFwd, wg: &wg, id: worker}
go sds.writeLoopWorker(params) go sds.writeLoopWorker(params)
} }
wg.Wait() wg.Wait()
@ -264,10 +289,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Logger) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Logger) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie. // For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing genesis state diff", "number", genesisBlockNumber) log.Info("Writing genesis state diff", "number", genesisBlockNumber)
sds.writeLoopParams.RLock()
defer sds.writeLoopParams.RUnlock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params) err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.CopyParams())
if err != nil { if err != nil {
log.Error("failed to write state diff", "number", log.Error("failed to write state diff", "number",
genesisBlockNumber, "error", err) genesisBlockNumber, "error", err)
@ -279,27 +302,71 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Log
func (sds *Service) writeLoopWorker(params workerParams) { func (sds *Service) writeLoopWorker(params workerParams) {
log := log.New("context", "statediff writing", "worker", params.id) log := log.New("context", "statediff writing", "worker", params.id)
defer params.wg.Done() defer params.wg.Done()
// statediffs the indicated block, and while maxBackfill > 0, backfills missing parent blocks.
var writeBlockWithParents func(*types.Block, uint64, Params) error
writeBlockWithParents = func(block *types.Block, maxBackfill uint64, writeParams Params) error {
parentBlock := sds.BlockCache.getParentBlock(block, sds.BlockChain)
if parentBlock == nil {
return errors.New("Parent block is nil, skipping this block")
}
parentIsGenesis := parentBlock.Number().Uint64() == genesisBlockNumber
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentIsGenesis {
sds.writeGenesisStateDiff(parentBlock, log)
}
log.Info("Writing state diff", "number", block.Number())
err := sds.writeStateDiffWithRetry(block, parentBlock.Root(), writeParams)
if err != nil {
return err
}
if parentIsGenesis {
return nil
}
// We do this _after_ indexing the requested block. This makes sure that if a child of ours arrives for
// statediffing while we are still working on missing ancestors, its regress stops at us, and only we
// continue working backward.
parentIndexed, err := sds.indexedOrInProgress(parentBlock)
if err != nil {
log.Error("Error checking for indexing status of parent block.",
"number", block.Number(), "hash", block.Hash(),
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash(),
"error", err)
return err
}
if parentIndexed {
return nil
}
if maxBackfill > 0 {
log.Info("Parent block not indexed. Indexing now.",
"number", block.Number(), "hash", block.Hash(),
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash())
err = writeBlockWithParents(parentBlock, maxBackfill-1, writeParams)
if err != nil {
log.Error("Error indexing parent block.",
"number", block.Number(), "hash", block.Hash(),
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash(),
"error", err)
}
} else {
log.Error("Parent block not indexed but max backfill depth exceeded. Index MUST be corrected manually.",
"number", block.Number(), "hash", block.Hash(),
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash())
}
return nil
}
for { for {
select { select {
case event := <-params.chainEventCh: case block := <-params.blockCh:
block := event.Block log.Debug("Block received", "number", block.Number())
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain) err := writeBlockWithParents(block, sds.backfillMaxDepth, sds.writeLoopParams.CopyParams())
if parent == nil {
log.Error("Parent block is nil, skipping this block", "number", block.Number())
continue
}
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parent.Number().Uint64() == genesisBlockNumber {
sds.writeGenesisStateDiff(parent, log)
}
log.Info("Writing state diff", "number", block.Number())
sds.writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(block, parent.Root(), sds.writeLoopParams.Params)
sds.writeLoopParams.RUnlock()
if err != nil { if err != nil {
log.Error("failed to write state diff", log.Error("Error processing statediff",
"number", block.Number(), "number", block.Number(),
"hash", block.Hash(), "hash", block.Hash(),
"error", err) "error", err)
@ -681,6 +748,23 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params)) return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
} }
// indexedOrInProgress returns true if the block has already been statediffed or is in progress, else false.
func (sds *Service) indexedOrInProgress(block *types.Block) (bool, error) {
if sds.statediffInProgress(block) {
return true, nil
}
return sds.indexer.HasBlock(block.Hash(), block.NumberU64())
}
// statediffInProgress returns true if statediffing is currently in progress for the block, else false.
func (sds *Service) statediffInProgress(block *types.Block) bool {
sds.currentBlocksMutex.Lock()
defer sds.currentBlocksMutex.Unlock()
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
return sds.currentBlocks[key]
}
// Claim exclusive access for state diffing the specified block. // Claim exclusive access for state diffing the specified block.
// Returns true and a function to release access if successful, else false, nil. // Returns true and a function to release access if successful, else false, nil.
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
@ -928,110 +1012,43 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head. // Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
func (sds *Service) Backfill() { func (sds *Service) Backfill() {
chainBlock := sds.BlockChain.CurrentBlock() pos := sds.currentPosition()
if nil == chainBlock { if pos.chainBlockNumber == 0 {
log.Info("Backfill: No previous chain block, nothing to backfill.")
return
}
chainBlockNumber := chainBlock.Number.Uint64()
if chainBlockNumber == 0 {
log.Info("Backfill: At start of chain, nothing to backfill.") log.Info("Backfill: At start of chain, nothing to backfill.")
return return
} }
indexerBlock, err := sds.indexer.CurrentBlock()
if nil == indexerBlock {
log.Info("Backfill: No previous indexer block, nothing to backfill.")
return
}
if nil != err {
log.Error("Backfill error", err)
return
}
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil != err {
log.Error("Backfill error", err)
return
}
headGap := chainBlockNumber - indexerBlockNumber
log.Info( log.Info(
"Backfill: initial positions", "Backfill: initial positions",
"chain", chainBlockNumber, "chain", pos.chainBlockNumber,
"indexer", indexerBlockNumber, "indexer", pos.indexerBlockNumber,
"headGap", headGap,
) )
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
if headGap < sds.backfillMaxHeadGap {
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
log.Info("Backfill: all workers done filling headGap.")
} else {
log.Error("Backfill: headGap too large to fill.")
}
}
if sds.backfillCheckPastBlocks > 0 { if sds.backfillCheckPastBlocks > 0 {
var gapCheckBeginNumber uint64 = 0 var gapCheckBeginNumber uint64 = 0
if indexerBlockNumber > sds.backfillCheckPastBlocks { if pos.indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks gapCheckBeginNumber = pos.indexerBlockNumber - sds.backfillCheckPastBlocks
} }
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, pos.chainBlockNumber)
if nil != err { if nil != err {
log.Error("Backfill error", err) log.Error("Backfill error", "error", err)
return return
} }
if nil != blockGaps && len(blockGaps) > 0 { if nil != blockGaps && len(blockGaps) > 0 {
gapsMsg, _ := json.Marshal(blockGaps) gapsMsg, _ := json.Marshal(blockGaps)
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) log.Info("Backfill: detected gaps in range",
"begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
sds.backfillDetectedGaps(blockGaps) sds.backfillDetectedGaps(blockGaps)
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) log.Info("Backfill: done processing detected gaps in range",
"begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
} else { } else {
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) log.Info("Backfill: no gaps detected in range",
"begin", gapCheckBeginNumber, "end", pos.chainBlockNumber)
} }
} }
} }
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: headGap done", "worker", w)
return
}
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params)
if err != nil {
log.Error("Backfill error", err)
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ {
ch <- bn
}
close(ch)
wg.Wait()
}
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of // backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole) // transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block // a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
@ -1051,7 +1068,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
return return
} }
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params) err := sds.writeStateDiffAt(num, sds.writeLoopParams.CopyParams())
if err != nil { if err != nil {
log.Error("Backfill error: ", err) log.Error("Backfill error: ", err)
} }
@ -1071,3 +1088,24 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
close(ch) close(ch)
wg.Wait() wg.Wait()
} }
// currentPosition returns the current block height for both the BlockChain and the statediff indexer.
func (sds *Service) currentPosition() servicePosition {
ret := servicePosition{}
chainBlock := sds.BlockChain.CurrentBlock()
if nil != chainBlock {
ret.chainBlockNumber = chainBlock.Number.Uint64()
}
indexerBlock, _ := sds.indexer.CurrentBlock()
if nil != indexerBlock {
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil == err {
ret.indexerBlockNumber = indexerBlockNumber
} else {
log.Error("Error parsing indexer block number", "block", indexerBlock.BlockNumber)
}
}
return ret
}