Refactor: Decouple knownGaps and Indexer
This commit decouples knownGaps and Indexer. All knownGaps logic is in its own file. This makes testing and maintainability easier. We have also removed all efforts to check the `lastProcessedblock` - This is because we won't ever run into this issue (hyptothetically), because geth won't let it happen.
This commit is contained in:
parent
2afccedd22
commit
9f92fe6e7c
@ -195,7 +195,6 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
|
||||
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
|
||||
var indexerConfig interfaces.Config
|
||||
var fileConfig interfaces.Config
|
||||
var clientName, nodeID string
|
||||
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
|
||||
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
|
||||
@ -210,15 +209,6 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
if err != nil {
|
||||
utils.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
if dbType != shared.FILE {
|
||||
fileConfig = file.Config{
|
||||
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
|
||||
}
|
||||
} else {
|
||||
fileConfig = nil
|
||||
}
|
||||
|
||||
switch dbType {
|
||||
case shared.FILE:
|
||||
indexerConfig = file.Config{
|
||||
|
@ -273,6 +273,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
},
|
||||
},
|
||||
|
@ -35,7 +35,7 @@ import (
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (sql.Database, interfaces.StateDiffIndexer, error) {
|
||||
switch config.Type() {
|
||||
case shared.FILE:
|
||||
log.Info("Creating a statediff indexer in SQL file writing mode")
|
||||
log.Info("Starting statediff service in SQL file writing mode")
|
||||
fc, ok := config.(file.Config)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||
@ -44,7 +44,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
ind, err := file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
||||
return nil, ind, err
|
||||
case shared.POSTGRES:
|
||||
log.Info("Creating a statediff service in Postgres writing mode")
|
||||
log.Info("Starting statediff service in Postgres writing mode")
|
||||
pgc, ok := config.(postgres.Config)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||
@ -69,7 +69,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
||||
return db, ind, err
|
||||
case shared.DUMP:
|
||||
log.Info("Creating statediff indexer in data dump mode")
|
||||
log.Info("Starting statediff service in data dump mode")
|
||||
dumpc, ok := config.(dump.Config)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
|
||||
|
@ -257,12 +257,3 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
||||
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
||||
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||
// "VALUES ('%s', '%s', %t, %d) " +
|
||||
// "ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||
// "WHERE eth.known_gaps.ending_block_number <= '%s';\n"
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
@ -198,48 +197,3 @@ func tearDown(t *testing.T) {
|
||||
err := ind.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
func TestKnownGapsUpsert(t *testing.T) {
|
||||
testKnownGapsUpsert(t)
|
||||
|
||||
}
|
||||
func testKnownGapsUpsert(t *testing.T) {
|
||||
gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain
|
||||
expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be
|
||||
|
||||
stateDiff, err := setupDb(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInd := setupFile(t)
|
||||
|
||||
// Get the latest block from the DB
|
||||
latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
||||
if err != nil {
|
||||
t.Fatal("Can't find a block in the eth.header_cids table.. Please put one there")
|
||||
}
|
||||
|
||||
// Add the gapDifference for testing purposes
|
||||
latestBlockOnChain := big.NewInt(0)
|
||||
latestBlockOnChain.Add(latestBlockInDb, gapDifference)
|
||||
|
||||
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
||||
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
||||
|
||||
gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0, fileInd)
|
||||
require.NoError(t, gapUpsertErr)
|
||||
|
||||
// Calculate what the start and end block should be in known_gaps
|
||||
// And check to make sure it is properly inserted
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||
|
||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock)
|
||||
|
||||
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||
require.NoError(t, queryErr)
|
||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)
|
||||
|
||||
}
|
||||
|
@ -113,5 +113,4 @@ func (db *DB) InsertKnownGapsStm() string {
|
||||
return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
|
||||
WHERE eth.known_gaps.ending_block_number <= $2`
|
||||
//return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES (1, 2, true, 1)`
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -186,15 +185,3 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upserts known gaps to the DB.
|
||||
// INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
|
||||
func (w *Writer) upsertKnownGaps(knownGaps models.KnownGapsModel) error {
|
||||
_, err := w.db.Exec(context.Background(), w.db.InsertKnownGapsStm(),
|
||||
knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting known_gaps entry: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package statediff
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -149,29 +148,6 @@ type Service struct {
|
||||
}
|
||||
|
||||
// This structure keeps track of the knownGaps at any given moment in time
|
||||
type KnownGapsState struct {
|
||||
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||
checkForGaps bool
|
||||
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||
processingKey int64
|
||||
// This number indicates the expected difference between blocks.
|
||||
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
||||
// Tandom with the processingKey to differentiate block processing logic.
|
||||
expectedDifference *big.Int
|
||||
// Indicates if Geth is in an error state
|
||||
// This is used to indicate the right time to upserts
|
||||
errorState bool
|
||||
// This array keeps track of errorBlocks as they occur.
|
||||
// When the errorState is false again, we can process these blocks.
|
||||
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
||||
knownErrorBlocks []*big.Int
|
||||
// The last processed block keeps track of the last processed block.
|
||||
// Its used to make sure we didn't skip over any block!
|
||||
lastProcessedBlock *big.Int
|
||||
// This fileIndexer is used to write the knownGaps to file
|
||||
// If we can't properly write to DB
|
||||
fileIndexer interfaces.StateDiffIndexer
|
||||
}
|
||||
|
||||
// BlockCache caches the last block for safe access from different service loops
|
||||
type BlockCache struct {
|
||||
@ -206,29 +182,11 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
var err error
|
||||
db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
||||
if err != nil {
|
||||
log.Error("Error creating indexer", "indexer: ", params.IndexerConfig.Type(), "error: ", err)
|
||||
return err
|
||||
}
|
||||
if params.FileConfig != nil {
|
||||
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.FileConfig, shared.FILE)
|
||||
if err != nil {
|
||||
log.Error("Error creating file indexer", "error: ", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
fileIndexer = indexer
|
||||
}
|
||||
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
|
||||
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
||||
}
|
||||
|
||||
var checkForGaps bool
|
||||
if params.IndexerConfig.Type() == shared.POSTGRES {
|
||||
checkForGaps = true
|
||||
} else {
|
||||
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
||||
checkForGaps = false
|
||||
}
|
||||
workers := params.NumWorkers
|
||||
if workers == 0 {
|
||||
workers = 1
|
||||
@ -374,75 +332,6 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
|
||||
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||
}
|
||||
|
||||
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
||||
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
|
||||
// away from sds.KnownGaps.expectedDifference
|
||||
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
|
||||
func (sds *Service) captureMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
|
||||
// last processed: 110
|
||||
// current block: 125
|
||||
log.Debug("current block", "block number: ", currentBlock)
|
||||
log.Debug("knownErrorBlocks", "knownErrorBlocks: ", knownErrorBlocks)
|
||||
log.Debug("last processed block", "block number: ", lastProcessedBlock)
|
||||
log.Debug("expected difference", "sds.KnownGaps.expectedDifference: ", sds.KnownGaps.expectedDifference)
|
||||
|
||||
if len(knownErrorBlocks) > 0 {
|
||||
// 115
|
||||
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
|
||||
// 120
|
||||
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
|
||||
|
||||
// 111
|
||||
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||
// 124
|
||||
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||
|
||||
if (expectedStartErrorBlock.Cmp(startErrorBlock) != 0) &&
|
||||
(expectedEndErrorBlock.Cmp(endErrorBlock) != 0) {
|
||||
log.Info("All Gaps already captured in knownErrorBlocks")
|
||||
}
|
||||
|
||||
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
|
||||
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn("But there are gaps that were also not added there.")
|
||||
log.Warn("Last Block in knownErrorBlocks", "endErrorBlock", endErrorBlock)
|
||||
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
|
||||
log.Warn("Current Block", "currentBlock", currentBlock)
|
||||
//120 + 1 == 121
|
||||
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
||||
// 121 to 124
|
||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
||||
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
}
|
||||
|
||||
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
||||
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn("But there are gaps that were also not added there.")
|
||||
log.Warn("First Block in knownErrorBlocks", "startErrorBlock", startErrorBlock)
|
||||
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
|
||||
// 115 - 1 == 114
|
||||
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
||||
// 111 to 114
|
||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
||||
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
}
|
||||
|
||||
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
||||
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
|
||||
} else {
|
||||
log.Warn("We missed blocks without any errors.")
|
||||
// 110 + 1 == 111
|
||||
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||
// 125 - 1 == 124
|
||||
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||
log.Warn("Missed blocks starting from", "startBlock", startBlock)
|
||||
log.Warn("Missed blocks ending at", "endBlock", endBlock)
|
||||
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
}
|
||||
}
|
||||
|
||||
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
defer params.wg.Done()
|
||||
for {
|
||||
|
@ -1,242 +0,0 @@
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
db sql.Database
|
||||
err error
|
||||
indexer interfaces.StateDiffIndexer
|
||||
chainConf = params.MainnetChainConfig
|
||||
)
|
||||
|
||||
type gapValues struct {
|
||||
lastProcessedBlock int64
|
||||
currentBlock int64
|
||||
knownErrorBlocksStart int64
|
||||
knownErrorBlocksEnd int64
|
||||
expectedDif int64
|
||||
processingKey int64
|
||||
}
|
||||
|
||||
// Add clean db
|
||||
// Test for failures when they are expected, when we go from smaller block to larger block
|
||||
// We should no longer see the smaller block in DB
|
||||
func TestKnownGaps(t *testing.T) {
|
||||
|
||||
tests := []gapValues{
|
||||
// Unprocessed gaps before and after knownErrorBlock
|
||||
{lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1, processingKey: 1},
|
||||
// No knownErrorBlocks
|
||||
{lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1, processingKey: 1},
|
||||
// No gaps before or after knownErrorBlocks
|
||||
{lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1, processingKey: 1},
|
||||
// gaps before knownErrorBlocks but not after
|
||||
{lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1, processingKey: 1},
|
||||
// gaps after knownErrorBlocks but not before
|
||||
{lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1, processingKey: 1},
|
||||
/// Same tests as above with a new expected DIF
|
||||
// Unprocessed gaps before and after knownErrorBlock
|
||||
{lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2, processingKey: 2},
|
||||
// No knownErrorBlocks
|
||||
{lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2, processingKey: 2},
|
||||
// No gaps before or after knownErrorBlocks
|
||||
{lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2, processingKey: 2},
|
||||
// gaps before knownErrorBlocks but not after
|
||||
{lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2, processingKey: 2},
|
||||
// gaps after knownErrorBlocks but not before
|
||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||
// Test update when block number is larger!!
|
||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||
// Update when processing key is different!
|
||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10},
|
||||
}
|
||||
|
||||
testWriteToDb(t, tests, true)
|
||||
testWriteToFile(t, tests, true)
|
||||
}
|
||||
|
||||
// test writing blocks to the DB
|
||||
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
stateDiff, db, err := setupDb(t)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// Create an array with knownGaps based on user inputs
|
||||
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
if checkGaps {
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
// Upsert
|
||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
|
||||
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, nil)
|
||||
// Validate that the upsert was done correctly.
|
||||
callValidateUpsert(t, checkGaps, stateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
tearDown(t, stateDiff)
|
||||
|
||||
}
|
||||
|
||||
// test writing blocks to file and then inserting them to DB
|
||||
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
stateDiff, db, err := setupDb(t)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
|
||||
tearDown(t, stateDiff)
|
||||
for _, tc := range tests {
|
||||
// Reuse processing key from expecteDiff
|
||||
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
if checkGaps {
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
|
||||
fileInd := setupFile(t)
|
||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
|
||||
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, fileInd)
|
||||
fileInd.Close()
|
||||
|
||||
newStateDiff, db, _ := setupDb(t)
|
||||
|
||||
file, ioErr := ioutil.ReadFile(file.TestConfig.FilePath)
|
||||
require.NoError(t, ioErr)
|
||||
|
||||
requests := strings.Split(string(file), ";")
|
||||
|
||||
// Skip the first two enteries
|
||||
for _, request := range requests[2:] {
|
||||
_, err := db.Exec(context.Background(), request)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
callValidateUpsert(t, checkGaps, newStateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
}
|
||||
|
||||
// test capturing missed blocks
|
||||
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64,
|
||||
stateDiff *sql.StateDiffIndexer, knownErrorBlocks []*big.Int, fileInd interfaces.StateDiffIndexer) {
|
||||
|
||||
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
||||
currentBlock := big.NewInt(currentBlockNum)
|
||||
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: processingKey,
|
||||
expectedDifference: big.NewInt(expectedDif),
|
||||
fileIndexer: fileInd,
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
indexer: stateDiff,
|
||||
}
|
||||
service.captureMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
|
||||
}
|
||||
|
||||
// Helper function to create an array of gaps given a start and end block
|
||||
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
|
||||
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||
}
|
||||
return knownErrorBlocks
|
||||
}
|
||||
|
||||
// This function will call the validateUpsert function based on various conditions.
|
||||
func callValidateUpsert(t *testing.T, checkGaps bool, stateDiff *sql.StateDiffIndexer,
|
||||
lastBlockProcessed int64, currentBlockNum int64, expectedDif int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) {
|
||||
// If there are gaps in knownErrorBlocks array
|
||||
if checkGaps {
|
||||
|
||||
// If there are no unexpected gaps before or after the entries in the knownErrorBlocks array
|
||||
// Only handle the knownErrorBlocks Array
|
||||
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
|
||||
// If there are gaps after knownErrorBlocks array, process them
|
||||
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||
|
||||
// If there are gaps before knownErrorBlocks array, process them
|
||||
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
|
||||
// if there are gaps before, after, and within the knownErrorBlocks array,handle all the errors.
|
||||
} else {
|
||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||
|
||||
}
|
||||
} else {
|
||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Make sure the upsert was performed correctly
|
||||
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
|
||||
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||
|
||||
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
||||
require.NoError(t, queryErr)
|
||||
}
|
||||
|
||||
// Create a DB object to use
|
||||
func setupDb(t *testing.T) (*sql.StateDiffIndexer, sql.Database, error) {
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||
return stateDiff, db, err
|
||||
}
|
||||
|
||||
// Create a file statediff indexer.
|
||||
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
indexer, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||
require.NoError(t, err)
|
||||
return indexer
|
||||
}
|
||||
|
||||
// Teardown the DB
|
||||
func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) {
|
||||
t.Log("Starting tearDown")
|
||||
sql.TearDownDB(t, db)
|
||||
err := stateDiff.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
Loading…
Reference in New Issue
Block a user