Add WriteStateSnapshot #15

Merged
roysc merged 23 commits from with-iterator-tracker into main 2023-09-28 03:35:47 +00:00
12 changed files with 184 additions and 243 deletions
Showing only changes of commit 68e2aca32e - Show all commits

View File

@ -19,26 +19,55 @@ package dump
import (
"fmt"
"io"
"math/big"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
submit func(blockTx *BatchTx, err error) error
blockNum string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func NewBatch(number *big.Int, dest io.Writer) *BatchTx {
batch := &BatchTx{
blockNum: number.String(),
dump: dest,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
}
go batch.cache()
return batch
}
func (self *BatchTx) Submit() error {
close(self.quit)
close(self.iplds)
if err := self.flush(); err != nil {
return err
}
return nil
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, but rollback not supported", "error", err)
}
}
func (tx *BatchTx) flush() error {
@ -65,7 +94,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}
@ -73,7 +102,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: i.Cid().String(),
Data: i.RawData(),
}

View File

@ -17,6 +17,7 @@
package dump
import (
"context"
"encoding/hex"
"fmt"
"io"
@ -37,7 +38,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -62,7 +62,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -91,40 +91,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
blockTx := &BatchTx{
roysc marked this conversation as resolved Outdated

Cleaning this part of the code up is a big win.

Cleaning this part of the code up is a big win.
BlockNumber: block.Number().String(),
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
submit: func(self *BatchTx, err error) error {
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
}
tDiff = time.Since(t)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
blockTx := NewBatch(block.Number(), sdi.dump)
t = time.Now()
// Publish and index header, collect headerID
@ -133,7 +101,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
tDiff := time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -352,7 +320,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -360,7 +328,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -383,7 +351,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -396,7 +364,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -440,6 +408,10 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
return nil, nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
return NewBatch(number, sdi.dump)
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()

View File

@ -16,14 +16,29 @@
package file
import "github.com/cerc-io/plugeth-statediff/utils/log"
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
submit func(blockTx *BatchTx, err error) error
blockNum string
fileWriter FileWriter
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func (tx *BatchTx) Submit() error {
tx.fileWriter.Flush()
return nil
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, but rollback not supported", "error", err)
}
}

View File

@ -17,6 +17,7 @@
package file
import (
"context"
"errors"
"fmt"
"math/big"
@ -130,7 +131,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -159,27 +160,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
blockTx := &BatchTx{
BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Trace(traceMsg)
return err
},
blockNum: block.Number().String(),
fileWriter: sdi.fileWriter,
}
tDiff := time.Since(t)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// write header, collect headerID
@ -187,7 +172,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
tDiff := time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -382,20 +367,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
// publish the state node
var stateModel models.StateNodeModel
if stateNode.Removed {
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStateCID, []byte{})
}
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -403,7 +384,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -423,10 +404,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.Removed {
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStorageCID, []byte{})
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -438,7 +419,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -453,12 +434,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
// PushIPLD writes iplds to ipld.blocks
func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content)
func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, ipld sdtypes.IPLD) error {
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), ipld.CID, ipld.Content)
return nil
}
@ -480,6 +457,13 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return false, nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
return &BatchTx{
blockNum: number.String(),
fileWriter: sdi.fileWriter,
}
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close()

View File

@ -35,7 +35,7 @@ const startingCacheCapacity = 1024 * 24
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
blockNum string
ctx context.Context
dbtx Tx
stm string
@ -44,15 +44,14 @@ type BatchTx struct {
ipldCache models.IPLDBatch
removedCacheFlag *uint32
// Tracks expected cache size and ensures cache is caught up before flush
cacheWg sync.WaitGroup
startTime time.Time
cacheWg sync.WaitGroup
}
func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, start time.Time) *BatchTx {
func NewBatch(insertStm string, ctx context.Context, number *big.Int, tx Tx) *BatchTx {
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: ctx,
BlockNumber: number.String(),
blockNum: number.String(),
stm: insertStm,
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
@ -61,35 +60,39 @@ func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, s
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
startTime: time.Now(),
dbtx: tx,
}
go blockTx.cache()
return blockTx
}
// Submit satisfies indexer.Batch
func (tx *BatchTx) Submit(err error) error {
func (tx *BatchTx) Submit() error {
defer tx.close()
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(tx.startTime))
t := time.Now()
if err := tx.flush(); err != nil {
rollback(tx.ctx, tx.dbtx)
return err
}
err = tx.dbtx.Commit(tx.ctx)
err := tx.dbtx.Commit(tx.ctx)
metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t))
return err
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
defer tx.close()
if p := recover(); p != nil {
defer tx.close()
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(tx.ctx, tx.dbtx)
panic(p)
} else if err != nil {
defer tx.close()
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(tx.ctx, tx.dbtx)
}
@ -148,7 +151,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}
@ -157,7 +160,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: i.Cid().String(),
Data: i.RawData(),
}
@ -168,7 +171,7 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) {
atomic.StoreUint32(tx.removedCacheFlag, 1)
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}

View File

@ -39,7 +39,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -82,11 +81,12 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
height := block.NumberU64()
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
transactions := block.Transactions()
var err error
// Derive any missing fields
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
return nil, err
@ -95,11 +95,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err)
}
if len(txNodes) != len(rctNodes) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes))
return nil, fmt.Errorf("expected number of transactions (%d) does not match number of receipts (%d)",
len(txNodes), len(rctNodes))
}
// Calculate reward
@ -108,98 +109,35 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if sdi.chainConfig.Clique != nil {
reward = big.NewInt(0)
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), transactions, receipts)
}
t = time.Now()
// Begin new DB tx for everything
tx := NewDelayedTx(sdi.dbWriter.db)
defer func() {
if p := recover(); p != nil {
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
rollback(sdi.ctx, tx)
}
}()
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: sdi.ctx,
BlockNumber: block.Number().String(),
stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
// handle transaction commit or rollback for any return case
submit: func(self *BatchTx, err error) error {
defer func() {
confirm := make(chan struct{})
self.quit <- confirm
close(self.quit)
<-confirm
close(self.iplds)
}()
if p := recover(); p != nil {
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
t = time.Now()
if err := self.flush(); err != nil {
rollback(sdi.ctx, tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
t = time.Now()
batch := NewBatch(
sdi.dbWriter.db.InsertIPLDsStm(), sdi.ctx,
block.Number(),
NewDelayedTx(sdi.dbWriter.db),
)
// handle transaction rollback for failures in this scope
defer batch.RollbackOnFailure(err)
// Publish and index header, collect headerID
headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
headerID, err := sdi.PushHeader(batch, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
err = sdi.processUncles(batch, headerID, block.Number(), block.UncleHash(), block.Uncles())
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
err = sdi.processReceiptsAndTxs(batch, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
@ -211,12 +149,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
t = time.Now()
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(time.Since(t))
return blockTx, err
return batch, err
}
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
@ -414,7 +349,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if stateNode.Removed {
tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{})
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -422,7 +357,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -444,7 +379,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.Removed {
tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{})
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -458,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -489,6 +424,15 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return sdi.dbWriter.hasHeader(hash, number)
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
return NewBatch(
sdi.dbWriter.db.InsertIPLDsStm(),
ctx,
number,
NewDelayedTx(sdi.dbWriter.db),
)
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dbWriter.Close()

View File

@ -17,6 +17,7 @@
package interfaces
import (
"context"
"math/big"
"time"
@ -39,6 +40,8 @@ type StateDiffIndexer interface {
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
BeginTx(number *big.Int, ctx context.Context) Batch
// Methods used by WatchAddress API/functionality
LoadWatchedAddresses() ([]common.Address, error)
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
@ -51,7 +54,9 @@ type StateDiffIndexer interface {
// Batch required for indexing data atomically
type Batch interface {
Submit(err error) error
Submit() error
BlockNumber() string
RollbackOnFailure(error)
}
// Config used to configure different underlying implementations

View File

@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
@ -48,7 +47,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
t.Fatal(err)
}
defer func() {
roysc marked this conversation as resolved
Review

Do you recall why we passed err in to Submit() before? I have a vague memory buzzing in the back of my mind and I want to double check that there is no longer any reason.

Do you recall why we passed err in to Submit() before? I have a vague memory buzzing in the back of my mind and I want to double check that there is no longer any reason.
Review

Basically it was due to handling rollbacks and commit in the submit function, so it needed the error to check whether to rollback. By factoring the rollback out, we avoid that, but now the caller is responsible for it.

Which reminds me that I need to defer a rollback call from the scope of service.writeStateDiff - since the tx is returned from PushBlock.

Basically it was due to handling rollbacks and commit in the submit function, so it needed the error to check whether to rollback. By factoring the rollback out, we avoid that, but now the caller is responsible for it. Which reminds me that I need to defer a rollback call from the scope of `service.writeStateDiff` - since the tx is returned from `PushBlock`.
Review

If you've added the rollback, feel free to resolve this conversation.

If you've added the rollback, feel free to resolve this conversation.
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -61,11 +60,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber())
}
func DoTestPublishAndIndexHeaderIPLDs(t *testing.T, db sql.Database) {
@ -547,13 +542,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx1.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx1.(*file.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx1.BlockNumber())
if err := tx1.Submit(err); err != nil {
if err := tx1.Submit(); err != nil {
t.Fatal(err)
}
@ -572,13 +563,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if tx, ok := tx2.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
} else if tx, ok := tx2.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx2.BlockNumber())
if err := tx2.Submit(err); err != nil {
if err := tx2.Submit(); err != nil {
t.Fatal(err)
}
@ -597,13 +584,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx3.(*sql.BatchTx); ok {
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx3.(*file.BatchTx); ok {
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.Block2Number.String(), tx3.BlockNumber())
if err := tx3.Submit(err); err != nil {
if err := tx3.Submit(); err != nil {
t.Fatal(err)
}
}

View File

@ -20,7 +20,6 @@ import (
"context"
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
@ -51,7 +50,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -60,11 +59,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, legacyData.BlockNumber.String(), tx.BlockNumber())
}
func TestLegacyIndexer(t *testing.T, db sql.Database) {

View File

@ -19,8 +19,6 @@ package test
import (
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
"github.com/ethereum/go-ethereum/core/types"
@ -36,7 +34,7 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -45,9 +43,5 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
}
require.Equal(t, testBlock.Number().String(), tx.BlockNumber())
}

View File

@ -817,6 +817,8 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
if params.IncludeReceipts {
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
}
t := time.Now()
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
if err != nil {
return err
@ -842,9 +844,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
BlockHash: block.Hash(),
BlockNumber: block.Number(),
}, params, nodeSink, ipldSink)
if err != nil {
return err
}
// TODO this anti-pattern needs to be sorted out eventually
if err = tx.Submit(err); err != nil {
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(t))
if err = tx.Submit(); err != nil {
return fmt.Errorf("batch transaction submission failed: %w", err)
}
return nil

View File

@ -17,6 +17,7 @@
package mocks
import (
context "context"
"math/big"
"time"
@ -84,10 +85,21 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
return &batch{}
}
func (sdi *StateDiffIndexer) Close() error {
return nil
}
func (tx *batch) Submit(err error) error {
func (tx *batch) RollbackOnFailure(error) {}
func (tx *batch) Submit() error {
return nil
}
// batch.BlockNumber
func (tx *batch) BlockNumber() string {
return "0"
}