% Checkpoint - Added the feature to write to File if writing to DB errors out. NOT TESTED
This commit is contained in:
parent
f6ff20eb0e
commit
f566aa780c
@ -31,11 +31,18 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface
|
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
|
||||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) {
|
// You can specify the specific
|
||||||
switch config.Type() {
|
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config, specificIndexer shared.DBType) (interfaces.StateDiffIndexer, error) {
|
||||||
|
var indexerToCreate shared.DBType
|
||||||
|
if specificIndexer != "" {
|
||||||
|
indexerToCreate = config.Type()
|
||||||
|
} else {
|
||||||
|
indexerToCreate = specificIndexer
|
||||||
|
}
|
||||||
|
switch indexerToCreate {
|
||||||
case shared.FILE:
|
case shared.FILE:
|
||||||
log.Info("Starting statediff service in SQL file writing mode")
|
log.Info("Creating a statediff indexer in SQL file writing mode")
|
||||||
fc, ok := config.(file.Config)
|
fc, ok := config.(file.Config)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||||
@ -43,7 +50,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
|||||||
fc.NodeInfo = nodeInfo
|
fc.NodeInfo = nodeInfo
|
||||||
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
||||||
case shared.POSTGRES:
|
case shared.POSTGRES:
|
||||||
log.Info("Starting statediff service in Postgres writing mode")
|
log.Info("Creating a statediff service in Postgres writing mode")
|
||||||
pgc, ok := config.(postgres.Config)
|
pgc, ok := config.(postgres.Config)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||||
@ -62,17 +69,17 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unrecongized Postgres driver type: %s", pgc.Driver)
|
return nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||||
}
|
}
|
||||||
return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver))
|
return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver))
|
||||||
case shared.DUMP:
|
case shared.DUMP:
|
||||||
log.Info("Starting statediff service in data dump mode")
|
log.Info("Creating statediff indexer in data dump mode")
|
||||||
dumpc, ok := config.(dump.Config)
|
dumpc, ok := config.(dump.Config)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
|
return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{})
|
||||||
}
|
}
|
||||||
return dump.NewStateDiffIndexer(chainConfig, dumpc), nil
|
return dump.NewStateDiffIndexer(chainConfig, dumpc), nil
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unrecognized database type: %s", config.Type())
|
return nil, fmt.Errorf("unrecognized database type: %s", indexerToCreate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -497,9 +497,9 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
return sdi.dump.Close()
|
return sdi.dump.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, index interfaces.StateDiffIndexer) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, index interfaces.StateDiffIndexer) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -479,10 +479,22 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
return sdi.fileWriter.Close()
|
return sdi.fileWriter.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer interfaces.StateDiffIndexer) error {
|
||||||
|
log.Info("Writing Gaps to file")
|
||||||
|
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||||
|
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||||
|
}
|
||||||
|
knownGap := models.KnownGapsModel{
|
||||||
|
StartingBlockNumber: startingBlockNumber.String(),
|
||||||
|
EndingBlockNumber: endingBlockNumber.String(),
|
||||||
|
CheckedOut: checkedOut,
|
||||||
|
ProcessingKey: processingKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
sdi.fileWriter.upsertKnownGaps(knownGap)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -155,6 +155,14 @@ const (
|
|||||||
|
|
||||||
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
|
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
|
||||||
"node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
|
"node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
|
||||||
|
// INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key)
|
||||||
|
//VALUES ($1, $2, $3, $4)
|
||||||
|
// ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
|
||||||
|
// WHERE eth.known_gaps.ending_block_number <= $2
|
||||||
|
knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||||
|
"VALUES ('%s', '%s', %t, %d) " +
|
||||||
|
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||||
|
"WHERE eth.known_gaps.ending_block_number <= '%s';\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
|
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
|
||||||
@ -253,3 +261,12 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
|||||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
|
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sqw *SQLWriter) upsertKnownGaps(knownGaps models.KnownGapsModel) {
|
||||||
|
sqw.stmts <- []byte(fmt.Sprintf(knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
||||||
|
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
||||||
|
//knownGapsInsert = "INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||||
|
// "VALUES ('%s', '%s', %t, %d) " +
|
||||||
|
// "ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||||
|
// "WHERE eth.known_gaps.ending_block_number <= '%s';\n"
|
||||||
|
}
|
||||||
|
@ -555,7 +555,7 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update the known gaps table with the gap information.
|
// Update the known gaps table with the gap information.
|
||||||
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
|
||||||
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||||
}
|
}
|
||||||
@ -565,7 +565,10 @@ func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingB
|
|||||||
CheckedOut: checkedOut,
|
CheckedOut: checkedOut,
|
||||||
ProcessingKey: processingKey,
|
ProcessingKey: processingKey,
|
||||||
}
|
}
|
||||||
|
log.Info("Writing known gaps to the DB")
|
||||||
if err := sdi.dbWriter.upsertKnownGaps(knownGap); err != nil {
|
if err := sdi.dbWriter.upsertKnownGaps(knownGap); err != nil {
|
||||||
|
log.Warn("Error writing knownGaps to DB, writing them to file instead")
|
||||||
|
fileIndexer.PushKnownGaps(startingBlockNumber, endingBlockNumber, checkedOut, processingKey, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -617,7 +620,7 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
|
|||||||
// TODO:
|
// TODO:
|
||||||
// REmove the return value
|
// REmove the return value
|
||||||
// Write to file if err in writing to DB
|
// Write to file if err in writing to DB
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, fileIndexer interfaces.StateDiffIndexer) error {
|
||||||
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
||||||
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -632,7 +635,7 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
|
|||||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||||
|
|
||||||
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
|
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
|
||||||
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey)
|
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey, fileIndexer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Write to file SQL file instead!!!
|
// Write to file SQL file instead!!!
|
||||||
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
||||||
|
@ -3,18 +3,21 @@ package sql_test
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
@ -24,6 +27,7 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
db sql.Database
|
db sql.Database
|
||||||
|
sqlxdb *sqlx.DB
|
||||||
err error
|
err error
|
||||||
ind interfaces.StateDiffIndexer
|
ind interfaces.StateDiffIndexer
|
||||||
chainConf = params.MainnetChainConfig
|
chainConf = params.MainnetChainConfig
|
||||||
@ -162,6 +166,16 @@ func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
|
|||||||
return stateDiff, err
|
return stateDiff, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
|
||||||
|
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||||
|
err := os.Remove(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return ind
|
||||||
|
}
|
||||||
|
|
||||||
func tearDown(t *testing.T) {
|
func tearDown(t *testing.T) {
|
||||||
sql.TearDownDB(t, db)
|
sql.TearDownDB(t, db)
|
||||||
err := ind.Close()
|
err := ind.Close()
|
||||||
@ -180,6 +194,8 @@ func testKnownGapsUpsert(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fileInd := setupFile(t)
|
||||||
|
|
||||||
// Get the latest block from the DB
|
// Get the latest block from the DB
|
||||||
latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -193,7 +209,7 @@ func testKnownGapsUpsert(t *testing.T) {
|
|||||||
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
||||||
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
||||||
|
|
||||||
gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
|
gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0, fileInd)
|
||||||
require.NoError(t, gapUpsertErr)
|
require.NoError(t, gapUpsertErr)
|
||||||
|
|
||||||
// Calculate what the start and end block should be in known_gaps
|
// Calculate what the start and end block should be in known_gaps
|
||||||
@ -206,7 +222,7 @@ func testKnownGapsUpsert(t *testing.T) {
|
|||||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock)
|
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock)
|
||||||
|
|
||||||
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)
|
|
||||||
require.NoError(t, queryErr)
|
require.NoError(t, queryErr)
|
||||||
|
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,8 @@ func (db *DB) InsertIPLDsStm() string {
|
|||||||
|
|
||||||
// InsertKnownGapsStm satisfies the sql.Statements interface
|
// InsertKnownGapsStm satisfies the sql.Statements interface
|
||||||
func (db *DB) InsertKnownGapsStm() string {
|
func (db *DB) InsertKnownGapsStm() string {
|
||||||
return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4) ON CONFLICT (starting_block_number) DO NOTHING`
|
return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES ($1, $2, $3, $4)
|
||||||
|
ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ($2, $4)
|
||||||
|
WHERE eth.known_gaps.ending_block_number <= $2`
|
||||||
//return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES (1, 2, true, 1)`
|
//return `INSERT INTO eth.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) VALUES (1, 2, true, 1)`
|
||||||
}
|
}
|
||||||
|
@ -32,8 +32,10 @@ type StateDiffIndexer interface {
|
|||||||
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
|
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
|
||||||
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||||
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||||
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
|
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64, indexer StateDiffIndexer) error
|
||||||
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
|
// The indexer at the end allows us to pass one indexer to another.
|
||||||
|
// We use then for the SQL indexer, we pass it the file indexer so it can write to file if writing to the DB fails.
|
||||||
|
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64, indexer StateDiffIndexer) error
|
||||||
io.Closer
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +44,7 @@ import (
|
|||||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||||
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
)
|
)
|
||||||
@ -125,7 +126,7 @@ type Service struct {
|
|||||||
// Should the statediff service wait for geth to sync to head?
|
// Should the statediff service wait for geth to sync to head?
|
||||||
WaitForSync bool
|
WaitForSync bool
|
||||||
// Used to signal if we should check for KnownGaps
|
// Used to signal if we should check for KnownGaps
|
||||||
KnownGaps KnownGaps
|
KnownGaps KnownGapsState
|
||||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||||
subscribers int32
|
subscribers int32
|
||||||
// Interface for publishing statediffs as PG-IPLD objects
|
// Interface for publishing statediffs as PG-IPLD objects
|
||||||
@ -138,7 +139,8 @@ type Service struct {
|
|||||||
maxRetry uint
|
maxRetry uint
|
||||||
}
|
}
|
||||||
|
|
||||||
type KnownGaps struct {
|
// This structure keeps track of the knownGaps at any given moment in time
|
||||||
|
type KnownGapsState struct {
|
||||||
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||||
checkForGaps bool
|
checkForGaps bool
|
||||||
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||||
@ -157,6 +159,9 @@ type KnownGaps struct {
|
|||||||
// The last processed block keeps track of the last processed block.
|
// The last processed block keeps track of the last processed block.
|
||||||
// Its used to make sure we didn't skip over any block!
|
// Its used to make sure we didn't skip over any block!
|
||||||
lastProcessedBlock *big.Int
|
lastProcessedBlock *big.Int
|
||||||
|
// This fileIndexer is used to write the knownGaps to file
|
||||||
|
// If we can't properly write to DB
|
||||||
|
fileIndexer interfaces.StateDiffIndexer
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
||||||
@ -192,7 +197,7 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks
|
|||||||
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
// 121 to 124
|
// 121 to 124
|
||||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
||||||
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey)
|
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
}
|
}
|
||||||
|
|
||||||
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
||||||
@ -204,12 +209,12 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks
|
|||||||
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
// 111 to 114
|
// 111 to 114
|
||||||
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
||||||
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey)
|
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
|
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
|
||||||
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
||||||
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey)
|
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
log.Warn("We missed blocks without any errors.")
|
log.Warn("We missed blocks without any errors.")
|
||||||
@ -219,7 +224,7 @@ func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks
|
|||||||
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
|
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
|
||||||
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
|
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
|
||||||
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey)
|
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -242,6 +247,7 @@ func NewBlockCache(max uint) BlockCache {
|
|||||||
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
||||||
blockChain := ethServ.BlockChain()
|
blockChain := ethServ.BlockChain()
|
||||||
var indexer interfaces.StateDiffIndexer
|
var indexer interfaces.StateDiffIndexer
|
||||||
|
var fileIndexer interfaces.StateDiffIndexer
|
||||||
quitCh := make(chan bool)
|
quitCh := make(chan bool)
|
||||||
if params.IndexerConfig != nil {
|
if params.IndexerConfig != nil {
|
||||||
info := nodeinfo.Info{
|
info := nodeinfo.Info{
|
||||||
@ -252,23 +258,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
ClientName: params.ClientName,
|
ClientName: params.ClientName,
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if params.IndexerConfig.Type() != shared.FILE {
|
||||||
|
fileIndexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig, "")
|
||||||
|
log.Info("Starting the statediff service in ", "mode", params.IndexerConfig.Type())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Info("Starting the statediff service in ", "mode", "File")
|
||||||
|
fileIndexer = indexer
|
||||||
|
}
|
||||||
|
//fileIndexer, fileErr = file.NewStateDiffIndexer(params.Context, blockChain.Config(), info)
|
||||||
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
workers := params.NumWorkers
|
workers := params.NumWorkers
|
||||||
if workers == 0 {
|
if workers == 0 {
|
||||||
workers = 1
|
workers = 1
|
||||||
}
|
}
|
||||||
// If we ever have multiple processingKeys we can update them here
|
// If we ever have multiple processingKeys we can update them here
|
||||||
// along with the expectedDifference
|
// along with the expectedDifference
|
||||||
knownGaps := &KnownGaps{
|
knownGaps := &KnownGapsState{
|
||||||
checkForGaps: true,
|
checkForGaps: true,
|
||||||
processingKey: 0,
|
processingKey: 0,
|
||||||
expectedDifference: big.NewInt(1),
|
expectedDifference: big.NewInt(1),
|
||||||
errorState: false,
|
errorState: false,
|
||||||
|
fileIndexer: fileIndexer,
|
||||||
}
|
}
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
@ -409,7 +429,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
// Check and update the gaps table.
|
// Check and update the gaps table.
|
||||||
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
||||||
log.Info("Checking for Gaps at current block: ", currentBlock.Number())
|
log.Info("Checking for Gaps at current block: ", currentBlock.Number())
|
||||||
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
|
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey, sds.KnownGaps.fileIndexer)
|
||||||
sds.KnownGaps.checkForGaps = false
|
sds.KnownGaps.checkForGaps = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,14 +2,18 @@ package statediff
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -20,6 +24,9 @@ var (
|
|||||||
chainConf = params.MainnetChainConfig
|
chainConf = params.MainnetChainConfig
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Add clean db
|
||||||
|
// Test for failures when they are expected, when we go from smaller block to larger block
|
||||||
|
// We should no longer see the smaller block in DB
|
||||||
func TestKnownGaps(t *testing.T) {
|
func TestKnownGaps(t *testing.T) {
|
||||||
type gapValues struct {
|
type gapValues struct {
|
||||||
lastProcessedBlock int64
|
lastProcessedBlock int64
|
||||||
@ -27,40 +34,48 @@ func TestKnownGaps(t *testing.T) {
|
|||||||
knownErrorBlocksStart int64
|
knownErrorBlocksStart int64
|
||||||
knownErrorBlocksEnd int64
|
knownErrorBlocksEnd int64
|
||||||
expectedDif int64
|
expectedDif int64
|
||||||
|
processingKey int64
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []gapValues{
|
tests := []gapValues{
|
||||||
// Unprocessed gaps before and after knownErrorBlock
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
{lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1},
|
{lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1, processingKey: 1},
|
||||||
// No knownErrorBlocks
|
// No knownErrorBlocks
|
||||||
{lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1},
|
{lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1, processingKey: 1},
|
||||||
// No gaps before or after knownErrorBlocks
|
// No gaps before or after knownErrorBlocks
|
||||||
{lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1},
|
{lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1, processingKey: 1},
|
||||||
// gaps before knownErrorBlocks but not after
|
// gaps before knownErrorBlocks but not after
|
||||||
{lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1},
|
{lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1, processingKey: 1},
|
||||||
// gaps after knownErrorBlocks but not before
|
// gaps after knownErrorBlocks but not before
|
||||||
{lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1},
|
{lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1, processingKey: 1},
|
||||||
/// Same tests as above with a new expected DIF
|
/// Same tests as above with a new expected DIF
|
||||||
// Unprocessed gaps before and after knownErrorBlock
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
{lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2},
|
{lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2, processingKey: 2},
|
||||||
// No knownErrorBlocks
|
// No knownErrorBlocks
|
||||||
{lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2},
|
{lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2, processingKey: 2},
|
||||||
// No gaps before or after knownErrorBlocks
|
// No gaps before or after knownErrorBlocks
|
||||||
{lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2},
|
{lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2, processingKey: 2},
|
||||||
// gaps before knownErrorBlocks but not after
|
// gaps before knownErrorBlocks but not after
|
||||||
{lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2},
|
{lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2, processingKey: 2},
|
||||||
// gaps after knownErrorBlocks but not before
|
// gaps after knownErrorBlocks but not before
|
||||||
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2},
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||||
|
// Test update when block number is larger!!
|
||||||
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 2},
|
||||||
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2052, currentBlock: 2100, expectedDif: 2, processingKey: 10},
|
||||||
}
|
}
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
// Reuse processing key from expecteDiff
|
// Reuse processing key from expecteDiff
|
||||||
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif)
|
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, false)
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
// Reuse processing key from expecteDiff
|
||||||
|
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// It also makes sure we properly calculate any missed gaps not in the known gaps lists
|
// It also makes sure we properly calculate any missed gaps not in the known gaps lists
|
||||||
// either before or after the list.
|
// either before or after the list.
|
||||||
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64) {
|
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64, skipDb bool) {
|
||||||
|
|
||||||
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
||||||
currentBlock := big.NewInt(currentBlockNum)
|
currentBlock := big.NewInt(currentBlockNum)
|
||||||
@ -74,9 +89,11 @@ func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBloc
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Comment out values which should not be used
|
// Comment out values which should not be used
|
||||||
knownGaps := KnownGaps{
|
fileInd := setupFile(t)
|
||||||
|
knownGaps := KnownGapsState{
|
||||||
processingKey: processingKey,
|
processingKey: processingKey,
|
||||||
expectedDifference: big.NewInt(expectedDif),
|
expectedDifference: big.NewInt(expectedDif),
|
||||||
|
fileIndexer: fileInd,
|
||||||
}
|
}
|
||||||
stateDiff, err := setupDb(t)
|
stateDiff, err := setupDb(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -134,6 +151,16 @@ func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
|
|||||||
return stateDiff, err
|
return stateDiff, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setupFile(t *testing.T) interfaces.StateDiffIndexer {
|
||||||
|
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||||
|
err := os.Remove(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
indexer, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return indexer
|
||||||
|
}
|
||||||
|
|
||||||
// Teardown the DB
|
// Teardown the DB
|
||||||
func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) {
|
func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) {
|
||||||
t.Log("Starting tearDown")
|
t.Log("Starting tearDown")
|
||||||
|
0
statediff/statediffing_test_file.sql
Normal file
0
statediff/statediffing_test_file.sql
Normal file
Loading…
Reference in New Issue
Block a user