Refactor: Decouple knownGaps and Indexer
This commit decouples knownGaps and Indexer. All knownGaps logic is in its own file. This makes testing and maintainability easier. We have also removed all efforts to check the `lastProcessedblock` - This is because we won't ever run into this issue (hyptothetically), because geth won't let it happen.
This commit is contained in:
parent
fc341a90fc
commit
b960661807
1
.gitignore
vendored
1
.gitignore
vendored
@ -56,3 +56,4 @@ related-repositories/hive/**
|
||||
related-repositories/ipld-eth-db/**
|
||||
statediff/indexer/database/sql/statediffing_test_file.sql
|
||||
statediff/statediffing_test_file.sql
|
||||
statediff/known_gaps.sql
|
||||
|
@ -177,7 +177,6 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
|
||||
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
|
||||
var indexerConfig interfaces.Config
|
||||
var fileConfig interfaces.Config
|
||||
var clientName, nodeID string
|
||||
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
|
||||
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
|
||||
@ -192,15 +191,6 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
if err != nil {
|
||||
utils.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
if dbType != shared.FILE {
|
||||
fileConfig = file.Config{
|
||||
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
|
||||
}
|
||||
} else {
|
||||
fileConfig = nil
|
||||
}
|
||||
|
||||
switch dbType {
|
||||
case shared.FILE:
|
||||
indexerConfig = file.Config{
|
||||
@ -262,14 +252,14 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
}
|
||||
}
|
||||
p := statediff.Config{
|
||||
IndexerConfig: indexerConfig,
|
||||
FileConfig: fileConfig,
|
||||
ID: nodeID,
|
||||
ClientName: clientName,
|
||||
Context: context.Background(),
|
||||
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
|
||||
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
|
||||
WaitForSync: ctx.GlobalBool(utils.StateDiffWaitForSync.Name),
|
||||
IndexerConfig: indexerConfig,
|
||||
KnownGapsFilePath: ctx.GlobalString(utils.StateDiffKnownGapsFilePath.Name),
|
||||
ID: nodeID,
|
||||
ClientName: clientName,
|
||||
Context: context.Background(),
|
||||
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
|
||||
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
|
||||
WaitForSync: ctx.GlobalBool(utils.StateDiffWaitForSync.Name),
|
||||
}
|
||||
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
|
||||
}
|
||||
|
@ -176,6 +176,7 @@ var (
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
configFileFlag,
|
||||
}
|
||||
|
@ -246,6 +246,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
},
|
||||
},
|
||||
|
@ -867,6 +867,11 @@ var (
|
||||
Name: "statediff.file.path",
|
||||
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
|
||||
}
|
||||
StateDiffKnownGapsFilePath = cli.StringFlag{
|
||||
Name: "statediff.knowngapsfile.path",
|
||||
Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.",
|
||||
Value: "./known_gaps.sql",
|
||||
}
|
||||
StateDiffDBClientNameFlag = cli.StringFlag{
|
||||
Name: "statediff.db.clientname",
|
||||
Usage: "Client name to use when writing state diffs to database",
|
||||
|
@ -26,10 +26,10 @@ import (
|
||||
|
||||
// Config contains instantiation parameters for the state diffing service
|
||||
type Config struct {
|
||||
// The configuration used for the primary stateDiff Indexer
|
||||
// The configuration used for the stateDiff Indexer
|
||||
IndexerConfig interfaces.Config
|
||||
// The configuration used for the file stateDiff Indexer. This indexer is used if the primary indexer fails and is not a FILE indexer.
|
||||
FileConfig interfaces.Config
|
||||
// The filepath to write knownGaps insert statements if we can't connect to the DB.
|
||||
KnownGapsFilePath string
|
||||
// A unique ID used for this service
|
||||
ID string
|
||||
// Name for the client this service is running
|
||||
|
@ -10,7 +10,6 @@ The known gaps table is updated when the following events occur:
|
||||
|
||||
1. At start up we check the latest block from the `eth.headers_cid` table. We compare the first block that we are processing with the latest block from the DB. If they are not one unit of expectedDifference away from each other, add the gap between the two blocks.
|
||||
2. If there is any error in processing a block (db connection, deadlock, etc), add that block to the knownErrorBlocks slice, when the next block is successfully written, write this slice into the DB.
|
||||
3. If the last processed block is not one unit of expectedDifference away from the current block being processed. This can be due to any unknown or unhandled errors in geth.
|
||||
|
||||
# Glossary
|
||||
|
||||
|
21
statediff/docs/database.md
Normal file
21
statediff/docs/database.md
Normal file
@ -0,0 +1,21 @@
|
||||
# Overview
|
||||
|
||||
This document will go through some notes on the database component of the statediff service.
|
||||
|
||||
# Components
|
||||
|
||||
- Indexer: The indexer creates IPLD and DB models to insert to the Postgres DB. It performs the insert utilizing and atomic function.
|
||||
- Builder: The builder constructs the statediff object that needs to be inserted.
|
||||
- Known Gaps: Captures any gaps that might have occured and either writes them to the DB, local sql file, to prometeus, or a local error.
|
||||
|
||||
# Making Code Changes
|
||||
|
||||
## Adding a New Function to the Indexer
|
||||
|
||||
If you want to implement a new feature for adding data to the database. Keep the following in mind:
|
||||
|
||||
1. You need to handle `sql`, `file`, and `dump`.
|
||||
1. `sql` - Contains the code needed to write directly to the `sql` db.
|
||||
2. `file` - Contains all the code required to write the SQL statements to a file.
|
||||
3. `dump` - Contains all the code for outputting events to the console.
|
||||
2. You will have to add it to the `interfaces.StateDiffIndexer` interface.
|
Binary file not shown.
Before Width: | Height: | Size: 52 KiB After Width: | Height: | Size: 33 KiB |
@ -32,29 +32,22 @@ import (
|
||||
)
|
||||
|
||||
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
|
||||
// You can specify the specific
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) {
|
||||
var indexerToCreate shared.DBType
|
||||
if specificIndexer == "" {
|
||||
indexerToCreate = config.Type()
|
||||
} else {
|
||||
indexerToCreate = specificIndexer
|
||||
}
|
||||
log.Info("Indexer to create is", "indexer", indexerToCreate)
|
||||
switch indexerToCreate {
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (sql.Database, interfaces.StateDiffIndexer, error) {
|
||||
switch config.Type() {
|
||||
case shared.FILE:
|
||||
log.Info("Creating a statediff indexer in SQL file writing mode")
|
||||
log.Info("Starting statediff service in SQL file writing mode")
|
||||
fc, ok := config.(file.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||
}
|
||||
fc.NodeInfo = nodeInfo
|
||||
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
||||
ind, err := file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
||||
return nil, ind, err
|
||||
case shared.POSTGRES:
|
||||
log.Info("Creating a statediff service in Postgres writing mode")
|
||||
log.Info("Starting statediff service in Postgres writing mode")
|
||||
pgc, ok := config.(postgres.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||
return nil, nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||
}
|
||||
var err error
|
||||
var driver sql.Driver
|
||||
@ -62,25 +55,27 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
case postgres.PGX:
|
||||
driver, err = postgres.NewPGXDriver(ctx, pgc, nodeInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
case postgres.SQLX:
|
||||
driver, err = postgres.NewSQLXDriver(ctx, pgc, nodeInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||
}
|
||||
return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver))
|
||||
db := postgres.NewPostgresDB(driver)
|
||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
||||
return db, ind, err
|
||||
case shared.DUMP:
|
||||
log.Info("Creating statediff indexer in data dump mode")
|
||||
log.Info("Starting statediff service in data dump mode")
|
||||
dumpc, ok := config.(dump.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
|
||||
return nil, nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
|
||||
}
|
||||
return dump.NewStateDiffIndexer(chainConfig, dumpc), nil
|
||||
return nil, dump.NewStateDiffIndexer(chainConfig, dumpc), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized database type: %s", indexerToCreate)
|
||||
return nil, nil, fmt.Errorf("unrecognized database type: %s", config.Type())
|
||||
}
|
||||
}
|
||||
|
@ -496,27 +496,3 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.dump.Close()
|
||||
}
|
||||
|
||||
// Only needed to satisfy use cases
|
||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error {
|
||||
log.Error("We can't find gaps in write mode!")
|
||||
return fmt.Errorf("We can't find gaps in write mode!")
|
||||
}
|
||||
|
||||
// Written but not tested. Unsure if there is a real use case for this anywhere.
|
||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
|
||||
log.Info("Dumping known gaps")
|
||||
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||
}
|
||||
knownGap := models.KnownGapsModel{
|
||||
StartingBlockNumber: startingBlockNumber.String(),
|
||||
EndingBlockNumber: endingBlockNumber.String(),
|
||||
CheckedOut: checkedOut,
|
||||
ProcessingKey: processingKey,
|
||||
}
|
||||
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", knownGap); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -478,24 +478,3 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.fileWriter.Close()
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
||||
log.Error("We can't find gaps in write mode!")
|
||||
return fmt.Errorf("We can't find gaps in write mode!")
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
||||
log.Info("Writing Gaps to file")
|
||||
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||
}
|
||||
knownGap := models.KnownGapsModel{
|
||||
StartingBlockNumber: startingBlockNumber.String(),
|
||||
EndingBlockNumber: endingBlockNumber.String(),
|
||||
CheckedOut: checkedOut,
|
||||
ProcessingKey: processingKey,
|
||||
}
|
||||
|
||||
sdi.fileWriter.upsertKnownGaps(knownGap)
|
||||
return nil
|
||||
}
|
||||
|
@ -155,14 +155,6 @@ const (
|
||||
|
||||
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
|
||||
"node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
|
||||
// INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key)
|
||||
//VALUES ($1, $2, $3, $4)
|
||||
// ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
|
||||
// WHERE eth.known_gaps.ending_block_number <= $2
|
||||
knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||
"VALUES ('%s', '%s', %t, %d) " +
|
||||
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||
"WHERE eth.known_gaps.ending_block_number <= '%s';\n"
|
||||
)
|
||||
|
||||
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
|
||||
@ -261,12 +253,3 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
||||
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
||||
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||
// "VALUES ('%s', '%s', %t, %d) " +
|
||||
// "ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||
// "WHERE eth.known_gaps.ending_block_number <= '%s';\n"
|
||||
}
|
||||
|
@ -555,95 +555,3 @@ func (sdi *StateDiffIndexer) Close() error {
|
||||
}
|
||||
|
||||
// Update the known gaps table with the gap information.
|
||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
|
||||
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||
}
|
||||
knownGap := models.KnownGapsModel{
|
||||
StartingBlockNumber: startingBlockNumber.String(),
|
||||
EndingBlockNumber: endingBlockNumber.String(),
|
||||
CheckedOut: checkedOut,
|
||||
ProcessingKey: processingKey,
|
||||
}
|
||||
log.Info("Writing known gaps to the DB")
|
||||
if err := sdi.dbWriter.upsertKnownGaps(knownGap); err != nil {
|
||||
log.Warn("Error writing knownGaps to DB, writing them to file instead")
|
||||
fileIndexer.PushKnownGaps(startingBlockNumber, endingBlockNumber, checkedOut, processingKey, nil)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a simple wrapper function which will run QueryRow on the DB
|
||||
func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
|
||||
var ret string
|
||||
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
||||
return "", err
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// This function is a simple wrapper which will call QueryDb but the return value will be
|
||||
// a big int instead of a string
|
||||
func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, error) {
|
||||
ret := new(big.Int)
|
||||
res, err := sdi.QueryDb(queryString)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
ret, ok := ret.SetString(res, 10)
|
||||
if !ok {
|
||||
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
||||
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Users provide the latestBlockInDb and the latestBlockOnChain
|
||||
// as well as the expected difference. This function does some simple math.
|
||||
// The expected difference for the time being is going to be 1, but as we run
|
||||
// More geth nodes, the expected difference might fluctuate.
|
||||
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
||||
latestBlock := big.NewInt(0)
|
||||
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
|
||||
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
}
|
||||
|
||||
// This function will check for Gaps and update the DB if gaps are found.
|
||||
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
||||
// It might be a useful parameter to update depending on the geth node.
|
||||
// TODO:
|
||||
// REmove the return value
|
||||
// Write to file if err in writing to DB
|
||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
|
||||
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
||||
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
|
||||
if gapExists {
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||
|
||||
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
|
||||
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
|
||||
if err != nil {
|
||||
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
|
||||
// Write to file SQL file instead!!!
|
||||
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
@ -181,48 +180,3 @@ func tearDown(t *testing.T) {
|
||||
err := ind.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
func TestKnownGapsUpsert(t *testing.T) {
|
||||
testKnownGapsUpsert(t)
|
||||
|
||||
}
|
||||
func testKnownGapsUpsert(t *testing.T) {
|
||||
gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain
|
||||
expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be
|
||||
|
||||
stateDiff, err := setupDb(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInd := setupFile(t)
|
||||
|
||||
// Get the latest block from the DB
|
||||
latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
||||
if err != nil {
|
||||
t.Fatal("Can't find a block in the eth.header_cids table.. Please put one there")
|
||||
}
|
||||
|
||||
// Add the gapDifference for testing purposes
|
||||
latestBlockOnChain := big.NewInt(0)
|
||||
latestBlockOnChain.Add(latestBlockInDb, gapDifference)
|
||||
|
||||
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
||||
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
||||
|
||||
gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0, fileInd)
|
||||
require.NoError(t, gapUpsertErr)
|
||||
|
||||
// Calculate what the start and end block should be in known_gaps
|
||||
// And check to make sure it is properly inserted
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||
|
||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock)
|
||||
|
||||
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||
require.NoError(t, queryErr)
|
||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)
|
||||
|
||||
}
|
||||
|
@ -106,5 +106,4 @@ func (db *DB) InsertKnownGapsStm() string {
|
||||
return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
|
||||
WHERE eth.known_gaps.ending_block_number <= $2`
|
||||
//return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES (1, 2, true, 1)`
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -183,15 +182,3 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upserts known gaps to the DB.
|
||||
// INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
|
||||
func (w *Writer) upsertKnownGaps(knownGaps models.KnownGapsModel) error {
|
||||
_, err := w.db.Exec(context.Background(), w.db.InsertKnownGapsStm(),
|
||||
knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting known_gaps entry: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -32,10 +32,6 @@ type StateDiffIndexer interface {
|
||||
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
|
||||
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer StateDiffIndexer) error
|
||||
// The indexer at the end allows us to pass one indexer to another.
|
||||
// We use then for the SQL indexer, we pass it the file indexer so it can write to file if writing to the DB fails.
|
||||
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer StateDiffIndexer) error
|
||||
io.Closer
|
||||
}
|
||||
|
||||
|
243
statediff/known_gaps.go
Normal file
243
statediff/known_gaps.go
Normal file
@ -0,0 +1,243 @@
|
||||
// Copyright 2019 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
)
|
||||
|
||||
var (
|
||||
knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||
"VALUES ('%s', '%s', %t, %d) " +
|
||||
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||
"WHERE eth.known_gaps.ending_block_number <= '%s';\n"
|
||||
dbQueryString = "SELECT MAX(block_number) FROM eth.header_cids"
|
||||
defaultWriteFilePath = "./known_gaps.sql"
|
||||
)
|
||||
|
||||
type KnownGaps interface {
|
||||
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
|
||||
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
|
||||
}
|
||||
|
||||
type KnownGapsState struct {
|
||||
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||
checkForGaps bool
|
||||
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||
processingKey int64
|
||||
// This number indicates the expected difference between blocks.
|
||||
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
||||
// Tandom with the processingKey to differentiate block processing logic.
|
||||
expectedDifference *big.Int
|
||||
// Indicates if Geth is in an error state
|
||||
// This is used to indicate the right time to upserts
|
||||
errorState bool
|
||||
// This array keeps track of errorBlocks as they occur.
|
||||
// When the errorState is false again, we can process these blocks.
|
||||
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
||||
knownErrorBlocks []*big.Int
|
||||
// The filepath to write SQL statements if we can't connect to the DB.
|
||||
writeFilePath string
|
||||
// DB object to use for reading and writing to the DB
|
||||
db sql.Database
|
||||
}
|
||||
|
||||
func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int,
|
||||
errorState bool, writeFilePath string, db sql.Database) *KnownGapsState {
|
||||
|
||||
return &KnownGapsState{
|
||||
checkForGaps: checkForGaps,
|
||||
processingKey: processingKey,
|
||||
expectedDifference: expectedDifference,
|
||||
errorState: errorState,
|
||||
writeFilePath: writeFilePath,
|
||||
db: db,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func MinMax(array []*big.Int) (*big.Int, *big.Int) {
|
||||
var max *big.Int = array[0]
|
||||
var min *big.Int = array[0]
|
||||
for _, value := range array {
|
||||
if max.Cmp(value) == -1 {
|
||||
max = value
|
||||
}
|
||||
if min.Cmp(value) == 1 {
|
||||
min = value
|
||||
}
|
||||
}
|
||||
return min, max
|
||||
}
|
||||
|
||||
// This function actually performs the write of the known gaps. It will try to do the following, it only goes to the next step if a failure occurs.
|
||||
// 1. Write to the DB directly.
|
||||
// 2. Write to sql file locally.
|
||||
// 3. Write to prometheus directly.
|
||||
// 4. Logs and error.
|
||||
func (kg *KnownGapsState) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
||||
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||
}
|
||||
knownGap := models.KnownGapsModel{
|
||||
StartingBlockNumber: startingBlockNumber.String(),
|
||||
EndingBlockNumber: endingBlockNumber.String(),
|
||||
CheckedOut: checkedOut,
|
||||
ProcessingKey: processingKey,
|
||||
}
|
||||
log.Info("Writing known gaps to the DB")
|
||||
|
||||
var writeErr error
|
||||
if kg.db != nil {
|
||||
dbErr := kg.upsertKnownGaps(knownGap)
|
||||
if dbErr != nil {
|
||||
log.Warn("Error writing knownGaps to DB, writing them to file instead")
|
||||
writeErr = kg.upsertKnownGapsFile(knownGap)
|
||||
}
|
||||
} else {
|
||||
writeErr = kg.upsertKnownGapsFile(knownGap)
|
||||
|
||||
}
|
||||
if writeErr != nil {
|
||||
log.Info("Unsuccessful when writing to a file", "Error", writeErr)
|
||||
return writeErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a simple wrapper function to write gaps from a knownErrorBlocks array.
|
||||
func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) {
|
||||
startErrorBlock, endErrorBlock := MinMax(knownErrorBlocks)
|
||||
|
||||
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey)
|
||||
kg.PushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)
|
||||
|
||||
}
|
||||
|
||||
// Users provide the latestBlockInDb and the latestBlockOnChain
|
||||
// as well as the expected difference. This function does some simple math.
|
||||
// The expected difference for the time being is going to be 1, but as we run
|
||||
// More geth nodes, the expected difference might fluctuate.
|
||||
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
||||
latestBlock := big.NewInt(0)
|
||||
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
|
||||
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
}
|
||||
|
||||
// This function will check for Gaps and update the DB if gaps are found.
|
||||
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
||||
// It might be a useful parameter to update depending on the geth node.
|
||||
// TODO:
|
||||
// REmove the return value
|
||||
// Write to file if err in writing to DB
|
||||
func (kg *KnownGapsState) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
||||
// Make this global
|
||||
latestBlockInDb, err := kg.QueryDbToBigInt(dbQueryString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
|
||||
if gapExists {
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||
|
||||
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
|
||||
err := kg.PushKnownGaps(startBlock, endBlock, false, processingKey)
|
||||
if err != nil {
|
||||
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upserts known gaps to the DB.
|
||||
// INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
|
||||
func (kg *KnownGapsState) upsertKnownGaps(knownGaps models.KnownGapsModel) error {
|
||||
_, err := kg.db.Exec(context.Background(), kg.db.InsertKnownGapsStm(),
|
||||
knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting known_gaps entry: %v", err)
|
||||
}
|
||||
log.Info("Successfully Wrote gaps to the DB", "startBlock", knownGaps.StartingBlockNumber, "endBlock", knownGaps.EndingBlockNumber)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) error {
|
||||
insertStmt := []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
||||
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
||||
log.Info("Trying to write file")
|
||||
if kg.writeFilePath == "" {
|
||||
kg.writeFilePath = defaultWriteFilePath
|
||||
}
|
||||
f, err := os.OpenFile(kg.writeFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
log.Info("Unable to open a file for writing")
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err = f.Write(insertStmt); err != nil {
|
||||
log.Info("Unable to open write insert statement to file")
|
||||
return err
|
||||
}
|
||||
log.Info("Wrote the gaps to a local SQL file")
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a simple wrapper function which will run QueryRow on the DB
|
||||
func (kg *KnownGapsState) QueryDb(queryString string) (string, error) {
|
||||
var ret string
|
||||
err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
||||
return "", err
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// This function is a simple wrapper which will call QueryDb but the return value will be
|
||||
// a big int instead of a string
|
||||
func (kg *KnownGapsState) QueryDbToBigInt(queryString string) (*big.Int, error) {
|
||||
ret := new(big.Int)
|
||||
res, err := kg.QueryDb(queryString)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
ret, ok := ret.SetString(res, 10)
|
||||
if !ok {
|
||||
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
||||
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
218
statediff/known_gaps_test.go
Normal file
218
statediff/known_gaps_test.go
Normal file
@ -0,0 +1,218 @@
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
chainConf = params.MainnetChainConfig
|
||||
knownGapsFilePath = "./known_gaps.sql"
|
||||
)
|
||||
|
||||
type gapValues struct {
|
||||
lastProcessedBlock int64
|
||||
currentBlock int64
|
||||
knownErrorBlocksStart int64
|
||||
knownErrorBlocksEnd int64
|
||||
expectedDif int64
|
||||
processingKey int64
|
||||
}
|
||||
|
||||
// Add clean db
|
||||
// Test for failures when they are expected, when we go from smaller block to larger block
|
||||
// We should no longer see the smaller block in DB
|
||||
func TestKnownGaps(t *testing.T) {
|
||||
|
||||
tests := []gapValues{
|
||||
// Known Gaps
|
||||
{knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, expectedDif: 1, processingKey: 1},
|
||||
/// Same tests as above with a new expected DIF
|
||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, expectedDif: 2, processingKey: 2},
|
||||
// Test update when block number is larger!!
|
||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1204, expectedDif: 2, processingKey: 2},
|
||||
// Update when processing key is different!
|
||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1204, expectedDif: 2, processingKey: 10},
|
||||
}
|
||||
|
||||
testWriteToDb(t, tests, true)
|
||||
testWriteToFile(t, tests, true)
|
||||
//testFindAndUpdateGaps(t, true)
|
||||
}
|
||||
|
||||
// test writing blocks to the DB
|
||||
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
t.Log("Starting Write to DB test")
|
||||
db := setupDb(t)
|
||||
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
t.Log("Cleaning up eth.known_gaps table")
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// Create an array with knownGaps based on user inputs
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: tc.processingKey,
|
||||
expectedDifference: big.NewInt(tc.expectedDif),
|
||||
db: db,
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
}
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
service.KnownGaps.knownErrorBlocks = knownErrorBlocks
|
||||
// Upsert
|
||||
testCaptureErrorBlocks(t, service)
|
||||
// Validate that the upsert was done correctly.
|
||||
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
tearDown(t, db)
|
||||
|
||||
}
|
||||
|
||||
// test writing blocks to file and then inserting them to DB
|
||||
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
t.Log("Starting write to file test")
|
||||
db := setupDb(t)
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
t.Log("Cleaning up eth.known_gaps table")
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
if _, err := os.Stat(knownGapsFilePath); err == nil {
|
||||
err := os.Remove(knownGapsFilePath)
|
||||
if err != nil {
|
||||
t.Fatal("Can't delete local file")
|
||||
}
|
||||
}
|
||||
tearDown(t, db)
|
||||
for _, tc := range tests {
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: tc.processingKey,
|
||||
expectedDifference: big.NewInt(tc.expectedDif),
|
||||
writeFilePath: knownGapsFilePath,
|
||||
db: nil, // Only set to nil to be verbose that we can't use it
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
}
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
service.KnownGaps.knownErrorBlocks = knownErrorBlocks
|
||||
|
||||
testCaptureErrorBlocks(t, service)
|
||||
|
||||
file, ioErr := ioutil.ReadFile(knownGapsFilePath)
|
||||
require.NoError(t, ioErr)
|
||||
|
||||
requests := strings.Split(string(file), ";")
|
||||
|
||||
newDb := setupDb(t)
|
||||
service.KnownGaps.db = newDb
|
||||
for _, request := range requests {
|
||||
_, err := newDb.Exec(context.Background(), request)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// Validate that the upsert was done correctly.
|
||||
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
tearDown(t, newDb)
|
||||
}
|
||||
}
|
||||
|
||||
// Find a gap, if no gaps exist, it will create an arbitrary one
|
||||
func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
|
||||
db := setupDb(t)
|
||||
|
||||
if wipeDbBeforeStart {
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: 1,
|
||||
expectedDifference: big.NewInt(1),
|
||||
db: db,
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
}
|
||||
|
||||
latestBlockInDb, err := service.KnownGaps.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
||||
if err != nil {
|
||||
t.Skip("Can't find a block in the eth.header_cids table.. Please put one there")
|
||||
}
|
||||
|
||||
// Add the gapDifference for testing purposes
|
||||
gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain
|
||||
expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be
|
||||
|
||||
latestBlockOnChain := big.NewInt(0)
|
||||
latestBlockOnChain.Add(latestBlockInDb, gapDifference)
|
||||
|
||||
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
||||
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
||||
|
||||
gapUpsertErr := service.KnownGaps.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
|
||||
require.NoError(t, gapUpsertErr)
|
||||
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
|
||||
startBlock.Add(latestBlockInDb, gapDifference)
|
||||
endBlock.Sub(latestBlockOnChain, gapDifference)
|
||||
validateUpsert(t, service, startBlock.Int64(), endBlock.Int64())
|
||||
|
||||
}
|
||||
|
||||
// test capturing missed blocks
|
||||
func testCaptureErrorBlocks(t *testing.T, service *Service) {
|
||||
service.KnownGaps.captureErrorBlocks(service.KnownGaps.knownErrorBlocks)
|
||||
}
|
||||
|
||||
// Helper function to create an array of gaps given a start and end block
|
||||
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
|
||||
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||
}
|
||||
return knownErrorBlocks
|
||||
}
|
||||
|
||||
// Make sure the upsert was performed correctly
|
||||
func validateUpsert(t *testing.T, service *Service, startingBlock int64, endingBlock int64) {
|
||||
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||
|
||||
_, queryErr := service.KnownGaps.QueryDb(queryString) // Figure out the string.
|
||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
||||
require.NoError(t, queryErr)
|
||||
}
|
||||
|
||||
// Create a DB object to use
|
||||
func setupDb(t *testing.T) sql.Database {
|
||||
db, err := postgres.SetupSQLXDB()
|
||||
if err != nil {
|
||||
t.Error("Can't create a DB connection....")
|
||||
t.Fatal(err)
|
||||
}
|
||||
// stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||
return db
|
||||
}
|
||||
|
||||
// Teardown the DB
|
||||
func tearDown(t *testing.T, db sql.Database) {
|
||||
t.Log("Starting tearDown")
|
||||
db.Close()
|
||||
}
|
@ -18,7 +18,6 @@ package statediff
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -42,6 +41,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
@ -140,29 +140,6 @@ type Service struct {
|
||||
}
|
||||
|
||||
// This structure keeps track of the knownGaps at any given moment in time
|
||||
type KnownGapsState struct {
|
||||
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||
checkForGaps bool
|
||||
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||
processingKey int64
|
||||
// This number indicates the expected difference between blocks.
|
||||
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
||||
// Tandom with the processingKey to differentiate block processing logic.
|
||||
expectedDifference *big.Int
|
||||
// Indicates if Geth is in an error state
|
||||
// This is used to indicate the right time to upserts
|
||||
errorState bool
|
||||
// This array keeps track of errorBlocks as they occur.
|
||||
// When the errorState is false again, we can process these blocks.
|
||||
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
||||
knownErrorBlocks []*big.Int
|
||||
// The last processed block keeps track of the last processed block.
|
||||
// Its used to make sure we didn't skip over any block!
|
||||
lastProcessedBlock *big.Int
|
||||
// This fileIndexer is used to write the knownGaps to file
|
||||
// If we can't properly write to DB
|
||||
fileIndexer interfaces.StateDiffIndexer
|
||||
}
|
||||
|
||||
// BlockCache caches the last block for safe access from different service loops
|
||||
type BlockCache struct {
|
||||
@ -183,7 +160,7 @@ func NewBlockCache(max uint) BlockCache {
|
||||
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
||||
blockChain := ethServ.BlockChain()
|
||||
var indexer interfaces.StateDiffIndexer
|
||||
var fileIndexer interfaces.StateDiffIndexer
|
||||
var db sql.Database
|
||||
quitCh := make(chan bool)
|
||||
if params.IndexerConfig != nil {
|
||||
info := nodeinfo.Info{
|
||||
@ -194,31 +171,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
ClientName: params.ClientName,
|
||||
}
|
||||
var err error
|
||||
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
|
||||
db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
||||
if err != nil {
|
||||
log.Error("Error creating indexer", "indexer: ", params.IndexerConfig.Type(), "error: ", err)
|
||||
return err
|
||||
}
|
||||
if params.FileConfig != nil {
|
||||
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.FileConfig, shared.FILE)
|
||||
if err != nil {
|
||||
log.Error("Error creating file indexer", "error: ", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
fileIndexer = indexer
|
||||
}
|
||||
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
|
||||
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
||||
}
|
||||
|
||||
var checkForGaps bool
|
||||
if params.IndexerConfig.Type() == shared.POSTGRES {
|
||||
checkForGaps = true
|
||||
} else {
|
||||
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
||||
checkForGaps = false
|
||||
}
|
||||
workers := params.NumWorkers
|
||||
if workers == 0 {
|
||||
workers = 1
|
||||
@ -226,11 +185,17 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
// If we ever have multiple processingKeys we can update them here
|
||||
// along with the expectedDifference
|
||||
knownGaps := &KnownGapsState{
|
||||
checkForGaps: checkForGaps,
|
||||
processingKey: 0,
|
||||
expectedDifference: big.NewInt(1),
|
||||
errorState: false,
|
||||
fileIndexer: fileIndexer,
|
||||
writeFilePath: params.KnownGapsFilePath,
|
||||
db: db,
|
||||
}
|
||||
if params.IndexerConfig.Type() == shared.POSTGRES {
|
||||
knownGaps.checkForGaps = true
|
||||
} else {
|
||||
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
||||
knownGaps.checkForGaps = false
|
||||
}
|
||||
sds := &Service{
|
||||
Mutex: sync.Mutex{},
|
||||
@ -348,75 +313,6 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
|
||||
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||
}
|
||||
|
||||
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
||||
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
|
||||
// away from sds.KnownGaps.expectedDifference
|
||||
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
|
||||
func (sds *Service) captureMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
|
||||
// last processed: 110
|
||||
// current block: 125
|
||||
log.Debug("current block", "block number: ", currentBlock)
|
||||
log.Debug("knownErrorBlocks", "knownErrorBlocks: ", knownErrorBlocks)
|
||||
log.Debug("last processed block", "block number: ", lastProcessedBlock)
|
||||
log.Debug("expected difference", "sds.KnownGaps.expectedDifference: ", sds.KnownGaps.expectedDifference)
|
||||
|
||||
if len(knownErrorBlocks) > 0 {
|
||||
// 115
|
||||
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
|
||||
// 120
|
||||
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
|
||||
|
||||
// 111
|
||||
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||
// 124
|
||||
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||
|
||||
if (expectedStartErrorBlock.Cmp(startErrorBlock) != 0) &&
|
||||
(expectedEndErrorBlock.Cmp(endErrorBlock) != 0) {
|
||||
log.Info("All Gaps already captured in knownErrorBlocks")
|
||||
}
|
||||
|
||||
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
|
||||
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn("But there are gaps that were also not added there.")
|
||||
log.Warn("Last Block in knownErrorBlocks", "endErrorBlock", endErrorBlock)
|
||||
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
|
||||
log.Warn("Current Block", "currentBlock", currentBlock)
|
||||
//120 + 1 == 121
|
||||
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
||||
// 121 to 124
|
||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
||||
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
}
|
||||
|
||||
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
||||
log.Warn("There are gaps in the knownErrorBlocks list", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn("But there are gaps that were also not added there.")
|
||||
log.Warn("First Block in knownErrorBlocks", "startErrorBlock", startErrorBlock)
|
||||
log.Warn("Last processed Block", "lastProcessedBlock", lastProcessedBlock)
|
||||
// 115 - 1 == 114
|
||||
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
||||
// 111 to 114
|
||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
||||
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
}
|
||||
|
||||
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
||||
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
|
||||
} else {
|
||||
log.Warn("We missed blocks without any errors.")
|
||||
// 110 + 1 == 111
|
||||
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||
// 125 - 1 == 124
|
||||
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||
log.Warn("Missed blocks starting from", "startBlock", startBlock)
|
||||
log.Warn("Missed blocks ending at", "endBlock", endBlock)
|
||||
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
}
|
||||
}
|
||||
|
||||
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
defer params.wg.Done()
|
||||
for {
|
||||
@ -440,7 +336,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
// Check and update the gaps table.
|
||||
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
||||
log.Info("Checking for Gaps at", "current block", currentBlock.Number())
|
||||
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||
go sds.KnownGaps.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
|
||||
sds.KnownGaps.checkForGaps = false
|
||||
}
|
||||
|
||||
@ -449,34 +345,22 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
if err != nil {
|
||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||
sds.KnownGaps.errorState = true
|
||||
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "blockNumber", currentBlock.Number())
|
||||
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
|
||||
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "block number", currentBlock.Number())
|
||||
// Write object to startdiff
|
||||
continue
|
||||
}
|
||||
sds.KnownGaps.errorState = false
|
||||
// Understand what the last block that should have been processed is
|
||||
previousExpectedBlock := big.NewInt(0).Sub(currentBlock.Number(), sds.KnownGaps.expectedDifference)
|
||||
// If we last block which should have been processed is not
|
||||
// the actual lastProcessedBlock, add it to known gaps table.
|
||||
if sds.KnownGaps.lastProcessedBlock != nil {
|
||||
// Can't combine the two if statements because you can't compare a nil object...
|
||||
if previousExpectedBlock.Cmp(sds.KnownGaps.lastProcessedBlock) != 0 {
|
||||
// We must pass in parameters by VALUE not reference.
|
||||
// If we pass them in my reference, the references can change before the computation is complete!
|
||||
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
||||
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
||||
staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock)
|
||||
if sds.KnownGaps.knownErrorBlocks != nil {
|
||||
// We must pass in parameters by VALUE not reference.
|
||||
// If we pass them in my reference, the references can change before the computation is complete!
|
||||
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
||||
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
||||
sds.KnownGaps.knownErrorBlocks = nil
|
||||
|
||||
log.Debug("Starting capturedMissedBlocks")
|
||||
log.Debug("previousExpectedBlock is", "previous expected block: ", previousExpectedBlock)
|
||||
log.Debug("sds.KnownGaps.lastProcessedBlock is", "sds.KnownGaps.lastProcessedBlock: ", sds.KnownGaps.lastProcessedBlock)
|
||||
|
||||
go sds.captureMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock)
|
||||
sds.KnownGaps.knownErrorBlocks = nil
|
||||
}
|
||||
log.Debug("Starting capturedMissedBlocks")
|
||||
go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks)
|
||||
}
|
||||
sds.KnownGaps.lastProcessedBlock = currentBlock.Number()
|
||||
|
||||
// TODO: how to handle with concurrent workers
|
||||
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||
|
@ -1,242 +0,0 @@
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
db sql.Database
|
||||
err error
|
||||
indexer interfaces.StateDiffIndexer
|
||||
chainConf = params.MainnetChainConfig
|
||||
)
|
||||
|
||||
type gapValues struct {
|
||||
lastProcessedBlock int64
|
||||
currentBlock int64
|
||||
knownErrorBlocksStart int64
|
||||
knownErrorBlocksEnd int64
|
||||
expectedDif int64
|
||||
processingKey int64
|
||||
}
|
||||
|
||||
// Add clean db
|
||||
// Test for failures when they are expected, when we go from smaller block to larger block
|
||||
// We should no longer see the smaller block in DB
|
||||
func TestKnownGaps(t *testing.T) {
|
||||
|
||||
tests := []gapValues{
|
||||
// Unprocessed gaps before and after knownErrorBlock
|
||||
{lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1, processingKey: 1},
|
||||
// No knownErrorBlocks
|
||||
{lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1, processingKey: 1},
|
||||
// No gaps before or after knownErrorBlocks
|
||||
{lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1, processingKey: 1},
|
||||
// gaps before knownErrorBlocks but not after
|
||||
{lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1, processingKey: 1},
|
||||
// gaps after knownErrorBlocks but not before
|
||||
{lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1, processingKey: 1},
|
||||
/// Same tests as above with a new expected DIF
|
||||
// Unprocessed gaps before and after knownErrorBlock
|
||||
{lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2, processingKey: 2},
|
||||
// No knownErrorBlocks
|
||||
{lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2, processingKey: 2},
|
||||
// No gaps before or after knownErrorBlocks
|
||||
{lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2, processingKey: 2},
|
||||
// gaps before knownErrorBlocks but not after
|
||||
{lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2, processingKey: 2},
|
||||
// gaps after knownErrorBlocks but not before
|
||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||
// Test update when block number is larger!!
|
||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||
// Update when processing key is different!
|
||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10},
|
||||
}
|
||||
|
||||
testWriteToDb(t, tests, true)
|
||||
testWriteToFile(t, tests, true)
|
||||
}
|
||||
|
||||
// test writing blocks to the DB
|
||||
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
stateDiff, db, err := setupDb(t)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// Create an array with knownGaps based on user inputs
|
||||
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
if checkGaps {
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
// Upsert
|
||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
|
||||
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, nil)
|
||||
// Validate that the upsert was done correctly.
|
||||
callValidateUpsert(t, checkGaps, stateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
tearDown(t, stateDiff)
|
||||
|
||||
}
|
||||
|
||||
// test writing blocks to file and then inserting them to DB
|
||||
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
stateDiff, db, err := setupDb(t)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
db.Exec(context.Background(), "DELETE FROM eth.known_gaps")
|
||||
}
|
||||
|
||||
tearDown(t, stateDiff)
|
||||
for _, tc := range tests {
|
||||
// Reuse processing key from expecteDiff
|
||||
checkGaps := tc.knownErrorBlocksStart != 0 && tc.knownErrorBlocksEnd != 0
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
if checkGaps {
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
|
||||
fileInd := setupFile(t)
|
||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd,
|
||||
tc.expectedDif, tc.processingKey, stateDiff, knownErrorBlocks, fileInd)
|
||||
fileInd.Close()
|
||||
|
||||
newStateDiff, db, _ := setupDb(t)
|
||||
|
||||
file, ioErr := ioutil.ReadFile(file.TestConfig.FilePath)
|
||||
require.NoError(t, ioErr)
|
||||
|
||||
requests := strings.Split(string(file), ";")
|
||||
|
||||
// Skip the first two enteries
|
||||
for _, request := range requests[2:] {
|
||||
_, err := db.Exec(context.Background(), request)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
callValidateUpsert(t, checkGaps, newStateDiff, tc.lastProcessedBlock, tc.currentBlock, tc.expectedDif, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
}
|
||||
|
||||
// test capturing missed blocks
|
||||
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64,
|
||||
stateDiff *sql.StateDiffIndexer, knownErrorBlocks []*big.Int, fileInd interfaces.StateDiffIndexer) {
|
||||
|
||||
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
||||
currentBlock := big.NewInt(currentBlockNum)
|
||||
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: processingKey,
|
||||
expectedDifference: big.NewInt(expectedDif),
|
||||
fileIndexer: fileInd,
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
indexer: stateDiff,
|
||||
}
|
||||
service.captureMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
|
||||
}
|
||||
|
||||
// Helper function to create an array of gaps given a start and end block
|
||||
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
|
||||
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||
}
|
||||
return knownErrorBlocks
|
||||
}
|
||||
|
||||
// This function will call the validateUpsert function based on various conditions.
|
||||
func callValidateUpsert(t *testing.T, checkGaps bool, stateDiff *sql.StateDiffIndexer,
|
||||
lastBlockProcessed int64, currentBlockNum int64, expectedDif int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) {
|
||||
// If there are gaps in knownErrorBlocks array
|
||||
if checkGaps {
|
||||
|
||||
// If there are no unexpected gaps before or after the entries in the knownErrorBlocks array
|
||||
// Only handle the knownErrorBlocks Array
|
||||
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
|
||||
// If there are gaps after knownErrorBlocks array, process them
|
||||
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||
|
||||
// If there are gaps before knownErrorBlocks array, process them
|
||||
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
|
||||
// if there are gaps before, after, and within the knownErrorBlocks array,handle all the errors.
|
||||
} else {
|
||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||
|
||||
}
|
||||
} else {
|
||||
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Make sure the upsert was performed correctly
|
||||
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
|
||||
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||
|
||||
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
||||
require.NoError(t, queryErr)
|
||||
}
|
||||
|
||||
// Create a DB object to use
|
||||
func setupDb(t *testing.T) (*sql.StateDiffIndexer, sql.Database, error) {
|
||||
db, err = postgres.SetupSQLXDB()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||
return stateDiff, db, err
|
||||
}
|
||||
|
||||
// Create a file statediff indexer.
|
||||
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
indexer, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||
require.NoError(t, err)
|
||||
return indexer
|
||||
}
|
||||
|
||||
// Teardown the DB
|
||||
func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) {
|
||||
t.Log("Starting tearDown")
|
||||
sql.TearDownDB(t, db)
|
||||
err := stateDiff.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
Loading…
Reference in New Issue
Block a user