Handle All Gaps within Geth
Including an updated doc which keeps track of events in this PR.
This commit is contained in:
parent
1a3a63d00e
commit
f6ff20eb0e
18
statediff/docs/KnownGaps.md
Normal file
18
statediff/docs/KnownGaps.md
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
# Overview
|
||||||
|
|
||||||
|
This document will provide some insight into the `known_gaps` table, their use cases, and implementation. Please refer to the [following PR](https://github.com/vulcanize/go-ethereum/pull/217) and the [following epic](https://github.com/vulcanize/ops/issues/143) to grasp their inception.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
# Use Cases
|
||||||
|
|
||||||
|
The known gaps table is updated when the following events occur:
|
||||||
|
|
||||||
|
1. At start up we check the latest block from the `eth.headers_cid` table. We compare the first block that we are processing with the latest block from the DB. If they are not one unit of expectedDifference away from each other, add the gap between the two blocks.
|
||||||
|
2. If there is any error in processing a block (db connection, deadlock, etc), add that block to the knownErrorBlocks slice, when the next block is successfully written, write this slice into the DB.
|
||||||
|
3. If the last processed block is not one unit of expectedDifference away from the current block being processed. This can be due to any unknown or unhandled errors in geth.
|
||||||
|
|
||||||
|
# Glossary
|
||||||
|
|
||||||
|
1. `expectedDifference (number)` - This number indicates what the difference between two blocks should be. If we are capturing all events on a geth node then this number would be `1`. But once we scale nodes, the `expectedDifference` might be `2` or greater.
|
||||||
|
2. `processingKey (number)` - This number can be used to keep track of different geth nodes and their specific `expectedDifference`.
|
3
statediff/docs/README.md
Normal file
3
statediff/docs/README.md
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# Overview
|
||||||
|
|
||||||
|
This folder keeps tracks of random documents as they relate to the `statediff` service.
|
BIN
statediff/docs/diagrams/KnownGapsProcess.png
Normal file
BIN
statediff/docs/diagrams/KnownGapsProcess.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 52 KiB |
@ -500,3 +500,6 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -482,3 +482,7 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -555,7 +555,10 @@ func (sdi *StateDiffIndexer) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update the known gaps table with the gap information.
|
// Update the known gaps table with the gap information.
|
||||||
func (sdi *StateDiffIndexer) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
|
||||||
|
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
|
||||||
|
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
|
||||||
|
}
|
||||||
knownGap := models.KnownGapsModel{
|
knownGap := models.KnownGapsModel{
|
||||||
StartingBlockNumber: startingBlockNumber.String(),
|
StartingBlockNumber: startingBlockNumber.String(),
|
||||||
EndingBlockNumber: endingBlockNumber.String(),
|
EndingBlockNumber: endingBlockNumber.String(),
|
||||||
@ -573,7 +576,7 @@ func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
|
|||||||
var ret string
|
var ret string
|
||||||
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Can't properly query the DB for query: ", queryString)
|
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
@ -589,7 +592,7 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro
|
|||||||
}
|
}
|
||||||
ret, ok := ret.SetString(res, 10)
|
ret, ok := ret.SetString(res, 10)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Error("Can't turn the res ", res, "into a bigInt")
|
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
|
||||||
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
|
||||||
}
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
@ -611,6 +614,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
|
|||||||
// This function will check for Gaps and update the DB if gaps are found.
|
// This function will check for Gaps and update the DB if gaps are found.
|
||||||
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
|
||||||
// It might be a useful parameter to update depending on the geth node.
|
// It might be a useful parameter to update depending on the geth node.
|
||||||
|
// TODO:
|
||||||
|
// REmove the return value
|
||||||
|
// Write to file if err in writing to DB
|
||||||
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
|
||||||
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
|
||||||
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
|
||||||
@ -625,8 +631,13 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
|
|||||||
startBlock.Add(latestBlockInDb, expectedDifference)
|
startBlock.Add(latestBlockInDb, expectedDifference)
|
||||||
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
endBlock.Sub(latestBlockOnChain, expectedDifference)
|
||||||
|
|
||||||
log.Warn("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)
|
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
|
||||||
sdi.pushKnownGaps(startBlock, endBlock, false, processingKey)
|
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey)
|
||||||
|
if err != nil {
|
||||||
|
// Write to file SQL file instead!!!
|
||||||
|
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -116,31 +116,3 @@ func tearDown(t *testing.T) {
|
|||||||
err = ind.Close()
|
err = ind.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//func TestKnownGapsUpsert(t *testing.T) {
|
|
||||||
// var startBlockNumber int64 = 111
|
|
||||||
// var endBlockNumber int64 = 121
|
|
||||||
// ind, err := setupDb(t)
|
|
||||||
// if err != nil {
|
|
||||||
// t.Fatal(err)
|
|
||||||
// }
|
|
||||||
// require.NoError(t, err)
|
|
||||||
//
|
|
||||||
// testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind)
|
|
||||||
// //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string.
|
|
||||||
// queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber)
|
|
||||||
// _, queryErr := ind.QueryDb(queryString) // Figure out the string.
|
|
||||||
// require.NoError(t, queryErr)
|
|
||||||
//
|
|
||||||
//}
|
|
||||||
//func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) {
|
|
||||||
// startBlock := big.NewInt(startBlockNumber)
|
|
||||||
// endBlock := big.NewInt(endBlockNumber)
|
|
||||||
//
|
|
||||||
// processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1)
|
|
||||||
// if processGapError != nil {
|
|
||||||
// t.Fatal(processGapError)
|
|
||||||
// }
|
|
||||||
// require.NoError(t, processGapError)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
|
@ -33,6 +33,7 @@ type StateDiffIndexer interface {
|
|||||||
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||||
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||||
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
|
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
|
||||||
|
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
|
||||||
io.Closer
|
io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ package statediff
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -123,6 +124,8 @@ type Service struct {
|
|||||||
BackendAPI ethapi.Backend
|
BackendAPI ethapi.Backend
|
||||||
// Should the statediff service wait for geth to sync to head?
|
// Should the statediff service wait for geth to sync to head?
|
||||||
WaitForSync bool
|
WaitForSync bool
|
||||||
|
// Used to signal if we should check for KnownGaps
|
||||||
|
KnownGaps KnownGaps
|
||||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||||
subscribers int32
|
subscribers int32
|
||||||
// Interface for publishing statediffs as PG-IPLD objects
|
// Interface for publishing statediffs as PG-IPLD objects
|
||||||
@ -135,6 +138,91 @@ type Service struct {
|
|||||||
maxRetry uint
|
maxRetry uint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type KnownGaps struct {
|
||||||
|
// Should we check for gaps by looking at the DB and comparing the latest block with head
|
||||||
|
checkForGaps bool
|
||||||
|
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
|
||||||
|
processingKey int64
|
||||||
|
// This number indicates the expected difference between blocks.
|
||||||
|
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
|
||||||
|
// Tandom with the processingKey to differentiate block processing logic.
|
||||||
|
expectedDifference *big.Int
|
||||||
|
// Indicates if Geth is in an error state
|
||||||
|
// This is used to indicate the right time to upserts
|
||||||
|
errorState bool
|
||||||
|
// This array keeps track of errorBlocks as they occur.
|
||||||
|
// When the errorState is false again, we can process these blocks.
|
||||||
|
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
|
||||||
|
knownErrorBlocks []*big.Int
|
||||||
|
// The last processed block keeps track of the last processed block.
|
||||||
|
// Its used to make sure we didn't skip over any block!
|
||||||
|
lastProcessedBlock *big.Int
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
|
||||||
|
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
|
||||||
|
// away from sds.KnownGaps.expectedDifference
|
||||||
|
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
|
||||||
|
func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
|
||||||
|
// last processed: 110
|
||||||
|
// current block: 125
|
||||||
|
if len(knownErrorBlocks) > 0 {
|
||||||
|
// 115
|
||||||
|
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
|
||||||
|
// 120
|
||||||
|
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
|
||||||
|
|
||||||
|
// 111
|
||||||
|
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 124
|
||||||
|
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
|
||||||
|
if (expectedStartErrorBlock == startErrorBlock) &&
|
||||||
|
(expectedEndErrorBlock == endErrorBlock) {
|
||||||
|
log.Info("All Gaps already captured in knownErrorBlocks")
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
|
||||||
|
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
|
||||||
|
log.Warn("But there are gaps that were also not added there.")
|
||||||
|
log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock))
|
||||||
|
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
|
||||||
|
log.Warn(fmt.Sprint("Current Block: ", currentBlock))
|
||||||
|
//120 + 1 == 121
|
||||||
|
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 121 to 124
|
||||||
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
|
||||||
|
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
|
||||||
|
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
|
||||||
|
log.Warn("But there are gaps that were also not added there.")
|
||||||
|
log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock))
|
||||||
|
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
|
||||||
|
// 115 - 1 == 114
|
||||||
|
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 111 to 114
|
||||||
|
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
|
||||||
|
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
|
||||||
|
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
|
||||||
|
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Warn("We missed blocks without any errors.")
|
||||||
|
// 110 + 1 == 111
|
||||||
|
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
// 125 - 1 == 124
|
||||||
|
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
|
||||||
|
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
|
||||||
|
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
|
||||||
|
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// BlockCache caches the last block for safe access from different service loops
|
// BlockCache caches the last block for safe access from different service loops
|
||||||
type BlockCache struct {
|
type BlockCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -174,6 +262,14 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
if workers == 0 {
|
if workers == 0 {
|
||||||
workers = 1
|
workers = 1
|
||||||
}
|
}
|
||||||
|
// If we ever have multiple processingKeys we can update them here
|
||||||
|
// along with the expectedDifference
|
||||||
|
knownGaps := &KnownGaps{
|
||||||
|
checkForGaps: true,
|
||||||
|
processingKey: 0,
|
||||||
|
expectedDifference: big.NewInt(1),
|
||||||
|
errorState: false,
|
||||||
|
}
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
@ -184,6 +280,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
BlockCache: NewBlockCache(workers),
|
BlockCache: NewBlockCache(workers),
|
||||||
BackendAPI: backend,
|
BackendAPI: backend,
|
||||||
WaitForSync: params.WaitForSync,
|
WaitForSync: params.WaitForSync,
|
||||||
|
KnownGaps: *knownGaps,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
@ -308,12 +405,40 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
sds.writeGenesisStateDiff(parentBlock, params.id)
|
sds.writeGenesisStateDiff(parentBlock, params.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If for any reason we need to check for gaps,
|
||||||
|
// Check and update the gaps table.
|
||||||
|
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
||||||
|
log.Info("Checking for Gaps at current block: ", currentBlock.Number())
|
||||||
|
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
|
||||||
|
sds.KnownGaps.checkForGaps = false
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
||||||
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
|
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||||
|
sds.KnownGaps.errorState = true
|
||||||
|
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
|
||||||
|
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table: ", currentBlock.Number())
|
||||||
|
// Write object to startdiff
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
sds.KnownGaps.errorState = false
|
||||||
|
// Understand what the last block that should have been processed is
|
||||||
|
previousExpectedBlock := big.NewInt(0).Sub(currentBlock.Number(), sds.KnownGaps.expectedDifference)
|
||||||
|
// If we last block which should have been processed is not
|
||||||
|
// the actual lastProcessedBlock, add it to known gaps table.
|
||||||
|
if previousExpectedBlock != sds.KnownGaps.lastProcessedBlock && sds.KnownGaps.lastProcessedBlock != nil {
|
||||||
|
// We must pass in parameters by VALUE not reference.
|
||||||
|
// If we pass them in my reference, the references can change before the computation is complete!
|
||||||
|
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
||||||
|
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
||||||
|
staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock)
|
||||||
|
go sds.capturedMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock)
|
||||||
|
sds.KnownGaps.knownErrorBlocks = nil
|
||||||
|
}
|
||||||
|
sds.KnownGaps.lastProcessedBlock = currentBlock.Number()
|
||||||
|
|
||||||
// TODO: how to handle with concurrent workers
|
// TODO: how to handle with concurrent workers
|
||||||
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||||
case <-sds.QuitChan:
|
case <-sds.QuitChan:
|
||||||
|
143
statediff/service_public_test.go
Normal file
143
statediff/service_public_test.go
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
package statediff
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
db sql.Database
|
||||||
|
err error
|
||||||
|
indexer interfaces.StateDiffIndexer
|
||||||
|
chainConf = params.MainnetChainConfig
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestKnownGaps(t *testing.T) {
|
||||||
|
type gapValues struct {
|
||||||
|
lastProcessedBlock int64
|
||||||
|
currentBlock int64
|
||||||
|
knownErrorBlocksStart int64
|
||||||
|
knownErrorBlocksEnd int64
|
||||||
|
expectedDif int64
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []gapValues{
|
||||||
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
|
{lastProcessedBlock: 110, knownErrorBlocksStart: 115, knownErrorBlocksEnd: 120, currentBlock: 125, expectedDif: 1},
|
||||||
|
// No knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 130, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 140, expectedDif: 1},
|
||||||
|
// No gaps before or after knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 150, knownErrorBlocksStart: 151, knownErrorBlocksEnd: 159, currentBlock: 160, expectedDif: 1},
|
||||||
|
// gaps before knownErrorBlocks but not after
|
||||||
|
{lastProcessedBlock: 170, knownErrorBlocksStart: 180, knownErrorBlocksEnd: 189, currentBlock: 190, expectedDif: 1},
|
||||||
|
// gaps after knownErrorBlocks but not before
|
||||||
|
{lastProcessedBlock: 200, knownErrorBlocksStart: 201, knownErrorBlocksEnd: 205, currentBlock: 210, expectedDif: 1},
|
||||||
|
/// Same tests as above with a new expected DIF
|
||||||
|
// Unprocessed gaps before and after knownErrorBlock
|
||||||
|
{lastProcessedBlock: 1100, knownErrorBlocksStart: 1150, knownErrorBlocksEnd: 1200, currentBlock: 1250, expectedDif: 2},
|
||||||
|
// No knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 1300, knownErrorBlocksStart: 0, knownErrorBlocksEnd: 0, currentBlock: 1400, expectedDif: 2},
|
||||||
|
// No gaps before or after knownErrorBlocks
|
||||||
|
{lastProcessedBlock: 1500, knownErrorBlocksStart: 1502, knownErrorBlocksEnd: 1598, currentBlock: 1600, expectedDif: 2},
|
||||||
|
// gaps before knownErrorBlocks but not after
|
||||||
|
{lastProcessedBlock: 1700, knownErrorBlocksStart: 1800, knownErrorBlocksEnd: 1898, currentBlock: 1900, expectedDif: 2},
|
||||||
|
// gaps after knownErrorBlocks but not before
|
||||||
|
{lastProcessedBlock: 2000, knownErrorBlocksStart: 2002, knownErrorBlocksEnd: 2050, currentBlock: 2100, expectedDif: 2},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
// Reuse processing key from expecteDiff
|
||||||
|
testCaptureMissedBlocks(t, tc.lastProcessedBlock, tc.currentBlock, tc.knownErrorBlocksStart, tc.knownErrorBlocksEnd, tc.expectedDif, tc.expectedDif)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// It also makes sure we properly calculate any missed gaps not in the known gaps lists
|
||||||
|
// either before or after the list.
|
||||||
|
func testCaptureMissedBlocks(t *testing.T, lastBlockProcessed int64, currentBlockNum int64, knownErrorBlocksStart int64, knownErrorBlocksEnd int64, expectedDif int64, processingKey int64) {
|
||||||
|
|
||||||
|
lastProcessedBlock := big.NewInt(lastBlockProcessed)
|
||||||
|
currentBlock := big.NewInt(currentBlockNum)
|
||||||
|
knownErrorBlocks := (make([]*big.Int, 0))
|
||||||
|
|
||||||
|
checkGaps := knownErrorBlocksStart != 0 && knownErrorBlocksEnd != 0
|
||||||
|
if checkGaps {
|
||||||
|
for i := knownErrorBlocksStart; i <= knownErrorBlocksEnd; i++ {
|
||||||
|
knownErrorBlocks = append(knownErrorBlocks, big.NewInt(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Comment out values which should not be used
|
||||||
|
knownGaps := KnownGaps{
|
||||||
|
processingKey: processingKey,
|
||||||
|
expectedDifference: big.NewInt(expectedDif),
|
||||||
|
}
|
||||||
|
stateDiff, err := setupDb(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
service := &Service{
|
||||||
|
KnownGaps: knownGaps,
|
||||||
|
indexer: stateDiff,
|
||||||
|
}
|
||||||
|
service.capturedMissedBlocks(currentBlock, knownErrorBlocks, lastProcessedBlock)
|
||||||
|
|
||||||
|
if checkGaps {
|
||||||
|
|
||||||
|
if lastBlockProcessed+expectedDif == knownErrorBlocksStart && knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
|
||||||
|
} else if lastBlockProcessed+expectedDif == knownErrorBlocksStart {
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
|
||||||
|
} else if knownErrorBlocksEnd+expectedDif == currentBlockNum {
|
||||||
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, knownErrorBlocksStart-expectedDif)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksStart, knownErrorBlocksEnd)
|
||||||
|
validateUpsert(t, stateDiff, knownErrorBlocksEnd+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
validateUpsert(t, stateDiff, lastBlockProcessed+expectedDif, currentBlockNum-expectedDif)
|
||||||
|
}
|
||||||
|
|
||||||
|
tearDown(t, stateDiff)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateUpsert(t *testing.T, stateDiff *sql.StateDiffIndexer, startingBlock int64, endingBlock int64) {
|
||||||
|
t.Logf("Starting to query blocks: %d - %d", startingBlock, endingBlock)
|
||||||
|
queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startingBlock, endingBlock)
|
||||||
|
|
||||||
|
_, queryErr := stateDiff.QueryDb(queryString) // Figure out the string.
|
||||||
|
t.Logf("Updated Known Gaps table starting from, %d, and ending at, %d", startingBlock, endingBlock)
|
||||||
|
require.NoError(t, queryErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a DB object to use
|
||||||
|
func setupDb(t *testing.T) (*sql.StateDiffIndexer, error) {
|
||||||
|
db, err = postgres.SetupSQLXDB()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
stateDiff, err := sql.NewStateDiffIndexer(context.Background(), chainConf, db)
|
||||||
|
return stateDiff, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Teardown the DB
|
||||||
|
func tearDown(t *testing.T, stateDiff *sql.StateDiffIndexer) {
|
||||||
|
t.Log("Starting tearDown")
|
||||||
|
sql.TearDownDB(t, db)
|
||||||
|
err := stateDiff.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user