398: Statediff missing parent blocks automatically. #399

Merged
telackey merged 8 commits from telackey/398 into v1.11.6-statediff-v5 2023-07-18 19:41:20 +00:00
17 changed files with 159 additions and 43 deletions

View File

@ -5,7 +5,7 @@ on:
env: env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'e62830c982d4dfc5f3c1c2b12c1754a7e9b538f1'}} stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'e62830c982d4dfc5f3c1c2b12c1754a7e9b538f1'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '66cd1d9e696cfa72f9d272927f9e945905c9f093' }} ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '1b922dbff350bfe2a9aec5fe82079e9d855ea7ed' }}
GOPATH: /tmp/go GOPATH: /tmp/go
jobs: jobs:
@ -53,6 +53,17 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v2 uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-eth-db-ref }}
repository: cerc-io/ipld-eth-db
path: "./ipld-eth-db/"
fetch-depth: 0
- name: Build ipld-eth-db
run: |
docker build -f ./ipld-eth-db/Dockerfile ./ipld-eth-db/ -t cerc/ipld-eth-db:local
- name: Run docker compose - name: Run docker compose
run: | run: |
docker-compose up -d docker-compose up -d

View File

@ -281,7 +281,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name), NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name), WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
BackfillCheckPastBlocks: ctx.Uint64(utils.StateDiffBackfillCheckPastBlocks.Name), BackfillCheckPastBlocks: ctx.Uint64(utils.StateDiffBackfillCheckPastBlocks.Name),
BackfillMaxHeadGap: ctx.Uint64(utils.StateDiffBackfillMaxHeadGap.Name), BackfillMaxDepth: ctx.Uint64(utils.StateDiffBackfillMaxDepth.Name),
} }
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend) utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
} }

View File

@ -177,7 +177,7 @@ var (
utils.StateDiffLogStatements, utils.StateDiffLogStatements,
utils.StateDiffCopyFrom, utils.StateDiffCopyFrom,
utils.StateDiffBackfillCheckPastBlocks, utils.StateDiffBackfillCheckPastBlocks,
utils.StateDiffBackfillMaxHeadGap, utils.StateDiffBackfillMaxDepth,
configFileFlag, configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags) }, utils.NetworkFlags, utils.DatabasePathFlags)

View File

@ -1125,9 +1125,9 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Usage: "The number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking.", Usage: "The number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking.",
Value: 7200, Value: 7200,
} }
StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{ StateDiffBackfillMaxDepth = &cli.Uint64Flag{
Name: "statediff.backfillmaxheadgap", Name: "statediff.backfillmaxdepth",
Usage: "The maximum gap between the current statediff and head positions that can be backfilled.", Usage: "When statediffing head, the maximum number of missing parents that can be backfilled.",
Value: 7200, Value: 7200,
} }
) )

View File

@ -5,7 +5,7 @@ services:
restart: on-failure restart: on-failure
depends_on: depends_on:
- ipld-eth-db - ipld-eth-db
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.2-alpha image: cerc/ipld-eth-db:local
environment: environment:
DATABASE_USER: "vdbm" DATABASE_USER: "vdbm"
DATABASE_NAME: "cerc_testing" DATABASE_NAME: "cerc_testing"

View File

@ -37,7 +37,7 @@ type Config struct {
// Whether to enable writing state diffs directly to track blockchain head // Whether to enable writing state diffs directly to track blockchain head
EnableWriteLoop bool EnableWriteLoop bool
// The maximum number of blocks to backfill when tracking head. // The maximum number of blocks to backfill when tracking head.
BackfillMaxHeadGap uint64 BackfillMaxDepth uint64
// The maximum number of blocks behind the startup position to check for gaps. // The maximum number of blocks behind the startup position to check for gaps.
BackfillCheckPastBlocks uint64 BackfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
@ -66,12 +66,33 @@ func (p *Params) ComputeWatchedAddressesLeafPaths() {
} }
} }
func (p *Params) Copy() Params {
ret := Params{
IncludeBlock: p.IncludeBlock,
IncludeReceipts: p.IncludeReceipts,
IncludeTD: p.IncludeTD,
IncludeCode: p.IncludeCode,
}
ret.WatchedAddresses = make([]common.Address, len(p.WatchedAddresses))
copy(ret.WatchedAddresses, p.WatchedAddresses)
return ret
}
// ParamsWithMutex allows to lock the parameters while they are being updated | read from // ParamsWithMutex allows to lock the parameters while they are being updated | read from
type ParamsWithMutex struct { type ParamsWithMutex struct {
Params Params
sync.RWMutex sync.RWMutex
} }
// CopyParams returns a defensive copy of the Params
func (p *ParamsWithMutex) CopyParams() Params {
p.RLock()
defer p.RUnlock()
return p.Params.Copy()
}
// Args bundles the arguments for the state diff builder // Args bundles the arguments for the state diff builder
type Args struct { type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash OldStateRoot, NewStateRoot, BlockHash common.Hash

View File

@ -186,6 +186,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
UnclesHash: header.UncleHash.String(), UnclesHash: header.UncleHash.String(),
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true,
} }
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod) _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err return headerID, err

View File

@ -234,7 +234,8 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{} var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase,
header.Canonical)
csw.rows <- tableRow{schema.TableHeader, values} csw.rows <- tableRow{schema.TableHeader, values}
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
} }

View File

@ -246,6 +246,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld
UnclesHash: header.UncleHash.String(), UnclesHash: header.UncleHash.String(),
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true,
}) })
return headerID return headerID
} }

View File

@ -140,8 +140,8 @@ const (
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES " + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s');\n" "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
@ -189,7 +189,7 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase) header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical)
sqw.stmts <- []byte(stmt) sqw.stmts <- []byte(stmt)
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
} }

View File

@ -260,6 +260,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
UnclesHash: header.UncleHash.String(), UnclesHash: header.UncleHash.String(),
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true,
}) })
} }

View File

@ -49,6 +49,7 @@ type Statements interface {
MaxHeaderStm() string MaxHeaderStm() string
ExistsHeaderStm() string ExistsHeaderStm() string
InsertHeaderStm() string InsertHeaderStm() string
SetCanonicalHeaderStm() string
InsertUncleStm() string InsertUncleStm() string
InsertTxStm() string InsertTxStm() string
InsertRctStm() string InsertRctStm() string

View File

@ -62,6 +62,12 @@ func (db *DB) InsertHeaderStm() string {
return schema.TableHeader.ToInsertStatement(db.upsert) return schema.TableHeader.ToInsertStatement(db.upsert)
} }
// SetCanonicalHeaderStm satisfies the sql.Statements interface
// Stm == Statement
func (db *DB) SetCanonicalHeaderStm() string {
return fmt.Sprintf("UPDATE %s SET canonical = false WHERE block_number = $1::BIGINT AND block_hash <> $2::TEXT AND canonical = true", schema.TableHeader.Name)
}
// InsertUncleStm satisfies the sql.Statements interface // InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string { func (db *DB) InsertUncleStm() string {
return schema.TableUncle.ToInsertStatement(db.upsert) return schema.TableUncle.ToInsertStatement(db.upsert)

View File

@ -92,6 +92,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) {
&model.Bloom, &model.Bloom,
&model.Timestamp, &model.Timestamp,
&model.Coinbase, &model.Coinbase,
&model.Canonical,
) )
model.BlockNumber = strconv.FormatUint(number, 10) model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10) model.TotalDifficulty = strconv.FormatUint(td, 10)
@ -120,11 +121,22 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
header.UnclesHash, header.UnclesHash,
header.Bloom, header.Bloom,
header.Timestamp, header.Timestamp,
header.Coinbase) header.Coinbase,
header.Canonical,
)
if err != nil { if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
} }
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
_, err = tx.Exec(w.db.Context(), w.db.SetCanonicalHeaderStm(),
header.BlockNumber,
header.BlockHash,
)
if err != nil {
return insertError{"eth.header_cids", err, w.db.SetCanonicalHeaderStm(), header}
}
return nil return nil
} }

View File

@ -41,6 +41,7 @@ type HeaderModel struct {
Bloom []byte `db:"bloom"` Bloom []byte `db:"bloom"`
Timestamp uint64 `db:"timestamp"` Timestamp uint64 `db:"timestamp"`
Coinbase string `db:"coinbase"` Coinbase string `db:"coinbase"`
Canonical bool `db:"canonical"`
} }
// UncleModel is the db model for eth.uncle_cids // UncleModel is the db model for eth.uncle_cids

View File

@ -54,6 +54,7 @@ var TableHeader = Table{
{Name: "bloom", Type: Dbytea}, {Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric}, {Name: "timestamp", Type: Dnumeric},
{Name: "coinbase", Type: Dvarchar}, {Name: "coinbase", Type: Dvarchar},
{Name: "canonical", Type: Dboolean},
}, },
UpsertClause: OnConflict("block_number", "block_hash").Set( UpsertClause: OnConflict("block_number", "block_hash").Set(
"parent_hash", "parent_hash",
@ -68,6 +69,7 @@ var TableHeader = Table{
"bloom", "bloom",
"timestamp", "timestamp",
"coinbase", "coinbase",
"canonical",
)} )}
var TableStateNode = Table{ var TableStateNode = Table{

View File

@ -139,7 +139,7 @@ type Service struct {
// Whether to enable writing state diffs directly to track blockchain head. // Whether to enable writing state diffs directly to track blockchain head.
enableWriteLoop bool enableWriteLoop bool
// Settings to use for backfilling state diffs (plugging gaps when tracking head) // Settings to use for backfilling state diffs (plugging gaps when tracking head)
backfillMaxHeadGap uint64 backfillMaxDepth uint64
backfillCheckPastBlocks uint64 backfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
numWorkers uint numWorkers uint
@ -232,7 +232,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
WaitForSync: params.WaitForSync, WaitForSync: params.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: params.EnableWriteLoop, enableWriteLoop: params.EnableWriteLoop,
backfillMaxHeadGap: params.BackfillMaxHeadGap, backfillMaxDepth: params.BackfillMaxDepth,
backfillCheckPastBlocks: params.BackfillCheckPastBlocks, backfillCheckPastBlocks: params.BackfillCheckPastBlocks,
numWorkers: workers, numWorkers: workers,
maxRetry: defaultRetryLimit, maxRetry: defaultRetryLimit,
@ -274,7 +274,7 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index
WaitForSync: cfg.WaitForSync, WaitForSync: cfg.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop, enableWriteLoop: cfg.EnableWriteLoop,
backfillMaxHeadGap: cfg.BackfillMaxHeadGap, backfillMaxDepth: cfg.BackfillMaxDepth,
backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks, backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
numWorkers: workers, numWorkers: workers,
maxRetry: defaultRetryLimit, maxRetry: defaultRetryLimit,
@ -365,14 +365,6 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
} else { } else {
log.Warn("WriteLoop: received unexpected block from the future", "block height", nextHeight, "last height", lastHeight) log.Warn("WriteLoop: received unexpected block from the future", "block height", nextHeight, "last height", lastHeight)
if distance <= sds.backfillMaxHeadGap {
for i := lastHeight + 1; i < nextHeight; i++ {
log.Info("WriteLoop: backfilling gap to head", "block", i, "block height", nextHeight, "last height", lastHeight)
blockFwd <- sds.BlockChain.GetBlockByNumber(i)
}
} else {
log.Warn("WriteLoop: gap to head too large to backfill", "block height", nextHeight, "last height", lastHeight, "gap", distance)
}
blockFwd <- chainEvent.Block blockFwd <- chainEvent.Block
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
} }
@ -409,9 +401,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie. // For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
writeLoopParams.RLock() err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.CopyParams())
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId) genesisBlockNumber, "error", err.Error(), "worker", workerId)
@ -422,27 +412,78 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
func (sds *Service) writeLoopWorker(params workerParams) { func (sds *Service) writeLoopWorker(params workerParams) {
defer params.wg.Done() defer params.wg.Done()
// statediffs the indicated block, and while maxBackfill > 0, backfills missing parent blocks.
var writeBlockWithParents func(*types.Block, uint64, Params) error
writeBlockWithParents = func(block *types.Block, maxBackfill uint64, writeParams Params) error {
parentBlock := sds.BlockCache.getParentBlock(block, sds.BlockChain)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", block.Number())
return nil
}
parentIsGenesis := parentBlock.Number().Uint64() == genesisBlockNumber
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentIsGenesis {
sds.writeGenesisStateDiff(parentBlock, params.id)
}
log.Info("Writing state diff", "block height", block.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiffWithRetry(block, parentBlock.Root(), writeParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error",
"number", block.Number().Uint64(),
"hash", block.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
return err
}
if !parentIsGenesis {
// We do this _after_ indexing the requested block. This makes sure that if a child of ours arrives for
// statediffing while we are still working on missing ancestors, its regress stops at us, and only we
// continue working backward.
parentIndexed, err := sds.indexedOrInProgress(parentBlock)
if err != nil {
log.Error("Error checking for indexing status of parent block.",
"number", block.Number(), "hash", block.Hash().String(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
} else if !parentIndexed {
if maxBackfill > 0 {
log.Info("Parent block not indexed. Indexing now.",
"number", block.Number(), "hash", block.Hash().Hex(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(),
"worker", params.id)
err = writeBlockWithParents(parentBlock, maxBackfill-1, writeParams)
if err != nil {
log.Error("Error indexing parent block.",
"number", block.Number(), "hash", block.Hash().Hex(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
}
} else {
log.Error("ERROR: Parent block not indexed but max backfill depth exceeded. Index MUST be corrected manually.",
"number", block.Number(), "hash", block.Hash().String(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().String(),
"worker", params.id)
}
}
}
return nil
}
for { for {
select { select {
//Notify chain event channel of events //Notify chain event channel of events
case chainEvent := <-params.blockCh: case chainEvent := <-params.blockCh:
log.Debug("WriteLoop(): chain event received", "event", chainEvent) log.Debug("WriteLoop(): chain event received", "event", chainEvent)
currentBlock := chainEvent currentBlock := chainEvent
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) err := writeBlockWithParents(currentBlock, sds.backfillMaxDepth, writeLoopParams.CopyParams())
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue
}
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentBlock.Number().Uint64() == genesisBlockNumber {
sds.writeGenesisStateDiff(parentBlock, params.id)
}
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", log.Error("statediff.Service.WriteLoop: processing error",
"block height", currentBlock.Number().Uint64(), "block height", currentBlock.Number().Uint64(),
@ -861,6 +902,14 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
} }
// indexedOrInProgress returns true if the block has already been statediffed or is in progress, else false.
func (sds *Service) indexedOrInProgress(block *types.Block) (bool, error) {
if sds.statediffInProgress(block) {
return true, nil
}
return sds.indexer.HasBlock(block.Hash(), block.NumberU64())
}
// Claim exclusive access for state diffing the specified block. // Claim exclusive access for state diffing the specified block.
// Returns true and a function to release access if successful, else false, nil. // Returns true and a function to release access if successful, else false, nil.
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
@ -879,6 +928,15 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
} }
} }
// statediffInProgress returns true if statediffing is currently in progress for the block, else false.
func (sds *Service) statediffInProgress(block *types.Block) bool {
sds.currentBlocksMutex.Lock()
defer sds.currentBlocksMutex.Unlock()
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
return sds.currentBlocks[key]
}
// Writes a state diff from the current block, parent state root, and provided params // Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
if granted, relinquish := sds.claimExclusiveAccess(block); granted { if granted, relinquish := sds.claimExclusiveAccess(block); granted {
@ -1202,7 +1260,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
return return
} }
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params) err := sds.writeStateDiffAt(num, writeLoopParams.CopyParams())
if err != nil { if err != nil {
log.Error("Backfill error: " + err.Error()) log.Error("Backfill error: " + err.Error())
} }