292: Backfill gaps in the recent past on startup when tracking head. (#395)

* Backfill gaps in the recent past when statediffing head.
This commit is contained in:
Thomas E Lackey 2023-06-23 21:04:57 -07:00 committed by Roy Crihfield
parent 656e9ee17d
commit c8939d0804
11 changed files with 296 additions and 18 deletions

View File

@ -38,6 +38,10 @@ type Config struct {
ClientName string ClientName string
// Whether to enable writing state diffs directly to track blockchain head // Whether to enable writing state diffs directly to track blockchain head
EnableWriteLoop bool EnableWriteLoop bool
// The maximum number of blocks to backfill when tracking head.
BackfillMaxHeadGap uint64
// The maximum number of blocks behind the startup position to check for gaps.
BackfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
NumWorkers uint NumWorkers uint
// Should the statediff service wait until geth has synced to the head of the blockchain? // Should the statediff service wait until geth has synced to the head of the blockchain?

View File

@ -419,6 +419,18 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return false, nil return false, nil
} }
// CurrentBlock returns the HeaderModel of the highest existing block in the output.
// In the "dump" case, this is always nil.
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return nil, nil
}
// DetectGaps returns a list of gaps in the output found within the specified block range.
// In the "dump" case this is always nil.
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
return nil, nil
}
// Close satisfies io.Closer // Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error { func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close() return sdi.dump.Close()

View File

@ -459,6 +459,18 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
return nil return nil
} }
// CurrentBlock returns the HeaderModel of the highest existing block in the output.
// In the "file" case, this is always nil.
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return nil, nil
}
// DetectGaps returns a list of gaps in the output found within the specified block range.
// In the "file" case this is always nil.
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
return nil, nil
}
// HasBlock checks whether the indicated block already exists in the output. // HasBlock checks whether the indicated block already exists in the output.
// In the "file" case this is presumed to be false. // In the "file" case this is presumed to be false.
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {

View File

@ -221,6 +221,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err return blockTx, err
} }
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return sdi.dbWriter.maxHeader()
}
// DetectGaps returns a list of gaps in the database found within the specified block range.
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber)
}
// processHeader publishes and indexes a header IPLD in Postgres // processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {

View File

@ -45,6 +45,8 @@ type Driver interface {
// Statements interface to accommodate different SQL query syntax // Statements interface to accommodate different SQL query syntax
type Statements interface { type Statements interface {
DetectGapsStm() string
MaxHeaderStm() string
ExistsHeaderStm() string ExistsHeaderStm() string
InsertHeaderStm() string InsertHeaderStm() string
InsertUncleStm() string InsertUncleStm() string

View File

@ -41,8 +41,19 @@ type DB struct {
sql.Driver sql.Driver
} }
// MaxHeaderStm satisfies the sql.Statements interface
func (db *DB) MaxHeaderStm() string {
return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name)
}
// ExistsHeaderStm satisfies the sql.Statements interface
func (db *DB) ExistsHeaderStm() string { func (db *DB) ExistsHeaderStm() string {
return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name) return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1::BIGINT AND block_hash = $2::TEXT LIMIT 1)", schema.TableHeader.Name)
}
// DetectGapsStm satisfies the sql.Statements interface
func (db *DB) DetectGapsStm() string {
return fmt.Sprintf("SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM %s WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1", schema.TableHeader.Name)
} }
// InsertHeaderStm satisfies the sql.Statements interface // InsertHeaderStm satisfies the sql.Statements interface

View File

@ -27,6 +27,7 @@ import (
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/models"
) )
@ -47,11 +48,55 @@ func (w *Writer) Close() error {
return w.db.Close() return w.db.Close()
} }
// hasHeader returns true if a matching hash+number record exists in the database, else false.
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) // pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), strconv.FormatUint(blockNumber, 10), blockHash.String()).Scan(&exists)
return exists, err return exists, err
} }
// detectGaps returns a list of BlockGaps detected within the specified block range
// For example, if the database contains blocks the overall range 1000:2000, but is missing blocks 1110:1230 and 1380
// it would return [{FirstMissing: 1110, LastMissing: 1230}, {FirstMissing: 1380, LastMissing: 1380}]
func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
var gaps []*interfaces.BlockGap
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
return gaps, err
}
// maxHeader returns the header for the highest block number in the database.
// SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1
func (w *Writer) maxHeader() (*models.HeaderModel, error) {
var model models.HeaderModel
var err error
var number, td, reward uint64
err = w.db.QueryRow(w.db.Context(), w.db.MaxHeaderStm()).Scan(
&number,
&model.BlockHash,
&model.ParentHash,
&model.CID,
&td,
&model.NodeIDs,
&reward,
&model.StateRoot,
&model.TxRoot,
&model.RctRoot,
&model.UnclesHash,
&model.Bloom,
&model.Timestamp,
&model.Coinbase,
)
model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10)
model.Reward = strconv.FormatUint(reward, 10)
return &model, err
}
/* /*
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)

View File

@ -20,14 +20,18 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
// StateDiffIndexer interface required to index statediff data // StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface { type StateDiffIndexer interface {
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
CurrentBlock() (*models.HeaderModel, error)
HasBlock(hash common.Hash, number uint64) (bool, error) HasBlock(hash common.Hash, number uint64) (bool, error)
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
@ -53,3 +57,9 @@ type Batch interface {
type Config interface { type Config interface {
Type() shared.DBType Type() shared.DBType
} }
// Used to represent a gap in statediffed blocks
type BlockGap struct {
FirstMissing uint64 `json:"firstMissing"`
LastMissing uint64 `json:"lastMissing"`
}

View File

@ -52,6 +52,14 @@ func init() {
"statediff.waitforsync", false, "statediff.waitforsync", false,
"Should the statediff service wait for geth to catch up to the head of the chain?", "Should the statediff service wait for geth to catch up to the head of the chain?",
) )
Flags.Uint64Var(&config.BackfillCheckPastBlocks,
"statediff.backfillcheckpastblocks", 7200,
"Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking",
)
Flags.Uint64Var(&config.BackfillMaxHeadGap,
"statediff.backfillmaxheadgap", 7200,
"Maximum gap between the startup statediff and startup head positions that can be backfilled",
)
Flags.Var(&dbType, Flags.Var(&dbType,
"statediff.db.type", "statediff.db.type",

View File

@ -18,9 +18,11 @@ package statediff
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -80,6 +82,9 @@ type Service struct {
enableWriteLoop bool enableWriteLoop bool
// Parameters to use in the service write loop, if enabled // Parameters to use in the service write loop, if enabled
writeLoopParams ParamsWithMutex writeLoopParams ParamsWithMutex
// Settings to use for backfilling state diffs (plugging gaps when tracking head)
backfillMaxHeadGap uint64
backfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
numWorkers uint numWorkers uint
// Number of retry for aborted transactions due to deadlock. // Number of retry for aborted transactions due to deadlock.
@ -157,22 +162,24 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
quitCh := make(chan bool) quitCh := make(chan bool)
sds := &Service{ sds := &Service{
BlockChain: blockChain, BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()), Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh, QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[SubID]Subscription), Subscriptions: make(map[common.Hash]map[SubID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers), BlockCache: NewBlockCache(workers),
BackendAPI: backend, BackendAPI: backend,
ShouldWaitForSync: cfg.WaitForSync, ShouldWaitForSync: cfg.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop, enableWriteLoop: cfg.EnableWriteLoop,
numWorkers: workers, backfillMaxHeadGap: cfg.BackfillMaxHeadGap,
maxRetry: defaultRetryLimit, backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
jobStatusSubs: map[SubID]jobStatusSubscription{}, numWorkers: workers,
currentJobs: map[uint64]JobID{}, maxRetry: defaultRetryLimit,
currentBlocks: map[string]bool{}, jobStatusSubs: map[SubID]jobStatusSubscription{},
writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams}, currentJobs: map[uint64]JobID{},
currentBlocks: map[string]bool{},
writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams},
} }
if indexer != nil { if indexer != nil {
@ -555,6 +562,8 @@ func (sds *Service) Start() error {
go sds.PublishLoop(chainEventCh) go sds.PublishLoop(chainEventCh)
if sds.enableWriteLoop { if sds.enableWriteLoop {
log.Info("Starting statediff DB backfill", "params", sds.writeLoopParams.Params)
go sds.Backfill()
log.Debug("Starting statediff DB write loop", "params", sds.writeLoopParams.Params) log.Debug("Starting statediff DB write loop", "params", sds.writeLoopParams.Params)
chainEventCh := make(chan core.ChainEvent, chainEventChanSize) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.WriteLoop(chainEventCh) go sds.WriteLoop(chainEventCh)
@ -916,3 +925,149 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
return addresses, nil return addresses, nil
} }
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
func (sds *Service) Backfill() {
chainBlock := sds.BlockChain.CurrentBlock()
if nil == chainBlock {
log.Info("Backfill: No previous chain block, nothing to backfill.")
return
}
chainBlockNumber := chainBlock.Number.Uint64()
if chainBlockNumber == 0 {
log.Info("Backfill: At start of chain, nothing to backfill.")
return
}
indexerBlock, err := sds.indexer.CurrentBlock()
if nil == indexerBlock {
log.Info("Backfill: No previous indexer block, nothing to backfill.")
return
}
if nil != err {
log.Error("Backfill error", err)
return
}
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil != err {
log.Error("Backfill error", err)
return
}
headGap := chainBlockNumber - indexerBlockNumber
log.Info(
"Backfill: initial positions",
"chain", chainBlockNumber,
"indexer", indexerBlockNumber,
"headGap", headGap,
)
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
if headGap < sds.backfillMaxHeadGap {
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
log.Info("Backfill: all workers done filling headGap.")
} else {
log.Error("Backfill: headGap too large to fill.")
}
}
if sds.backfillCheckPastBlocks > 0 {
var gapCheckBeginNumber uint64 = 0
if indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
}
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
if nil != err {
log.Error("Backfill error", err)
return
}
if nil != blockGaps && len(blockGaps) > 0 {
gapsMsg, _ := json.Marshal(blockGaps)
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
sds.backfillDetectedGaps(blockGaps)
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
} else {
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
}
}
}
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: headGap done", "worker", w)
return
}
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params)
if err != nil {
log.Error("Backfill error", err)
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ {
ch <- bn
}
close(ch)
wg.Wait()
}
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
// is still in-flight, but a later block was already written.
func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: detected gap fill done", "worker", w)
return
}
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: ", err)
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for _, gap := range blockGaps {
for num := gap.FirstMissing; num <= gap.LastMissing; num++ {
ch <- num
}
}
close(ch)
wg.Wait()
}

View File

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/models"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
) )
@ -33,6 +34,14 @@ var _ interfaces.Batch = &batch{}
// StateDiffIndexer is a mock state diff indexer // StateDiffIndexer is a mock state diff indexer
type StateDiffIndexer struct{} type StateDiffIndexer struct{}
func (sdi *StateDiffIndexer) DetectGaps(beginBlock uint64, endBlock uint64) ([]*interfaces.BlockGap, error) {
return nil, nil
}
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return nil, nil
}
type batch struct{} type batch struct{}
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {