** Write to file if you can't write to DB

This commit contains many new features:

1. Write the insert statement to file if there is an error when trying to write to DB.
2. Test the write to DB and write to file features in unit tests. They work as expected.
3. Insert logic for calling the knownGaps function (untested).

This PR makes a major change to the way statediff is started. Unless it is started in `file` mode, it will create a `file` indexer. This indexer will be used for writing errors to disk.
This commit is contained in:
Abdul Rabbani 2022-03-28 15:00:29 -04:00
parent 4774b01573
commit 2afccedd22
6 changed files with 216 additions and 106 deletions

View File

@ -195,6 +195,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
var indexerConfig interfaces.Config
var fileConfig interfaces.Config
var clientName, nodeID string
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
@ -209,6 +210,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
if err != nil {
utils.Fatalf("%v", err)
}
if dbType != shared.FILE {
fileConfig = file.Config{
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
}
} else {
fileConfig = nil
}
switch dbType {
case shared.FILE:
indexerConfig = file.Config{

View File

@ -534,6 +534,21 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg,
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return nil
}
// Written but not tested. Unsure if there is a real use case for this anywhere.
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
log.Info("Dumping known gaps")
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
}
knownGap := models.KnownGapsModel{
StartingBlockNumber: startingBlockNumber.String(),
EndingBlockNumber: endingBlockNumber.String(),
CheckedOut: checkedOut,
ProcessingKey: processingKey,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", knownGap); err != nil {
return err
}
return nil
}

View File

@ -259,7 +259,7 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
}
func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
sqw.stmts <- []byte(fmt.Sprintf(knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
sqw.stmts <- []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
// "VALUES ('%s', '%s', %t, %d) " +

View File

@ -719,7 +719,8 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro
// More geth nodes, the expected difference might fluctuate.
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
latestBlock := big.NewInt(0)
if latestBlock.Add(latestBlockOnChain, expectedDifference) != latestBlockInDb {
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
return true
}
return false
@ -746,9 +747,10 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
startBlock.Add(latestBlockInDb, expectedDifference)
endBlock.Sub(latestBlockOnChain, expectedDifference)
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
if err != nil {
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
// Write to file SQL file instead!!!
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
return err

View File

@ -173,70 +173,6 @@ type KnownGapsState struct {
fileIndexer interfaces.StateDiffIndexer
}
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
// away from sds.KnownGaps.expectedDifference
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
// last processed: 110
// current block: 125
if len(knownErrorBlocks) > 0 {
// 115
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
// 120
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
// 111
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
// 124
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
if (expectedStartErrorBlock == startErrorBlock) &&
(expectedEndErrorBlock == endErrorBlock) {
log.Info("All Gaps already captured in knownErrorBlocks")
}
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
log.Warn("But there are gaps that were also not added there.")
log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock))
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
log.Warn(fmt.Sprint("Current Block: ", currentBlock))
//120 + 1 == 121
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
// 121 to 124
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
log.Warn("But there are gaps that were also not added there.")
log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock))
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
// 115 - 1 == 114
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
// 111 to 114
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
} else {
log.Warn("We missed blocks without any errors.")
// 110 + 1 == 111
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
// 125 - 1 == 124
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
}
// BlockCache caches the last block for safe access from different service loops
type BlockCache struct {
sync.Mutex
@ -270,23 +206,29 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
var err error
db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
if err != nil {
log.Error("Error creating indexer", "indexer: ", params.IndexerConfig.Type(), "error: ", err)
return err
}
if params.IndexerConfig.Type() != shared.FILE {
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
log.Info("Starting the statediff service in ", "mode", params.IndexerConfig.Type())
if params.FileConfig != nil {
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.FileConfig, shared.FILE)
if err != nil {
log.Error("Error creating file indexer", "error: ", err)
return err
}
} else {
log.Info("Starting the statediff service in ", "mode", "File")
fileIndexer = indexer
}
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
indexer.ReportDBMetrics(10*time.Second, quitCh)
}
var checkForGaps bool
if params.IndexerConfig.Type() == shared.POSTGRES {
checkForGaps = true
} else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
checkForGaps = false
}
workers := params.NumWorkers
if workers == 0 {
workers = 1
@ -432,6 +374,75 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
}
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
// away from sds.KnownGaps.expectedDifference
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
func (sds *Service) captureMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
// last processed: 110
// current block: 125
log.Debug("current block", "block number: ", currentBlock)
log.Debug("knownErrorBlocks", "knownErrorBlocks: ", knownErrorBlocks)
log.Debug("last processed block", "block number: ", lastProcessedBlock)
log.Debug("expected difference", "sds.KnownGaps.expectedDifference: ", sds.KnownGaps.expectedDifference)
if len(knownErrorBlocks) > 0 {
// 115
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
// 120
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
// 111
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
// 124
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
if (expectedStartErrorBlock.Cmp(startErrorBlock) != 0) &&
(expectedEndErrorBlock.Cmp(endErrorBlock) != 0) {
log.Info("All Gaps already captured in knownErrorBlocks")
}
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
log.Warn("But there are gaps that were also not added there.")
log.Warn("Last Block in knownErrorBlocks", "endErrorBlock", endErrorBlock)
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
log.Warn("Current Block", "currentBlock", currentBlock)
//120 + 1 == 121
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
// 121 to 124
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
log.Warn("But there are gaps that were also not added there.")
log.Warn("First Block in knownErrorBlocks", "startErrorBlock", startErrorBlock)
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
// 115 - 1 == 114
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
// 111 to 114
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
} else {
log.Warn("We missed blocks without any errors.")
// 110 + 1 == 111
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
// 125 - 1 == 124
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
log.Warn("Missed blocks starting from", "startBlock", startBlock)
log.Warn("Missed blocks ending at", "endBlock", endBlock)
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
}
}
func (sds *Service) writeLoopWorker(params workerParams) {
defer params.wg.Done()
for {

View File

@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"math/big"
"os"
"strings"
"testing"
"github.com/ethereum/go-ethereum/params"
@ -24,18 +26,19 @@ var (
chainConf = params.MainnetChainConfig
)
type gapValues struct {
lastProcessedBlock int64
currentBlock int64
knownErrorBlocksStart int64
knownErrorBlocksEnd int64
expectedDif int64
processingKey int64
}
// Add clean db
// Test for failures when they are expected, when we go from smaller block to larger block
// We should no longer see the smaller block in DB
func TestKnownGaps(t *testing.T) {
type gapValues struct {
lastProcessedBlock int64
currentBlock int64
knownErrorBlocksStart int64
knownErrorBlocksEnd int64
expectedDif int64
processingKey int64
}
tests := []gapValues{
// Unprocessed gaps before and after knownErrorBlock
@ -61,64 +64,132 @@ func TestKnownGaps(t *testing.T) {
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2},
// Test update when block number is larger!!
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2},
// Update when processing key is different!
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10},
}
for _, tc := range tests {
// Reuse processing key from expecteDiff
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, false)
testWriteToDb(t, tests, true)
testWriteToFile(t, tests, true)
}
// test writing blocks to the DB
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
stateDiff, db, err := setupDb(t)
require.NoError(t, err)
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
// so we can't find the original start and endblock pair.
if wipeDbBeforeStart {
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
}
for _, tc := range tests {
// Create an array with knownGaps based on user inputs
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
knownErrorBlocks := (make([]*big.Int, 0))
if checkGaps {
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
}
// Upsert
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, nil)
// Validate that the upsert was done correctly.
callValidateUpsert(t, checkGaps, stateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
}
tearDown(t, stateDiff)
}
// test writing blocks to file and then inserting them to DB
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
stateDiff, db, err := setupDb(t)
require.NoError(t, err)
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
// so we can't find the original start and endblock pair.
if wipeDbBeforeStart {
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
}
tearDown(t, stateDiff)
for _, tc := range tests {
// Reuse processing key from expecteDiff
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, true)
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
knownErrorBlocks := (make([]*big.Int, 0))
if checkGaps {
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
}
fileInd := setupFile(t)
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, fileInd)
fileInd.Close()
newStateDiff, db, _ := setupDb(t)
file, ioErr := ioutil.ReadFile(file.TestConfig.FilePath)
require.NoError(t, ioErr)
requests := strings.Split(string(file), ";")
// Skip the first two enteries
for _, request := range requests[2:] {
_, err := db.Exec(context.Background(), request)
require.NoError(t, err)
}
callValidateUpsert(t, checkGaps, newStateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
}
}
// It also makes sure we properly calculate any missed gaps not in the known gaps lists
// either before or after the list.
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64, skipDb bool) {
// test capturing missed blocks
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64,
stateDiff *sql.StateDiffIndexer, knownErrorBlocks []*big.Int, fileInd interfaces.StateDiffIndexer) {
lastProcessedBlock := big.NewInt(lastBlockProcessed)
currentBlock := big.NewInt(currentBlockNum)
knownErrorBlocks := (make([]*big.Int, 0))
checkGaps := knownErrorBlocksStart != 0 && knownErrorBlocksEnd != 0
if checkGaps {
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
}
}
// Comment out values which should not be used
fileInd := setupFile(t)
knownGaps := KnownGapsState{
processingKey: processingKey,
expectedDifference: big.NewInt(expectedDif),
fileIndexer: fileInd,
}
stateDiff, err := setupDb(t)
if err != nil {
t.Fatal(err)
}
service := &Service{
KnownGaps: knownGaps,
indexer: stateDiff,
}
service.capturedMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
service.captureMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
}
// Helper function to create an array of gaps given a start and end block
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
}
return knownErrorBlocks
}
// This function will call the validateUpsert function based on various conditions.
func callValidateUpsert(t *testing.T, checkGaps bool, stateDiff *sql.StateDiffIndexer,
lastBlockProcessed int64, currentBlockNum int64, expectedDif int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) {
// If there are gaps in knownErrorBlocks array
if checkGaps {
// If there are no unexpected gaps before or after the entries in the knownErrorBlocks array
// Only handle the knownErrorBlocks Array
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
// If there are gaps after knownErrorBlocks array, process them
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
// If there are gaps before knownErrorBlocks array, process them
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
// if there are gaps before, after, and within the knownErrorBlocks array,handle all the errors.
} else {
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
@ -129,9 +200,9 @@ func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBloc
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
}
tearDown(t, stateDiff)
}
// Make sure the upsert was performed correctly
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
@ -142,15 +213,16 @@ func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock
}
// Create a DB object to use
func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
func setupDb(t *testing.T) (*sql.StateDiffIndexer, sql.Database, error) {
db, err = postgres.SetupSQLXDB()
if err != nil {
t.Fatal(err)
}
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
return stateDiff, err
return stateDiff, db, err
}
// Create a file statediff indexer.
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)