remove logrus, use geth logging pkg
This commit is contained in:
parent
eb482ee783
commit
2b033f2f3c
4
go.mod
4
go.mod
@ -77,7 +77,7 @@ require (
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
||||
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211
|
||||
golang.org/x/text v0.3.7
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
@ -85,8 +85,6 @@ require (
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
|
||||
)
|
||||
|
||||
require github.com/sirupsen/logrus v1.9.0
|
||||
|
||||
require (
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@ -250,7 +250,6 @@ github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZ
|
||||
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
|
||||
github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM=
|
||||
github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
|
||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ=
|
||||
github.com/huin/goupnp v1.0.3/go.mod h1:ZxNlw5WqJj6wSsRK5+YfflQGXYfccj5VgQsMNixHM7Y=
|
||||
@ -787,8 +786,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
|
||||
@ -904,7 +901,6 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
|
||||
|
||||
@ -1,273 +0,0 @@
|
||||
// Copyright 2019 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
)
|
||||
|
||||
var (
|
||||
knownGapsInsert = "INSERT INTO eth_meta.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
||||
"VALUES ('%s', '%s', %t, %d) " +
|
||||
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
||||
"WHERE eth_meta.known_gaps.ending_block_number <= '%s';\n"
|
||||
dbQueryString = "SELECT MAX(block_number) FROM eth.header_cids"
|
||||
defaultWriteFilePath = "./known_gaps.sql"
|
||||
)
|
||||
|
||||
type KnownGapsState struct {
|
||||
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||
checkForGaps bool
|
||||
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||
processingKey int64
|
||||
// This number indicates the expected difference between blocks.
|
||||
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
||||
// Tandom with the processingKey to differentiate block processing logic.
|
||||
expectedDifference *big.Int
|
||||
// Indicates if Geth is in an error state
|
||||
// This is used to indicate the right time to upserts
|
||||
errorState bool
|
||||
// This array keeps track of errorBlocks as they occur.
|
||||
// When the errorState is false again, we can process these blocks.
|
||||
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
||||
knownErrorBlocks []*big.Int
|
||||
// The filepath to write SQL statements if we can't connect to the DB.
|
||||
writeFilePath string
|
||||
// DB object to use for reading and writing to the DB
|
||||
db sql.Database
|
||||
//Do we have entries in the local sql file that need to be written to the DB
|
||||
sqlFileWaitingForWrite bool
|
||||
// Metrics object used to track metrics.
|
||||
statediffMetrics statediffMetricsHandles
|
||||
}
|
||||
|
||||
// Create a new KnownGapsState struct, currently unused.
|
||||
func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int, errorState bool, writeFilePath string, db sql.Database, statediffMetrics statediffMetricsHandles) *KnownGapsState {
|
||||
return &KnownGapsState{
|
||||
checkForGaps: checkForGaps,
|
||||
processingKey: processingKey,
|
||||
expectedDifference: expectedDifference,
|
||||
errorState: errorState,
|
||||
writeFilePath: writeFilePath,
|
||||
db: db,
|
||||
statediffMetrics: statediffMetrics,
|
||||
}
|
||||
}
|
||||
|
||||
func minMax(array []*big.Int) (*big.Int, *big.Int) {
|
||||
var max *big.Int = array[0]
|
||||
var min *big.Int = array[0]
|
||||
for _, value := range array {
|
||||
if max.Cmp(value) == -1 {
|
||||
max = value
|
||||
}
|
||||
if min.Cmp(value) == 1 {
|
||||
min = value
|
||||
}
|
||||
}
|
||||
return min, max
|
||||
}
|
||||
|
||||
// This function actually performs the write of the known gaps. It will try to do the following, it only goes to the next step if a failure occurs.
|
||||
// 1. Write to the DB directly.
|
||||
// 2. Write to sql file locally.
|
||||
// 3. Write to prometheus directly.
|
||||
// 4. Logs and error.
|
||||
func (kg *KnownGapsState) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
||||
if startingBlockNumber.Cmp(endingBlockNumber) == 1 {
|
||||
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||
}
|
||||
knownGap := models.KnownGapsModel{
|
||||
StartingBlockNumber: startingBlockNumber.String(),
|
||||
EndingBlockNumber: endingBlockNumber.String(),
|
||||
CheckedOut: checkedOut,
|
||||
ProcessingKey: processingKey,
|
||||
}
|
||||
|
||||
log.Info("Updating Metrics for the start and end block")
|
||||
kg.statediffMetrics.knownGapStart.Update(startingBlockNumber.Int64())
|
||||
kg.statediffMetrics.knownGapEnd.Update(endingBlockNumber.Int64())
|
||||
|
||||
var writeErr error
|
||||
log.Info("Writing known gaps to the DB")
|
||||
if kg.db != nil {
|
||||
dbErr := kg.upsertKnownGaps(knownGap)
|
||||
if dbErr != nil {
|
||||
log.Warn("Error writing knownGaps to DB, writing them to file instead")
|
||||
writeErr = kg.upsertKnownGapsFile(knownGap)
|
||||
}
|
||||
} else {
|
||||
writeErr = kg.upsertKnownGapsFile(knownGap)
|
||||
}
|
||||
if writeErr != nil {
|
||||
log.Error("Unsuccessful when writing to a file", "Error", writeErr)
|
||||
log.Error("Updating Metrics for the start and end error block")
|
||||
log.Error("Unable to write the following Gaps to DB or File", "startBlock", startingBlockNumber, "endBlock", endingBlockNumber)
|
||||
kg.statediffMetrics.knownGapErrorStart.Update(startingBlockNumber.Int64())
|
||||
kg.statediffMetrics.knownGapErrorEnd.Update(endingBlockNumber.Int64())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a simple wrapper function to write gaps from a knownErrorBlocks array.
|
||||
func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) {
|
||||
startErrorBlock, endErrorBlock := minMax(knownErrorBlocks)
|
||||
|
||||
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
||||
log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey)
|
||||
kg.pushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)
|
||||
}
|
||||
|
||||
// Users provide the latestBlockInDb and the latestBlockOnChain
|
||||
// as well as the expected difference. This function does some simple math.
|
||||
// The expected difference for the time being is going to be 1, but as we run
|
||||
// More geth nodes, the expected difference might fluctuate.
|
||||
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
||||
latestBlock := big.NewInt(0)
|
||||
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
|
||||
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// This function will check for Gaps and update the DB if gaps are found.
|
||||
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
||||
// It might be a useful parameter to update depending on the geth node.
|
||||
// TODO:
|
||||
// REmove the return value
|
||||
// Write to file if err in writing to DB
|
||||
func (kg *KnownGapsState) findAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
||||
// Make this global
|
||||
latestBlockInDb, err := kg.queryDbToBigInt(dbQueryString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
|
||||
if gapExists {
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||
|
||||
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
|
||||
err := kg.pushKnownGaps(startBlock, endBlock, false, processingKey)
|
||||
if err != nil {
|
||||
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upserts known gaps to the DB.
|
||||
func (kg *KnownGapsState) upsertKnownGaps(knownGaps models.KnownGapsModel) error {
|
||||
_, err := kg.db.Exec(context.Background(), kg.db.InsertKnownGapsStm(),
|
||||
knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting known_gaps entry: %v", err)
|
||||
}
|
||||
log.Info("Successfully Wrote gaps to the DB", "startBlock", knownGaps.StartingBlockNumber, "endBlock", knownGaps.EndingBlockNumber)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write upsert statement into a local file.
|
||||
func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) error {
|
||||
insertStmt := []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
||||
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
||||
log.Info("Trying to write file")
|
||||
if kg.writeFilePath == "" {
|
||||
kg.writeFilePath = defaultWriteFilePath
|
||||
}
|
||||
f, err := os.OpenFile(kg.writeFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
log.Info("Unable to open a file for writing")
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err = f.Write(insertStmt); err != nil {
|
||||
log.Info("Unable to open write insert statement to file")
|
||||
return err
|
||||
}
|
||||
log.Info("Wrote the gaps to a local SQL file")
|
||||
kg.sqlFileWaitingForWrite = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kg *KnownGapsState) writeSqlFileStmtToDb() error {
|
||||
log.Info("Writing the local SQL file for KnownGaps to the DB")
|
||||
file, err := ioutil.ReadFile(kg.writeFilePath)
|
||||
|
||||
if err != nil {
|
||||
log.Error("Unable to open local SQL File for writing")
|
||||
return err
|
||||
}
|
||||
|
||||
requests := strings.Split(string(file), ";")
|
||||
|
||||
for _, request := range requests {
|
||||
_, err := kg.db.Exec(context.Background(), request)
|
||||
if err != nil {
|
||||
log.Error("Unable to run insert statement from file to the DB")
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := os.Truncate(kg.writeFilePath, 0); err != nil {
|
||||
log.Info("Failed to empty knownGaps file after inserting statements to the DB", "error", err)
|
||||
}
|
||||
kg.sqlFileWaitingForWrite = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is a simple wrapper function which will run QueryRow on the DB
|
||||
func (kg *KnownGapsState) queryDb(queryString string) (string, error) {
|
||||
var ret string
|
||||
err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
||||
return "", err
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// This function is a simple wrapper which will call QueryDb but the return value will be
|
||||
// a big int instead of a string
|
||||
func (kg *KnownGapsState) queryDbToBigInt(queryString string) (*big.Int, error) {
|
||||
ret := new(big.Int)
|
||||
res, err := kg.queryDb(queryString)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
ret, ok := ret.SetString(res, 10)
|
||||
if !ok {
|
||||
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
||||
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
@ -1,207 +0,0 @@
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
knownGapsFilePath = "./known_gaps.sql"
|
||||
)
|
||||
|
||||
type gapValues struct {
|
||||
knownErrorBlocksStart int64
|
||||
knownErrorBlocksEnd int64
|
||||
expectedDif int64
|
||||
processingKey int64
|
||||
}
|
||||
|
||||
// Add clean db
|
||||
// Test for failures when they are expected, when we go from smaller block to larger block
|
||||
// We should no longer see the smaller block in DB
|
||||
func TestKnownGaps(t *testing.T) {
|
||||
tests := []gapValues{
|
||||
// Known Gaps
|
||||
{knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, expectedDif: 1, processingKey: 1},
|
||||
/// Same tests as above with a new expected DIF
|
||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, expectedDif: 2, processingKey: 2},
|
||||
// Test update when block number is larger!!
|
||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1204, expectedDif: 2, processingKey: 2},
|
||||
// Update when processing key is different!
|
||||
{knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1204, expectedDif: 2, processingKey: 10},
|
||||
}
|
||||
|
||||
testWriteToDb(t, tests, true)
|
||||
testWriteToFile(t, tests, true)
|
||||
testFindAndUpdateGaps(t, true)
|
||||
}
|
||||
|
||||
// test writing blocks to the DB
|
||||
func testWriteToDb(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
t.Log("Starting Write to DB test")
|
||||
db := setupDb(t)
|
||||
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
t.Log("Cleaning up eth_meta.known_gaps table")
|
||||
db.Exec(context.Background(), "DELETE FROM eth_meta.known_gaps")
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// Create an array with knownGaps based on user inputs
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: tc.processingKey,
|
||||
expectedDifference: big.NewInt(tc.expectedDif),
|
||||
db: db,
|
||||
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
}
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
service.KnownGaps.knownErrorBlocks = knownErrorBlocks
|
||||
// Upsert
|
||||
testCaptureErrorBlocks(t, service)
|
||||
// Validate that the upsert was done correctly.
|
||||
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
}
|
||||
tearDown(t, db)
|
||||
}
|
||||
|
||||
// test writing blocks to file and then inserting them to DB
|
||||
func testWriteToFile(t *testing.T, tests []gapValues, wipeDbBeforeStart bool) {
|
||||
t.Log("Starting write to file test")
|
||||
db := setupDb(t)
|
||||
// Clear Table first, this is needed because we updated an entry to have a larger endblock number
|
||||
// so we can't find the original start and endblock pair.
|
||||
if wipeDbBeforeStart {
|
||||
t.Log("Cleaning up eth_meta.known_gaps table")
|
||||
db.Exec(context.Background(), "DELETE FROM eth_meta.known_gaps")
|
||||
}
|
||||
if _, err := os.Stat(knownGapsFilePath); err == nil {
|
||||
err := os.Remove(knownGapsFilePath)
|
||||
if err != nil {
|
||||
t.Fatal("Can't delete local file")
|
||||
}
|
||||
}
|
||||
tearDown(t, db)
|
||||
for _, tc := range tests {
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: tc.processingKey,
|
||||
expectedDifference: big.NewInt(tc.expectedDif),
|
||||
writeFilePath: knownGapsFilePath,
|
||||
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
|
||||
db: nil, // Only set to nil to be verbose that we can't use it
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
}
|
||||
knownErrorBlocks := (make([]*big.Int, 0))
|
||||
knownErrorBlocks = createKnownErrorBlocks(knownErrorBlocks, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
service.KnownGaps.knownErrorBlocks = knownErrorBlocks
|
||||
|
||||
testCaptureErrorBlocks(t, service)
|
||||
newDb := setupDb(t)
|
||||
service.KnownGaps.db = newDb
|
||||
if service.KnownGaps.sqlFileWaitingForWrite {
|
||||
writeErr := service.KnownGaps.writeSqlFileStmtToDb()
|
||||
require.NoError(t, writeErr)
|
||||
}
|
||||
|
||||
// Validate that the upsert was done correctly.
|
||||
validateUpsert(t, service, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd)
|
||||
tearDown(t, newDb)
|
||||
}
|
||||
}
|
||||
|
||||
// Find a gap, if no gaps exist, it will create an arbitrary one
|
||||
func testFindAndUpdateGaps(t *testing.T, wipeDbBeforeStart bool) {
|
||||
db := setupDb(t)
|
||||
|
||||
if wipeDbBeforeStart {
|
||||
db.Exec(context.Background(), "DELETE FROM eth_meta.known_gaps")
|
||||
}
|
||||
knownGaps := KnownGapsState{
|
||||
processingKey: 1,
|
||||
expectedDifference: big.NewInt(1),
|
||||
db: db,
|
||||
statediffMetrics: RegisterStatediffMetrics(metrics.DefaultRegistry),
|
||||
}
|
||||
service := &Service{
|
||||
KnownGaps: knownGaps,
|
||||
}
|
||||
|
||||
latestBlockInDb, err := service.KnownGaps.queryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
|
||||
if err != nil {
|
||||
t.Skip("Can't find a block in the eth.header_cids table.. Please put one there")
|
||||
}
|
||||
|
||||
// Add the gapDifference for testing purposes
|
||||
gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain
|
||||
expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be
|
||||
|
||||
latestBlockOnChain := big.NewInt(0)
|
||||
latestBlockOnChain.Add(latestBlockInDb, gapDifference)
|
||||
|
||||
t.Log("The latest block on the chain is: ", latestBlockOnChain)
|
||||
t.Log("The latest block on the DB is: ", latestBlockInDb)
|
||||
|
||||
gapUpsertErr := service.KnownGaps.findAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
|
||||
require.NoError(t, gapUpsertErr)
|
||||
|
||||
startBlock := big.NewInt(0)
|
||||
endBlock := big.NewInt(0)
|
||||
|
||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||
validateUpsert(t, service, startBlock.Int64(), endBlock.Int64())
|
||||
}
|
||||
|
||||
// test capturing missed blocks
|
||||
func testCaptureErrorBlocks(t *testing.T, service *Service) {
|
||||
service.KnownGaps.captureErrorBlocks(service.KnownGaps.knownErrorBlocks)
|
||||
}
|
||||
|
||||
// Helper function to create an array of gaps given a start and end block
|
||||
func createKnownErrorBlocks(knownErrorBlocks []*big.Int, knownErrorBlocksStart int64, knownErrorBlocksEnd int64) []*big.Int {
|
||||
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||
}
|
||||
return knownErrorBlocks
|
||||
}
|
||||
|
||||
// Make sure the upsert was performed correctly
|
||||
func validateUpsert(t *testing.T, service *Service, startingBlock int64, endingBlock int64) {
|
||||
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||
queryString := fmt.Sprintf("SELECT starting_block_number from eth_meta.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||
|
||||
_, queryErr := service.KnownGaps.queryDb(queryString) // Figure out the string.
|
||||
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
||||
require.NoError(t, queryErr)
|
||||
}
|
||||
|
||||
// Create a DB object to use
|
||||
func setupDb(t *testing.T) sql.Database {
|
||||
db, err := postgres.SetupSQLXDB()
|
||||
if err != nil {
|
||||
t.Error("Can't create a DB connection....")
|
||||
t.Fatal(err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
// Teardown the DB
|
||||
func tearDown(t *testing.T, db sql.Database) {
|
||||
t.Log("Starting tearDown")
|
||||
db.Close()
|
||||
}
|
||||
@ -43,7 +43,7 @@ func (it *PrefixBoundIterator) Next(descend bool) bool {
|
||||
return cmp <= 0
|
||||
}
|
||||
|
||||
// Iterator with an upper bound value (hex path prefix)
|
||||
// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix)
|
||||
func NewPrefixBoundIterator(it trie.NodeIterator, from []byte, to []byte) *PrefixBoundIterator {
|
||||
return &PrefixBoundIterator{NodeIterator: it, StartPath: from, EndPath: to}
|
||||
}
|
||||
@ -102,7 +102,7 @@ func (gen *prefixGenerator) Next() {
|
||||
}
|
||||
}
|
||||
|
||||
// Generates paths that cut trie domain into `nbins` uniform conterminous bins (w/ opt. prefix)
|
||||
// MakePaths generates paths that cut trie domain into `nbins` uniform conterminous bins (w/ opt. prefix)
|
||||
// eg. MakePaths([], 2) => [[0] [8]]
|
||||
// MakePaths([4], 32) => [[4 0 0] [4 0 8] [4 1 0]... [4 f 8]]
|
||||
func MakePaths(prefix []byte, nbins uint) [][]byte {
|
||||
@ -129,7 +129,7 @@ func eachPrefixRange(prefix []byte, nbins uint, callback func([]byte, []byte)) {
|
||||
}
|
||||
}
|
||||
|
||||
// Cut a trie by path prefix, returning `nbins` iterators covering its subtries
|
||||
// SubtrieIterators cuts a trie by path prefix, returning `nbins` iterators covering its subtries
|
||||
func SubtrieIterators(tree state.Trie, nbins uint) []trie.NodeIterator {
|
||||
var iters []trie.NodeIterator
|
||||
eachPrefixRange(nil, nbins, func(from []byte, to []byte) {
|
||||
@ -139,7 +139,7 @@ func SubtrieIterators(tree state.Trie, nbins uint) []trie.NodeIterator {
|
||||
return iters
|
||||
}
|
||||
|
||||
// Factory for per-bin subtrie iterators
|
||||
// SubtrieIteratorFactory is a factory for per-bin subtrie iterators
|
||||
type SubtrieIteratorFactory struct {
|
||||
tree state.Trie
|
||||
startPaths, endPaths [][]byte
|
||||
@ -152,7 +152,7 @@ func (fac *SubtrieIteratorFactory) IteratorAt(bin uint) *PrefixBoundIterator {
|
||||
return NewPrefixBoundIterator(it, fac.startPaths[bin], fac.endPaths[bin])
|
||||
}
|
||||
|
||||
// Cut a trie by path prefix, returning `nbins` iterators covering its subtries
|
||||
// NewSubtrieIteratorFactory cuts a trie by path prefix, returning `nbins` iterators covering its subtries
|
||||
func NewSubtrieIteratorFactory(tree state.Trie, nbins uint) SubtrieIteratorFactory {
|
||||
starts := make([][]byte, 0, nbins)
|
||||
ends := make([][]byte, 0, nbins)
|
||||
|
||||
@ -10,8 +10,8 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
iter "github.com/ethereum/go-ethereum/trie/concurrent_iterator"
|
||||
)
|
||||
@ -50,14 +50,14 @@ func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) {
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
sig := <-sigChan
|
||||
log.Errorf("Signal received (%v), stopping", sig)
|
||||
log.Error("Signal received (%v), stopping", "signal", sig)
|
||||
// cancel context on receiving a signal
|
||||
// on ctx cancellation, all the iterators complete processing of their current node before stopping
|
||||
cancelCtx()
|
||||
}()
|
||||
}
|
||||
|
||||
// Wraps an iterator in a Iterator. This should not be called once halts are possible.
|
||||
// Tracked wraps an iterator in a Iterator. This should not be called once halts are possible.
|
||||
func (tr *Tracker) Tracked(it trie.NodeIterator, recoveredPath []byte) (ret *Iterator) {
|
||||
// create seeked path of max capacity (65)
|
||||
iterSeekedPath := make([]byte, 0, 65)
|
||||
@ -79,14 +79,14 @@ func (tr *Tracker) Tracked(it trie.NodeIterator, recoveredPath []byte) (ret *Ite
|
||||
return
|
||||
}
|
||||
|
||||
// explicitly stops an iterator
|
||||
// StopIterator explicitly stops an iterator
|
||||
func (tr *Tracker) StopIterator(it *Iterator) {
|
||||
tr.stopChan <- it
|
||||
}
|
||||
|
||||
// dumps iterator path and bounds to a text file so it can be restored later
|
||||
func (tr *Tracker) dump() error {
|
||||
log.Debugf("Dumping recovery state to: %s", tr.recoveryFile)
|
||||
log.Debug("Dumping recovery state to", "recovery file", tr.recoveryFile)
|
||||
var rows [][]string
|
||||
for it := range tr.started {
|
||||
var startPath []byte
|
||||
@ -120,7 +120,7 @@ func (tr *Tracker) dump() error {
|
||||
return out.WriteAll(rows)
|
||||
}
|
||||
|
||||
// attempts to read iterator state from file
|
||||
// Restore attempts to read iterator state from file
|
||||
// if file doesn't exist, returns an empty slice with no error
|
||||
func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) {
|
||||
file, err := os.Open(tr.recoveryFile)
|
||||
@ -130,7 +130,7 @@ func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) {
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
log.Debugf("Restoring recovery state from: %s", tr.recoveryFile)
|
||||
log.Debug("Restoring recovery state from", "recovery file", tr.recoveryFile)
|
||||
|
||||
defer file.Close()
|
||||
in := csv.NewReader(file)
|
||||
@ -173,7 +173,7 @@ func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) {
|
||||
ret = append(ret, tr.Tracked(it, recoveredPath))
|
||||
}
|
||||
|
||||
log.Debugf("Restored %d iterators", len(ret))
|
||||
log.Debug("Restored iterators", "count", len(ret))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@ -213,7 +213,7 @@ func (it *Iterator) Next(descend bool) bool {
|
||||
if it.tracker.running {
|
||||
it.tracker.stopChan <- it
|
||||
} else {
|
||||
log.Errorf("iterator stopped after tracker halted: path=%x", it.Path())
|
||||
log.Error("iterator stopped after tracker halted", "path", it.Path())
|
||||
}
|
||||
}
|
||||
return ret
|
||||
|
||||
@ -24,7 +24,7 @@ func CompareNodes(a, b trie.NodeIterator) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// hexToKeyBytes turns hex nibbles into key bytes.
|
||||
// HexToKeyBytes turns hex nibbles into key bytes.
|
||||
// This can only be used for keys of even length.
|
||||
func HexToKeyBytes(hex []byte) []byte {
|
||||
if hasTerm(hex) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user