292: Backfill gaps in the recent past on startup when tracking head. #395
@ -280,6 +280,8 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name),
|
EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name),
|
||||||
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
|
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
|
||||||
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
|
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
|
||||||
|
BackfillCheckPastBlocks: ctx.Uint64(utils.StateDiffBackfillCheckPastBlocks.Name),
|
||||||
|
BackfillMaxHeadGap: ctx.Uint64(utils.StateDiffBackfillMaxHeadGap.Name),
|
||||||
}
|
}
|
||||||
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
|
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
|
||||||
}
|
}
|
||||||
|
@ -176,6 +176,8 @@ var (
|
|||||||
utils.StateDiffUpsert,
|
utils.StateDiffUpsert,
|
||||||
utils.StateDiffLogStatements,
|
utils.StateDiffLogStatements,
|
||||||
utils.StateDiffCopyFrom,
|
utils.StateDiffCopyFrom,
|
||||||
|
utils.StateDiffBackfillCheckPastBlocks,
|
||||||
|
utils.StateDiffBackfillMaxHeadGap,
|
||||||
configFileFlag,
|
configFileFlag,
|
||||||
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
||||||
|
|
||||||
|
@ -1120,6 +1120,16 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
|
|||||||
Name: "statediff.waitforsync",
|
Name: "statediff.waitforsync",
|
||||||
Usage: "Should the statediff service wait for geth to catch up to the head of the chain?",
|
Usage: "Should the statediff service wait for geth to catch up to the head of the chain?",
|
||||||
}
|
}
|
||||||
|
StateDiffBackfillCheckPastBlocks = &cli.Uint64Flag{
|
||||||
|
Name: "statediff.backfillcheckpastblocks",
|
||||||
|
Usage: "The number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking.",
|
||||||
|
Value: 7200,
|
||||||
|
}
|
||||||
|
StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{
|
||||||
|
Name: "statediff.backfillmaxheadgap",
|
||||||
|
Usage: "The maximum gap between the startup statediff and startup head positions that can be backfilled.",
|
||||||
|
Value: 7200,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -36,6 +36,10 @@ type Config struct {
|
|||||||
ClientName string
|
ClientName string
|
||||||
// Whether to enable writing state diffs directly to track blockchain head
|
// Whether to enable writing state diffs directly to track blockchain head
|
||||||
EnableWriteLoop bool
|
EnableWriteLoop bool
|
||||||
|
// The maximum number of blocks to backfill when tracking head.
|
||||||
|
BackfillMaxHeadGap uint64
|
||||||
|
// The maximum number of blocks behind the startup position to check for gaps.
|
||||||
|
BackfillCheckPastBlocks uint64
|
||||||
// Size of the worker pool
|
// Size of the worker pool
|
||||||
NumWorkers uint
|
NumWorkers uint
|
||||||
// Should the statediff service wait until geth has synced to the head of the blockchain?
|
// Should the statediff service wait until geth has synced to the head of the blockchain?
|
||||||
|
@ -418,6 +418,18 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentBlock returns the HeaderModel of the highest existing block in the output.
|
||||||
|
// In the "dump" case, this is always nil.
|
||||||
|
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DetectGaps returns a list of gaps in the output found within the specified block range.
|
||||||
|
// In the "dump" case this is always nil.
|
||||||
|
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.dump.Close()
|
return sdi.dump.Close()
|
||||||
|
@ -461,6 +461,18 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentBlock returns the HeaderModel of the highest existing block in the output.
|
||||||
|
// In the "file" case, this is always nil.
|
||||||
|
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DetectGaps returns a list of gaps in the output found within the specified block range.
|
||||||
|
// In the "file" case this is always nil.
|
||||||
|
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// HasBlock checks whether the indicated block already exists in the output.
|
// HasBlock checks whether the indicated block already exists in the output.
|
||||||
// In the "file" case this is presumed to be false.
|
// In the "file" case this is presumed to be false.
|
||||||
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
@ -219,11 +219,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
return blockTx, err
|
return blockTx, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
|
||||||
|
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
|
||||||
|
return sdi.dbWriter.maxHeader()
|
||||||
|
}
|
||||||
|
|
||||||
// HasBlock checks whether the indicated block already exists in the database.
|
// HasBlock checks whether the indicated block already exists in the database.
|
||||||
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
return sdi.dbWriter.hasHeader(hash, number)
|
return sdi.dbWriter.hasHeader(hash, number)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DetectGaps returns a list of gaps in the database found within the specified block range.
|
||||||
|
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
|
||||||
|
return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
// processHeader publishes and indexes a header IPLD in Postgres
|
// processHeader publishes and indexes a header IPLD in Postgres
|
||||||
// it returns the headerID
|
// it returns the headerID
|
||||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
|
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
|
||||||
|
@ -45,6 +45,8 @@ type Driver interface {
|
|||||||
|
|
||||||
// Statements interface to accommodate different SQL query syntax
|
// Statements interface to accommodate different SQL query syntax
|
||||||
type Statements interface {
|
type Statements interface {
|
||||||
|
DetectGapsStm() string
|
||||||
|
MaxHeaderStm() string
|
||||||
ExistsHeaderStm() string
|
ExistsHeaderStm() string
|
||||||
InsertHeaderStm() string
|
InsertHeaderStm() string
|
||||||
InsertUncleStm() string
|
InsertUncleStm() string
|
||||||
|
@ -41,8 +41,19 @@ type DB struct {
|
|||||||
sql.Driver
|
sql.Driver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MaxHeaderStm satisfies the sql.Statements interface
|
||||||
|
func (db *DB) MaxHeaderStm() string {
|
||||||
|
return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExistsHeaderStm satisfies the sql.Statements interface
|
||||||
func (db *DB) ExistsHeaderStm() string {
|
func (db *DB) ExistsHeaderStm() string {
|
||||||
return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name)
|
return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1::BIGINT AND block_hash = $2::TEXT LIMIT 1)", schema.TableHeader.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DetectGapsStm satisfies the sql.Statements interface
|
||||||
|
func (db *DB) DetectGapsStm() string {
|
||||||
|
return fmt.Sprintf("SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM %s WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1", schema.TableHeader.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertHeaderStm satisfies the sql.Statements interface
|
// InsertHeaderStm satisfies the sql.Statements interface
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
|
||||||
"github.com/jackc/pgtype"
|
"github.com/jackc/pgtype"
|
||||||
@ -48,11 +50,55 @@ func (w *Writer) Close() error {
|
|||||||
return w.db.Close()
|
return w.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hasHeader returns true if a matching hash+number record exists in the database, else false.
|
||||||
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
|
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
|
||||||
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists)
|
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
|
||||||
|
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
|
||||||
|
// and let PG handle the cast
|
||||||
|
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), strconv.FormatUint(blockNumber, 10), blockHash.String()).Scan(&exists)
|
||||||
return exists, err
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// detectGaps returns a list of BlockGaps detected within the specified block range
|
||||||
|
// For example, if the database contains blocks the overall range 1000:2000, but is missing blocks 1110:1230 and 1380
|
||||||
|
// it would return [{FirstMissing: 1110, LastMissing: 1230}, {FirstMissing: 1380, LastMissing: 1380}]
|
||||||
|
func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
|
||||||
|
var gaps []*interfaces.BlockGap
|
||||||
|
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
|
||||||
|
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
|
||||||
|
// and let PG handle the cast
|
||||||
|
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
|
||||||
|
return gaps, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// maxHeader returns the header for the highest block number in the database.
|
||||||
|
// SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1
|
||||||
|
func (w *Writer) maxHeader() (*models.HeaderModel, error) {
|
||||||
|
var model models.HeaderModel
|
||||||
|
var err error
|
||||||
|
var number, td, reward uint64
|
||||||
|
err = w.db.QueryRow(w.db.Context(), w.db.MaxHeaderStm()).Scan(
|
||||||
|
&number,
|
||||||
|
&model.BlockHash,
|
||||||
|
&model.ParentHash,
|
||||||
|
&model.CID,
|
||||||
|
&td,
|
||||||
|
&model.NodeIDs,
|
||||||
|
&reward,
|
||||||
|
&model.StateRoot,
|
||||||
|
&model.TxRoot,
|
||||||
|
&model.RctRoot,
|
||||||
|
&model.UnclesHash,
|
||||||
|
&model.Bloom,
|
||||||
|
&model.Timestamp,
|
||||||
|
&model.Coinbase,
|
||||||
|
)
|
||||||
|
model.BlockNumber = strconv.FormatUint(number, 10)
|
||||||
|
model.TotalDifficulty = strconv.FormatUint(td, 10)
|
||||||
|
model.Reward = strconv.FormatUint(reward, 10)
|
||||||
|
return &model, err
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
|
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||||
@ -29,6 +31,8 @@ import (
|
|||||||
|
|
||||||
// StateDiffIndexer interface required to index statediff data
|
// StateDiffIndexer interface required to index statediff data
|
||||||
type StateDiffIndexer interface {
|
type StateDiffIndexer interface {
|
||||||
|
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
|
||||||
|
CurrentBlock() (*models.HeaderModel, error)
|
||||||
HasBlock(hash common.Hash, number uint64) (bool, error)
|
HasBlock(hash common.Hash, number uint64) (bool, error)
|
||||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
||||||
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
|
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
|
||||||
@ -54,3 +58,9 @@ type Batch interface {
|
|||||||
type Config interface {
|
type Config interface {
|
||||||
Type() shared.DBType
|
Type() shared.DBType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Used to represent a gap in statediffed blocks
|
||||||
|
type BlockGap struct {
|
||||||
|
FirstMissing uint64 `json:"firstMissing"`
|
||||||
|
LastMissing uint64 `json:"lastMissing"`
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ package statediff
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -137,6 +138,9 @@ type Service struct {
|
|||||||
indexer interfaces.StateDiffIndexer
|
indexer interfaces.StateDiffIndexer
|
||||||
// Whether to enable writing state diffs directly to track blockchain head.
|
// Whether to enable writing state diffs directly to track blockchain head.
|
||||||
enableWriteLoop bool
|
enableWriteLoop bool
|
||||||
|
// Settings to use for backfilling state diffs (plugging gaps when tracking head)
|
||||||
|
backfillMaxHeadGap uint64
|
||||||
|
backfillCheckPastBlocks uint64
|
||||||
// Size of the worker pool
|
// Size of the worker pool
|
||||||
numWorkers uint
|
numWorkers uint
|
||||||
// Number of retry for aborted transactions due to deadlock.
|
// Number of retry for aborted transactions due to deadlock.
|
||||||
@ -222,6 +226,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
WaitForSync: params.WaitForSync,
|
WaitForSync: params.WaitForSync,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
|
backfillMaxHeadGap: params.BackfillMaxHeadGap,
|
||||||
|
backfillCheckPastBlocks: params.BackfillCheckPastBlocks,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
||||||
@ -262,6 +268,8 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index
|
|||||||
WaitForSync: cfg.WaitForSync,
|
WaitForSync: cfg.WaitForSync,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: cfg.EnableWriteLoop,
|
enableWriteLoop: cfg.EnableWriteLoop,
|
||||||
|
backfillMaxHeadGap: cfg.BackfillMaxHeadGap,
|
||||||
|
backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
||||||
@ -692,6 +700,8 @@ func (sds *Service) Start() error {
|
|||||||
go sds.Loop(chainEventCh)
|
go sds.Loop(chainEventCh)
|
||||||
|
|
||||||
if sds.enableWriteLoop {
|
if sds.enableWriteLoop {
|
||||||
|
log.Info("Starting statediff DB backfill", "params", writeLoopParams.Params)
|
||||||
|
go sds.Backfill()
|
||||||
log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params)
|
log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params)
|
||||||
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
||||||
go sds.WriteLoop(chainEventCh)
|
go sds.WriteLoop(chainEventCh)
|
||||||
@ -1101,3 +1111,149 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
|
|||||||
|
|
||||||
return addresses, nil
|
return addresses, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
|
||||||
|
func (sds *Service) Backfill() {
|
||||||
|
chainBlock := sds.BlockChain.CurrentBlock()
|
||||||
|
if nil == chainBlock {
|
||||||
|
log.Info("Backfill: No previous chain block, nothing to backfill.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
chainBlockNumber := chainBlock.Number.Uint64()
|
||||||
|
if chainBlockNumber == 0 {
|
||||||
|
log.Info("Backfill: At start of chain, nothing to backfill.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
indexerBlock, err := sds.indexer.CurrentBlock()
|
||||||
|
if nil == indexerBlock {
|
||||||
|
log.Info("Backfill: No previous indexer block, nothing to backfill.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if nil != err {
|
||||||
|
log.Error("Backfill error: " + err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
|
||||||
|
if nil != err {
|
||||||
|
log.Error("Backfill error: " + err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
headGap := chainBlockNumber - indexerBlockNumber
|
||||||
|
log.Info(
|
||||||
|
"Backfill: initial positions",
|
||||||
|
"chain", chainBlockNumber,
|
||||||
|
"indexer", indexerBlockNumber,
|
||||||
|
"headGap", headGap,
|
||||||
|
)
|
||||||
|
|
||||||
|
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
|
||||||
|
if headGap < sds.backfillMaxHeadGap {
|
||||||
|
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
|
||||||
|
log.Info("Backfill: all workers done filling headGap.")
|
||||||
|
} else {
|
||||||
|
log.Error("Backfill: headGap too large to fill.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sds.backfillCheckPastBlocks > 0 {
|
||||||
|
var gapCheckBeginNumber uint64 = 0
|
||||||
|
if indexerBlockNumber > sds.backfillCheckPastBlocks {
|
||||||
|
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
|
||||||
|
}
|
||||||
|
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
|
||||||
|
if nil != err {
|
||||||
|
log.Error("Backfill error: " + err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if nil != blockGaps && len(blockGaps) > 0 {
|
||||||
|
gapsMsg, _ := json.Marshal(blockGaps)
|
||||||
|
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
|
||||||
|
sds.backfillDetectedGaps(blockGaps)
|
||||||
|
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
|
||||||
|
} else {
|
||||||
|
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
|
||||||
|
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
|
||||||
|
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
|
||||||
|
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
|
||||||
|
var ch = make(chan uint64)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := uint(0); i < sds.numWorkers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(w uint) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case num, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
log.Info("Backfill: headGap done", "worker", w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
|
||||||
|
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Backfill error: " + err.Error())
|
||||||
|
}
|
||||||
|
case <-sds.QuitChan:
|
||||||
|
log.Info("Backfill: quitting before finish", "worker", w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ {
|
||||||
|
ch <- bn
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
|
||||||
|
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
|
||||||
|
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
|
||||||
|
// is still in-flight, but a later block was already written.
|
||||||
|
func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
|
||||||
|
var ch = make(chan uint64)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := uint(0); i < sds.numWorkers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(w uint) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case num, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
log.Info("Backfill: detected gap fill done", "worker", w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
|
||||||
|
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Backfill error: " + err.Error())
|
||||||
|
}
|
||||||
|
case <-sds.QuitChan:
|
||||||
|
log.Info("Backfill: quitting before finish", "worker", w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, gap := range blockGaps {
|
||||||
|
for num := gap.FirstMissing; num <= gap.LastMissing; num++ {
|
||||||
|
ch <- num
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
@ -32,6 +34,14 @@ var _ interfaces.Batch = &batch{}
|
|||||||
// StateDiffIndexer is a mock state diff indexer
|
// StateDiffIndexer is a mock state diff indexer
|
||||||
type StateDiffIndexer struct{}
|
type StateDiffIndexer struct{}
|
||||||
|
|
||||||
|
func (sdi *StateDiffIndexer) DetectGaps(beginBlock uint64, endBlock uint64) ([]*interfaces.BlockGap, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
type batch struct{}
|
type batch struct{}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user