diff --git a/.gitignore b/.gitignore index c6f7a73c8..6c7f48b51 100644 --- a/.gitignore +++ b/.gitignore @@ -53,4 +53,6 @@ foundry/projects/local-private-network/geth-linux-amd64 # Helpful repos related-repositories/foundry-test/** related-repositories/hive/** -related-repositories/ipld-eth-db/** \ No newline at end of file +related-repositories/ipld-eth-db/** +statediff/indexer/database/sql/statediffing_test_file.sql +statediff/statediffing_test_file.sql diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 5dc7e7ea1..b2340e938 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -177,6 +177,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.GlobalBool(utils.StateDiffFlag.Name) { var indexerConfig interfaces.Config + var fileConfig interfaces.Config var clientName, nodeID string if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) { clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name) @@ -191,6 +192,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if err != nil { utils.Fatalf("%v", err) } + + if dbType != shared.FILE { + fileConfig = file.Config{ + FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), + } + } else { + fileConfig = nil + } + switch dbType { case shared.FILE: indexerConfig = file.Config{ @@ -253,6 +263,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } p := statediff.Config{ IndexerConfig: indexerConfig, + FileConfig: fileConfig, ID: nodeID, ClientName: clientName, Context: context.Background(), diff --git a/statediff/config.go b/statediff/config.go index a83df3ca0..535c46485 100644 --- a/statediff/config.go +++ b/statediff/config.go @@ -26,7 +26,10 @@ import ( // Config contains instantiation parameters for the state diffing service type Config struct { + // The configuration used for the primary stateDiff Indexer IndexerConfig interfaces.Config + // The configuration used for the file stateDiff Indexer. This indexer is used if the primary indexer fails and is not a FILE indexer. + FileConfig interfaces.Config // A unique ID used for this service ID string // Name for the client this service is running diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index 3bcf0859b..f9d8920ce 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -35,11 +35,12 @@ import ( // You can specify the specific func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) { var indexerToCreate shared.DBType - if specificIndexer != "" { + if specificIndexer == "" { indexerToCreate = config.Type() } else { indexerToCreate = specificIndexer } + log.Info("Indexer to create is", "indexer", indexerToCreate) switch indexerToCreate { case shared.FILE: log.Info("Creating a statediff indexer in SQL file writing mode") diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index f391b34e2..921fafed9 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -497,9 +497,26 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() } +// Only needed to satisfy use cases func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error { - return nil + log.Error("We can't find gaps in write mode!") + return fmt.Errorf("We can't find gaps in write mode!") } + +// Written but not tested. Unsure if there is a real use case for this anywhere. func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error { + log.Info("Dumping known gaps") + if startingBlockNumber.Cmp(endingBlockNumber) != -1 { + return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber) + } + knownGap := models.KnownGapsModel{ + StartingBlockNumber: startingBlockNumber.String(), + EndingBlockNumber: endingBlockNumber.String(), + CheckedOut: checkedOut, + ProcessingKey: processingKey, + } + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", knownGap); err != nil { + return err + } return nil } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index d58291825..f11bf8f28 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -480,7 +480,8 @@ func (sdi *StateDiffIndexer) Close() error { } func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error { - return nil + log.Error("We can't find gaps in write mode!") + return fmt.Errorf("We can't find gaps in write mode!") } func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error { diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index a8795946f..8010ec13d 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -263,7 +263,7 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { } func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) { - sqw.stmts <- []byte(fmt.Sprintf(knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey, + sqw.stmts <- []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber)) //knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " + // "VALUES ('%s', '%s', %t, %d) " + diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 0ec9d75aa..8fbde648e 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -607,7 +607,8 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro // More geth nodes, the expected difference might fluctuate. func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool { latestBlock := big.NewInt(0) - if latestBlock.Add(latestBlockOnChain, expectedDifference) != latestBlockInDb { + if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 { + log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference) return true } return false @@ -634,9 +635,10 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe startBlock.Add(latestBlockInDb, expectedDifference) endBlock.Sub(latestBlockOnChain, expectedDifference) - log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)) + log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock) err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer) if err != nil { + log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err) // Write to file SQL file instead!!! // If write to SQL file fails, write to disk. Handle this within the write to SQL file function! return err diff --git a/statediff/service.go b/statediff/service.go index df6f2066b..f68490fe5 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -164,70 +164,6 @@ type KnownGapsState struct { fileIndexer interfaces.StateDiffIndexer } -// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks. -// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit -// away from sds.KnownGaps.expectedDifference -// Essentially, if geth ever misses blocks but doesn't output an error, we are covered. -func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) { - // last processed: 110 - // current block: 125 - if len(knownErrorBlocks) > 0 { - // 115 - startErrorBlock := new(big.Int).Set(knownErrorBlocks[0]) - // 120 - endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1]) - - // 111 - expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference) - // 124 - expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) - - if (expectedStartErrorBlock == startErrorBlock) && - (expectedEndErrorBlock == endErrorBlock) { - log.Info("All Gaps already captured in knownErrorBlocks") - } - - if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 { - log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks)) - log.Warn("But there are gaps that were also not added there.") - log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock)) - log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock)) - log.Warn(fmt.Sprint("Current Block: ", currentBlock)) - //120 + 1 == 121 - startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference) - // 121 to 124 - log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock)) - sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) - } - - if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 { - log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks)) - log.Warn("But there are gaps that were also not added there.") - log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock)) - log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock)) - // 115 - 1 == 114 - endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference) - // 111 to 114 - log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock)) - sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) - } - - log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks)) - log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey)) - sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) - - } else { - log.Warn("We missed blocks without any errors.") - // 110 + 1 == 111 - startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference) - // 125 - 1 == 124 - endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) - log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock)) - log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock)) - sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) - } -} - // BlockCache caches the last block for safe access from different service loops type BlockCache struct { sync.Mutex @@ -260,23 +196,29 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params var err error indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "") if err != nil { + log.Error("Error creating indexer", "indexer: ", params.IndexerConfig.Type(), "error: ", err) return err } - if params.IndexerConfig.Type() != shared.FILE { - fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "") - log.Info("Starting the statediff service in ", "mode", params.IndexerConfig.Type()) + if params.FileConfig != nil { + fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.FileConfig, shared.FILE) if err != nil { + log.Error("Error creating file indexer", "error: ", err) return err } - } else { - log.Info("Starting the statediff service in ", "mode", "File") fileIndexer = indexer } //fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info) indexer.ReportDBMetrics(10*time.Second, quitCh) } + var checkForGaps bool + if params.IndexerConfig.Type() == shared.POSTGRES { + checkForGaps = true + } else { + log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") + checkForGaps = false + } workers := params.NumWorkers if workers == 0 { workers = 1 @@ -284,7 +226,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params // If we ever have multiple processingKeys we can update them here // along with the expectedDifference knownGaps := &KnownGapsState{ - checkForGaps: true, + checkForGaps: checkForGaps, processingKey: 0, expectedDifference: big.NewInt(1), errorState: false, @@ -406,6 +348,75 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber) } +// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks. +// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit +// away from sds.KnownGaps.expectedDifference +// Essentially, if geth ever misses blocks but doesn't output an error, we are covered. +func (sds *Service) captureMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) { + // last processed: 110 + // current block: 125 + log.Debug("current block", "block number: ", currentBlock) + log.Debug("knownErrorBlocks", "knownErrorBlocks: ", knownErrorBlocks) + log.Debug("last processed block", "block number: ", lastProcessedBlock) + log.Debug("expected difference", "sds.KnownGaps.expectedDifference: ", sds.KnownGaps.expectedDifference) + + if len(knownErrorBlocks) > 0 { + // 115 + startErrorBlock := new(big.Int).Set(knownErrorBlocks[0]) + // 120 + endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1]) + + // 111 + expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference) + // 124 + expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) + + if (expectedStartErrorBlock.Cmp(startErrorBlock) != 0) && + (expectedEndErrorBlock.Cmp(endErrorBlock) != 0) { + log.Info("All Gaps already captured in knownErrorBlocks") + } + + if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 { + log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks) + log.Warn("But there are gaps that were also not added there.") + log.Warn("Last Block in knownErrorBlocks", "endErrorBlock", endErrorBlock) + log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock) + log.Warn("Current Block", "currentBlock", currentBlock) + //120 + 1 == 121 + startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference) + // 121 to 124 + log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock)) + sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) + } + + if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 { + log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks) + log.Warn("But there are gaps that were also not added there.") + log.Warn("First Block in knownErrorBlocks", "startErrorBlock", startErrorBlock) + log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock) + // 115 - 1 == 114 + endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference) + // 111 to 114 + log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock)) + sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) + } + + log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks) + log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey)) + sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) + + } else { + log.Warn("We missed blocks without any errors.") + // 110 + 1 == 111 + startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference) + // 125 - 1 == 124 + endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) + log.Warn("Missed blocks starting from", "startBlock", startBlock) + log.Warn("Missed blocks ending at", "endBlock", endBlock) + sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) + } +} + func (sds *Service) writeLoopWorker(params workerParams) { defer params.wg.Done() for { @@ -428,7 +439,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { // If for any reason we need to check for gaps, // Check and update the gaps table. if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState { - log.Info("Checking for Gaps at current block: ", currentBlock.Number()) + log.Info("Checking for Gaps at", "current block", currentBlock.Number()) go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer) sds.KnownGaps.checkForGaps = false } @@ -439,7 +450,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) sds.KnownGaps.errorState = true sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number()) - log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table: ", currentBlock.Number()) + log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "block number", currentBlock.Number()) // Write object to startdiff continue } @@ -448,14 +459,22 @@ func (sds *Service) writeLoopWorker(params workerParams) { previousExpectedBlock := big.NewInt(0).Sub(currentBlock.Number(), sds.KnownGaps.expectedDifference) // If we last block which should have been processed is not // the actual lastProcessedBlock, add it to known gaps table. - if previousExpectedBlock != sds.KnownGaps.lastProcessedBlock && sds.KnownGaps.lastProcessedBlock != nil { - // We must pass in parameters by VALUE not reference. - // If we pass them in my reference, the references can change before the computation is complete! - staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks)) - copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks) - staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock) - go sds.capturedMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock) - sds.KnownGaps.knownErrorBlocks = nil + if sds.KnownGaps.lastProcessedBlock != nil { + // Can't combine the two if statements because you can't compare a nil object... + if previousExpectedBlock.Cmp(sds.KnownGaps.lastProcessedBlock) != 0 { + // We must pass in parameters by VALUE not reference. + // If we pass them in my reference, the references can change before the computation is complete! + staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks)) + copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks) + staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock) + + log.Debug("Starting capturedMissedBlocks") + log.Debug("previousExpectedBlock is", "previous expected block: ", previousExpectedBlock) + log.Debug("sds.KnownGaps.lastProcessedBlock is", "sds.KnownGaps.lastProcessedBlock: ", sds.KnownGaps.lastProcessedBlock) + + go sds.captureMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock) + sds.KnownGaps.knownErrorBlocks = nil + } } sds.KnownGaps.lastProcessedBlock = currentBlock.Number() diff --git a/statediff/service_public_test.go b/statediff/service_public_test.go index c4d8ba5b7..ed2c5d3ec 100644 --- a/statediff/service_public_test.go +++ b/statediff/service_public_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "io/ioutil" "math/big" "os" + "strings" "testing" "github.com/ethereum/go-ethereum/params" @@ -24,18 +26,19 @@ var ( chainConf = params.MainnetChainConfig ) +type gapValues struct { + lastProcessedBlock int64 + currentBlock int64 + knownErrorBlocksStart int64 + knownErrorBlocksEnd int64 + expectedDif int64 + processingKey int64 +} + // Add clean db // Test for failures when they are expected, when we go from smaller block to larger block // We should no longer see the smaller block in DB func TestKnownGaps(t *testing.T) { - type gapValues struct { - lastProcessedBlock int64 - currentBlock int64 - knownErrorBlocksStart int64 - knownErrorBlocksEnd int64 - expectedDif int64 - processingKey int64 - } tests := []gapValues{ // Unprocessed gaps before and after knownErrorBlock @@ -61,64 +64,132 @@ func TestKnownGaps(t *testing.T) { {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2}, // Test update when block number is larger!! {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2}, + // Update when processing key is different! {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10}, } - for _, tc := range tests { - // Reuse processing key from expecteDiff - testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, false) + + testWriteToDb(t, tests, true) + testWriteToFile(t, tests, true) +} + +// test writing blocks to the DB +func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) { + stateDiff, db, err := setupDb(t) + require.NoError(t, err) + + // Clear Table first, this is needed because we updated an entry to have a larger endblock number + // so we can't find the original start and endblock pair. + if wipeDbBeforeStart { + db.Exec(context.Background(), "DELETE FROM eth.known_gaps") } + + for _, tc := range tests { + // Create an array with knownGaps based on user inputs + checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0 + knownErrorBlocks := (make([]*big.Int, 0)) + if checkGaps { + knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd) + } + // Upsert + testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, + tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, nil) + // Validate that the upsert was done correctly. + callValidateUpsert(t, checkGaps, stateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd) + } + tearDown(t, stateDiff) + +} + +// test writing blocks to file and then inserting them to DB +func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) { + stateDiff, db, err := setupDb(t) + require.NoError(t, err) + + // Clear Table first, this is needed because we updated an entry to have a larger endblock number + // so we can't find the original start and endblock pair. + if wipeDbBeforeStart { + db.Exec(context.Background(), "DELETE FROM eth.known_gaps") + } + + tearDown(t, stateDiff) for _, tc := range tests { // Reuse processing key from expecteDiff - testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, true) + checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0 + knownErrorBlocks := (make([]*big.Int, 0)) + if checkGaps { + knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd) + } + + fileInd := setupFile(t) + testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, + tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, fileInd) + fileInd.Close() + + newStateDiff, db, _ := setupDb(t) + + file, ioErr := ioutil.ReadFile(file.TestConfig.FilePath) + require.NoError(t, ioErr) + + requests := strings.Split(string(file), ";") + + // Skip the first two enteries + for _, request := range requests[2:] { + _, err := db.Exec(context.Background(), request) + require.NoError(t, err) + } + callValidateUpsert(t, checkGaps, newStateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd) } } -// It also makes sure we properly calculate any missed gaps not in the known gaps lists -// either before or after the list. -func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64, skipDb bool) { +// test capturing missed blocks +func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64, + stateDiff *sql.StateDiffIndexer, knownErrorBlocks []*big.Int, fileInd interfaces.StateDiffIndexer) { lastProcessedBlock := big.NewInt(lastBlockProcessed) currentBlock := big.NewInt(currentBlockNum) - knownErrorBlocks := (make([]*big.Int, 0)) - checkGaps := knownErrorBlocksStart != 0 && knownErrorBlocksEnd != 0 - if checkGaps { - for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ { - knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i)) - } - } - - // Comment out values which should not be used - fileInd := setupFile(t) knownGaps := KnownGapsState{ processingKey: processingKey, expectedDifference: big.NewInt(expectedDif), fileIndexer: fileInd, } - stateDiff, err := setupDb(t) - if err != nil { - t.Fatal(err) - } - service := &Service{ KnownGaps: knownGaps, indexer: stateDiff, } - service.capturedMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock) + service.captureMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock) +} +// Helper function to create an array of gaps given a start and end block +func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int { + for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ { + knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i)) + } + return knownErrorBlocks +} + +// This function will call the validateUpsert function based on various conditions. +func callValidateUpsert(t *testing.T, checkGaps bool, stateDiff *sql.StateDiffIndexer, + lastBlockProcessed int64, currentBlockNum int64, expectedDif int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) { + // If there are gaps in knownErrorBlocks array if checkGaps { + // If there are no unexpected gaps before or after the entries in the knownErrorBlocks array + // Only handle the knownErrorBlocks Array if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum { validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) + // If there are gaps after knownErrorBlocks array, process them } else if lastBlockProcessed+expectedDif == knownErrorBlocksStart { validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif) + // If there are gaps before knownErrorBlocks array, process them } else if knownErrorBlocksEnd+expectedDif == currentBlockNum { validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif) validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) + // if there are gaps before, after, and within the knownErrorBlocks array,handle all the errors. } else { validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif) validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) @@ -129,9 +200,9 @@ func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBloc validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif) } - tearDown(t, stateDiff) } +// Make sure the upsert was performed correctly func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) { t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock) queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock) @@ -142,15 +213,16 @@ func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock } // Create a DB object to use -func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) { +func setupDb(t *testing.T) (*sql.StateDiffIndexer, sql.Database, error) { db, err = postgres.SetupSQLXDB() if err != nil { t.Fatal(err) } stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db) - return stateDiff, err + return stateDiff, db, err } +// Create a file statediff indexer. func setupFile(t *testing.T) interfaces.StateDiffIndexer { if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { err := os.Remove(file.TestConfig.FilePath)