** Write to file if you can't write to DB
This commit contains many new features: 1. Write the insert statement to file if there is an error when trying to write to DB. 2. Test the write to DB and write to file features in unit tests. They work as expected. 3. Insert logic for calling the knownGaps function (untested). This PR makes a major change to the way statediff is started. Unless it is started in `file` mode, it will create a `file` indexer. This indexer will be used for writing errors to disk.
This commit is contained in:
parent
f566aa780c
commit
fc341a90fc
4
.gitignore
vendored
4
.gitignore
vendored
@ -53,4 +53,6 @@ foundry/projects/local-private-network/geth-linux-amd64
|
|||||||
# Helpful repos
|
# Helpful repos
|
||||||
related-repositories/foundry-test/**
|
related-repositories/foundry-test/**
|
||||||
related-repositories/hive/**
|
related-repositories/hive/**
|
||||||
related-repositories/ipld-eth-db/**
|
related-repositories/ipld-eth-db/**
|
||||||
|
statediff/indexer/database/sql/statediffing_test_file.sql
|
||||||
|
statediff/statediffing_test_file.sql
|
||||||
|
@ -177,6 +177,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
|
|
||||||
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
|
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
|
||||||
var indexerConfig interfaces.Config
|
var indexerConfig interfaces.Config
|
||||||
|
var fileConfig interfaces.Config
|
||||||
var clientName, nodeID string
|
var clientName, nodeID string
|
||||||
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
|
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
|
||||||
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
|
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
|
||||||
@ -191,6 +192,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
utils.Fatalf("%v", err)
|
utils.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dbType != shared.FILE {
|
||||||
|
fileConfig = file.Config{
|
||||||
|
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fileConfig = nil
|
||||||
|
}
|
||||||
|
|
||||||
switch dbType {
|
switch dbType {
|
||||||
case shared.FILE:
|
case shared.FILE:
|
||||||
indexerConfig = file.Config{
|
indexerConfig = file.Config{
|
||||||
@ -253,6 +263,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
}
|
}
|
||||||
p := statediff.Config{
|
p := statediff.Config{
|
||||||
IndexerConfig: indexerConfig,
|
IndexerConfig: indexerConfig,
|
||||||
|
FileConfig: fileConfig,
|
||||||
ID: nodeID,
|
ID: nodeID,
|
||||||
ClientName: clientName,
|
ClientName: clientName,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
@ -26,7 +26,10 @@ import (
|
|||||||
|
|
||||||
// Config contains instantiation parameters for the state diffing service
|
// Config contains instantiation parameters for the state diffing service
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
// The configuration used for the primary stateDiff Indexer
|
||||||
IndexerConfig interfaces.Config
|
IndexerConfig interfaces.Config
|
||||||
|
// The configuration used for the file stateDiff Indexer. This indexer is used if the primary indexer fails and is not a FILE indexer.
|
||||||
|
FileConfig interfaces.Config
|
||||||
// A unique ID used for this service
|
// A unique ID used for this service
|
||||||
ID string
|
ID string
|
||||||
// Name for the client this service is running
|
// Name for the client this service is running
|
||||||
|
@ -35,11 +35,12 @@ import (
|
|||||||
// You can specify the specific
|
// You can specify the specific
|
||||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) {
|
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) {
|
||||||
var indexerToCreate shared.DBType
|
var indexerToCreate shared.DBType
|
||||||
if specificIndexer != "" {
|
if specificIndexer == "" {
|
||||||
indexerToCreate = config.Type()
|
indexerToCreate = config.Type()
|
||||||
} else {
|
} else {
|
||||||
indexerToCreate = specificIndexer
|
indexerToCreate = specificIndexer
|
||||||
}
|
}
|
||||||
|
log.Info("Indexer to create is", "indexer", indexerToCreate)
|
||||||
switch indexerToCreate {
|
switch indexerToCreate {
|
||||||
case shared.FILE:
|
case shared.FILE:
|
||||||
log.Info("Creating a statediff indexer in SQL file writing mode")
|
log.Info("Creating a statediff indexer in SQL file writing mode")
|
||||||
|
@ -497,9 +497,26 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
return sdi.dump.Close()
|
return sdi.dump.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only needed to satisfy use cases
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error {
|
||||||
return nil
|
log.Error("We can't find gaps in write mode!")
|
||||||
|
return fmt.Errorf("We can't find gaps in write mode!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Written but not tested. Unsure if there is a real use case for this anywhere.
|
||||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
|
||||||
|
log.Info("Dumping known gaps")
|
||||||
|
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||||
|
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||||
|
}
|
||||||
|
knownGap := models.KnownGapsModel{
|
||||||
|
StartingBlockNumber: startingBlockNumber.String(),
|
||||||
|
EndingBlockNumber: endingBlockNumber.String(),
|
||||||
|
CheckedOut: checkedOut,
|
||||||
|
ProcessingKey: processingKey,
|
||||||
|
}
|
||||||
|
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", knownGap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -480,7 +480,8 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
||||||
return nil
|
log.Error("We can't find gaps in write mode!")
|
||||||
|
return fmt.Errorf("We can't find gaps in write mode!")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
||||||
|
@ -263,7 +263,7 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
|
func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
|
||||||
sqw.stmts <- []byte(fmt.Sprintf(knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
sqw.stmts <- []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
||||||
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
||||||
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||||
// "VALUES ('%s', '%s', %t, %d) " +
|
// "VALUES ('%s', '%s', %t, %d) " +
|
||||||
|
@ -607,7 +607,8 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro
|
|||||||
// More geth nodes, the expected difference might fluctuate.
|
// More geth nodes, the expected difference might fluctuate.
|
||||||
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
||||||
latestBlock := big.NewInt(0)
|
latestBlock := big.NewInt(0)
|
||||||
if latestBlock.Add(latestBlockOnChain, expectedDifference) != latestBlockInDb {
|
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
|
||||||
|
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@ -634,9 +635,10 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
|
|||||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||||
|
|
||||||
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
|
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
|
||||||
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
|
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
|
||||||
// Write to file SQL file instead!!!
|
// Write to file SQL file instead!!!
|
||||||
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
||||||
return err
|
return err
|
||||||
|
@ -164,70 +164,6 @@ type KnownGapsState struct {
|
|||||||
fileIndexer interfaces.StateDiffIndexer
|
fileIndexer interfaces.StateDiffIndexer
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
|
||||||
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
|
|
||||||
// away from sds.KnownGaps.expectedDifference
|
|
||||||
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
|
|
||||||
func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
|
|
||||||
// last processed: 110
|
|
||||||
// current block: 125
|
|
||||||
if len(knownErrorBlocks) > 0 {
|
|
||||||
// 115
|
|
||||||
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
|
|
||||||
// 120
|
|
||||||
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
|
|
||||||
|
|
||||||
// 111
|
|
||||||
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
|
||||||
// 124
|
|
||||||
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
|
||||||
|
|
||||||
if (expectedStartErrorBlock == startErrorBlock) &&
|
|
||||||
(expectedEndErrorBlock == endErrorBlock) {
|
|
||||||
log.Info("All Gaps already captured in knownErrorBlocks")
|
|
||||||
}
|
|
||||||
|
|
||||||
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
|
|
||||||
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
|
|
||||||
log.Warn("But there are gaps that were also not added there.")
|
|
||||||
log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock))
|
|
||||||
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
|
|
||||||
log.Warn(fmt.Sprint("Current Block: ", currentBlock))
|
|
||||||
//120 + 1 == 121
|
|
||||||
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
|
||||||
// 121 to 124
|
|
||||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
|
||||||
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
|
||||||
}
|
|
||||||
|
|
||||||
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
|
||||||
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
|
|
||||||
log.Warn("But there are gaps that were also not added there.")
|
|
||||||
log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock))
|
|
||||||
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
|
|
||||||
// 115 - 1 == 114
|
|
||||||
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
|
||||||
// 111 to 114
|
|
||||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
|
||||||
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
|
|
||||||
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
|
||||||
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
log.Warn("We missed blocks without any errors.")
|
|
||||||
// 110 + 1 == 111
|
|
||||||
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
|
||||||
// 125 - 1 == 124
|
|
||||||
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
|
||||||
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
|
|
||||||
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
|
|
||||||
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// BlockCache caches the last block for safe access from different service loops
|
// BlockCache caches the last block for safe access from different service loops
|
||||||
type BlockCache struct {
|
type BlockCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -260,23 +196,29 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
var err error
|
var err error
|
||||||
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
|
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error("Error creating indexer", "indexer: ", params.IndexerConfig.Type(), "error: ", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if params.IndexerConfig.Type() != shared.FILE {
|
if params.FileConfig != nil {
|
||||||
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
|
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.FileConfig, shared.FILE)
|
||||||
log.Info("Starting the statediff service in ", "mode", params.IndexerConfig.Type())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error("Error creating file indexer", "error: ", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
log.Info("Starting the statediff service in ", "mode", "File")
|
|
||||||
fileIndexer = indexer
|
fileIndexer = indexer
|
||||||
}
|
}
|
||||||
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
|
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
|
||||||
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var checkForGaps bool
|
||||||
|
if params.IndexerConfig.Type() == shared.POSTGRES {
|
||||||
|
checkForGaps = true
|
||||||
|
} else {
|
||||||
|
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
||||||
|
checkForGaps = false
|
||||||
|
}
|
||||||
workers := params.NumWorkers
|
workers := params.NumWorkers
|
||||||
if workers == 0 {
|
if workers == 0 {
|
||||||
workers = 1
|
workers = 1
|
||||||
@ -284,7 +226,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
// If we ever have multiple processingKeys we can update them here
|
// If we ever have multiple processingKeys we can update them here
|
||||||
// along with the expectedDifference
|
// along with the expectedDifference
|
||||||
knownGaps := &KnownGapsState{
|
knownGaps := &KnownGapsState{
|
||||||
checkForGaps: true,
|
checkForGaps: checkForGaps,
|
||||||
processingKey: 0,
|
processingKey: 0,
|
||||||
expectedDifference: big.NewInt(1),
|
expectedDifference: big.NewInt(1),
|
||||||
errorState: false,
|
errorState: false,
|
||||||
@ -406,6 +348,75 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
|
|||||||
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
||||||
|
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
|
||||||
|
// away from sds.KnownGaps.expectedDifference
|
||||||
|
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
|
||||||
|
func (sds *Service) captureMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
|
||||||
|
// last processed: 110
|
||||||
|
// current block: 125
|
||||||
|
log.Debug("current block", "block number: ", currentBlock)
|
||||||
|
log.Debug("knownErrorBlocks", "knownErrorBlocks: ", knownErrorBlocks)
|
||||||
|
log.Debug("last processed block", "block number: ", lastProcessedBlock)
|
||||||
|
log.Debug("expected difference", "sds.KnownGaps.expectedDifference: ", sds.KnownGaps.expectedDifference)
|
||||||
|
|
||||||
|
if len(knownErrorBlocks) > 0 {
|
||||||
|
// 115
|
||||||
|
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
|
||||||
|
// 120
|
||||||
|
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
|
||||||
|
|
||||||
|
// 111
|
||||||
|
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 124
|
||||||
|
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
|
||||||
|
if (expectedStartErrorBlock.Cmp(startErrorBlock) != 0) &&
|
||||||
|
(expectedEndErrorBlock.Cmp(endErrorBlock) != 0) {
|
||||||
|
log.Info("All Gaps already captured in knownErrorBlocks")
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
|
||||||
|
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
|
||||||
|
log.Warn("But there are gaps that were also not added there.")
|
||||||
|
log.Warn("Last Block in knownErrorBlocks", "endErrorBlock", endErrorBlock)
|
||||||
|
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
|
||||||
|
log.Warn("Current Block", "currentBlock", currentBlock)
|
||||||
|
//120 + 1 == 121
|
||||||
|
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 121 to 124
|
||||||
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
||||||
|
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
||||||
|
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
|
||||||
|
log.Warn("But there are gaps that were also not added there.")
|
||||||
|
log.Warn("First Block in knownErrorBlocks", "startErrorBlock", startErrorBlock)
|
||||||
|
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
|
||||||
|
// 115 - 1 == 114
|
||||||
|
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 111 to 114
|
||||||
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
||||||
|
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
||||||
|
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
||||||
|
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Warn("We missed blocks without any errors.")
|
||||||
|
// 110 + 1 == 111
|
||||||
|
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 125 - 1 == 124
|
||||||
|
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
log.Warn("Missed blocks starting from", "startBlock", startBlock)
|
||||||
|
log.Warn("Missed blocks ending at", "endBlock", endBlock)
|
||||||
|
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sds *Service) writeLoopWorker(params workerParams) {
|
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||||
defer params.wg.Done()
|
defer params.wg.Done()
|
||||||
for {
|
for {
|
||||||
@ -428,7 +439,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
// If for any reason we need to check for gaps,
|
// If for any reason we need to check for gaps,
|
||||||
// Check and update the gaps table.
|
// Check and update the gaps table.
|
||||||
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
||||||
log.Info("Checking for Gaps at current block: ", currentBlock.Number())
|
log.Info("Checking for Gaps at", "current block", currentBlock.Number())
|
||||||
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
sds.KnownGaps.checkForGaps = false
|
sds.KnownGaps.checkForGaps = false
|
||||||
}
|
}
|
||||||
@ -439,7 +450,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||||
sds.KnownGaps.errorState = true
|
sds.KnownGaps.errorState = true
|
||||||
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
|
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
|
||||||
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table: ", currentBlock.Number())
|
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "block number", currentBlock.Number())
|
||||||
// Write object to startdiff
|
// Write object to startdiff
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -448,14 +459,22 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
previousExpectedBlock := big.NewInt(0).Sub(currentBlock.Number(), sds.KnownGaps.expectedDifference)
|
previousExpectedBlock := big.NewInt(0).Sub(currentBlock.Number(), sds.KnownGaps.expectedDifference)
|
||||||
// If we last block which should have been processed is not
|
// If we last block which should have been processed is not
|
||||||
// the actual lastProcessedBlock, add it to known gaps table.
|
// the actual lastProcessedBlock, add it to known gaps table.
|
||||||
if previousExpectedBlock != sds.KnownGaps.lastProcessedBlock && sds.KnownGaps.lastProcessedBlock != nil {
|
if sds.KnownGaps.lastProcessedBlock != nil {
|
||||||
// We must pass in parameters by VALUE not reference.
|
// Can't combine the two if statements because you can't compare a nil object...
|
||||||
// If we pass them in my reference, the references can change before the computation is complete!
|
if previousExpectedBlock.Cmp(sds.KnownGaps.lastProcessedBlock) != 0 {
|
||||||
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
// We must pass in parameters by VALUE not reference.
|
||||||
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
// If we pass them in my reference, the references can change before the computation is complete!
|
||||||
staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock)
|
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
||||||
go sds.capturedMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock)
|
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
||||||
sds.KnownGaps.knownErrorBlocks = nil
|
staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock)
|
||||||
|
|
||||||
|
log.Debug("Starting capturedMissedBlocks")
|
||||||
|
log.Debug("previousExpectedBlock is", "previous expected block: ", previousExpectedBlock)
|
||||||
|
log.Debug("sds.KnownGaps.lastProcessedBlock is", "sds.KnownGaps.lastProcessedBlock: ", sds.KnownGaps.lastProcessedBlock)
|
||||||
|
|
||||||
|
go sds.captureMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock)
|
||||||
|
sds.KnownGaps.knownErrorBlocks = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
sds.KnownGaps.lastProcessedBlock = currentBlock.Number()
|
sds.KnownGaps.lastProcessedBlock = currentBlock.Number()
|
||||||
|
|
||||||
|
@ -4,8 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
@ -24,18 +26,19 @@ var (
|
|||||||
chainConf = params.MainnetChainConfig
|
chainConf = params.MainnetChainConfig
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type gapValues struct {
|
||||||
|
lastProcessedBlock int64
|
||||||
|
currentBlock int64
|
||||||
|
knownErrorBlocksStart int64
|
||||||
|
knownErrorBlocksEnd int64
|
||||||
|
expectedDif int64
|
||||||
|
processingKey int64
|
||||||
|
}
|
||||||
|
|
||||||
// Add clean db
|
// Add clean db
|
||||||
// Test for failures when they are expected, when we go from smaller block to larger block
|
// Test for failures when they are expected, when we go from smaller block to larger block
|
||||||
// We should no longer see the smaller block in DB
|
// We should no longer see the smaller block in DB
|
||||||
func TestKnownGaps(t *testing.T) {
|
func TestKnownGaps(t *testing.T) {
|
||||||
type gapValues struct {
|
|
||||||
lastProcessedBlock int64
|
|
||||||
currentBlock int64
|
|
||||||
knownErrorBlocksStart int64
|
|
||||||
knownErrorBlocksEnd int64
|
|
||||||
expectedDif int64
|
|
||||||
processingKey int64
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []gapValues{
|
tests := []gapValues{
|
||||||
// Unprocessed gaps before and after knownErrorBlock
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
@ -61,64 +64,132 @@ func TestKnownGaps(t *testing.T) {
|
|||||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||||
// Test update when block number is larger!!
|
// Test update when block number is larger!!
|
||||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||||
|
// Update when processing key is different!
|
||||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10},
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10},
|
||||||
}
|
}
|
||||||
for _, tc := range tests {
|
|
||||||
// Reuse processing key from expecteDiff
|
testWriteToDb(t, tests, true)
|
||||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, false)
|
testWriteToFile(t, tests, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test writing blocks to the DB
|
||||||
|
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||||
|
stateDiff, db, err := setupDb(t)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||||
|
// so we can't find the original start and endblock pair.
|
||||||
|
if wipeDbBeforeStart {
|
||||||
|
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// Create an array with knownGaps based on user inputs
|
||||||
|
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
|
||||||
|
knownErrorBlocks := (make([]*big.Int, 0))
|
||||||
|
if checkGaps {
|
||||||
|
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||||
|
}
|
||||||
|
// Upsert
|
||||||
|
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
|
||||||
|
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, nil)
|
||||||
|
// Validate that the upsert was done correctly.
|
||||||
|
callValidateUpsert(t, checkGaps, stateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||||
|
}
|
||||||
|
tearDown(t, stateDiff)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// test writing blocks to file and then inserting them to DB
|
||||||
|
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||||
|
stateDiff, db, err := setupDb(t)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||||
|
// so we can't find the original start and endblock pair.
|
||||||
|
if wipeDbBeforeStart {
|
||||||
|
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||||
|
}
|
||||||
|
|
||||||
|
tearDown(t, stateDiff)
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
// Reuse processing key from expecteDiff
|
// Reuse processing key from expecteDiff
|
||||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, true)
|
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
|
||||||
|
knownErrorBlocks := (make([]*big.Int, 0))
|
||||||
|
if checkGaps {
|
||||||
|
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||||
|
}
|
||||||
|
|
||||||
|
fileInd := setupFile(t)
|
||||||
|
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
|
||||||
|
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, fileInd)
|
||||||
|
fileInd.Close()
|
||||||
|
|
||||||
|
newStateDiff, db, _ := setupDb(t)
|
||||||
|
|
||||||
|
file, ioErr := ioutil.ReadFile(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, ioErr)
|
||||||
|
|
||||||
|
requests := strings.Split(string(file), ";")
|
||||||
|
|
||||||
|
// Skip the first two enteries
|
||||||
|
for _, request := range requests[2:] {
|
||||||
|
_, err := db.Exec(context.Background(), request)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
callValidateUpsert(t, checkGaps, newStateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// It also makes sure we properly calculate any missed gaps not in the known gaps lists
|
// test capturing missed blocks
|
||||||
// either before or after the list.
|
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64,
|
||||||
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64, skipDb bool) {
|
stateDiff *sql.StateDiffIndexer, knownErrorBlocks []*big.Int, fileInd interfaces.StateDiffIndexer) {
|
||||||
|
|
||||||
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
||||||
currentBlock := big.NewInt(currentBlockNum)
|
currentBlock := big.NewInt(currentBlockNum)
|
||||||
knownErrorBlocks := (make([]*big.Int, 0))
|
|
||||||
|
|
||||||
checkGaps := knownErrorBlocksStart != 0 && knownErrorBlocksEnd != 0
|
|
||||||
if checkGaps {
|
|
||||||
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
|
||||||
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Comment out values which should not be used
|
|
||||||
fileInd := setupFile(t)
|
|
||||||
knownGaps := KnownGapsState{
|
knownGaps := KnownGapsState{
|
||||||
processingKey: processingKey,
|
processingKey: processingKey,
|
||||||
expectedDifference: big.NewInt(expectedDif),
|
expectedDifference: big.NewInt(expectedDif),
|
||||||
fileIndexer: fileInd,
|
fileIndexer: fileInd,
|
||||||
}
|
}
|
||||||
stateDiff, err := setupDb(t)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
service := &Service{
|
service := &Service{
|
||||||
KnownGaps: knownGaps,
|
KnownGaps: knownGaps,
|
||||||
indexer: stateDiff,
|
indexer: stateDiff,
|
||||||
}
|
}
|
||||||
service.capturedMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
|
service.captureMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to create an array of gaps given a start and end block
|
||||||
|
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
|
||||||
|
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||||
|
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||||
|
}
|
||||||
|
return knownErrorBlocks
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will call the validateUpsert function based on various conditions.
|
||||||
|
func callValidateUpsert(t *testing.T, checkGaps bool, stateDiff *sql.StateDiffIndexer,
|
||||||
|
lastBlockProcessed int64, currentBlockNum int64, expectedDif int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) {
|
||||||
|
// If there are gaps in knownErrorBlocks array
|
||||||
if checkGaps {
|
if checkGaps {
|
||||||
|
|
||||||
|
// If there are no unexpected gaps before or after the entries in the knownErrorBlocks array
|
||||||
|
// Only handle the knownErrorBlocks Array
|
||||||
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
|
||||||
|
// If there are gaps after knownErrorBlocks array, process them
|
||||||
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
|
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
|
||||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
|
||||||
|
// If there are gaps before knownErrorBlocks array, process them
|
||||||
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
|
||||||
|
// if there are gaps before, after, and within the knownErrorBlocks array,handle all the errors.
|
||||||
} else {
|
} else {
|
||||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
@ -129,9 +200,9 @@ func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBloc
|
|||||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
|
||||||
}
|
}
|
||||||
|
|
||||||
tearDown(t, stateDiff)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure the upsert was performed correctly
|
||||||
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
|
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
|
||||||
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||||
@ -142,15 +213,16 @@ func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a DB object to use
|
// Create a DB object to use
|
||||||
func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
|
func setupDb(t *testing.T) (*sql.StateDiffIndexer, sql.Database, error) {
|
||||||
db, err = postgres.SetupSQLXDB()
|
db, err = postgres.SetupSQLXDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||||
return stateDiff, err
|
return stateDiff, db, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a file statediff indexer.
|
||||||
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
|
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
|
||||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||||
err := os.Remove(file.TestConfig.FilePath)
|
err := os.Remove(file.TestConfig.FilePath)
|
||||||
|
Loading…
Reference in New Issue
Block a user