292: Backfill gaps in the recent past on startup when tracking head. #395

Merged
telackey merged 11 commits from telackey/292 into v1.11.6-statediff-v5 2023-06-24 04:04:57 +00:00
10 changed files with 288 additions and 38 deletions
Showing only changes of commit 95942c335c - Show all commits

View File

@ -36,6 +36,10 @@ type Config struct {
ClientName string ClientName string
// Whether to enable writing state diffs directly to track blockchain head // Whether to enable writing state diffs directly to track blockchain head
EnableWriteLoop bool EnableWriteLoop bool
// The maximum number of blocks to backfill when tracking head.
BackfillMaxHeadGap uint64
// The maximum number of blocks behind the startup position to check for gaps.
BackfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
NumWorkers uint NumWorkers uint
// Should the statediff service wait until geth has synced to the head of the blockchain? // Should the statediff service wait until geth has synced to the head of the blockchain?

View File

@ -418,6 +418,18 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return false, nil return false, nil
} }
// CurrentBlock returns the HeaderModel of the highest existing block in the output.
// In the "dump" case, this is always nil.
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return nil, nil
}
// DetectGaps returns a list of gaps in the output found within the specified block range.
// In the "dump" case this is always nil.
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
return nil, nil
}
// Close satisfies io.Closer // Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error { func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close() return sdi.dump.Close()

View File

@ -461,6 +461,18 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
return nil return nil
} }
// CurrentBlock returns the HeaderModel of the highest existing block in the output.
// In the "dump" case, this is always nil.
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return nil, nil
}
// DetectGaps returns a list of gaps in the output found within the specified block range.
// In the "dump" case this is always nil.
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
return nil, nil
}
// HasBlock checks whether the indicated block already exists in the output. // HasBlock checks whether the indicated block already exists in the output.
// In the "file" case this is presumed to be false. // In the "file" case this is presumed to be false.
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {

View File

@ -219,11 +219,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err return blockTx, err
} }
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
func (sdi *StateDiffIndexer) CurrentBlock() (*models.HeaderModel, error) {
return sdi.dbWriter.maxHeader()
}
// HasBlock checks whether the indicated block already exists in the database. // HasBlock checks whether the indicated block already exists in the database.
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
return sdi.dbWriter.hasHeader(hash, number) return sdi.dbWriter.hasHeader(hash, number)
} }
// DetectGaps returns a list of gaps in the database found within the specified block range.
func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber)
}
// processHeader publishes and indexes a header IPLD in Postgres // processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {

View File

@ -45,6 +45,7 @@ type Driver interface {
// Statements interface to accommodate different SQL query syntax // Statements interface to accommodate different SQL query syntax
type Statements interface { type Statements interface {
MaxHeaderStm() string
ExistsHeaderStm() string ExistsHeaderStm() string
InsertHeaderStm() string InsertHeaderStm() string
InsertUncleStm() string InsertUncleStm() string

View File

@ -41,8 +41,12 @@ type DB struct {
sql.Driver sql.Driver
} }
func (db *DB) MaxHeaderStm() string {
return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name)
}
func (db *DB) ExistsHeaderStm() string { func (db *DB) ExistsHeaderStm() string {
return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name) return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1::BIGINT AND block_hash = $2::TEXT LIMIT 1)", schema.TableHeader.Name)
} }
// InsertHeaderStm satisfies the sql.Statements interface // InsertHeaderStm satisfies the sql.Statements interface

View File

@ -20,6 +20,8 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgtype" "github.com/jackc/pgtype"
@ -48,11 +50,57 @@ func (w *Writer) Close() error {
return w.db.Close() return w.db.Close()
} }
// hashHeader returns true if a matching hash+number record exists in the database, else false.
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) // pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), strconv.FormatUint(blockNumber, 10), blockHash.String()).Scan(&exists)
return exists, err return exists, err
} }
// detectGaps returns a list of BlockGaps detected within the specified block range
// For example, if the database contains blocks the overall range 1000:2000, but is missing blocks 1110:1230 and 1380
// it would return [{FirstMissing: 1110, LastMissing: 1230}, {FirstMissing: 1380, LastMissing: 1380}]
func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*interfaces.BlockGap, error) {
pgStm := "SELECT block_number + 1 AS first_missing, (next_bn - 1) AS last_missing FROM (SELECT block_number, LEAD(block_number) OVER (ORDER BY block_number) AS next_bn FROM eth.header_cids WHERE block_number >= $1::BIGINT AND block_number <= $2::BIGINT) h WHERE next_bn > block_number + 1"
var gaps []*interfaces.BlockGap
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast
err := w.db.Select(w.db.Context(), &gaps, pgStm, strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
return gaps, err
}
/*
SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1
*/
func (w *Writer) maxHeader() (*models.HeaderModel, error) {
var model models.HeaderModel
var err error
var number, td, reward uint64
err = w.db.QueryRow(w.db.Context(), w.db.MaxHeaderStm()).Scan(
&number,
&model.BlockHash,
&model.ParentHash,
&model.CID,
&td,
&model.NodeIDs,
&reward,
&model.StateRoot,
&model.TxRoot,
&model.RctRoot,
&model.UnclesHash,
&model.Bloom,
&model.Timestamp,
&model.Coinbase,
)
model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10)
model.Reward = strconv.FormatUint(reward, 10)
return &model, err
}
/* /*
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)

View File

@ -21,6 +21,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
@ -29,6 +31,8 @@ import (
// StateDiffIndexer interface required to index statediff data // StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface { type StateDiffIndexer interface {
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
CurrentBlock() (*models.HeaderModel, error)
HasBlock(hash common.Hash, number uint64) (bool, error) HasBlock(hash common.Hash, number uint64) (bool, error)
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
@ -54,3 +58,9 @@ type Batch interface {
type Config interface { type Config interface {
Type() shared.DBType Type() shared.DBType
} }
// Used to represent a gap in statediffed blocks
type BlockGap struct {
FirstMissing uint64 `json:"firstMissing"`
LastMissing uint64 `json:"lastMissing"`
}

View File

@ -18,6 +18,7 @@ package statediff
import ( import (
"bytes" "bytes"
"encoding/json"
"fmt" "fmt"
"math/big" "math/big"
"strconv" "strconv"
@ -137,6 +138,9 @@ type Service struct {
indexer interfaces.StateDiffIndexer indexer interfaces.StateDiffIndexer
// Whether to enable writing state diffs directly to track blockchain head. // Whether to enable writing state diffs directly to track blockchain head.
enableWriteLoop bool enableWriteLoop bool
// Settings to use for backfilling state diffs (plugging gaps when tracking head)
backfillMaxHeadGap uint64
backfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
numWorkers uint numWorkers uint
// Number of retry for aborted transactions due to deadlock. // Number of retry for aborted transactions due to deadlock.
@ -211,24 +215,26 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
} }
sds := &Service{ sds := &Service{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
BlockChain: blockChain, BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()), Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh, QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers), BlockCache: NewBlockCache(workers),
BackendAPI: backend, BackendAPI: backend,
WaitForSync: params.WaitForSync, WaitForSync: params.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: params.EnableWriteLoop, enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers, backfillMaxHeadGap: params.BackfillMaxHeadGap,
maxRetry: defaultRetryLimit, backfillCheckPastBlocks: params.BackfillCheckPastBlocks,
jobStatusSubs: map[rpc.ID]statusSubscription{}, numWorkers: workers,
currentJobs: map[uint64]JobID{}, maxRetry: defaultRetryLimit,
currentJobsMutex: sync.Mutex{}, jobStatusSubs: map[rpc.ID]statusSubscription{},
currentBlocks: map[string]bool{}, currentJobs: map[uint64]JobID{},
currentBlocksMutex: sync.Mutex{}, currentJobsMutex: sync.Mutex{},
currentBlocks: map[string]bool{},
currentBlocksMutex: sync.Mutex{},
} }
stack.RegisterLifecycle(sds) stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs()) stack.RegisterAPIs(sds.APIs())
@ -251,24 +257,26 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index
quitCh := make(chan bool) quitCh := make(chan bool)
sds := &Service{ sds := &Service{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
BlockChain: blockChain, BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()), Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh, QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers), BlockCache: NewBlockCache(workers),
BackendAPI: backend, BackendAPI: backend,
WaitForSync: cfg.WaitForSync, WaitForSync: cfg.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop, enableWriteLoop: cfg.EnableWriteLoop,
numWorkers: workers, backfillMaxHeadGap: cfg.BackfillMaxHeadGap,
maxRetry: defaultRetryLimit, backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
jobStatusSubs: map[rpc.ID]statusSubscription{}, numWorkers: workers,
currentJobs: map[uint64]JobID{}, maxRetry: defaultRetryLimit,
currentJobsMutex: sync.Mutex{}, jobStatusSubs: map[rpc.ID]statusSubscription{},
currentBlocks: map[string]bool{}, currentJobs: map[uint64]JobID{},
currentBlocksMutex: sync.Mutex{}, currentJobsMutex: sync.Mutex{},
currentBlocks: map[string]bool{},
currentBlocksMutex: sync.Mutex{},
} }
if indexer != nil { if indexer != nil {
@ -319,6 +327,145 @@ type workerParams struct {
id uint id uint
} }
func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: detected gap fill done", "worker", w)
return
}
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for _, gap := range blockGaps {
for num := gap.FirstMissing; num <= gap.LastMissing; num++ {
ch <- num
}
}
close(ch)
wg.Wait()
}
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
headGap := chainBlockNumber - indexerBlockNumber
var ch = make(chan uint64, headGap)
for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ {
ch <- bn
}
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: headGap done", "worker", w)
return
}
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
close(ch)
wg.Wait()
}
func (sds *Service) Backfill() {
chainBlock := sds.BlockChain.CurrentBlock()
if nil == chainBlock {
log.Info("Backfill: No previous chain block, nothing to backfill.")
return
}
chainBlockNumber := chainBlock.Number.Uint64()
if chainBlockNumber == 0 {
log.Info("Backfill: At start of chain, nothing to backfill.")
return
}
indexerBlock, err := sds.indexer.CurrentBlock()
if nil == indexerBlock {
log.Info("Backfill: No previous indexer block, nothing to backfill.")
return
}
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
headGap := chainBlockNumber - indexerBlockNumber
log.Info(
"Backfill: initial positions",
"chain", chainBlockNumber,
"indexer", indexerBlockNumber,
"headGap", headGap,
)
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
if headGap < sds.backfillMaxHeadGap {
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
log.Info("Backfill: all workers done filling headGap.")
} else {
log.Error("Backfill: headGap too large to fill.")
}
}
if sds.backfillCheckPastBlocks > 0 {
var gapCheckBeginNumber uint64 = 0
if indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
}
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
if nil != blockGaps && len(blockGaps) > 0 {
gapsMsg, _ := json.Marshal(blockGaps)
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
sds.backfillDetectedGaps(blockGaps)
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
} else {
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
}
}
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe() defer chainEventSub.Unsubscribe()
@ -692,6 +839,8 @@ func (sds *Service) Start() error {
go sds.Loop(chainEventCh) go sds.Loop(chainEventCh)
if sds.enableWriteLoop { if sds.enableWriteLoop {
log.Info("Starting statediff DB backfill", "params", writeLoopParams.Params)
go sds.Backfill()
log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params) log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params)
chainEventCh := make(chan core.ChainEvent, chainEventChanSize) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.WriteLoop(chainEventCh) go sds.WriteLoop(chainEventCh)