From 95942c335cc3d90ad326ebcd0a7d4a2771997617 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 01:34:48 -0500 Subject: [PATCH 01/11] Backfill gaps in the recent past when statediffing head. --- statediff/config.go | 4 + statediff/indexer/database/dump/indexer.go | 12 + statediff/indexer/database/file/indexer.go | 12 + ...atediffing_watched_addresses_test_file.csv | 0 statediff/indexer/database/sql/indexer.go | 10 + statediff/indexer/database/sql/interfaces.go | 1 + .../indexer/database/sql/postgres/database.go | 6 +- statediff/indexer/database/sql/writer.go | 50 +++- statediff/indexer/interfaces/interfaces.go | 10 + statediff/service.go | 221 +++++++++++++++--- 10 files changed, 288 insertions(+), 38 deletions(-) create mode 100644 statediff/indexer/database/file/mainnet_tests/statediffing_watched_addresses_test_file.csv diff --git a/statediff/config.go b/statediff/config.go index b036f769f..6b29631bc 100644 --- a/statediff/config.go +++ b/statediff/config.go @@ -36,6 +36,10 @@ type Config struct { ClientName string // Whether to enable writing state diffs directly to track blockchain head EnableWriteLoop bool + // The maximum number of blocks to backfill when tracking head. + BackfillMaxHeadGap uint64 + // The maximum number of blocks behind the startup position to check for gaps. + BackfillCheckPastBlocks uint64 // Size of the worker pool NumWorkers uint // Should the statediff service wait until geth has synced to the head of the blockchain? diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 9b54cd699..608a3d382 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -418,6 +418,18 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er return false, nil } +// CurrentBlock returns the HeaderModel of the highest existing block in the output. +// In the "dump" case, this is always nil. +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return nil, nil +} + +// DetectGaps returns a list of gaps in the output found within the specified block range. +// In the "dump" case this is always nil. +func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + return nil, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 291aba16e..738b7d383 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -461,6 +461,18 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// CurrentBlock returns the HeaderModel of the highest existing block in the output. +// In the "dump" case, this is always nil. +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return nil, nil +} + +// DetectGaps returns a list of gaps in the output found within the specified block range. +// In the "dump" case this is always nil. +func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + return nil, nil +} + // HasBlock checks whether the indicated block already exists in the output. // In the "file" case this is presumed to be false. func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { diff --git a/statediff/indexer/database/file/mainnet_tests/statediffing_watched_addresses_test_file.csv b/statediff/indexer/database/file/mainnet_tests/statediffing_watched_addresses_test_file.csv new file mode 100644 index 000000000..e69de29bb diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 48da1fee9..8089e2167 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -219,11 +219,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } +// CurrentBlock returns the HeaderModel of the highest existing block in the database. +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return sdi.dbWriter.maxHeader() +} + // HasBlock checks whether the indicated block already exists in the database. func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { return sdi.dbWriter.hasHeader(hash, number) } +// DetectGaps returns a list of gaps in the database found within the specified block range. +func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber) +} + // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 685a9e6e9..d59b603eb 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -45,6 +45,7 @@ type Driver interface { // Statements interface to accommodate different SQL query syntax type Statements interface { + MaxHeaderStm() string ExistsHeaderStm() string InsertHeaderStm() string InsertUncleStm() string diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 173608b60..77190fa80 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -41,8 +41,12 @@ type DB struct { sql.Driver } +func (db *DB) MaxHeaderStm() string { + return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name) +} + func (db *DB) ExistsHeaderStm() string { - return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name) + return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1::BIGINT AND block_hash = $2::TEXT LIMIT 1)", schema.TableHeader.Name) } // InsertHeaderStm satisfies the sql.Statements interface diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 113d2f48d..244c42395 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -20,6 +20,8 @@ import ( "fmt" "strconv" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgtype" @@ -48,11 +50,57 @@ func (w *Writer) Close() error { return w.db.Close() } +// hashHeader returns true if a matching hash+number record exists in the database, else false. func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { - err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) + // pgx misdetects the parameter OIDs and selects int8, which can overflow. + // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text + // and let PG handle the cast + err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), strconv.FormatUint(blockNumber, 10), blockHash.String()).Scan(&exists) return exists, err } +// detectGaps returns a list of BlockGaps detected within the specified block range +// For example, if the database contains blocks the overall range 1000:2000, but is missing blocks 1110:1230 and 1380 +// it would return [{FirstMissing: 1110, LastMissing: 1230}, {FirstMissing: 1380, LastMissing: 1380}] +func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { + pgStm := "SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM eth.header_cids WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1" + var gaps []*interfaces.BlockGap + // pgx misdetects the parameter OIDs and selects int8, which can overflow. + // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text + // and let PG handle the cast + err := w.db.Select(w.db.Context(), &gaps, pgStm, strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) + return gaps, err +} + +/* +SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1 +*/ +func (w *Writer) maxHeader() (*models.HeaderModel, error) { + var model models.HeaderModel + var err error + var number, td, reward uint64 + err = w.db.QueryRow(w.db.Context(), w.db.MaxHeaderStm()).Scan( + &number, + &model.BlockHash, + &model.ParentHash, + &model.CID, + &td, + &model.NodeIDs, + &reward, + &model.StateRoot, + &model.TxRoot, + &model.RctRoot, + &model.UnclesHash, + &model.Bloom, + &model.Timestamp, + &model.Coinbase, + ) + model.BlockNumber = strconv.FormatUint(number, 10) + model.TotalDifficulty = strconv.FormatUint(td, 10) + model.Reward = strconv.FormatUint(reward, 10) + return &model, err +} + /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 63d5bc353..c9907c088 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -21,6 +21,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/statediff/indexer/shared" @@ -29,6 +31,8 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { + DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error) + CurrentBlock() (*models.HeaderModel, error) HasBlock(hash common.Hash, number uint64) (bool, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error @@ -54,3 +58,9 @@ type Batch interface { type Config interface { Type() shared.DBType } + +// Used to represent a gap in statediffed blocks +type BlockGap struct { + FirstMissing uint64 `json:"firstMissing"` + LastMissing uint64 `json:"lastMissing"` +} diff --git a/statediff/service.go b/statediff/service.go index b6e0db961..62fcdb255 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -18,6 +18,7 @@ package statediff import ( "bytes" + "encoding/json" "fmt" "math/big" "strconv" @@ -137,6 +138,9 @@ type Service struct { indexer interfaces.StateDiffIndexer // Whether to enable writing state diffs directly to track blockchain head. enableWriteLoop bool + // Settings to use for backfilling state diffs (plugging gaps when tracking head) + backfillMaxHeadGap uint64 + backfillCheckPastBlocks uint64 // Size of the worker pool numWorkers uint // Number of retry for aborted transactions due to deadlock. @@ -211,24 +215,26 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params } sds := &Service{ - Mutex: sync.Mutex{}, - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - WaitForSync: params.WaitForSync, - indexer: indexer, - enableWriteLoop: params.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[rpc.ID]statusSubscription{}, - currentJobs: map[uint64]JobID{}, - currentJobsMutex: sync.Mutex{}, - currentBlocks: map[string]bool{}, - currentBlocksMutex: sync.Mutex{}, + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: params.WaitForSync, + indexer: indexer, + enableWriteLoop: params.EnableWriteLoop, + backfillMaxHeadGap: params.BackfillMaxHeadGap, + backfillCheckPastBlocks: params.BackfillCheckPastBlocks, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentJobsMutex: sync.Mutex{}, + currentBlocks: map[string]bool{}, + currentBlocksMutex: sync.Mutex{}, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -251,24 +257,26 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index quitCh := make(chan bool) sds := &Service{ - Mutex: sync.Mutex{}, - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - WaitForSync: cfg.WaitForSync, - indexer: indexer, - enableWriteLoop: cfg.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[rpc.ID]statusSubscription{}, - currentJobs: map[uint64]JobID{}, - currentJobsMutex: sync.Mutex{}, - currentBlocks: map[string]bool{}, - currentBlocksMutex: sync.Mutex{}, + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: cfg.WaitForSync, + indexer: indexer, + enableWriteLoop: cfg.EnableWriteLoop, + backfillMaxHeadGap: cfg.BackfillMaxHeadGap, + backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentJobsMutex: sync.Mutex{}, + currentBlocks: map[string]bool{}, + currentBlocksMutex: sync.Mutex{}, } if indexer != nil { @@ -319,6 +327,145 @@ type workerParams struct { id uint } +func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { + var ch = make(chan uint64) + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: detected gap fill done", "worker", w) + return + } + log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: " + err.Error()) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + + for _, gap := range blockGaps { + for num := gap.FirstMissing; num <= gap.LastMissing; num++ { + ch <- num + } + } + close(ch) + wg.Wait() +} + +func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { + headGap := chainBlockNumber - indexerBlockNumber + var ch = make(chan uint64, headGap) + for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ { + ch <- bn + } + + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: headGap done", "worker", w) + return + } + log.Info("Backfill: backfilling head gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: " + err.Error()) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + close(ch) + wg.Wait() +} + +func (sds *Service) Backfill() { + chainBlock := sds.BlockChain.CurrentBlock() + if nil == chainBlock { + log.Info("Backfill: No previous chain block, nothing to backfill.") + return + } + + chainBlockNumber := chainBlock.Number.Uint64() + if chainBlockNumber == 0 { + log.Info("Backfill: At start of chain, nothing to backfill.") + return + } + + indexerBlock, err := sds.indexer.CurrentBlock() + if nil == indexerBlock { + log.Info("Backfill: No previous indexer block, nothing to backfill.") + return + } + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + headGap := chainBlockNumber - indexerBlockNumber + log.Info( + "Backfill: initial positions", + "chain", chainBlockNumber, + "indexer", indexerBlockNumber, + "headGap", headGap, + ) + + if sds.backfillMaxHeadGap > 0 && headGap > 0 { + if headGap < sds.backfillMaxHeadGap { + sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) + log.Info("Backfill: all workers done filling headGap.") + } else { + log.Error("Backfill: headGap too large to fill.") + } + } + + if sds.backfillCheckPastBlocks > 0 { + var gapCheckBeginNumber uint64 = 0 + if indexerBlockNumber > sds.backfillCheckPastBlocks { + gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks + } + blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + if nil != blockGaps && len(blockGaps) > 0 { + gapsMsg, _ := json.Marshal(blockGaps) + log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + sds.backfillDetectedGaps(blockGaps) + log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + } else { + log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) + } + } +} + func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() @@ -692,6 +839,8 @@ func (sds *Service) Start() error { go sds.Loop(chainEventCh) if sds.enableWriteLoop { + log.Info("Starting statediff DB backfill", "params", writeLoopParams.Params) + go sds.Backfill() log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params) chainEventCh := make(chan core.ChainEvent, chainEventChanSize) go sds.WriteLoop(chainEventCh) -- 2.45.2 From ab7e807afe8d617a16209a4f37a807a37c8575f6 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 01:35:15 -0500 Subject: [PATCH 02/11] Remove file --- .../mainnet_tests/statediffing_watched_addresses_test_file.csv | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 statediff/indexer/database/file/mainnet_tests/statediffing_watched_addresses_test_file.csv diff --git a/statediff/indexer/database/file/mainnet_tests/statediffing_watched_addresses_test_file.csv b/statediff/indexer/database/file/mainnet_tests/statediffing_watched_addresses_test_file.csv deleted file mode 100644 index e69de29bb..000000000 -- 2.45.2 From 352b6d02986153ccbb0a1fd3ea3a634eaaca1d7d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 11:57:57 -0500 Subject: [PATCH 03/11] Add flags. --- cmd/geth/config.go | 16 +++++++++------- cmd/geth/main.go | 2 ++ cmd/utils/flags.go | 10 ++++++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 36724cb17..384a2bd2b 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -273,13 +273,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } } p := statediff.Config{ - IndexerConfig: indexerConfig, - ID: nodeID, - ClientName: clientName, - Context: context.Background(), - EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name), - NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name), - WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name), + IndexerConfig: indexerConfig, + ID: nodeID, + ClientName: clientName, + Context: context.Background(), + EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name), + NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name), + WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name), + BackfillCheckPastBlocks: ctx.Uint64(utils.StateDiffBackfillCheckPastBlocks.Name), + BackfillMaxHeadGap: ctx.Uint64(utils.StateDiffBackfillMaxHeadGap.Name), } utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 750136289..c193c7c39 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -176,6 +176,8 @@ var ( utils.StateDiffUpsert, utils.StateDiffLogStatements, utils.StateDiffCopyFrom, + utils.StateDiffBackfillCheckPastBlocks, + utils.StateDiffBackfillMaxHeadGap, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7c4f957a3..7cd724994 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1120,6 +1120,16 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Name: "statediff.waitforsync", Usage: "Should the statediff service wait for geth to catch up to the head of the chain?", } + StateDiffBackfillCheckPastBlocks = &cli.Uint64Flag{ + Name: "statediff.backfillcheckpastblocks", + Usage: "The number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking.", + Value: 7200, + } + StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{ + Name: "statediff.backfillmaxheadgap", + Usage: "The maximum gap between the startup statediff and startup head positions that can be backfilled.", + Value: 7200, + } ) var ( -- 2.45.2 From 3ad2bcd9a0bc65b162d2a39d27a1ea456a795234 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 11:59:01 -0500 Subject: [PATCH 04/11] Move the SQL statement. --- statediff/indexer/database/sql/interfaces.go | 1 + statediff/indexer/database/sql/postgres/database.go | 7 +++++++ statediff/indexer/database/sql/writer.go | 10 ++++------ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index d59b603eb..bb912cbc7 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -45,6 +45,7 @@ type Driver interface { // Statements interface to accommodate different SQL query syntax type Statements interface { + DetectGapsStm() string MaxHeaderStm() string ExistsHeaderStm() string InsertHeaderStm() string diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 77190fa80..0358e566f 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -41,14 +41,21 @@ type DB struct { sql.Driver } +// MaxHeaderStm satisfies the sql.Statements interface func (db *DB) MaxHeaderStm() string { return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name) } +// ExistsHeaderStm satisfies the sql.Statements interface func (db *DB) ExistsHeaderStm() string { return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1::BIGINT AND block_hash = $2::TEXT LIMIT 1)", schema.TableHeader.Name) } +// DetectGapsStm satisfies the sql.Statements interface +func (db *DB) DetectGapsStm() string { + return fmt.Sprintf("SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM %s WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1", schema.TableHeader.Name) +} + // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 244c42395..ba9407e0a 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -50,7 +50,7 @@ func (w *Writer) Close() error { return w.db.Close() } -// hashHeader returns true if a matching hash+number record exists in the database, else false. +// hasHeader returns true if a matching hash+number record exists in the database, else false. func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { // pgx misdetects the parameter OIDs and selects int8, which can overflow. // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text @@ -63,18 +63,16 @@ func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bo // For example, if the database contains blocks the overall range 1000:2000, but is missing blocks 1110:1230 and 1380 // it would return [{FirstMissing: 1110, LastMissing: 1230}, {FirstMissing: 1380, LastMissing: 1380}] func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { - pgStm := "SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM eth.header_cids WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1" var gaps []*interfaces.BlockGap // pgx misdetects the parameter OIDs and selects int8, which can overflow. // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text // and let PG handle the cast - err := w.db.Select(w.db.Context(), &gaps, pgStm, strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) + err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) return gaps, err } -/* -SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1 -*/ +// maxHeader returns the header for the highest block number in the database. +// SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1 func (w *Writer) maxHeader() (*models.HeaderModel, error) { var model models.HeaderModel var err error -- 2.45.2 From 2bf4c912694854226dfb360d02df4e6b7f6206ac Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 12:54:50 -0500 Subject: [PATCH 05/11] Fix mock --- statediff/test_helpers/mocks/indexer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index 767f436f1..6ab19f0da 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -17,6 +17,7 @@ package mocks import ( + "github.com/ethereum/go-ethereum/statediff/indexer/models" "math/big" "time" @@ -32,6 +33,14 @@ var _ interfaces.Batch = &batch{} // StateDiffIndexer is a mock state diff indexer type StateDiffIndexer struct{} +func (sdi *StateDiffIndexer) DetectGaps(beginBlock uint64, endBlock uint64) ([]*interfaces.BlockGap, error) { + return nil, nil +} + +func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { + return nil, nil +} + type batch struct{} func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { -- 2.45.2 From b091f3382e0e53885aaab2ea0781f1f4a91c5776 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 13:32:57 -0500 Subject: [PATCH 06/11] Refactor and add comments. --- statediff/service.go | 286 ++++++++++++++++++++++--------------------- 1 file changed, 147 insertions(+), 139 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 62fcdb255..78d57814a 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -327,145 +327,6 @@ type workerParams struct { id uint } -func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { - var ch = make(chan uint64) - var wg sync.WaitGroup - for i := uint(0); i < sds.numWorkers; i++ { - wg.Add(1) - go func(w uint) { - defer wg.Done() - for { - select { - case num, ok := <-ch: - if !ok { - log.Info("Backfill: detected gap fill done", "worker", w) - return - } - log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, writeLoopParams.Params) - if err != nil { - log.Error("Backfill error: " + err.Error()) - } - case <-sds.QuitChan: - log.Info("Backfill: quitting before finish", "worker", w) - return - } - } - }(i) - } - - for _, gap := range blockGaps { - for num := gap.FirstMissing; num <= gap.LastMissing; num++ { - ch <- num - } - } - close(ch) - wg.Wait() -} - -func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { - headGap := chainBlockNumber - indexerBlockNumber - var ch = make(chan uint64, headGap) - for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ { - ch <- bn - } - - var wg sync.WaitGroup - for i := uint(0); i < sds.numWorkers; i++ { - wg.Add(1) - go func(w uint) { - defer wg.Done() - for { - select { - case num, ok := <-ch: - if !ok { - log.Info("Backfill: headGap done", "worker", w) - return - } - log.Info("Backfill: backfilling head gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, writeLoopParams.Params) - if err != nil { - log.Error("Backfill error: " + err.Error()) - } - case <-sds.QuitChan: - log.Info("Backfill: quitting before finish", "worker", w) - return - } - } - }(i) - } - close(ch) - wg.Wait() -} - -func (sds *Service) Backfill() { - chainBlock := sds.BlockChain.CurrentBlock() - if nil == chainBlock { - log.Info("Backfill: No previous chain block, nothing to backfill.") - return - } - - chainBlockNumber := chainBlock.Number.Uint64() - if chainBlockNumber == 0 { - log.Info("Backfill: At start of chain, nothing to backfill.") - return - } - - indexerBlock, err := sds.indexer.CurrentBlock() - if nil == indexerBlock { - log.Info("Backfill: No previous indexer block, nothing to backfill.") - return - } - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - headGap := chainBlockNumber - indexerBlockNumber - log.Info( - "Backfill: initial positions", - "chain", chainBlockNumber, - "indexer", indexerBlockNumber, - "headGap", headGap, - ) - - if sds.backfillMaxHeadGap > 0 && headGap > 0 { - if headGap < sds.backfillMaxHeadGap { - sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) - log.Info("Backfill: all workers done filling headGap.") - } else { - log.Error("Backfill: headGap too large to fill.") - } - } - - if sds.backfillCheckPastBlocks > 0 { - var gapCheckBeginNumber uint64 = 0 - if indexerBlockNumber > sds.backfillCheckPastBlocks { - gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks - } - blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - if nil != blockGaps && len(blockGaps) > 0 { - gapsMsg, _ := json.Marshal(blockGaps) - log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) - sds.backfillDetectedGaps(blockGaps) - log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) - } else { - log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) - } - } -} - func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() @@ -1250,3 +1111,150 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add return addresses, nil } + +// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head. +func (sds *Service) Backfill() { + chainBlock := sds.BlockChain.CurrentBlock() + if nil == chainBlock { + log.Info("Backfill: No previous chain block, nothing to backfill.") + return + } + + chainBlockNumber := chainBlock.Number.Uint64() + if chainBlockNumber == 0 { + log.Info("Backfill: At start of chain, nothing to backfill.") + return + } + + indexerBlock, err := sds.indexer.CurrentBlock() + if nil == indexerBlock { + log.Info("Backfill: No previous indexer block, nothing to backfill.") + return + } + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + headGap := chainBlockNumber - indexerBlockNumber + log.Info( + "Backfill: initial positions", + "chain", chainBlockNumber, + "indexer", indexerBlockNumber, + "headGap", headGap, + ) + + if sds.backfillMaxHeadGap > 0 && headGap > 0 { + if headGap < sds.backfillMaxHeadGap { + sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) + log.Info("Backfill: all workers done filling headGap.") + } else { + log.Error("Backfill: headGap too large to fill.") + } + } + + if sds.backfillCheckPastBlocks > 0 { + var gapCheckBeginNumber uint64 = 0 + if indexerBlockNumber > sds.backfillCheckPastBlocks { + gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks + } + blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + if nil != blockGaps && len(blockGaps) > 0 { + gapsMsg, _ := json.Marshal(blockGaps) + log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + sds.backfillDetectedGaps(blockGaps) + log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + } else { + log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) + } + } +} + +// backfillHeadGap fills in any gap between the statediff position and the chain position at startup. +// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost, +// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc. +func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { + headGap := chainBlockNumber - indexerBlockNumber + var ch = make(chan uint64, headGap) + for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ { + ch <- bn + } + + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: headGap done", "worker", w) + return + } + log.Info("Backfill: backfilling head gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: " + err.Error()) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + close(ch) + wg.Wait() +} + +// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of +// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole) +// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block +// is still in-flight, but a later block was already written. +func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { + var ch = make(chan uint64) + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: detected gap fill done", "worker", w) + return + } + log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: " + err.Error()) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + + for _, gap := range blockGaps { + for num := gap.FirstMissing; num <= gap.LastMissing; num++ { + ch <- num + } + } + close(ch) + wg.Wait() +} -- 2.45.2 From 99a91b28cd57853a8e3d01a38d58788efde0fd2d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 13:35:42 -0500 Subject: [PATCH 07/11] goimports --- statediff/test_helpers/mocks/indexer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index 6ab19f0da..3658db650 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -17,10 +17,11 @@ package mocks import ( - "github.com/ethereum/go-ethereum/statediff/indexer/models" "math/big" "time" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" -- 2.45.2 From d80480df0f455d00259a9c8e2831c9361cb342cb Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 14:21:09 -0500 Subject: [PATCH 08/11] Up to and including the current block... --- statediff/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statediff/service.go b/statediff/service.go index 78d57814a..61ed75907 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -1187,7 +1187,7 @@ func (sds *Service) Backfill() { func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { headGap := chainBlockNumber - indexerBlockNumber var ch = make(chan uint64, headGap) - for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ { + for bn := indexerBlockNumber; bn <= chainBlockNumber; bn++ { ch <- bn } -- 2.45.2 From 48a758840dde21aec7f84f584925ee707478f291 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 23 Jun 2023 01:18:48 -0500 Subject: [PATCH 09/11] Fix channel size and minor refactor for consistency. --- statediff/service.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 61ed75907..753db3632 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -1185,12 +1185,7 @@ func (sds *Service) Backfill() { // A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost, // while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc. func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { - headGap := chainBlockNumber - indexerBlockNumber - var ch = make(chan uint64, headGap) - for bn := indexerBlockNumber; bn <= chainBlockNumber; bn++ { - ch <- bn - } - + var ch = make(chan uint64) var wg sync.WaitGroup for i := uint(0); i < sds.numWorkers; i++ { wg.Add(1) @@ -1215,6 +1210,10 @@ func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber } }(i) } + + for bn := indexerBlockNumber; bn <= chainBlockNumber; bn++ { + ch <- bn + } close(ch) wg.Wait() } -- 2.45.2 From 05c228e33a60e4c972e92d54d0ef95359a5b4b60 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 23 Jun 2023 17:40:34 -0500 Subject: [PATCH 10/11] Review comments --- statediff/indexer/database/file/indexer.go | 4 ++-- statediff/indexer/database/file/statediffing_test_file.sql | 0 .../file/statediffing_watched_addresses_test_file.csv | 0 statediff/service.go | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) delete mode 100644 statediff/indexer/database/file/statediffing_test_file.sql delete mode 100644 statediff/indexer/database/file/statediffing_watched_addresses_test_file.csv diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 738b7d383..3fb9b8d4d 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -462,13 +462,13 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) } // CurrentBlock returns the HeaderModel of the highest existing block in the output. -// In the "dump" case, this is always nil. +// In the "file" case, this is always nil. func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) { return nil, nil } // DetectGaps returns a list of gaps in the output found within the specified block range. -// In the "dump" case this is always nil. +// In the "file" case this is always nil. func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) { return nil, nil } diff --git a/statediff/indexer/database/file/statediffing_test_file.sql b/statediff/indexer/database/file/statediffing_test_file.sql deleted file mode 100644 index e69de29bb..000000000 diff --git a/statediff/indexer/database/file/statediffing_watched_addresses_test_file.csv b/statediff/indexer/database/file/statediffing_watched_addresses_test_file.csv deleted file mode 100644 index e69de29bb..000000000 diff --git a/statediff/service.go b/statediff/service.go index 753db3632..64a7b0b95 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -1211,7 +1211,7 @@ func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber }(i) } - for bn := indexerBlockNumber; bn <= chainBlockNumber; bn++ { + for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ { ch <- bn } close(ch) -- 2.45.2 From 0efb8610e1904d61033a7c1ca4a595d5a3fb638c Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 23 Jun 2023 17:41:35 -0500 Subject: [PATCH 11/11] Don't change --- statediff/indexer/database/file/statediffing_test_file.sql | 0 .../database/file/statediffing_watched_addresses_test_file.csv | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 statediff/indexer/database/file/statediffing_test_file.sql create mode 100644 statediff/indexer/database/file/statediffing_watched_addresses_test_file.csv diff --git a/statediff/indexer/database/file/statediffing_test_file.sql b/statediff/indexer/database/file/statediffing_test_file.sql new file mode 100644 index 000000000..e69de29bb diff --git a/statediff/indexer/database/file/statediffing_watched_addresses_test_file.csv b/statediff/indexer/database/file/statediffing_watched_addresses_test_file.csv new file mode 100644 index 000000000..e69de29bb -- 2.45.2