From a2bbeaa1d1ed113234190ccfcace89116e78e09f Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 25 Mar 2022 09:54:36 -0400 Subject: [PATCH] Handle All Gaps within Geth Including an updated doc which keeps track of events in this PR. --- statediff/indexer/database/dump/indexer.go | 3 + statediff/indexer/database/sql/indexer.go | 16 +- .../sql/mainnet_tests/indexer_test.go | 28 ---- statediff/service.go | 85 +++++++++++ statediff/service_public_test.go | 143 ++++++++++++++++++ 5 files changed, 243 insertions(+), 32 deletions(-) create mode 100644 statediff/service_public_test.go diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 0b7a5ffb0..c11ecac86 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -534,3 +534,6 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { return nil } +func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { + return nil +} diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index e33f9d388..65b4badd1 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -691,7 +691,7 @@ func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) { var ret string err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret) if err != nil { - log.Error("Can't properly query the DB for query: ", queryString) + log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString)) return "", err } return ret, nil @@ -707,7 +707,7 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro } ret, ok := ret.SetString(res, 10) if !ok { - log.Error("Can't turn the res ", res, "into a bigInt") + log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt")) return ret, fmt.Errorf("Can't turn %s into a bigInt", res) } return ret, nil @@ -729,6 +729,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer // This function will check for Gaps and update the DB if gaps are found. // The processingKey will currently be set to 0, but as we start to leverage horizontal scaling // It might be a useful parameter to update depending on the geth node. +// TODO: +// REmove the return value +// Write to file if err in writing to DB func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids" latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString) @@ -743,8 +746,13 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe startBlock.Add(latestBlockInDb, expectedDifference) endBlock.Sub(latestBlockOnChain, expectedDifference) - log.Warn("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock) - sdi.pushKnownGaps(startBlock, endBlock, false, processingKey) + log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)) + err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey) + if err != nil { + // Write to file SQL file instead!!! + // If write to SQL file fails, write to disk. Handle this within the write to SQL file function! + return err + } } return nil diff --git a/statediff/indexer/database/sql/mainnet_tests/indexer_test.go b/statediff/indexer/database/sql/mainnet_tests/indexer_test.go index 2da315beb..213daa54c 100644 --- a/statediff/indexer/database/sql/mainnet_tests/indexer_test.go +++ b/statediff/indexer/database/sql/mainnet_tests/indexer_test.go @@ -115,31 +115,3 @@ func tearDown(t *testing.T) { err = ind.Close() require.NoError(t, err) } - -//func TestKnownGapsUpsert(t *testing.T) { -// var startBlockNumber int64 = 111 -// var endBlockNumber int64 = 121 -// ind, err := setupDb(t) -// if err != nil { -// t.Fatal(err) -// } -// require.NoError(t, err) -// -// testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind) -// //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string. -// queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber) -// _, queryErr := ind.QueryDb(queryString) // Figure out the string. -// require.NoError(t, queryErr) -// -//} -//func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) { -// startBlock := big.NewInt(startBlockNumber) -// endBlock := big.NewInt(endBlockNumber) -// -// processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1) -// if processGapError != nil { -// t.Fatal(processGapError) -// } -// require.NoError(t, processGapError) -//} -// diff --git a/statediff/service.go b/statediff/service.go index 4bdb9bf50..052ca62c3 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -148,6 +148,91 @@ type Service struct { maxRetry uint } +type KnownGaps struct { + // Should we check for gaps by looking at the DB and comparing the latest block with head + checkForGaps bool + // Arbitrary processingKey that can be used down the line to differentiate different geth nodes. + processingKey int64 + // This number indicates the expected difference between blocks. + // Currently, this is 1 since the geth node processes each block. But down the road this can be used in + // Tandom with the processingKey to differentiate block processing logic. + expectedDifference *big.Int + // Indicates if Geth is in an error state + // This is used to indicate the right time to upserts + errorState bool + // This array keeps track of errorBlocks as they occur. + // When the errorState is false again, we can process these blocks. + // Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead? + knownErrorBlocks []*big.Int + // The last processed block keeps track of the last processed block. + // Its used to make sure we didn't skip over any block! + lastProcessedBlock *big.Int +} + +// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks. +// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit +// away from sds.KnownGaps.expectedDifference +// Essentially, if geth ever misses blocks but doesn't output an error, we are covered. +func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) { + // last processed: 110 + // current block: 125 + if len(knownErrorBlocks) > 0 { + // 115 + startErrorBlock := new(big.Int).Set(knownErrorBlocks[0]) + // 120 + endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1]) + + // 111 + expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference) + // 124 + expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) + + if (expectedStartErrorBlock == startErrorBlock) && + (expectedEndErrorBlock == endErrorBlock) { + log.Info("All Gaps already captured in knownErrorBlocks") + } + + if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 { + log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks)) + log.Warn("But there are gaps that were also not added there.") + log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock)) + log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock)) + log.Warn(fmt.Sprint("Current Block: ", currentBlock)) + //120 + 1 == 121 + startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference) + // 121 to 124 + log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock)) + sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey) + } + + if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 { + log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks)) + log.Warn("But there are gaps that were also not added there.") + log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock)) + log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock)) + // 115 - 1 == 114 + endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference) + // 111 to 114 + log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock)) + sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey) + } + + log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks)) + log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey)) + sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey) + + } else { + log.Warn("We missed blocks without any errors.") + // 110 + 1 == 111 + startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference) + // 125 - 1 == 124 + endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference) + log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock)) + log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock)) + sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey) + } +} + // BlockCache caches the last block for safe access from different service loops type BlockCache struct { sync.Mutex diff --git a/statediff/service_public_test.go b/statediff/service_public_test.go new file mode 100644 index 000000000..05e7cb52e --- /dev/null +++ b/statediff/service_public_test.go @@ -0,0 +1,143 @@ +package statediff + +import ( + "context" + "fmt" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/stretchr/testify/require" +) + +var ( + db sql.Database + err error + indexer interfaces.StateDiffIndexer + chainConf = params.MainnetChainConfig +) + +func TestKnownGaps(t *testing.T) { + type gapValues struct { + lastProcessedBlock int64 + currentBlock int64 + knownErrorBlocksStart int64 + knownErrorBlocksEnd int64 + expectedDif int64 + } + + tests := []gapValues{ + // Unprocessed gaps before and after knownErrorBlock + {lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1}, + // No knownErrorBlocks + {lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1}, + // No gaps before or after knownErrorBlocks + {lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1}, + // gaps before knownErrorBlocks but not after + {lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1}, + // gaps after knownErrorBlocks but not before + {lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1}, + /// Same tests as above with a new expected DIF + // Unprocessed gaps before and after knownErrorBlock + {lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2}, + // No knownErrorBlocks + {lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2}, + // No gaps before or after knownErrorBlocks + {lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2}, + // gaps before knownErrorBlocks but not after + {lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2}, + // gaps after knownErrorBlocks but not before + {lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2}, + } + for _, tc := range tests { + // Reuse processing key from expecteDiff + testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif) + } +} + +// It also makes sure we properly calculate any missed gaps not in the known gaps lists +// either before or after the list. +func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64) { + + lastProcessedBlock := big.NewInt(lastBlockProcessed) + currentBlock := big.NewInt(currentBlockNum) + knownErrorBlocks := (make([]*big.Int, 0)) + + checkGaps := knownErrorBlocksStart != 0 && knownErrorBlocksEnd != 0 + if checkGaps { + for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ { + knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i)) + } + } + + // Comment out values which should not be used + knownGaps := KnownGaps{ + processingKey: processingKey, + expectedDifference: big.NewInt(expectedDif), + } + stateDiff, err := setupDb(t) + if err != nil { + t.Fatal(err) + } + + service := &Service{ + KnownGaps: knownGaps, + indexer: stateDiff, + } + service.capturedMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock) + + if checkGaps { + + if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum { + validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) + + } else if lastBlockProcessed+expectedDif == knownErrorBlocksStart { + validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) + validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif) + + } else if knownErrorBlocksEnd+expectedDif == currentBlockNum { + validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif) + validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) + + } else { + validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif) + validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd) + validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif) + + } + } else { + validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif) + } + + tearDown(t, stateDiff) +} + +func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) { + t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock) + queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock) + + _, queryErr := stateDiff.QueryDb(queryString) // Figure out the string. + t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock) + require.NoError(t, queryErr) +} + +// Create a DB object to use +func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) { + db, err = postgres.SetupSQLXDB() + if err != nil { + t.Fatal(err) + } + stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db) + return stateDiff, err +} + +// Teardown the DB +func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) { + t.Log("Starting tearDown") + sql.TearDownDB(t, db) + err := stateDiff.Close() + require.NoError(t, err) +}