remove known_gaps integration #303
@ -287,14 +287,13 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
p := statediff.Config{
|
p := statediff.Config{
|
||||||
IndexerConfig: indexerConfig,
|
IndexerConfig: indexerConfig,
|
||||||
KnownGapsFilePath: ctx.String(utils.StateDiffKnownGapsFilePath.Name),
|
ID: nodeID,
|
||||||
ID: nodeID,
|
ClientName: clientName,
|
||||||
ClientName: clientName,
|
Context: context.Background(),
|
||||||
Context: context.Background(),
|
EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name),
|
||||||
EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name),
|
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
|
||||||
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
|
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
|
||||||
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
|
|
||||||
}
|
}
|
||||||
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
|
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,6 @@ var (
|
|||||||
utils.StateDiffFileMode,
|
utils.StateDiffFileMode,
|
||||||
utils.StateDiffFileCsvDir,
|
utils.StateDiffFileCsvDir,
|
||||||
utils.StateDiffFilePath,
|
utils.StateDiffFilePath,
|
||||||
utils.StateDiffKnownGapsFilePath,
|
|
||||||
utils.StateDiffWaitForSync,
|
utils.StateDiffWaitForSync,
|
||||||
utils.StateDiffWatchedAddressesFilePath,
|
utils.StateDiffWatchedAddressesFilePath,
|
||||||
utils.StateDiffUpsert,
|
utils.StateDiffUpsert,
|
||||||
|
@ -1064,11 +1064,6 @@ var (
|
|||||||
Name: "statediff.file.path",
|
Name: "statediff.file.path",
|
||||||
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
|
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
|
||||||
}
|
}
|
||||||
StateDiffKnownGapsFilePath = &cli.StringFlag{
|
|
||||||
Name: "statediff.knowngapsfile.path",
|
|
||||||
Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.",
|
|
||||||
Value: "./known_gaps.sql",
|
|
||||||
}
|
|
||||||
StateDiffWatchedAddressesFilePath = &cli.StringFlag{
|
StateDiffWatchedAddressesFilePath = &cli.StringFlag{
|
||||||
Name: "statediff.file.wapath",
|
Name: "statediff.file.wapath",
|
||||||
Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode",
|
Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode",
|
||||||
|
@ -30,8 +30,6 @@ import (
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
// The configuration used for the stateDiff Indexer
|
// The configuration used for the stateDiff Indexer
|
||||||
IndexerConfig interfaces.Config
|
IndexerConfig interfaces.Config
|
||||||
// The filepath to write knownGaps insert statements if we can't connect to the DB.
|
|
||||||
KnownGapsFilePath string
|
|
||||||
// A unique ID used for this service
|
// A unique ID used for this service
|
||||||
ID string
|
ID string
|
||||||
// Name for the client this service is running
|
// Name for the client this service is running
|
||||||
|
@ -1,273 +0,0 @@
|
|||||||
// Copyright 2019 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package statediff
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"math/big"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
knownGapsInsert = "INSERT INTO eth_meta.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
|
||||||
"VALUES ('%s', '%s', %t, %d) " +
|
|
||||||
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
|
||||||
"WHERE eth_meta.known_gaps.ending_block_number <= '%s';\n"
|
|
||||||
dbQueryString = "SELECT MAX(block_number) FROM eth.header_cids"
|
|
||||||
defaultWriteFilePath = "./known_gaps.sql"
|
|
||||||
)
|
|
||||||
|
|
||||||
type KnownGapsState struct {
|
|
||||||
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
|
||||||
checkForGaps bool
|
|
||||||
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
|
||||||
processingKey int64
|
|
||||||
// This number indicates the expected difference between blocks.
|
|
||||||
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
|
||||||
// Tandom with the processingKey to differentiate block processing logic.
|
|
||||||
expectedDifference *big.Int
|
|
||||||
// Indicates if Geth is in an error state
|
|
||||||
// This is used to indicate the right time to upserts
|
|
||||||
errorState bool
|
|
||||||
// This array keeps track of errorBlocks as they occur.
|
|
||||||
// When the errorState is false again, we can process these blocks.
|
|
||||||
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
|
||||||
knownErrorBlocks []*big.Int
|
|
||||||
// The filepath to write SQL statements if we can't connect to the DB.
|
|
||||||
writeFilePath string
|
|
||||||
// DB object to use for reading and writing to the DB
|
|
||||||
db sql.Database
|
|
||||||
//Do we have entries in the local sql file that need to be written to the DB
|
|
||||||
sqlFileWaitingForWrite bool
|
|
||||||
// Metrics object used to track metrics.
|
|
||||||
statediffMetrics statediffMetricsHandles
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new KnownGapsState struct, currently unused.
|
|
||||||
func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int, errorState bool, writeFilePath string, db sql.Database, statediffMetrics statediffMetricsHandles) *KnownGapsState {
|
|
||||||
return &KnownGapsState{
|
|
||||||
checkForGaps: checkForGaps,
|
|
||||||
processingKey: processingKey,
|
|
||||||
expectedDifference: expectedDifference,
|
|
||||||
errorState: errorState,
|
|
||||||
writeFilePath: writeFilePath,
|
|
||||||
db: db,
|
|
||||||
statediffMetrics: statediffMetrics,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func minMax(array []*big.Int) (*big.Int, *big.Int) {
|
|
||||||
var max *big.Int = array[0]
|
|
||||||
var min *big.Int = array[0]
|
|
||||||
for _, value := range array {
|
|
||||||
if max.Cmp(value) == -1 {
|
|
||||||
max = value
|
|
||||||
}
|
|
||||||
if min.Cmp(value) == 1 {
|
|
||||||
min = value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return min, max
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function actually performs the write of the known gaps. It will try to do the following, it only goes to the next step if a failure occurs.
|
|
||||||
// 1. Write to the DB directly.
|
|
||||||
// 2. Write to sql file locally.
|
|
||||||
// 3. Write to prometheus directly.
|
|
||||||
// 4. Logs and error.
|
|
||||||
func (kg *KnownGapsState) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
|
||||||
if startingBlockNumber.Cmp(endingBlockNumber) == 1 {
|
|
||||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
|
||||||
}
|
|
||||||
knownGap := models.KnownGapsModel{
|
|
||||||
StartingBlockNumber: startingBlockNumber.String(),
|
|
||||||
EndingBlockNumber: endingBlockNumber.String(),
|
|
||||||
CheckedOut: checkedOut,
|
|
||||||
ProcessingKey: processingKey,
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Updating Metrics for the start and end block")
|
|
||||||
kg.statediffMetrics.knownGapStart.Update(startingBlockNumber.Int64())
|
|
||||||
kg.statediffMetrics.knownGapEnd.Update(endingBlockNumber.Int64())
|
|
||||||
|
|
||||||
var writeErr error
|
|
||||||
log.Info("Writing known gaps to the DB")
|
|
||||||
if kg.db != nil {
|
|
||||||
dbErr := kg.upsertKnownGaps(knownGap)
|
|
||||||
if dbErr != nil {
|
|
||||||
log.Warn("Error writing knownGaps to DB, writing them to file instead")
|
|
||||||
writeErr = kg.upsertKnownGapsFile(knownGap)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
writeErr = kg.upsertKnownGapsFile(knownGap)
|
|
||||||
}
|
|
||||||
if writeErr != nil {
|
|
||||||
log.Error("Unsuccessful when writing to a file", "Error", writeErr)
|
|
||||||
log.Error("Updating Metrics for the start and end error block")
|
|
||||||
log.Error("Unable to write the following Gaps to DB or File", "startBlock", startingBlockNumber, "endBlock", endingBlockNumber)
|
|
||||||
kg.statediffMetrics.knownGapErrorStart.Update(startingBlockNumber.Int64())
|
|
||||||
kg.statediffMetrics.knownGapErrorEnd.Update(endingBlockNumber.Int64())
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is a simple wrapper function to write gaps from a knownErrorBlocks array.
|
|
||||||
func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) {
|
|
||||||
startErrorBlock, endErrorBlock := minMax(knownErrorBlocks)
|
|
||||||
|
|
||||||
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
|
||||||
log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey)
|
|
||||||
kg.pushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Users provide the latestBlockInDb and the latestBlockOnChain
|
|
||||||
// as well as the expected difference. This function does some simple math.
|
|
||||||
// The expected difference for the time being is going to be 1, but as we run
|
|
||||||
// More geth nodes, the expected difference might fluctuate.
|
|
||||||
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
|
||||||
latestBlock := big.NewInt(0)
|
|
||||||
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
|
|
||||||
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function will check for Gaps and update the DB if gaps are found.
|
|
||||||
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
|
||||||
// It might be a useful parameter to update depending on the geth node.
|
|
||||||
// TODO:
|
|
||||||
// REmove the return value
|
|
||||||
// Write to file if err in writing to DB
|
|
||||||
func (kg *KnownGapsState) findAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
|
||||||
// Make this global
|
|
||||||
latestBlockInDb, err := kg.queryDbToBigInt(dbQueryString)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
|
|
||||||
if gapExists {
|
|
||||||
startBlock := big.NewInt(0)
|
|
||||||
endBlock := big.NewInt(0)
|
|
||||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
|
||||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
|
||||||
|
|
||||||
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
|
|
||||||
err := kg.pushKnownGaps(startBlock, endBlock, false, processingKey)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Upserts known gaps to the DB.
|
|
||||||
func (kg *KnownGapsState) upsertKnownGaps(knownGaps models.KnownGapsModel) error {
|
|
||||||
_, err := kg.db.Exec(context.Background(), kg.db.InsertKnownGapsStm(),
|
|
||||||
knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error upserting known_gaps entry: %v", err)
|
|
||||||
}
|
|
||||||
log.Info("Successfully Wrote gaps to the DB", "startBlock", knownGaps.StartingBlockNumber, "endBlock", knownGaps.EndingBlockNumber)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write upsert statement into a local file.
|
|
||||||
func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) error {
|
|
||||||
insertStmt := []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
|
||||||
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
|
||||||
log.Info("Trying to write file")
|
|
||||||
if kg.writeFilePath == "" {
|
|
||||||
kg.writeFilePath = defaultWriteFilePath
|
|
||||||
}
|
|
||||||
f, err := os.OpenFile(kg.writeFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
||||||
if err != nil {
|
|
||||||
log.Info("Unable to open a file for writing")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
if _, err = f.Write(insertStmt); err != nil {
|
|
||||||
log.Info("Unable to open write insert statement to file")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Info("Wrote the gaps to a local SQL file")
|
|
||||||
kg.sqlFileWaitingForWrite = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kg *KnownGapsState) writeSqlFileStmtToDb() error {
|
|
||||||
log.Info("Writing the local SQL file for KnownGaps to the DB")
|
|
||||||
file, err := ioutil.ReadFile(kg.writeFilePath)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Unable to open local SQL File for writing")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
requests := strings.Split(string(file), ";")
|
|
||||||
|
|
||||||
for _, request := range requests {
|
|
||||||
_, err := kg.db.Exec(context.Background(), request)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Unable to run insert statement from file to the DB")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := os.Truncate(kg.writeFilePath, 0); err != nil {
|
|
||||||
log.Info("Failed to empty knownGaps file after inserting statements to the DB", "error", err)
|
|
||||||
}
|
|
||||||
kg.sqlFileWaitingForWrite = false
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is a simple wrapper function which will run QueryRow on the DB
|
|
||||||
func (kg *KnownGapsState) queryDb(queryString string) (string, error) {
|
|
||||||
var ret string
|
|
||||||
err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return ret, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This function is a simple wrapper which will call QueryDb but the return value will be
|
|
||||||
// a big int instead of a string
|
|
||||||
func (kg *KnownGapsState) queryDbToBigInt(queryString string) (*big.Int, error) {
|
|
||||||
ret := new(big.Int)
|
|
||||||
res, err := kg.queryDb(queryString)
|
|
||||||
if err != nil {
|
|
||||||
return ret, err
|
|
||||||
}
|
|
||||||
ret, ok := ret.SetString(res, 10)
|
|
||||||
if !ok {
|
|
||||||
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
|
||||||
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
|
||||||
}
|
|
||||||
return ret, nil
|
|
||||||
}
|
|
@ -1,207 +0,0 @@
|
|||||||
package statediff
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"math/big"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
knownGapsFilePath = "./known_gaps.sql"
|
|
||||||
)
|
|
||||||
|
|
||||||
type gapValues struct {
|
|
||||||
knownErrorBlocksStart int64
|
|
||||||
knownErrorBlocksEnd int64
|
|
||||||
expectedDif int64
|
|
||||||
processingKey int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add clean db
|
|
||||||
// Test for failures when they are expected, when we go from smaller block to larger block
|
|
||||||
// We should no longer see the smaller block in DB
|
|
||||||
func TestKnownGaps(t *testing.T) {
|
|
||||||
tests := []gapValues{
|
|
||||||
// Known Gaps
|
|
||||||
{knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, expectedDif: 1, processingKey: 1},
|
|
||||||
/// Same tests as above with a new expected DIF
|
|
||||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, expectedDif: 2, processingKey: 2},
|
|
||||||
// Test update when block number is larger!!
|
|
||||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1204, expectedDif: 2, processingKey: 2},
|
|
||||||
// Update when processing key is different!
|
|
||||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1204, expectedDif: 2, processingKey: 10},
|
|
||||||
}
|
|
||||||
|
|
||||||
testWriteToDb(t, tests, true)
|
|
||||||
testWriteToFile(t, tests, true)
|
|
||||||
testFindAndUpdateGaps(t, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// test writing blocks to the DB
|
|
||||||
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
|
||||||
t.Log("Starting Write to DB test")
|
|
||||||
db := setupDb(t)
|
|
||||||
|
|
||||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
|
||||||
// so we can't find the original start and endblock pair.
|
|
||||||
if wipeDbBeforeStart {
|
|
||||||
t.Log("Cleaning up eth_meta.known_gaps table")
|
|
||||||
db.Exec(context.Background(), "DELETE FROM eth_meta.known_gaps")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tests {
|
|
||||||
// Create an array with knownGaps based on user inputs
|
|
||||||
knownGaps := KnownGapsState{
|
|
||||||
processingKey: tc.processingKey,
|
|
||||||
expectedDifference: big.NewInt(tc.expectedDif),
|
|
||||||
db: db,
|
|
||||||
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
|
|
||||||
}
|
|
||||||
service := &Service{
|
|
||||||
KnownGaps: knownGaps,
|
|
||||||
}
|
|
||||||
knownErrorBlocks := (make([]*big.Int, 0))
|
|
||||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
|
||||||
service.KnownGaps.knownErrorBlocks = knownErrorBlocks
|
|
||||||
// Upsert
|
|
||||||
testCaptureErrorBlocks(t, service)
|
|
||||||
// Validate that the upsert was done correctly.
|
|
||||||
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
|
||||||
}
|
|
||||||
tearDown(t, db)
|
|
||||||
}
|
|
||||||
|
|
||||||
// test writing blocks to file and then inserting them to DB
|
|
||||||
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
|
||||||
t.Log("Starting write to file test")
|
|
||||||
db := setupDb(t)
|
|
||||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
|
||||||
// so we can't find the original start and endblock pair.
|
|
||||||
if wipeDbBeforeStart {
|
|
||||||
t.Log("Cleaning up eth_meta.known_gaps table")
|
|
||||||
db.Exec(context.Background(), "DELETE FROM eth_meta.known_gaps")
|
|
||||||
}
|
|
||||||
if _, err := os.Stat(knownGapsFilePath); err == nil {
|
|
||||||
err := os.Remove(knownGapsFilePath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("Can't delete local file")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tearDown(t, db)
|
|
||||||
for _, tc := range tests {
|
|
||||||
knownGaps := KnownGapsState{
|
|
||||||
processingKey: tc.processingKey,
|
|
||||||
expectedDifference: big.NewInt(tc.expectedDif),
|
|
||||||
writeFilePath: knownGapsFilePath,
|
|
||||||
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
|
|
||||||
db: nil, // Only set to nil to be verbose that we can't use it
|
|
||||||
}
|
|
||||||
service := &Service{
|
|
||||||
KnownGaps: knownGaps,
|
|
||||||
}
|
|
||||||
knownErrorBlocks := (make([]*big.Int, 0))
|
|
||||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
|
||||||
service.KnownGaps.knownErrorBlocks = knownErrorBlocks
|
|
||||||
|
|
||||||
testCaptureErrorBlocks(t, service)
|
|
||||||
newDb := setupDb(t)
|
|
||||||
service.KnownGaps.db = newDb
|
|
||||||
if service.KnownGaps.sqlFileWaitingForWrite {
|
|
||||||
writeErr := service.KnownGaps.writeSqlFileStmtToDb()
|
|
||||||
require.NoError(t, writeErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate that the upsert was done correctly.
|
|
||||||
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
|
||||||
tearDown(t, newDb)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find a gap, if no gaps exist, it will create an arbitrary one
|
|
||||||
func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
|
|
||||||
db := setupDb(t)
|
|
||||||
|
|
||||||
if wipeDbBeforeStart {
|
|
||||||
db.Exec(context.Background(), "DELETE FROM eth_meta.known_gaps")
|
|
||||||
}
|
|
||||||
knownGaps := KnownGapsState{
|
|
||||||
processingKey: 1,
|
|
||||||
expectedDifference: big.NewInt(1),
|
|
||||||
db: db,
|
|
||||||
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
|
|
||||||
}
|
|
||||||
service := &Service{
|
|
||||||
KnownGaps: knownGaps,
|
|
||||||
}
|
|
||||||
|
|
||||||
latestBlockInDb, err := service.KnownGaps.queryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
|
||||||
if err != nil {
|
|
||||||
t.Skip("Can't find a block in the eth.header_cids table.. Please put one there")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the gapDifference for testing purposes
|
|
||||||
gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain
|
|
||||||
expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be
|
|
||||||
|
|
||||||
latestBlockOnChain := big.NewInt(0)
|
|
||||||
latestBlockOnChain.Add(latestBlockInDb, gapDifference)
|
|
||||||
|
|
||||||
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
|
||||||
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
|
||||||
|
|
||||||
gapUpsertErr := service.KnownGaps.findAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
|
|
||||||
require.NoError(t, gapUpsertErr)
|
|
||||||
|
|
||||||
startBlock := big.NewInt(0)
|
|
||||||
endBlock := big.NewInt(0)
|
|
||||||
|
|
||||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
|
||||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
|
||||||
validateUpsert(t, service, startBlock.Int64(), endBlock.Int64())
|
|
||||||
}
|
|
||||||
|
|
||||||
// test capturing missed blocks
|
|
||||||
func testCaptureErrorBlocks(t *testing.T, service *Service) {
|
|
||||||
service.KnownGaps.captureErrorBlocks(service.KnownGaps.knownErrorBlocks)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to create an array of gaps given a start and end block
|
|
||||||
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
|
|
||||||
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
|
||||||
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
|
||||||
}
|
|
||||||
return knownErrorBlocks
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the upsert was performed correctly
|
|
||||||
func validateUpsert(t *testing.T, service *Service, startingBlock int64, endingBlock int64) {
|
|
||||||
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
|
||||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth_meta.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
|
||||||
|
|
||||||
_, queryErr := service.KnownGaps.queryDb(queryString) // Figure out the string.
|
|
||||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
|
||||||
require.NoError(t, queryErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a DB object to use
|
|
||||||
func setupDb(t *testing.T) sql.Database {
|
|
||||||
db, err := postgres.SetupSQLXDB()
|
|
||||||
if err != nil {
|
|
||||||
t.Error("Can't create a DB connection....")
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return db
|
|
||||||
}
|
|
||||||
|
|
||||||
// Teardown the DB
|
|
||||||
func tearDown(t *testing.T, db sql.Database) {
|
|
||||||
t.Log("Starting tearDown")
|
|
||||||
db.Close()
|
|
||||||
}
|
|
@ -42,10 +42,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
|
||||||
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
"github.com/thoas/go-funk"
|
"github.com/thoas/go-funk"
|
||||||
@ -134,9 +132,7 @@ type Service struct {
|
|||||||
BackendAPI ethapi.Backend
|
BackendAPI ethapi.Backend
|
||||||
// Should the statediff service wait for geth to sync to head?
|
// Should the statediff service wait for geth to sync to head?
|
||||||
WaitForSync bool
|
WaitForSync bool
|
||||||
// Used to signal if we should check for KnownGaps
|
// Whether we have any subscribers
|
||||||
KnownGaps KnownGapsState
|
|
||||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
|
||||||
subscribers int32
|
subscribers int32
|
||||||
// Interface for publishing statediffs as PG-IPLD objects
|
// Interface for publishing statediffs as PG-IPLD objects
|
||||||
indexer interfaces.StateDiffIndexer
|
indexer interfaces.StateDiffIndexer
|
||||||
@ -167,7 +163,6 @@ func NewBlockCache(max uint) BlockCache {
|
|||||||
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
||||||
blockChain := ethServ.BlockChain()
|
blockChain := ethServ.BlockChain()
|
||||||
var indexer interfaces.StateDiffIndexer
|
var indexer interfaces.StateDiffIndexer
|
||||||
var db sql.Database
|
|
||||||
var err error
|
var err error
|
||||||
quitCh := make(chan bool)
|
quitCh := make(chan bool)
|
||||||
indexerConfigAvailable := params.IndexerConfig != nil
|
indexerConfigAvailable := params.IndexerConfig != nil
|
||||||
@ -180,7 +175,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
ClientName: params.ClientName,
|
ClientName: params.ClientName,
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
_, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -191,25 +186,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
if workers == 0 {
|
if workers == 0 {
|
||||||
workers = 1
|
workers = 1
|
||||||
}
|
}
|
||||||
// If we ever have multiple processingKeys we can update them here
|
|
||||||
// along with the expectedDifference
|
|
||||||
knownGaps := &KnownGapsState{
|
|
||||||
processingKey: 0,
|
|
||||||
expectedDifference: big.NewInt(1),
|
|
||||||
errorState: false,
|
|
||||||
writeFilePath: params.KnownGapsFilePath,
|
|
||||||
db: db,
|
|
||||||
statediffMetrics: statediffMetrics,
|
|
||||||
sqlFileWaitingForWrite: false,
|
|
||||||
}
|
|
||||||
if indexerConfigAvailable {
|
|
||||||
if params.IndexerConfig.Type() == shared.POSTGRES {
|
|
||||||
knownGaps.checkForGaps = true
|
|
||||||
} else {
|
|
||||||
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
|
||||||
knownGaps.checkForGaps = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
@ -220,7 +197,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
BlockCache: NewBlockCache(workers),
|
BlockCache: NewBlockCache(workers),
|
||||||
BackendAPI: backend,
|
BackendAPI: backend,
|
||||||
WaitForSync: params.WaitForSync,
|
WaitForSync: params.WaitForSync,
|
||||||
KnownGaps: *knownGaps,
|
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
@ -355,43 +331,15 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
sds.writeGenesisStateDiff(parentBlock, params.id)
|
sds.writeGenesisStateDiff(parentBlock, params.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If for any reason we need to check for gaps,
|
|
||||||
// Check and update the gaps table.
|
|
||||||
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
|
||||||
log.Info("Checking for Gaps at", "current block", currentBlock.Number())
|
|
||||||
go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
|
|
||||||
sds.KnownGaps.checkForGaps = false
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
||||||
writeLoopParams.RLock()
|
writeLoopParams.RLock()
|
||||||
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
|
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
|
||||||
writeLoopParams.RUnlock()
|
writeLoopParams.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// This is where the Postgres errors bubbles up to, so this is where we want to emit a comprehensie error trace/report
|
||||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||||
sds.KnownGaps.errorState = true
|
|
||||||
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "blockNumber", currentBlock.Number())
|
|
||||||
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
|
|
||||||
// Write object to startdiff
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sds.KnownGaps.errorState = false
|
|
||||||
if sds.KnownGaps.knownErrorBlocks != nil {
|
|
||||||
// We must pass in parameters by VALUE not reference.
|
|
||||||
// If we pass them in my reference, the references can change before the computation is complete!
|
|
||||||
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
|
||||||
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
|
||||||
sds.KnownGaps.knownErrorBlocks = nil
|
|
||||||
go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks)
|
|
||||||
}
|
|
||||||
|
|
||||||
if sds.KnownGaps.sqlFileWaitingForWrite {
|
|
||||||
log.Info("There are entries in the SQL file for knownGaps that should be written")
|
|
||||||
err := sds.KnownGaps.writeSqlFileStmtToDb()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Unable to write KnownGap sql file to DB")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: how to handle with concurrent workers
|
// TODO: how to handle with concurrent workers
|
||||||
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||||
@ -644,7 +592,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function will check the status of geth syncing.
|
// GetSyncStatus will check the status of geth syncing.
|
||||||
// It will return false if geth has finished syncing.
|
// It will return false if geth has finished syncing.
|
||||||
// It will return a true Geth is still syncing.
|
// It will return a true Geth is still syncing.
|
||||||
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) {
|
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) {
|
||||||
@ -659,7 +607,7 @@ func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) {
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function calls GetSyncStatus to check if we have caught up to head.
|
// WaitingForSync calls GetSyncStatus to check if we have caught up to head.
|
||||||
// It will keep looking and checking if we have caught up to head.
|
// It will keep looking and checking if we have caught up to head.
|
||||||
// It will only complete if we catch up to head, otherwise it will keep looping forever.
|
// It will only complete if we catch up to head, otherwise it will keep looping forever.
|
||||||
func (sds *Service) WaitingForSync() error {
|
func (sds *Service) WaitingForSync() error {
|
||||||
@ -901,7 +849,7 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Performs one of following operations on the watched addresses in writeLoopParams and the db:
|
// WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db:
|
||||||
// add | remove | set | clear
|
// add | remove | set | clear
|
||||||
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
|
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
|
||||||
// lock writeLoopParams for a write
|
// lock writeLoopParams for a write
|
||||||
|
Loading…
Reference in New Issue
Block a user