Statediff missing parent blocks automatically #8
23
config.go
23
config.go
@ -39,7 +39,7 @@ type Config struct {
|
|||||||
// Whether to enable writing state diffs directly to track blockchain head
|
// Whether to enable writing state diffs directly to track blockchain head
|
||||||
EnableWriteLoop bool
|
EnableWriteLoop bool
|
||||||
// The maximum number of blocks to backfill when tracking head.
|
// The maximum number of blocks to backfill when tracking head.
|
||||||
BackfillMaxHeadGap uint64
|
BackfillMaxDepth uint64
|
||||||
// The maximum number of blocks behind the startup position to check for gaps.
|
// The maximum number of blocks behind the startup position to check for gaps.
|
||||||
BackfillCheckPastBlocks uint64
|
BackfillCheckPastBlocks uint64
|
||||||
// Size of the worker pool
|
// Size of the worker pool
|
||||||
@ -60,6 +60,19 @@ type Params struct {
|
|||||||
watchedAddressesLeafPaths [][]byte
|
watchedAddressesLeafPaths [][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Params) Copy() Params {
|
||||||
|
ret := Params{
|
||||||
|
IncludeBlock: p.IncludeBlock,
|
||||||
|
IncludeReceipts: p.IncludeReceipts,
|
||||||
|
IncludeTD: p.IncludeTD,
|
||||||
|
IncludeCode: p.IncludeCode,
|
||||||
|
}
|
||||||
|
ret.WatchedAddresses = make([]common.Address, len(p.WatchedAddresses))
|
||||||
|
copy(ret.WatchedAddresses, p.WatchedAddresses)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
// ComputeWatchedAddressesLeafPaths populates a slice with paths (hex_encoding(Keccak256)) of each of the WatchedAddresses
|
// ComputeWatchedAddressesLeafPaths populates a slice with paths (hex_encoding(Keccak256)) of each of the WatchedAddresses
|
||||||
func (p *Params) ComputeWatchedAddressesLeafPaths() {
|
func (p *Params) ComputeWatchedAddressesLeafPaths() {
|
||||||
p.watchedAddressesLeafPaths = make([][]byte, len(p.WatchedAddresses))
|
p.watchedAddressesLeafPaths = make([][]byte, len(p.WatchedAddresses))
|
||||||
@ -74,6 +87,14 @@ type ParamsWithMutex struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CopyParams returns a defensive copy of the Params
|
||||||
|
func (p *ParamsWithMutex) CopyParams() Params {
|
||||||
|
p.RLock()
|
||||||
|
defer p.RUnlock()
|
||||||
|
|
||||||
|
return p.Params.Copy()
|
||||||
|
}
|
||||||
|
|
||||||
// Args bundles the arguments for the state diff builder
|
// Args bundles the arguments for the state diff builder
|
||||||
type Args struct {
|
type Args struct {
|
||||||
OldStateRoot, NewStateRoot, BlockHash common.Hash
|
OldStateRoot, NewStateRoot, BlockHash common.Hash
|
||||||
|
@ -187,6 +187,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
|
|||||||
UnclesHash: header.UncleHash.String(),
|
UnclesHash: header.UncleHash.String(),
|
||||||
Timestamp: header.Time,
|
Timestamp: header.Time,
|
||||||
Coinbase: header.Coinbase.String(),
|
Coinbase: header.Coinbase.String(),
|
||||||
|
Canonical: true,
|
||||||
}
|
}
|
||||||
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
|
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
|
||||||
return headerID, err
|
return headerID, err
|
||||||
|
@ -231,7 +231,8 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
|
|||||||
var values []interface{}
|
var values []interface{}
|
||||||
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||||
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
|
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
|
||||||
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase)
|
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase,
|
||||||
|
header.Canonical)
|
||||||
csw.rows <- tableRow{schema.TableHeader, values}
|
csw.rows <- tableRow{schema.TableHeader, values}
|
||||||
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
||||||
}
|
}
|
||||||
|
@ -244,6 +244,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld
|
|||||||
UnclesHash: header.UncleHash.String(),
|
UnclesHash: header.UncleHash.String(),
|
||||||
Timestamp: header.Time,
|
Timestamp: header.Time,
|
||||||
Coinbase: header.Coinbase.String(),
|
Coinbase: header.Coinbase.String(),
|
||||||
|
Canonical: true,
|
||||||
})
|
})
|
||||||
return headerID
|
return headerID
|
||||||
}
|
}
|
||||||
|
@ -140,8 +140,8 @@ const (
|
|||||||
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
|
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
|
||||||
|
|
||||||
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
|
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
|
||||||
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES " +
|
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " +
|
||||||
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s');\n"
|
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n"
|
||||||
|
|
||||||
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
|
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
|
||||||
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
|
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
|
||||||
@ -189,7 +189,7 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
|
|||||||
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
|
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
|
||||||
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||||
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
|
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot,
|
||||||
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase)
|
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical)
|
||||||
sqw.stmts <- []byte(stmt)
|
sqw.stmts <- []byte(stmt)
|
||||||
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
||||||
}
|
}
|
||||||
|
@ -257,6 +257,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
|
|||||||
UnclesHash: header.UncleHash.String(),
|
UnclesHash: header.UncleHash.String(),
|
||||||
Timestamp: header.Time,
|
Timestamp: header.Time,
|
||||||
Coinbase: header.Coinbase.String(),
|
Coinbase: header.Coinbase.String(),
|
||||||
|
Canonical: true,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +49,7 @@ type Statements interface {
|
|||||||
MaxHeaderStm() string
|
MaxHeaderStm() string
|
||||||
ExistsHeaderStm() string
|
ExistsHeaderStm() string
|
||||||
InsertHeaderStm() string
|
InsertHeaderStm() string
|
||||||
|
SetCanonicalHeaderStm() string
|
||||||
InsertUncleStm() string
|
InsertUncleStm() string
|
||||||
InsertTxStm() string
|
InsertTxStm() string
|
||||||
InsertRctStm() string
|
InsertRctStm() string
|
||||||
|
@ -62,6 +62,12 @@ func (db *DB) InsertHeaderStm() string {
|
|||||||
return schema.TableHeader.ToInsertStatement(db.upsert)
|
return schema.TableHeader.ToInsertStatement(db.upsert)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetCanonicalHeaderStm satisfies the sql.Statements interface
|
||||||
|
// Stm == Statement
|
||||||
|
func (db *DB) SetCanonicalHeaderStm() string {
|
||||||
|
return fmt.Sprintf("UPDATE %s SET canonical = false WHERE block_number = $1::BIGINT AND block_hash <> $2::TEXT AND canonical = true", schema.TableHeader.Name)
|
||||||
|
}
|
||||||
|
|
||||||
// InsertUncleStm satisfies the sql.Statements interface
|
// InsertUncleStm satisfies the sql.Statements interface
|
||||||
func (db *DB) InsertUncleStm() string {
|
func (db *DB) InsertUncleStm() string {
|
||||||
return schema.TableUncle.ToInsertStatement(db.upsert)
|
return schema.TableUncle.ToInsertStatement(db.upsert)
|
||||||
|
@ -90,6 +90,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) {
|
|||||||
&model.Bloom,
|
&model.Bloom,
|
||||||
&model.Timestamp,
|
&model.Timestamp,
|
||||||
&model.Coinbase,
|
&model.Coinbase,
|
||||||
|
&model.Canonical,
|
||||||
)
|
)
|
||||||
model.BlockNumber = strconv.FormatUint(number, 10)
|
model.BlockNumber = strconv.FormatUint(number, 10)
|
||||||
model.TotalDifficulty = strconv.FormatUint(td, 10)
|
model.TotalDifficulty = strconv.FormatUint(td, 10)
|
||||||
@ -118,11 +119,22 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
|||||||
header.UnclesHash,
|
header.UnclesHash,
|
||||||
header.Bloom,
|
header.Bloom,
|
||||||
header.Timestamp,
|
header.Timestamp,
|
||||||
header.Coinbase)
|
header.Coinbase,
|
||||||
|
header.Canonical,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
|
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
|
||||||
}
|
}
|
||||||
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
metrics.IndexerMetrics.BlocksCounter.Inc(1)
|
||||||
|
|
||||||
|
_, err = tx.Exec(w.db.Context(), w.db.SetCanonicalHeaderStm(),
|
||||||
|
header.BlockNumber,
|
||||||
|
header.BlockHash,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return insertError{"eth.header_cids", err, w.db.SetCanonicalHeaderStm(), header}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,7 @@ type HeaderModel struct {
|
|||||||
Bloom []byte `db:"bloom"`
|
Bloom []byte `db:"bloom"`
|
||||||
Timestamp uint64 `db:"timestamp"`
|
Timestamp uint64 `db:"timestamp"`
|
||||||
Coinbase string `db:"coinbase"`
|
Coinbase string `db:"coinbase"`
|
||||||
|
Canonical bool `db:"canonical"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UncleModel is the db model for eth.uncle_cids
|
// UncleModel is the db model for eth.uncle_cids
|
||||||
|
@ -54,6 +54,7 @@ var TableHeader = Table{
|
|||||||
{Name: "bloom", Type: Dbytea},
|
{Name: "bloom", Type: Dbytea},
|
||||||
{Name: "timestamp", Type: Dnumeric},
|
{Name: "timestamp", Type: Dnumeric},
|
||||||
{Name: "coinbase", Type: Dvarchar},
|
{Name: "coinbase", Type: Dvarchar},
|
||||||
|
{Name: "canonical", Type: Dboolean},
|
||||||
},
|
},
|
||||||
UpsertClause: OnConflict("block_number", "block_hash").Set(
|
UpsertClause: OnConflict("block_number", "block_hash").Set(
|
||||||
"parent_hash",
|
"parent_hash",
|
||||||
@ -68,6 +69,7 @@ var TableHeader = Table{
|
|||||||
"bloom",
|
"bloom",
|
||||||
"timestamp",
|
"timestamp",
|
||||||
"coinbase",
|
"coinbase",
|
||||||
|
"canonical",
|
||||||
)}
|
)}
|
||||||
|
|
||||||
var TableStateNode = Table{
|
var TableStateNode = Table{
|
||||||
|
@ -56,9 +56,9 @@ func init() {
|
|||||||
"statediff.backfillcheckpastblocks", 7200,
|
"statediff.backfillcheckpastblocks", 7200,
|
||||||
"Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking",
|
"Number of blocks behind the startup statediff position to check (and fill) for gaps when head tracking",
|
||||||
)
|
)
|
||||||
Flags.Uint64Var(&config.BackfillMaxHeadGap,
|
Flags.Uint64Var(&config.BackfillMaxDepth,
|
||||||
"statediff.backfillmaxheadgap", 7200,
|
"statediff.backfillmaxdepth", 7200,
|
||||||
"Maximum gap between the startup statediff and startup head positions that can be backfilled",
|
"When statediffing head, the maximum number of missing parents that can be backfilled",
|
||||||
)
|
)
|
||||||
|
|
||||||
Flags.Var(&dbType,
|
Flags.Var(&dbType,
|
||||||
|
262
service.go
262
service.go
@ -83,7 +83,7 @@ type Service struct {
|
|||||||
// Parameters to use in the service write loop, if enabled
|
// Parameters to use in the service write loop, if enabled
|
||||||
writeLoopParams ParamsWithMutex
|
writeLoopParams ParamsWithMutex
|
||||||
// Settings to use for backfilling state diffs (plugging gaps when tracking head)
|
// Settings to use for backfilling state diffs (plugging gaps when tracking head)
|
||||||
backfillMaxHeadGap uint64
|
backfillMaxDepth uint64
|
||||||
backfillCheckPastBlocks uint64
|
backfillCheckPastBlocks uint64
|
||||||
// Size of the worker pool
|
// Size of the worker pool
|
||||||
numWorkers uint
|
numWorkers uint
|
||||||
@ -133,6 +133,12 @@ type jobStatusSubscription struct {
|
|||||||
quitChan chan<- bool
|
quitChan chan<- bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Utility type for showing the relative positions of the blockchain and the statediff indexer.
|
||||||
|
type servicePosition struct {
|
||||||
|
chainBlockNumber uint64
|
||||||
|
indexerBlockNumber uint64
|
||||||
|
}
|
||||||
|
|
||||||
// BlockCache caches the last block for safe access from different service loops
|
// BlockCache caches the last block for safe access from different service loops
|
||||||
type BlockCache struct {
|
type BlockCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -141,7 +147,7 @@ type BlockCache struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type workerParams struct {
|
type workerParams struct {
|
||||||
chainEventCh <-chan core.ChainEvent
|
blockCh <-chan *types.Block
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
id uint
|
id uint
|
||||||
}
|
}
|
||||||
@ -172,7 +178,7 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
|
|||||||
ShouldWaitForSync: cfg.WaitForSync,
|
ShouldWaitForSync: cfg.WaitForSync,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: cfg.EnableWriteLoop,
|
enableWriteLoop: cfg.EnableWriteLoop,
|
||||||
backfillMaxHeadGap: cfg.BackfillMaxHeadGap,
|
backfillMaxDepth: cfg.BackfillMaxDepth,
|
||||||
backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
|
backfillCheckPastBlocks: cfg.BackfillCheckPastBlocks,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
@ -213,16 +219,21 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc BlockChain)
|
|||||||
|
|
||||||
// WriteLoop event loop for progressively processing and writing diffs directly to DB
|
// WriteLoop event loop for progressively processing and writing diffs directly to DB
|
||||||
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||||
log.Info("Starting statediff write loop")
|
initialPos := sds.currentPosition()
|
||||||
|
log.Info(
|
||||||
|
"WriteLoop: initial positions",
|
||||||
|
"chain", initialPos.chainBlockNumber,
|
||||||
|
"indexer", initialPos.indexerBlockNumber,
|
||||||
|
)
|
||||||
log := log.New("context", "statediff writing")
|
log := log.New("context", "statediff writing")
|
||||||
sub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
sub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
||||||
defer sub.Unsubscribe()
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
|
blockFwd := make(chan *types.Block, chainEventChanSize)
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Info("Quitting")
|
log.Info("Quitting")
|
||||||
close(chainEventFwd)
|
close(blockFwd)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -232,16 +243,30 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
|||||||
select {
|
select {
|
||||||
case event := <-chainEventCh:
|
case event := <-chainEventCh:
|
||||||
// First process metrics for chain events, then forward to workers
|
// First process metrics for chain events, then forward to workers
|
||||||
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
|
lastHeight := uint64(defaultStatediffMetrics.lastEventHeight.Value())
|
||||||
|
if lastHeight == 0 {
|
||||||
|
lastHeight = initialPos.indexerBlockNumber
|
||||||
|
}
|
||||||
block := event.Block
|
block := event.Block
|
||||||
log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash)
|
log.Debug("Chain event received", "number", block.Number(), "hash", event.Hash)
|
||||||
nextHeight := int64(block.Number().Uint64())
|
nextHeight := block.Number().Uint64()
|
||||||
if nextHeight-lastHeight != 1 {
|
if nextHeight > lastHeight {
|
||||||
log.Warn("Received block out-of-order", "next", nextHeight, "last", lastHeight)
|
distance := nextHeight - lastHeight
|
||||||
|
if distance == 1 {
|
||||||
|
log.Info("WriteLoop: received expected block",
|
||||||
|
"block number", nextHeight, "last number", lastHeight)
|
||||||
|
} else {
|
||||||
|
log.Warn("WriteLoop: received unexpected block from the future",
|
||||||
|
"block number", nextHeight, "last number", lastHeight)
|
||||||
|
}
|
||||||
|
blockFwd <- block
|
||||||
|
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
|
||||||
|
} else {
|
||||||
|
log.Warn("WriteLoop: received unexpected block from the past",
|
||||||
|
"block number", nextHeight, "last number", lastHeight)
|
||||||
|
blockFwd <- block
|
||||||
}
|
}
|
||||||
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
|
|
||||||
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||||
chainEventFwd <- event
|
|
||||||
case err := <-sub.Err():
|
case err := <-sub.Err():
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error from subscription", "error", err)
|
log.Error("Error from subscription", "error", err)
|
||||||
@ -255,7 +280,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
|||||||
}()
|
}()
|
||||||
wg.Add(int(sds.numWorkers))
|
wg.Add(int(sds.numWorkers))
|
||||||
for worker := uint(0); worker < sds.numWorkers; worker++ {
|
for worker := uint(0); worker < sds.numWorkers; worker++ {
|
||||||
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker}
|
params := workerParams{blockCh: blockFwd, wg: &wg, id: worker}
|
||||||
go sds.writeLoopWorker(params)
|
go sds.writeLoopWorker(params)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -264,10 +289,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
|||||||
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Logger) {
|
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Logger) {
|
||||||
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
|
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
|
||||||
log.Info("Writing genesis state diff", "number", genesisBlockNumber)
|
log.Info("Writing genesis state diff", "number", genesisBlockNumber)
|
||||||
sds.writeLoopParams.RLock()
|
|
||||||
defer sds.writeLoopParams.RUnlock()
|
|
||||||
|
|
||||||
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params)
|
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.CopyParams())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to write state diff", "number",
|
log.Error("failed to write state diff", "number",
|
||||||
genesisBlockNumber, "error", err)
|
genesisBlockNumber, "error", err)
|
||||||
@ -279,27 +302,71 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, logger log.Log
|
|||||||
func (sds *Service) writeLoopWorker(params workerParams) {
|
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||||
log := log.New("context", "statediff writing", "worker", params.id)
|
log := log.New("context", "statediff writing", "worker", params.id)
|
||||||
defer params.wg.Done()
|
defer params.wg.Done()
|
||||||
for {
|
|
||||||
select {
|
// statediffs the indicated block, and while maxBackfill > 0, backfills missing parent blocks.
|
||||||
case event := <-params.chainEventCh:
|
var writeBlockWithParents func(*types.Block, uint64, Params) error
|
||||||
block := event.Block
|
writeBlockWithParents = func(block *types.Block, maxBackfill uint64, writeParams Params) error {
|
||||||
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
|
parentBlock := sds.BlockCache.getParentBlock(block, sds.BlockChain)
|
||||||
if parent == nil {
|
if parentBlock == nil {
|
||||||
log.Error("Parent block is nil, skipping this block", "number", block.Number())
|
return errors.New("Parent block is nil, skipping this block")
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parentIsGenesis := parentBlock.Number().Uint64() == genesisBlockNumber
|
||||||
|
|
||||||
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
|
// chainEvent streams block from block 1, but we also need to include data from the genesis block.
|
||||||
if parent.Number().Uint64() == genesisBlockNumber {
|
if parentIsGenesis {
|
||||||
sds.writeGenesisStateDiff(parent, log)
|
sds.writeGenesisStateDiff(parentBlock, log)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Writing state diff", "number", block.Number())
|
log.Info("Writing state diff", "number", block.Number())
|
||||||
sds.writeLoopParams.RLock()
|
err := sds.writeStateDiffWithRetry(block, parentBlock.Root(), writeParams)
|
||||||
err := sds.writeStateDiffWithRetry(block, parent.Root(), sds.writeLoopParams.Params)
|
|
||||||
sds.writeLoopParams.RUnlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to write state diff",
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if parentIsGenesis {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// We do this _after_ indexing the requested block. This makes sure that if a child of ours arrives for
|
||||||
|
// statediffing while we are still working on missing ancestors, its regress stops at us, and only we
|
||||||
|
// continue working backward.
|
||||||
|
parentIndexed, err := sds.indexedOrInProgress(parentBlock)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error checking for indexing status of parent block.",
|
||||||
|
"number", block.Number(), "hash", block.Hash(),
|
||||||
|
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash(),
|
||||||
|
"error", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if parentIndexed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if maxBackfill > 0 {
|
||||||
|
log.Info("Parent block not indexed. Indexing now.",
|
||||||
|
"number", block.Number(), "hash", block.Hash(),
|
||||||
|
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash())
|
||||||
|
err = writeBlockWithParents(parentBlock, maxBackfill-1, writeParams)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error indexing parent block.",
|
||||||
|
"number", block.Number(), "hash", block.Hash(),
|
||||||
|
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash(),
|
||||||
|
"error", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Error("Parent block not indexed but max backfill depth exceeded. Index MUST be corrected manually.",
|
||||||
|
"number", block.Number(), "hash", block.Hash(),
|
||||||
|
"parent number", parentBlock.Number(), "parent hash", parentBlock.Hash())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case block := <-params.blockCh:
|
||||||
|
log.Debug("Block received", "number", block.Number())
|
||||||
|
err := writeBlockWithParents(block, sds.backfillMaxDepth, sds.writeLoopParams.CopyParams())
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error processing statediff",
|
||||||
"number", block.Number(),
|
"number", block.Number(),
|
||||||
"hash", block.Hash(),
|
"hash", block.Hash(),
|
||||||
"error", err)
|
"error", err)
|
||||||
@ -681,6 +748,23 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
|
|||||||
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
|
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// indexedOrInProgress returns true if the block has already been statediffed or is in progress, else false.
|
||||||
|
func (sds *Service) indexedOrInProgress(block *types.Block) (bool, error) {
|
||||||
|
if sds.statediffInProgress(block) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return sds.indexer.HasBlock(block.Hash(), block.NumberU64())
|
||||||
|
}
|
||||||
|
|
||||||
|
// statediffInProgress returns true if statediffing is currently in progress for the block, else false.
|
||||||
|
func (sds *Service) statediffInProgress(block *types.Block) bool {
|
||||||
|
sds.currentBlocksMutex.Lock()
|
||||||
|
defer sds.currentBlocksMutex.Unlock()
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
|
||||||
|
return sds.currentBlocks[key]
|
||||||
|
}
|
||||||
|
|
||||||
// Claim exclusive access for state diffing the specified block.
|
// Claim exclusive access for state diffing the specified block.
|
||||||
// Returns true and a function to release access if successful, else false, nil.
|
// Returns true and a function to release access if successful, else false, nil.
|
||||||
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
|
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
|
||||||
@ -928,110 +1012,43 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
|
|||||||
|
|
||||||
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
|
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
|
||||||
func (sds *Service) Backfill() {
|
func (sds *Service) Backfill() {
|
||||||
chainBlock := sds.BlockChain.CurrentBlock()
|
pos := sds.currentPosition()
|
||||||
if nil == chainBlock {
|
if pos.chainBlockNumber == 0 {
|
||||||
log.Info("Backfill: No previous chain block, nothing to backfill.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
chainBlockNumber := chainBlock.Number.Uint64()
|
|
||||||
if chainBlockNumber == 0 {
|
|
||||||
log.Info("Backfill: At start of chain, nothing to backfill.")
|
log.Info("Backfill: At start of chain, nothing to backfill.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
indexerBlock, err := sds.indexer.CurrentBlock()
|
|
||||||
if nil == indexerBlock {
|
|
||||||
log.Info("Backfill: No previous indexer block, nothing to backfill.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if nil != err {
|
|
||||||
log.Error("Backfill error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
|
|
||||||
if nil != err {
|
|
||||||
log.Error("Backfill error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
headGap := chainBlockNumber - indexerBlockNumber
|
|
||||||
log.Info(
|
log.Info(
|
||||||
"Backfill: initial positions",
|
"Backfill: initial positions",
|
||||||
"chain", chainBlockNumber,
|
"chain", pos.chainBlockNumber,
|
||||||
"indexer", indexerBlockNumber,
|
"indexer", pos.indexerBlockNumber,
|
||||||
"headGap", headGap,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
|
|
||||||
if headGap < sds.backfillMaxHeadGap {
|
|
||||||
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
|
|
||||||
log.Info("Backfill: all workers done filling headGap.")
|
|
||||||
} else {
|
|
||||||
log.Error("Backfill: headGap too large to fill.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if sds.backfillCheckPastBlocks > 0 {
|
if sds.backfillCheckPastBlocks > 0 {
|
||||||
var gapCheckBeginNumber uint64 = 0
|
var gapCheckBeginNumber uint64 = 0
|
||||||
if indexerBlockNumber > sds.backfillCheckPastBlocks {
|
if pos.indexerBlockNumber > sds.backfillCheckPastBlocks {
|
||||||
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
|
gapCheckBeginNumber = pos.indexerBlockNumber - sds.backfillCheckPastBlocks
|
||||||
}
|
}
|
||||||
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
|
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, pos.chainBlockNumber)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
log.Error("Backfill error", err)
|
log.Error("Backfill error", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if nil != blockGaps && len(blockGaps) > 0 {
|
if nil != blockGaps && len(blockGaps) > 0 {
|
||||||
gapsMsg, _ := json.Marshal(blockGaps)
|
gapsMsg, _ := json.Marshal(blockGaps)
|
||||||
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
|
log.Info("Backfill: detected gaps in range",
|
||||||
|
"begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
|
||||||
sds.backfillDetectedGaps(blockGaps)
|
sds.backfillDetectedGaps(blockGaps)
|
||||||
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
|
log.Info("Backfill: done processing detected gaps in range",
|
||||||
|
"begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
|
||||||
} else {
|
} else {
|
||||||
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
|
log.Info("Backfill: no gaps detected in range",
|
||||||
|
"begin", gapCheckBeginNumber, "end", pos.chainBlockNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
|
|
||||||
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
|
|
||||||
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
|
|
||||||
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
|
|
||||||
var ch = make(chan uint64)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for i := uint(0); i < sds.numWorkers; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(w uint) {
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case num, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
log.Info("Backfill: headGap done", "worker", w)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
|
|
||||||
err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Backfill error", err)
|
|
||||||
}
|
|
||||||
case <-sds.QuitChan:
|
|
||||||
log.Info("Backfill: quitting before finish", "worker", w)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ {
|
|
||||||
ch <- bn
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
|
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
|
||||||
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
|
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
|
||||||
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
|
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
|
||||||
@ -1051,7 +1068,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
|
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
|
||||||
err := sds.writeStateDiffAt(num, sds.writeLoopParams.Params)
|
err := sds.writeStateDiffAt(num, sds.writeLoopParams.CopyParams())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Backfill error: ", err)
|
log.Error("Backfill error: ", err)
|
||||||
}
|
}
|
||||||
@ -1071,3 +1088,24 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
|
|||||||
close(ch)
|
close(ch)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// currentPosition returns the current block height for both the BlockChain and the statediff indexer.
|
||||||
|
func (sds *Service) currentPosition() servicePosition {
|
||||||
|
ret := servicePosition{}
|
||||||
|
chainBlock := sds.BlockChain.CurrentBlock()
|
||||||
|
if nil != chainBlock {
|
||||||
|
ret.chainBlockNumber = chainBlock.Number.Uint64()
|
||||||
|
}
|
||||||
|
|
||||||
|
indexerBlock, _ := sds.indexer.CurrentBlock()
|
||||||
|
if nil != indexerBlock {
|
||||||
|
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
|
||||||
|
if nil == err {
|
||||||
|
ret.indexerBlockNumber = indexerBlockNumber
|
||||||
|
} else {
|
||||||
|
log.Error("Error parsing indexer block number", "block", indexerBlock.BlockNumber)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user