From 1a3a63d00ecadb3db0b88a19ab1faa49c81725e1 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Wed, 23 Mar 2022 14:03:35 -0400 Subject: [PATCH] Find Gaps and at them to `known_gaps` This contains all the logic needed to add a gap to the `known_gaps` table. We can now add this code to various parts of the application to calculate if a gap has occured. --- statediff/indexer/database/dump/indexer.go | 6 +- statediff/indexer/database/file/indexer.go | 6 +- statediff/indexer/database/sql/indexer.go | 65 +++++++++++++++++-- .../database/sql/indexer_shared_test.go | 57 ++++++++++++++++ .../sql/mainnet_tests/indexer_test.go | 53 +++++++-------- statediff/indexer/interfaces/interfaces.go | 3 +- 6 files changed, 145 insertions(+), 45 deletions(-) diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 15a541b9c..29c51efd9 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -497,10 +497,6 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() } -func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { return nil } - -func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) { - return "", nil -} diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 949aac6d1..8df695f81 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -479,10 +479,6 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() } -func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { return nil } - -func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) { - return "", nil -} diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 275f8f303..dc07bf843 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -555,7 +555,7 @@ func (sdi *StateDiffIndexer) Close() error { } // Update the known gaps table with the gap information. -func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { +func (sdi *StateDiffIndexer) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error { knownGap := models.KnownGapsModel{ StartingBlockNumber: startingBlockNumber.String(), EndingBlockNumber: endingBlockNumber.String(), @@ -570,13 +570,64 @@ func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingB // This is a simple wrapper function which will run QueryRow on the DB func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) { - var name string - err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&name) - // err := sdi.dbWriter.db.QueryRow(context.Background(), "SELECT ename FROM emp ORDER BY sal DESC LIMIT 1;").Scan(&name) + var ret string + err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret) if err != nil { + log.Error("Can't properly query the DB for query: ", queryString) return "", err } - fmt.Println(name) - - return name, nil + return ret, nil +} + +// This function is a simple wrapper which will call QueryDb but the return value will be +// a big int instead of a string +func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, error) { + ret := new(big.Int) + res, err := sdi.QueryDb(queryString) + if err != nil { + return ret, err + } + ret, ok := ret.SetString(res, 10) + if !ok { + log.Error("Can't turn the res ", res, "into a bigInt") + return ret, fmt.Errorf("Can't turn %s into a bigInt", res) + } + return ret, nil +} + +// Users provide the latestBlockInDb and the latestBlockOnChain +// as well as the expected difference. This function does some simple math. +// The expected difference for the time being is going to be 1, but as we run +// More geth nodes, the expected difference might fluctuate. +func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDifference *big.Int) bool { + latestBlock := big.NewInt(0) + if latestBlock.Add(latestBlockOnChain, expectedDifference) != latestBlockInDb { + return true + } + return false + +} + +// This function will check for Gaps and update the DB if gaps are found. +// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling +// It might be a useful parameter to update depending on the geth node. +func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error { + dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids" + latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString) + if err != nil { + return err + } + + gapExists := isGap(latestBlockInDb, latestBlockOnChain, expectedDifference) + if gapExists { + startBlock := big.NewInt(0) + endBlock := big.NewInt(0) + startBlock.Add(latestBlockInDb, expectedDifference) + endBlock.Sub(latestBlockOnChain, expectedDifference) + + log.Warn("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock) + sdi.pushKnownGaps(startBlock, endBlock, false, processingKey) + } + + return nil } diff --git a/statediff/indexer/database/sql/indexer_shared_test.go b/statediff/indexer/database/sql/indexer_shared_test.go index 0351fb134..3af3adff9 100644 --- a/statediff/indexer/database/sql/indexer_shared_test.go +++ b/statediff/indexer/database/sql/indexer_shared_test.go @@ -2,7 +2,9 @@ package sql_test import ( "bytes" + "context" "fmt" + "math/big" "os" "testing" @@ -11,8 +13,10 @@ import ( "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/mocks" @@ -22,6 +26,7 @@ var ( db sql.Database err error ind interfaces.StateDiffIndexer + chainConf = params.MainnetChainConfig ipfsPgGet = `SELECT data FROM public.blocks WHERE key = $1` tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte @@ -148,8 +153,60 @@ func checkTxClosure(t *testing.T, idle, inUse, open int64) { require.Equal(t, open, db.Stats().Open()) } +func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) { + db, err = postgres.SetupSQLXDB() + if err != nil { + t.Fatal(err) + } + stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db) + return stateDiff, err +} + func tearDown(t *testing.T) { sql.TearDownDB(t, db) err := ind.Close() require.NoError(t, err) } +func TestKnownGapsUpsert(t *testing.T) { + testKnownGapsUpsert(t) + +} +func testKnownGapsUpsert(t *testing.T) { + gapDifference := big.NewInt(10) // Set a difference between latestBlock in DB and on Chain + expectedDifference := big.NewInt(1) // Set what the expected difference between latestBlock in DB and on Chain should be + + stateDiff, err := setupDb(t) + if err != nil { + t.Fatal(err) + } + + // Get the latest block from the DB + latestBlockInDb, err := stateDiff.QueryDbToBigInt("SELECT MAX(block_number) FROM eth.header_cids") + if err != nil { + t.Fatal("Can't find a block in the eth.header_cids table.. Please put one there") + } + + // Add the gapDifference for testing purposes + latestBlockOnChain := big.NewInt(0) + latestBlockOnChain.Add(latestBlockInDb, gapDifference) + + t.Log("The latest block on the chain is: ", latestBlockOnChain) + t.Log("The latest block on the DB is: ", latestBlockInDb) + + gapUpsertErr := stateDiff.FindAndUpdateGaps(latestBlockOnChain, expectedDifference, 0) + require.NoError(t, gapUpsertErr) + + // Calculate what the start and end block should be in known_gaps + // And check to make sure it is properly inserted + startBlock := big.NewInt(0) + endBlock := big.NewInt(0) + startBlock.Add(latestBlockInDb, expectedDifference) + endBlock.Sub(latestBlockOnChain, expectedDifference) + + queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlock, endBlock) + + _, queryErr := stateDiff.QueryDb(queryString) // Figure out the string. + t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startBlock, endBlock) + require.NoError(t, queryErr) + +} diff --git a/statediff/indexer/database/sql/mainnet_tests/indexer_test.go b/statediff/indexer/database/sql/mainnet_tests/indexer_test.go index b75ea1b7b..bb7001d25 100644 --- a/statediff/indexer/database/sql/mainnet_tests/indexer_test.go +++ b/statediff/indexer/database/sql/mainnet_tests/indexer_test.go @@ -117,29 +117,30 @@ func tearDown(t *testing.T) { require.NoError(t, err) } -func TestKnownGapsUpsert(t *testing.T) { - var startBlockNumber int64 = 111 - var endBlockNumber int64 = 121 - ind, err := setupDb(t) - if err != nil { - t.Fatal(err) - } - require.NoError(t, err) - - testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind) - //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string. - queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber) - _, queryErr := ind.QueryDb(queryString) // Figure out the string. - require.NoError(t, queryErr) - -} -func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) { - startBlock := big.NewInt(startBlockNumber) - endBlock := big.NewInt(endBlockNumber) - - processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1) - if processGapError != nil { - t.Fatal(processGapError) - } - require.NoError(t, processGapError) -} +//func TestKnownGapsUpsert(t *testing.T) { +// var startBlockNumber int64 = 111 +// var endBlockNumber int64 = 121 +// ind, err := setupDb(t) +// if err != nil { +// t.Fatal(err) +// } +// require.NoError(t, err) +// +// testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind) +// //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string. +// queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber) +// _, queryErr := ind.QueryDb(queryString) // Figure out the string. +// require.NoError(t, queryErr) +// +//} +//func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) { +// startBlock := big.NewInt(startBlockNumber) +// endBlock := big.NewInt(endBlockNumber) +// +// processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1) +// if processGapError != nil { +// t.Fatal(processGapError) +// } +// require.NoError(t, processGapError) +//} +// diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 62123418c..7d58c510c 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -32,8 +32,7 @@ type StateDiffIndexer interface { PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error ReportDBMetrics(delay time.Duration, quit <-chan bool) - PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error - QueryDb(queryString string) (string, error) + FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error io.Closer }