diff --git a/config.go b/config.go index 625b71b..85e63a2 100644 --- a/config.go +++ b/config.go @@ -38,6 +38,10 @@ type Config struct { ClientName string // Whether to enable writing state diffs directly to track blockchain head EnableWriteLoop bool + // The maximum number of blocks to backfill when tracking head. + BackfillMaxHeadGap uint64 + // The maximum number of blocks behind the startup position to check for gaps. + BackfillCheckPastBlocks uint64 // Size of the worker pool NumWorkers uint // Should the statediff service wait until geth has synced to the head of the blockchain? diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index 2bbb49b..fd9b44b 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -419,6 +419,18 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er return false, nil } +// CurrentBlock returns the HeaderModel of the highest existing block in the output. +// In the "dump" case, this is always nil. +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return nil, nil +} + +// DetectGaps returns a list of gaps in the output found within the specified block range. +// In the "dump" case this is always nil. +func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + return nil, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index dd28469..38c9a10 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -459,6 +459,18 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// CurrentBlock returns the HeaderModel of the highest existing block in the output. +// In the "file" case, this is always nil. +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return nil, nil +} + +// DetectGaps returns a list of gaps in the output found within the specified block range. +// In the "file" case this is always nil. +func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + return nil, nil +} + // HasBlock checks whether the indicated block already exists in the output. // In the "file" case this is presumed to be false. func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 06c21b1..5811993 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -221,6 +221,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } +// CurrentBlock returns the HeaderModel of the highest existing block in the database. +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return sdi.dbWriter.maxHeader() +} + +// DetectGaps returns a list of gaps in the database found within the specified block range. +func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber) +} + // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index 1eaf40a..2296be3 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -45,6 +45,8 @@ type Driver interface { // Statements interface to accommodate different SQL query syntax type Statements interface { + DetectGapsStm() string + MaxHeaderStm() string ExistsHeaderStm() string InsertHeaderStm() string InsertUncleStm() string diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index 5299d06..c646d76 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -41,8 +41,19 @@ type DB struct { sql.Driver } +// MaxHeaderStm satisfies the sql.Statements interface +func (db *DB) MaxHeaderStm() string { + return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name) +} + +// ExistsHeaderStm satisfies the sql.Statements interface func (db *DB) ExistsHeaderStm() string { - return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name) + return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1::BIGINT AND block_hash = $2::TEXT LIMIT 1)", schema.TableHeader.Name) +} + +// DetectGapsStm satisfies the sql.Statements interface +func (db *DB) DetectGapsStm() string { + return fmt.Sprintf("SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM %s WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1", schema.TableHeader.Name) } // InsertHeaderStm satisfies the sql.Statements interface diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 4f0b2cc..5270365 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -27,6 +27,7 @@ import ( "github.com/shopspring/decimal" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics" + "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/models" ) @@ -47,11 +48,55 @@ func (w *Writer) Close() error { return w.db.Close() } +// hasHeader returns true if a matching hash+number record exists in the database, else false. func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { - err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) + // pgx misdetects the parameter OIDs and selects int8, which can overflow. + // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text + // and let PG handle the cast + err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), strconv.FormatUint(blockNumber, 10), blockHash.String()).Scan(&exists) return exists, err } +// detectGaps returns a list of BlockGaps detected within the specified block range +// For example, if the database contains blocks the overall range 1000:2000, but is missing blocks 1110:1230 and 1380 +// it would return [{FirstMissing: 1110, LastMissing: 1230}, {FirstMissing: 1380, LastMissing: 1380}] +func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + var gaps []*interfaces.BlockGap + // pgx misdetects the parameter OIDs and selects int8, which can overflow. + // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text + // and let PG handle the cast + err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) + return gaps, err +} + +// maxHeader returns the header for the highest block number in the database. +// SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1 +func (w *Writer) maxHeader() (*models.HeaderModel, error) { + var model models.HeaderModel + var err error + var number, td, reward uint64 + err = w.db.QueryRow(w.db.Context(), w.db.MaxHeaderStm()).Scan( + &number, + &model.BlockHash, + &model.ParentHash, + &model.CID, + &td, + &model.NodeIDs, + &reward, + &model.StateRoot, + &model.TxRoot, + &model.RctRoot, + &model.UnclesHash, + &model.Bloom, + &model.Timestamp, + &model.Coinbase, + ) + model.BlockNumber = strconv.FormatUint(number, 10) + model.TotalDifficulty = strconv.FormatUint(td, 10) + model.Reward = strconv.FormatUint(reward, 10) + return &model, err +} + /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) diff --git a/indexer/interfaces/interfaces.go b/indexer/interfaces/interfaces.go index 94ed2af..ad3f42c 100644 --- a/indexer/interfaces/interfaces.go +++ b/indexer/interfaces/interfaces.go @@ -20,14 +20,18 @@ import ( "math/big" "time" + "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/shared" sdtypes "github.com/cerc-io/plugeth-statediff/types" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { + DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error) + CurrentBlock() (*models.HeaderModel, error) HasBlock(hash common.Hash, number uint64) (bool, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error @@ -53,3 +57,9 @@ type Batch interface { type Config interface { Type() shared.DBType } + +// Used to represent a gap in statediffed blocks +type BlockGap struct { + FirstMissing uint64 `json:"firstMissing"` + LastMissing uint64 `json:"lastMissing"` +} diff --git a/main/flags.go b/main/flags.go index 791e062..1e4a627 100644 --- a/main/flags.go +++ b/main/flags.go @@ -52,6 +52,14 @@ func init() { "statediff.waitforsync", false, "Should the statediff service wait for geth to catch up to the head of the chain?", ) + Flags.Uint64Var(&config.BackfillCheckPastBlocks, + "statediff.backfillcheckpastblocks", 7200, + "Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking", + ) + Flags.Uint64Var(&config.BackfillMaxHeadGap, + "statediff.backfillmaxheadgap", 7200, + "Maximum gap between the startup statediff and startup head positions that can be backfilled", + ) Flags.Var(&dbType, "statediff.db.type", diff --git a/service.go b/service.go index 146e32d..61c7e5b 100644 --- a/service.go +++ b/service.go @@ -18,9 +18,11 @@ package statediff import ( "bytes" + "encoding/json" "errors" "fmt" "math/big" + "strconv" "strings" "sync" "sync/atomic" @@ -80,6 +82,9 @@ type Service struct { enableWriteLoop bool // Parameters to use in the service write loop, if enabled writeLoopParams ParamsWithMutex + // Settings to use for backfilling state diffs (plugging gaps when tracking head) + backfillMaxHeadGap uint64 + backfillCheckPastBlocks uint64 // Size of the worker pool numWorkers uint // Number of retry for aborted transactions due to deadlock. @@ -157,22 +162,24 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde quitCh := make(chan bool) sds := &Service{ - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[SubID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - ShouldWaitForSync: cfg.WaitForSync, - indexer: indexer, - enableWriteLoop: cfg.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[SubID]jobStatusSubscription{}, - currentJobs: map[uint64]JobID{}, - currentBlocks: map[string]bool{}, - writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[SubID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + ShouldWaitForSync: cfg.WaitForSync, + indexer: indexer, + enableWriteLoop: cfg.EnableWriteLoop, + backfillMaxHeadGap: cfg.BackfillMaxHeadGap, + backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[SubID]jobStatusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentBlocks: map[string]bool{}, + writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams}, } if indexer != nil { @@ -555,6 +562,8 @@ func (sds *Service) Start() error { go sds.PublishLoop(chainEventCh) if sds.enableWriteLoop { + log.Info("Starting statediff DB backfill", "params", sds.writeLoopParams.Params) + go sds.Backfill() log.Debug("Starting statediff DB write loop", "params", sds.writeLoopParams.Params) chainEventCh := make(chan core.ChainEvent, chainEventChanSize) go sds.WriteLoop(chainEventCh) @@ -916,3 +925,149 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add return addresses, nil } + +// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head. +func (sds *Service) Backfill() { + chainBlock := sds.BlockChain.CurrentBlock() + if nil == chainBlock { + log.Info("Backfill: No previous chain block, nothing to backfill.") + return + } + + chainBlockNumber := chainBlock.Number.Uint64() + if chainBlockNumber == 0 { + log.Info("Backfill: At start of chain, nothing to backfill.") + return + } + + indexerBlock, err := sds.indexer.CurrentBlock() + if nil == indexerBlock { + log.Info("Backfill: No previous indexer block, nothing to backfill.") + return + } + if nil != err { + log.Error("Backfill error", err) + return + } + + indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) + if nil != err { + log.Error("Backfill error", err) + return + } + + headGap := chainBlockNumber - indexerBlockNumber + log.Info( + "Backfill: initial positions", + "chain", chainBlockNumber, + "indexer", indexerBlockNumber, + "headGap", headGap, + ) + + if sds.backfillMaxHeadGap > 0 && headGap > 0 { + if headGap < sds.backfillMaxHeadGap { + sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) + log.Info("Backfill: all workers done filling headGap.") + } else { + log.Error("Backfill: headGap too large to fill.") + } + } + + if sds.backfillCheckPastBlocks > 0 { + var gapCheckBeginNumber uint64 = 0 + if indexerBlockNumber > sds.backfillCheckPastBlocks { + gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks + } + blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) + if nil != err { + log.Error("Backfill error", err) + return + } + + if nil != blockGaps && len(blockGaps) > 0 { + gapsMsg, _ := json.Marshal(blockGaps) + log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + sds.backfillDetectedGaps(blockGaps) + log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + } else { + log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) + } + } +} + +// backfillHeadGap fills in any gap between the statediff position and the chain position at startup. +// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost, +// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc. +func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { + var ch = make(chan uint64) + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: headGap done", "worker", w) + return + } + log.Info("Backfill: backfilling head gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params) + if err != nil { + log.Error("Backfill error", err) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + + for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ { + ch <- bn + } + close(ch) + wg.Wait() +} + +// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of +// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole) +// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block +// is still in-flight, but a later block was already written. +func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { + var ch = make(chan uint64) + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: detected gap fill done", "worker", w) + return + } + log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: ", err) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + + for _, gap := range blockGaps { + for num := gap.FirstMissing; num <= gap.LastMissing; num++ { + ch <- num + } + } + close(ch) + wg.Wait() +} diff --git a/test_helpers/mocks/indexer.go b/test_helpers/mocks/indexer.go index 938b0ab..e4ca5fb 100644 --- a/test_helpers/mocks/indexer.go +++ b/test_helpers/mocks/indexer.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" + "github.com/cerc-io/plugeth-statediff/indexer/models" sdtypes "github.com/cerc-io/plugeth-statediff/types" ) @@ -33,6 +34,14 @@ var _ interfaces.Batch = &batch{} // StateDiffIndexer is a mock state diff indexer type StateDiffIndexer struct{} +func (sdi *StateDiffIndexer) DetectGaps(beginBlock uint64, endBlock uint64) ([]*interfaces.BlockGap, error) { + return nil, nil +} + +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return nil, nil +} + type batch struct{} func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {