Find Gaps and at them to known_gaps

This contains all the logic needed to add a gap to the `known_gaps` table.

We can now add this code to various parts of the application to calculate if a gap has occured.
This commit is contained in:
Abdul Rabbani 2022-03-23 14:03:35 -04:00
parent ce66532fcf
commit 1a3a63d00e
6 changed files with 145 additions and 45 deletions

View File

@ -497,10 +497,6 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close() return sdi.dump.Close()
} }
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
return nil return nil
} }
func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
return "", nil
}

View File

@ -479,10 +479,6 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close() return sdi.fileWriter.Close()
} }
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
return nil return nil
} }
func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
return "", nil
}

View File

@ -555,7 +555,7 @@ func (sdi *StateDiffIndexer) Close() error {
} }
// Update the known gaps table with the gap information. // Update the known gaps table with the gap information.
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { func (sdi *StateDiffIndexer) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
knownGap := models.KnownGapsModel{ knownGap := models.KnownGapsModel{
StartingBlockNumber: startingBlockNumber.String(), StartingBlockNumber: startingBlockNumber.String(),
EndingBlockNumber: endingBlockNumber.String(), EndingBlockNumber: endingBlockNumber.String(),
@ -570,13 +570,64 @@ func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingB
// This is a simple wrapper function which will run QueryRow on the DB // This is a simple wrapper function which will run QueryRow on the DB
func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) { func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
var name string var ret string
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&name) err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
// err := sdi.dbWriter.db.QueryRow(context.Background(), "SELECT ename FROM emp ORDER BY sal DESC LIMIT 1;").Scan(&name)
if err != nil { if err != nil {
log.Error("Can't properly query the DB for query: ", queryString)
return "", err return "", err
} }
fmt.Println(name) return ret, nil
}
return name, nil
// This function is a simple wrapper which will call QueryDb but the return value will be
// a big int instead of a string
func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, error) {
ret := new(big.Int)
res, err := sdi.QueryDb(queryString)
if err != nil {
return ret, err
}
ret, ok := ret.SetString(res, 10)
if !ok {
log.Error("Can't turn the res ", res, "into a bigInt")
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
}
return ret, nil
}
// Users provide the latestBlockInDb and the latestBlockOnChain
// as well as the expected difference. This function does some simple math.
// The expected difference for the time being is going to be 1, but as we run
// More geth nodes, the expected difference might fluctuate.
func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool {
latestBlock := big.NewInt(0)
if latestBlock.Add(latestBlockOnChain, expectedDifference) != latestBlockInDb {
return true
}
return false
}
// This function will check for Gaps and update the DB if gaps are found.
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
// It might be a useful parameter to update depending on the geth node.
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
if err != nil {
return err
}
gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference)
if gapExists {
startBlock := big.NewInt(0)
endBlock := big.NewInt(0)
startBlock.Add(latestBlockInDb, expectedDifference)
endBlock.Sub(latestBlockOnChain, expectedDifference)
log.Warn("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)
sdi.pushKnownGaps(startBlock, endBlock, false, processingKey)
}
return nil
} }

View File

@ -2,7 +2,9 @@ package sql_test
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"math/big"
"os" "os"
"testing" "testing"
@ -11,8 +13,10 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks" "github.com/ethereum/go-ethereum/statediff/indexer/mocks"
@ -22,6 +26,7 @@ var (
db sql.Database db sql.Database
err error err error
ind interfaces.StateDiffIndexer ind interfaces.StateDiffIndexer
chainConf = params.MainnetChainConfig
ipfsPgGet = `SELECT data FROM public.blocks ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1` WHERE key = $1`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
@ -148,8 +153,60 @@ func checkTxClosure(t *testing.T, idle, inUse, open int64) {
require.Equal(t, open, db.Stats().Open()) require.Equal(t, open, db.Stats().Open())
} }
func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
db, err = postgres.SetupSQLXDB()
if err != nil {
t.Fatal(err)
}
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
return stateDiff, err
}
func tearDown(t *testing.T) { func tearDown(t *testing.T) {
sql.TearDownDB(t, db) sql.TearDownDB(t, db)
err := ind.Close() err := ind.Close()
require.NoError(t, err) require.NoError(t, err)
} }
func TestKnownGapsUpsert(t *testing.T) {
testKnownGapsUpsert(t)
}
func testKnownGapsUpsert(t *testing.T) {
gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain
expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be
stateDiff, err := setupDb(t)
if err != nil {
t.Fatal(err)
}
// Get the latest block from the DB
latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids")
if err != nil {
t.Fatal("Can't find a block in the eth.header_cids table.. Please put one there")
}
// Add the gapDifference for testing purposes
latestBlockOnChain := big.NewInt(0)
latestBlockOnChain.Add(latestBlockInDb, gapDifference)
t.Log("The latest block on the chain is: ", latestBlockOnChain)
t.Log("The latest block on the DB is: ", latestBlockInDb)
gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0)
require.NoError(t, gapUpsertErr)
// Calculate what the start and end block should be in known_gaps
// And check to make sure it is properly inserted
startBlock := big.NewInt(0)
endBlock := big.NewInt(0)
startBlock.Add(latestBlockInDb, expectedDifference)
endBlock.Sub(latestBlockOnChain, expectedDifference)
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock)
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock)
require.NoError(t, queryErr)
}

View File

@ -117,29 +117,30 @@ func tearDown(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestKnownGapsUpsert(t *testing.T) { //func TestKnownGapsUpsert(t *testing.T) {
var startBlockNumber int64 = 111 // var startBlockNumber int64 = 111
var endBlockNumber int64 = 121 // var endBlockNumber int64 = 121
ind, err := setupDb(t) // ind, err := setupDb(t)
if err != nil { // if err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
require.NoError(t, err) // require.NoError(t, err)
//
testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind) // testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind)
//str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string. // //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string.
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber) // queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber)
_, queryErr := ind.QueryDb(queryString) // Figure out the string. // _, queryErr := ind.QueryDb(queryString) // Figure out the string.
require.NoError(t, queryErr) // require.NoError(t, queryErr)
//
} //}
func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) { //func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) {
startBlock := big.NewInt(startBlockNumber) // startBlock := big.NewInt(startBlockNumber)
endBlock := big.NewInt(endBlockNumber) // endBlock := big.NewInt(endBlockNumber)
//
processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1) // processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1)
if processGapError != nil { // if processGapError != nil {
t.Fatal(processGapError) // t.Fatal(processGapError)
} // }
require.NoError(t, processGapError) // require.NoError(t, processGapError)
} //}
//

View File

@ -32,8 +32,7 @@ type StateDiffIndexer interface {
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool) ReportDBMetrics(delay time.Duration, quit <-chan bool)
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
QueryDb(queryString string) (string, error)
io.Closer io.Closer
} }