Handle All Gaps within Geth
Including an updated doc which keeps track of events in this PR.
This commit is contained in:
parent
22b1b245bb
commit
a2bbeaa1d1
@ -534,3 +534,6 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg,
|
|||||||
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
|
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -691,7 +691,7 @@ func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
|
|||||||
var ret string
|
var ret string
|
||||||
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Can't properly query the DB for query: ", queryString)
|
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
@ -707,7 +707,7 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro
|
|||||||
}
|
}
|
||||||
ret, ok := ret.SetString(res, 10)
|
ret, ok := ret.SetString(res, 10)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Error("Can't turn the res ", res, "into a bigInt")
|
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
||||||
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
||||||
}
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
@ -729,6 +729,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
|
|||||||
// This function will check for Gaps and update the DB if gaps are found.
|
// This function will check for Gaps and update the DB if gaps are found.
|
||||||
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
||||||
// It might be a useful parameter to update depending on the geth node.
|
// It might be a useful parameter to update depending on the geth node.
|
||||||
|
// TODO:
|
||||||
|
// REmove the return value
|
||||||
|
// Write to file if err in writing to DB
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
||||||
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
||||||
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
||||||
@ -743,8 +746,13 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
|
|||||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||||
|
|
||||||
log.Warn("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)
|
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
|
||||||
sdi.pushKnownGaps(startBlock, endBlock, false, processingKey)
|
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey)
|
||||||
|
if err != nil {
|
||||||
|
// Write to file SQL file instead!!!
|
||||||
|
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -115,31 +115,3 @@ func tearDown(t *testing.T) {
|
|||||||
err = ind.Close()
|
err = ind.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//func TestKnownGapsUpsert(t *testing.T) {
|
|
||||||
// var startBlockNumber int64 = 111
|
|
||||||
// var endBlockNumber int64 = 121
|
|
||||||
// ind, err := setupDb(t)
|
|
||||||
// if err != nil {
|
|
||||||
// t.Fatal(err)
|
|
||||||
// }
|
|
||||||
// require.NoError(t, err)
|
|
||||||
//
|
|
||||||
// testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind)
|
|
||||||
// //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string.
|
|
||||||
// queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber)
|
|
||||||
// _, queryErr := ind.QueryDb(queryString) // Figure out the string.
|
|
||||||
// require.NoError(t, queryErr)
|
|
||||||
//
|
|
||||||
//}
|
|
||||||
//func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) {
|
|
||||||
// startBlock := big.NewInt(startBlockNumber)
|
|
||||||
// endBlock := big.NewInt(endBlockNumber)
|
|
||||||
//
|
|
||||||
// processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1)
|
|
||||||
// if processGapError != nil {
|
|
||||||
// t.Fatal(processGapError)
|
|
||||||
// }
|
|
||||||
// require.NoError(t, processGapError)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
|
@ -148,6 +148,91 @@ type Service struct {
|
|||||||
maxRetry uint
|
maxRetry uint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type KnownGaps struct {
|
||||||
|
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||||
|
checkForGaps bool
|
||||||
|
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||||
|
processingKey int64
|
||||||
|
// This number indicates the expected difference between blocks.
|
||||||
|
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
||||||
|
// Tandom with the processingKey to differentiate block processing logic.
|
||||||
|
expectedDifference *big.Int
|
||||||
|
// Indicates if Geth is in an error state
|
||||||
|
// This is used to indicate the right time to upserts
|
||||||
|
errorState bool
|
||||||
|
// This array keeps track of errorBlocks as they occur.
|
||||||
|
// When the errorState is false again, we can process these blocks.
|
||||||
|
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
||||||
|
knownErrorBlocks []*big.Int
|
||||||
|
// The last processed block keeps track of the last processed block.
|
||||||
|
// Its used to make sure we didn't skip over any block!
|
||||||
|
lastProcessedBlock *big.Int
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
||||||
|
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
|
||||||
|
// away from sds.KnownGaps.expectedDifference
|
||||||
|
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
|
||||||
|
func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
|
||||||
|
// last processed: 110
|
||||||
|
// current block: 125
|
||||||
|
if len(knownErrorBlocks) > 0 {
|
||||||
|
// 115
|
||||||
|
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
|
||||||
|
// 120
|
||||||
|
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
|
||||||
|
|
||||||
|
// 111
|
||||||
|
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 124
|
||||||
|
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
|
||||||
|
if (expectedStartErrorBlock == startErrorBlock) &&
|
||||||
|
(expectedEndErrorBlock == endErrorBlock) {
|
||||||
|
log.Info("All Gaps already captured in knownErrorBlocks")
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
|
||||||
|
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
|
||||||
|
log.Warn("But there are gaps that were also not added there.")
|
||||||
|
log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock))
|
||||||
|
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
|
||||||
|
log.Warn(fmt.Sprint("Current Block: ", currentBlock))
|
||||||
|
//120 + 1 == 121
|
||||||
|
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 121 to 124
|
||||||
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
||||||
|
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
||||||
|
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
|
||||||
|
log.Warn("But there are gaps that were also not added there.")
|
||||||
|
log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock))
|
||||||
|
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
|
||||||
|
// 115 - 1 == 114
|
||||||
|
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 111 to 114
|
||||||
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
||||||
|
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
|
||||||
|
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
||||||
|
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Warn("We missed blocks without any errors.")
|
||||||
|
// 110 + 1 == 111
|
||||||
|
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 125 - 1 == 124
|
||||||
|
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
|
||||||
|
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
|
||||||
|
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// BlockCache caches the last block for safe access from different service loops
|
// BlockCache caches the last block for safe access from different service loops
|
||||||
type BlockCache struct {
|
type BlockCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
143
statediff/service_public_test.go
Normal file
143
statediff/service_public_test.go
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
package statediff
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
db sql.Database
|
||||||
|
err error
|
||||||
|
indexer interfaces.StateDiffIndexer
|
||||||
|
chainConf = params.MainnetChainConfig
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestKnownGaps(t *testing.T) {
|
||||||
|
type gapValues struct {
|
||||||
|
lastProcessedBlock int64
|
||||||
|
currentBlock int64
|
||||||
|
knownErrorBlocksStart int64
|
||||||
|
knownErrorBlocksEnd int64
|
||||||
|
expectedDif int64
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []gapValues{
|
||||||
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
|
{lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1},
|
||||||
|
// No knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1},
|
||||||
|
// No gaps before or after knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1},
|
||||||
|
// gaps before knownErrorBlocks but not after
|
||||||
|
{lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1},
|
||||||
|
// gaps after knownErrorBlocks but not before
|
||||||
|
{lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1},
|
||||||
|
/// Same tests as above with a new expected DIF
|
||||||
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
|
{lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2},
|
||||||
|
// No knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2},
|
||||||
|
// No gaps before or after knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2},
|
||||||
|
// gaps before knownErrorBlocks but not after
|
||||||
|
{lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2},
|
||||||
|
// gaps after knownErrorBlocks but not before
|
||||||
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
// Reuse processing key from expecteDiff
|
||||||
|
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// It also makes sure we properly calculate any missed gaps not in the known gaps lists
|
||||||
|
// either before or after the list.
|
||||||
|
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64) {
|
||||||
|
|
||||||
|
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
||||||
|
currentBlock := big.NewInt(currentBlockNum)
|
||||||
|
knownErrorBlocks := (make([]*big.Int, 0))
|
||||||
|
|
||||||
|
checkGaps := knownErrorBlocksStart != 0 && knownErrorBlocksEnd != 0
|
||||||
|
if checkGaps {
|
||||||
|
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||||
|
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Comment out values which should not be used
|
||||||
|
knownGaps := KnownGaps{
|
||||||
|
processingKey: processingKey,
|
||||||
|
expectedDifference: big.NewInt(expectedDif),
|
||||||
|
}
|
||||||
|
stateDiff, err := setupDb(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
service := &Service{
|
||||||
|
KnownGaps: knownGaps,
|
||||||
|
indexer: stateDiff,
|
||||||
|
}
|
||||||
|
service.capturedMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
|
||||||
|
|
||||||
|
if checkGaps {
|
||||||
|
|
||||||
|
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
|
||||||
|
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
|
||||||
|
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||||
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
}
|
||||||
|
|
||||||
|
tearDown(t, stateDiff)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
|
||||||
|
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||||
|
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||||
|
|
||||||
|
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||||
|
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
||||||
|
require.NoError(t, queryErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a DB object to use
|
||||||
|
func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
|
||||||
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||||
|
return stateDiff, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Teardown the DB
|
||||||
|
func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) {
|
||||||
|
t.Log("Starting tearDown")
|
||||||
|
sql.TearDownDB(t, db)
|
||||||
|
err := stateDiff.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user