274 lines
10 KiB
Go
274 lines
10 KiB
Go
// Copyright 2019 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package statediff
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/big"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
)
|
|
|
|
var (
|
|
knownGapsInsert = "INSERT INTO eth_meta.known_gaps (starting_block_number, ending_block_number, checked_out, processing_key) " +
|
|
"VALUES ('%s', '%s', %t, %d) " +
|
|
"ON CONFLICT (starting_block_number) DO UPDATE SET (ending_block_number, processing_key) = ('%s', %d) " +
|
|
"WHERE eth_meta.known_gaps.ending_block_number <= '%s';\n"
|
|
dbQueryString = "SELECT MAX(block_number) FROM eth.header_cids"
|
|
defaultWriteFilePath = "./known_gaps.sql"
|
|
)
|
|
|
|
type KnownGapsState struct {
|
|
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
|
checkForGaps bool
|
|
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
|
processingKey int64
|
|
// This number indicates the expected difference between blocks.
|
|
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
|
// Tandom with the processingKey to differentiate block processing logic.
|
|
expectedDifference *big.Int
|
|
// Indicates if Geth is in an error state
|
|
// This is used to indicate the right time to upserts
|
|
errorState bool
|
|
// This array keeps track of errorBlocks as they occur.
|
|
// When the errorState is false again, we can process these blocks.
|
|
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
|
knownErrorBlocks []*big.Int
|
|
// The filepath to write SQL statements if we can't connect to the DB.
|
|
writeFilePath string
|
|
// DB object to use for reading and writing to the DB
|
|
db sql.Database
|
|
//Do we have entries in the local sql file that need to be written to the DB
|
|
sqlFileWaitingForWrite bool
|
|
// Metrics object used to track metrics.
|
|
statediffMetrics statediffMetricsHandles
|
|
}
|
|
|
|
// Create a new KnownGapsState struct, currently unused.
|
|
func NewKnownGapsState(checkForGaps bool, processingKey int64, expectedDifference *big.Int, errorState bool, writeFilePath string, db sql.Database, statediffMetrics statediffMetricsHandles) *KnownGapsState {
|
|
return &KnownGapsState{
|
|
checkForGaps: checkForGaps,
|
|
processingKey: processingKey,
|
|
expectedDifference: expectedDifference,
|
|
errorState: errorState,
|
|
writeFilePath: writeFilePath,
|
|
db: db,
|
|
statediffMetrics: statediffMetrics,
|
|
}
|
|
}
|
|
|
|
func minMax(array []*big.Int) (*big.Int, *big.Int) {
|
|
var max *big.Int = array[0]
|
|
var min *big.Int = array[0]
|
|
for _, value := range array {
|
|
if max.Cmp(value) == -1 {
|
|
max = value
|
|
}
|
|
if min.Cmp(value) == 1 {
|
|
min = value
|
|
}
|
|
}
|
|
return min, max
|
|
}
|
|
|
|
// This function actually performs the write of the known gaps. It will try to do the following, it only goes to the next step if a failure occurs.
|
|
// 1. Write to the DB directly.
|
|
// 2. Write to sql file locally.
|
|
// 3. Write to prometheus directly.
|
|
// 4. Logs and error.
|
|
func (kg *KnownGapsState) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
|
if startingBlockNumber.Cmp(endingBlockNumber) == 1 {
|
|
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
|
}
|
|
knownGap := models.KnownGapsModel{
|
|
StartingBlockNumber: startingBlockNumber.String(),
|
|
EndingBlockNumber: endingBlockNumber.String(),
|
|
CheckedOut: checkedOut,
|
|
ProcessingKey: processingKey,
|
|
}
|
|
|
|
log.Info("Updating Metrics for the start and end block")
|
|
kg.statediffMetrics.knownGapStart.Update(startingBlockNumber.Int64())
|
|
kg.statediffMetrics.knownGapEnd.Update(endingBlockNumber.Int64())
|
|
|
|
var writeErr error
|
|
log.Info("Writing known gaps to the DB")
|
|
if kg.db != nil {
|
|
dbErr := kg.upsertKnownGaps(knownGap)
|
|
if dbErr != nil {
|
|
log.Warn("Error writing knownGaps to DB, writing them to file instead")
|
|
writeErr = kg.upsertKnownGapsFile(knownGap)
|
|
}
|
|
} else {
|
|
writeErr = kg.upsertKnownGapsFile(knownGap)
|
|
}
|
|
if writeErr != nil {
|
|
log.Error("Unsuccessful when writing to a file", "Error", writeErr)
|
|
log.Error("Updating Metrics for the start and end error block")
|
|
log.Error("Unable to write the following Gaps to DB or File", "startBlock", startingBlockNumber, "endBlock", endingBlockNumber)
|
|
kg.statediffMetrics.knownGapErrorStart.Update(startingBlockNumber.Int64())
|
|
kg.statediffMetrics.knownGapErrorEnd.Update(endingBlockNumber.Int64())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// This is a simple wrapper function to write gaps from a knownErrorBlocks array.
|
|
func (kg *KnownGapsState) captureErrorBlocks(knownErrorBlocks []*big.Int) {
|
|
startErrorBlock, endErrorBlock := minMax(knownErrorBlocks)
|
|
|
|
log.Warn("The following Gaps were found", "knownErrorBlocks", knownErrorBlocks)
|
|
log.Warn("Updating known Gaps table", "startErrorBlock", startErrorBlock, "endErrorBlock", endErrorBlock, "processingKey", kg.processingKey)
|
|
kg.pushKnownGaps(startErrorBlock, endErrorBlock, false, kg.processingKey)
|
|
}
|
|
|
|
// Users provide the latestBlockInDb and the latestBlockOnChain
|
|
// as well as the expected difference. This function does some simple math.
|
|
// The expected difference for the time being is going to be 1, but as we run
|
|
// More geth nodes, the expected difference might fluctuate.
|
|
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
|
|
latestBlock := big.NewInt(0)
|
|
if latestBlock.Sub(latestBlockOnChain, expectedDifference).Cmp(latestBlockInDb) != 0 {
|
|
log.Warn("We found a gap", "latestBlockInDb", latestBlockInDb, "latestBlockOnChain", latestBlockOnChain, "expectedDifference", expectedDifference)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// This function will check for Gaps and update the DB if gaps are found.
|
|
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
|
// It might be a useful parameter to update depending on the geth node.
|
|
// TODO:
|
|
// REmove the return value
|
|
// Write to file if err in writing to DB
|
|
func (kg *KnownGapsState) findAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
|
// Make this global
|
|
latestBlockInDb, err := kg.queryDbToBigInt(dbQueryString)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
|
|
if gapExists {
|
|
startBlock := big.NewInt(0)
|
|
endBlock := big.NewInt(0)
|
|
startBlock.Add(latestBlockInDb, expectedDifference)
|
|
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
|
|
|
log.Warn("Found Gaps starting at", "startBlock", startBlock, "endingBlock", endBlock)
|
|
err := kg.pushKnownGaps(startBlock, endBlock, false, processingKey)
|
|
if err != nil {
|
|
log.Error("We were unable to write the following gap to the DB", "start Block", startBlock, "endBlock", endBlock, "error", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Upserts known gaps to the DB.
|
|
func (kg *KnownGapsState) upsertKnownGaps(knownGaps models.KnownGapsModel) error {
|
|
_, err := kg.db.Exec(context.Background(), kg.db.InsertKnownGapsStm(),
|
|
knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey)
|
|
if err != nil {
|
|
return fmt.Errorf("error upserting known_gaps entry: %v", err)
|
|
}
|
|
log.Info("Successfully Wrote gaps to the DB", "startBlock", knownGaps.StartingBlockNumber, "endBlock", knownGaps.EndingBlockNumber)
|
|
return nil
|
|
}
|
|
|
|
// Write upsert statement into a local file.
|
|
func (kg *KnownGapsState) upsertKnownGapsFile(knownGaps models.KnownGapsModel) error {
|
|
insertStmt := []byte(fmt.Sprintf(knownGapsInsert, knownGaps.StartingBlockNumber, knownGaps.EndingBlockNumber, knownGaps.CheckedOut, knownGaps.ProcessingKey,
|
|
knownGaps.EndingBlockNumber, knownGaps.ProcessingKey, knownGaps.EndingBlockNumber))
|
|
log.Info("Trying to write file")
|
|
if kg.writeFilePath == "" {
|
|
kg.writeFilePath = defaultWriteFilePath
|
|
}
|
|
f, err := os.OpenFile(kg.writeFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
log.Info("Unable to open a file for writing")
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
if _, err = f.Write(insertStmt); err != nil {
|
|
log.Info("Unable to open write insert statement to file")
|
|
return err
|
|
}
|
|
log.Info("Wrote the gaps to a local SQL file")
|
|
kg.sqlFileWaitingForWrite = true
|
|
return nil
|
|
}
|
|
|
|
func (kg *KnownGapsState) writeSqlFileStmtToDb() error {
|
|
log.Info("Writing the local SQL file for KnownGaps to the DB")
|
|
file, err := ioutil.ReadFile(kg.writeFilePath)
|
|
|
|
if err != nil {
|
|
log.Error("Unable to open local SQL File for writing")
|
|
return err
|
|
}
|
|
|
|
requests := strings.Split(string(file), ";")
|
|
|
|
for _, request := range requests {
|
|
_, err := kg.db.Exec(context.Background(), request)
|
|
if err != nil {
|
|
log.Error("Unable to run insert statement from file to the DB")
|
|
return err
|
|
}
|
|
}
|
|
if err := os.Truncate(kg.writeFilePath, 0); err != nil {
|
|
log.Info("Failed to empty knownGaps file after inserting statements to the DB", "error", err)
|
|
}
|
|
kg.sqlFileWaitingForWrite = false
|
|
return nil
|
|
}
|
|
|
|
// This is a simple wrapper function which will run QueryRow on the DB
|
|
func (kg *KnownGapsState) queryDb(queryString string) (string, error) {
|
|
var ret string
|
|
err := kg.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
|
if err != nil {
|
|
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
|
return "", err
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// This function is a simple wrapper which will call QueryDb but the return value will be
|
|
// a big int instead of a string
|
|
func (kg *KnownGapsState) queryDbToBigInt(queryString string) (*big.Int, error) {
|
|
ret := new(big.Int)
|
|
res, err := kg.queryDb(queryString)
|
|
if err != nil {
|
|
return ret, err
|
|
}
|
|
ret, ok := ret.SetString(res, 10)
|
|
if !ok {
|
|
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
|
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
|
}
|
|
return ret, nil
|
|
}
|