398: Statediff missing parent blocks automatically. #399

Merged
telackey merged 8 commits from telackey/398 into v1.11.6-statediff-v5 2023-07-18 19:41:20 +00:00
17 changed files with 159 additions and 43 deletions

View File

@ -5,7 +5,7 @@ on:
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'e62830c982d4dfc5f3c1c2b12c1754a7e9b538f1'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '66cd1d9e696cfa72f9d272927f9e945905c9f093' }}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '1b922dbff350bfe2a9aec5fe82079e9d855ea7ed' }}
GOPATH: /tmp/go
jobs:
@ -53,6 +53,17 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-eth-db-ref }}
repository: cerc-io/ipld-eth-db
path: "./ipld-eth-db/"
fetch-depth: 0
- name: Build ipld-eth-db
run: |
docker build -f ./ipld-eth-db/Dockerfile ./ipld-eth-db/ -t cerc/ipld-eth-db:local
- name: Run docker compose
run: |
docker-compose up -d

View File

@ -281,7 +281,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
BackfillCheckPastBlocks: ctx.Uint64(utils.StateDiffBackfillCheckPastBlocks.Name),
BackfillMaxHeadGap: ctx.Uint64(utils.StateDiffBackfillMaxHeadGap.Name),
BackfillMaxDepth: ctx.Uint64(utils.StateDiffBackfillMaxDepth.Name),
}
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
}

View File

@ -177,7 +177,7 @@ var (
utils.StateDiffLogStatements,
utils.StateDiffCopyFrom,
utils.StateDiffBackfillCheckPastBlocks,
utils.StateDiffBackfillMaxHeadGap,
utils.StateDiffBackfillMaxDepth,
configFileFlag,
}, utils.NetworkFlags, utils.DatabasePathFlags)

View File

@ -1125,9 +1125,9 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Usage: "The number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking.",
Value: 7200,
}
StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{
Name: "statediff.backfillmaxheadgap",
Usage: "The maximum gap between the current statediff and head positions that can be backfilled.",
StateDiffBackfillMaxDepth = &cli.Uint64Flag{
Name: "statediff.backfillmaxdepth",
Usage: "When statediffing head, the maximum number of missing parents that can be backfilled.",
Value: 7200,
}
)

View File

@ -5,7 +5,7 @@ services:
restart: on-failure
depends_on:
- ipld-eth-db
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.2-alpha
image: cerc/ipld-eth-db:local
environment:
DATABASE_USER: "vdbm"
DATABASE_NAME: "cerc_testing"

View File

@ -37,7 +37,7 @@ type Config struct {
// Whether to enable writing state diffs directly to track blockchain head
EnableWriteLoop bool
// The maximum number of blocks to backfill when tracking head.
BackfillMaxHeadGap uint64
BackfillMaxDepth uint64
// The maximum number of blocks behind the startup position to check for gaps.
BackfillCheckPastBlocks uint64
// Size of the worker pool
@ -66,12 +66,33 @@ func (p *Params) ComputeWatchedAddressesLeafPaths() {
}
}
func (p *Params) Copy() Params {
ret := Params{
IncludeBlock: p.IncludeBlock,
IncludeReceipts: p.IncludeReceipts,
IncludeTD: p.IncludeTD,
IncludeCode: p.IncludeCode,
}
ret.WatchedAddresses = make([]common.Address, len(p.WatchedAddresses))
copy(ret.WatchedAddresses, p.WatchedAddresses)
return ret
}
// ParamsWithMutex allows to lock the parameters while they are being updated | read from
type ParamsWithMutex struct {
Params
sync.RWMutex
}
// CopyParams returns a defensive copy of the Params
func (p *ParamsWithMutex) CopyParams() Params {
p.RLock()
defer p.RUnlock()
return p.Params.Copy()
}
// Args bundles the arguments for the state diff builder
type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash

View File

@ -186,6 +186,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
UnclesHash: header.UncleHash.String(),
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
Canonical: true,
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err

View File

@ -234,7 +234,8 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase)
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase,
header.Canonical)
csw.rows <- tableRow{schema.TableHeader, values}
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}

View File

@ -246,6 +246,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld
UnclesHash: header.UncleHash.String(),
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
Canonical: true,
})
return headerID
}

View File

@ -140,8 +140,8 @@ const (
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s');\n"
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
@ -189,7 +189,7 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase)
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical)
sqw.stmts <- []byte(stmt)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}

View File

@ -260,6 +260,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
UnclesHash: header.UncleHash.String(),
Timestamp: header.Time,
Coinbase: header.Coinbase.String(),
Canonical: true,
})
}

View File

@ -49,6 +49,7 @@ type Statements interface {
MaxHeaderStm() string
ExistsHeaderStm() string
InsertHeaderStm() string
SetCanonicalHeaderStm() string
InsertUncleStm() string
InsertTxStm() string
InsertRctStm() string

View File

@ -62,6 +62,12 @@ func (db *DB) InsertHeaderStm() string {
return schema.TableHeader.ToInsertStatement(db.upsert)
}
// SetCanonicalHeaderStm satisfies the sql.Statements interface
// Stm == Statement
func (db *DB) SetCanonicalHeaderStm() string {
return fmt.Sprintf("UPDATE %s SET canonical = false WHERE block_number = $1::BIGINT AND block_hash <> $2::TEXT AND canonical = true", schema.TableHeader.Name)
}
// InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string {
return schema.TableUncle.ToInsertStatement(db.upsert)

View File

@ -92,6 +92,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) {
&model.Bloom,
&model.Timestamp,
&model.Coinbase,
&model.Canonical,
)
model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10)
@ -120,11 +121,22 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
header.UnclesHash,
header.Bloom,
header.Timestamp,
header.Coinbase)
header.Coinbase,
header.Canonical,
)
if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
}
metrics.IndexerMetrics.BlocksCounter.Inc(1)
_, err = tx.Exec(w.db.Context(), w.db.SetCanonicalHeaderStm(),
header.BlockNumber,
header.BlockHash,
)
if err != nil {
return insertError{"eth.header_cids", err, w.db.SetCanonicalHeaderStm(), header}
}
return nil
}

View File

@ -41,6 +41,7 @@ type HeaderModel struct {
Bloom []byte `db:"bloom"`
Timestamp uint64 `db:"timestamp"`
Coinbase string `db:"coinbase"`
Canonical bool `db:"canonical"`
}
// UncleModel is the db model for eth.uncle_cids

View File

@ -54,6 +54,7 @@ var TableHeader = Table{
{Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric},
{Name: "coinbase", Type: Dvarchar},
{Name: "canonical", Type: Dboolean},
},
UpsertClause: OnConflict("block_number", "block_hash").Set(
"parent_hash",
@ -68,6 +69,7 @@ var TableHeader = Table{
"bloom",
"timestamp",
"coinbase",
"canonical",
)}
var TableStateNode = Table{

View File

@ -139,7 +139,7 @@ type Service struct {
// Whether to enable writing state diffs directly to track blockchain head.
enableWriteLoop bool
// Settings to use for backfilling state diffs (plugging gaps when tracking head)
backfillMaxHeadGap uint64
backfillMaxDepth uint64
backfillCheckPastBlocks uint64
// Size of the worker pool
numWorkers uint
@ -232,7 +232,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
WaitForSync: params.WaitForSync,
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
backfillMaxHeadGap: params.BackfillMaxHeadGap,
backfillMaxDepth: params.BackfillMaxDepth,
backfillCheckPastBlocks: params.BackfillCheckPastBlocks,
numWorkers: workers,
maxRetry: defaultRetryLimit,
@ -274,7 +274,7 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index
WaitForSync: cfg.WaitForSync,
indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop,
backfillMaxHeadGap: cfg.BackfillMaxHeadGap,
backfillMaxDepth: cfg.BackfillMaxDepth,
backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
numWorkers: workers,
maxRetry: defaultRetryLimit,
@ -365,14 +365,6 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
} else {
log.Warn("WriteLoop: received unexpected block from the future", "block height", nextHeight, "last height", lastHeight)
if distance <= sds.backfillMaxHeadGap {
for i := lastHeight + 1; i < nextHeight; i++ {
log.Info("WriteLoop: backfilling gap to head", "block", i, "block height", nextHeight, "last height", lastHeight)
blockFwd <- sds.BlockChain.GetBlockByNumber(i)
}
} else {
log.Warn("WriteLoop: gap to head too large to backfill", "block height", nextHeight, "last height", lastHeight, "gap", distance)
}
blockFwd <- chainEvent.Block
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
}
@ -409,9 +401,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.Params)
writeLoopParams.RUnlock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.CopyParams())
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId)
@ -422,27 +412,78 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
func (sds *Service) writeLoopWorker(params workerParams) {
defer params.wg.Done()
// statediffs the indicated block, and while maxBackfill > 0, backfills missing parent blocks.
var writeBlockWithParents func(*types.Block, uint64, Params) error
writeBlockWithParents = func(block *types.Block, maxBackfill uint64, writeParams Params) error {
parentBlock := sds.BlockCache.getParentBlock(block, sds.BlockChain)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", block.Number())
return nil
}
parentIsGenesis := parentBlock.Number().Uint64() == genesisBlockNumber
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentIsGenesis {
sds.writeGenesisStateDiff(parentBlock, params.id)
}
log.Info("Writing state diff", "block height", block.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiffWithRetry(block, parentBlock.Root(), writeParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error",
"number", block.Number().Uint64(),
"hash", block.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
return err
}
if !parentIsGenesis {
// We do this _after_ indexing the requested block. This makes sure that if a child of ours arrives for
// statediffing while we are still working on missing ancestors, its regress stops at us, and only we
// continue working backward.
parentIndexed, err := sds.indexedOrInProgress(parentBlock)
if err != nil {
log.Error("Error checking for indexing status of parent block.",
"number", block.Number(), "hash", block.Hash().String(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
} else if !parentIndexed {
if maxBackfill > 0 {
log.Info("Parent block not indexed. Indexing now.",
"number", block.Number(), "hash", block.Hash().Hex(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(),
"worker", params.id)
err = writeBlockWithParents(parentBlock, maxBackfill-1, writeParams)
if err != nil {
log.Error("Error indexing parent block.",
"number", block.Number(), "hash", block.Hash().Hex(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().Hex(),
"error", err.Error(),
"worker", params.id)
}
} else {
log.Error("ERROR: Parent block not indexed but max backfill depth exceeded. Index MUST be corrected manually.",
"number", block.Number(), "hash", block.Hash().String(),
"parent number", parentBlock.NumberU64(), "parent hash", parentBlock.Hash().String(),
"worker", params.id)
}
}
}
return nil
}
for {
select {
//Notify chain event channel of events
case chainEvent := <-params.blockCh:
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
currentBlock := chainEvent
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue
}
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
if parentBlock.Number().Uint64() == genesisBlockNumber {
sds.writeGenesisStateDiff(parentBlock, params.id)
}
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
writeLoopParams.RUnlock()
err := writeBlockWithParents(currentBlock, sds.backfillMaxDepth, writeLoopParams.CopyParams())
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error",
"block height", currentBlock.Number().Uint64(),
@ -861,6 +902,14 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}
// indexedOrInProgress returns true if the block has already been statediffed or is in progress, else false.
func (sds *Service) indexedOrInProgress(block *types.Block) (bool, error) {
if sds.statediffInProgress(block) {
return true, nil
}
return sds.indexer.HasBlock(block.Hash(), block.NumberU64())
}
// Claim exclusive access for state diffing the specified block.
// Returns true and a function to release access if successful, else false, nil.
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
@ -879,6 +928,15 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
}
}
// statediffInProgress returns true if statediffing is currently in progress for the block, else false.
func (sds *Service) statediffInProgress(block *types.Block) bool {
sds.currentBlocksMutex.Lock()
defer sds.currentBlocksMutex.Unlock()
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
return sds.currentBlocks[key]
}
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
@ -1202,7 +1260,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
return
}
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
err := sds.writeStateDiffAt(num, writeLoopParams.CopyParams())
if err != nil {
log.Error("Backfill error: " + err.Error())
}