refactor batch

This commit is contained in:
Roy Crihfield 2023-08-06 13:29:19 +08:00
parent 12272aa001
commit 2f540104d1
12 changed files with 182 additions and 242 deletions

View File

@ -19,26 +19,55 @@ package dump
import (
"fmt"
"io"
"math/big"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
submit func(blockTx *BatchTx, err error) error
blockNum string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func NewBatch(number *big.Int, dest io.Writer) *BatchTx {
batch := &BatchTx{
blockNum: number.String(),
dump: dest,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
}
go batch.cache()
return batch
}
func (self *BatchTx) Submit() error {
close(self.quit)
close(self.iplds)
if err := self.flush(); err != nil {
return err
}
return nil
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, but rollback not supported", "error", err)
}
}
func (tx *BatchTx) flush() error {
@ -65,7 +94,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}
@ -73,7 +102,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: i.Cid().String(),
Data: i.RawData(),
}

View File

@ -17,6 +17,7 @@
package dump
import (
"context"
"encoding/hex"
"fmt"
"io"
@ -37,7 +38,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -62,7 +62,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -91,40 +91,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
blockTx := &BatchTx{
BlockNumber: block.Number().String(),
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
submit: func(self *BatchTx, err error) error {
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
}
tDiff = time.Since(t)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
blockTx := NewBatch(block.Number(), sdi.dump)
t = time.Now()
// Publish and index header, collect headerID
@ -133,7 +101,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
tDiff := time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -352,7 +320,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -360,7 +328,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -383,7 +351,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -396,7 +364,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -440,6 +408,10 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
return nil, nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
return NewBatch(number, sdi.dump)
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()

View File

@ -16,14 +16,29 @@
package file
import "github.com/cerc-io/plugeth-statediff/utils/log"
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
submit func(blockTx *BatchTx, err error) error
blockNum string
fileWriter FileWriter
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func (tx *BatchTx) Submit() error {
tx.fileWriter.Flush()
return nil
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, but rollback not supported", "error", err)
}
}

View File

@ -17,6 +17,7 @@
package file
import (
"context"
"errors"
"fmt"
"math/big"
@ -130,7 +131,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -159,27 +160,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
blockTx := &BatchTx{
BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Trace(traceMsg)
return err
},
blockNum: block.Number().String(),
fileWriter: sdi.fileWriter,
}
tDiff := time.Since(t)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// write header, collect headerID
@ -187,7 +172,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
tDiff := time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -382,20 +367,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
// publish the state node
var stateModel models.StateNodeModel
if stateNode.Removed {
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStateCID, []byte{})
}
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -403,7 +384,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -423,10 +404,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.Removed {
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStorageCID, []byte{})
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -438,7 +419,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -453,12 +434,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
// PushIPLD writes iplds to ipld.blocks
func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content)
func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, ipld sdtypes.IPLD) error {
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), ipld.CID, ipld.Content)
return nil
}
@ -480,6 +457,13 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return false, nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
return &BatchTx{
blockNum: number.String(),
fileWriter: sdi.fileWriter,
}
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close()

View File

@ -35,7 +35,7 @@ const startingCacheCapacity = 1024 * 24
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
blockNum string
ctx context.Context
dbtx Tx
stm string
@ -44,15 +44,14 @@ type BatchTx struct {
ipldCache models.IPLDBatch
removedCacheFlag *uint32
// Tracks expected cache size and ensures cache is caught up before flush
cacheWg sync.WaitGroup
startTime time.Time
cacheWg sync.WaitGroup
}
func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, start time.Time) *BatchTx {
func NewBatch(insertStm string, ctx context.Context, number *big.Int, tx Tx) *BatchTx {
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: ctx,
BlockNumber: number.String(),
blockNum: number.String(),
stm: insertStm,
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
@ -61,27 +60,30 @@ func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, s
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
startTime: time.Now(),
dbtx: tx,
}
go blockTx.cache()
return blockTx
}
// Submit satisfies indexer.Batch
func (tx *BatchTx) Submit(err error) error {
func (tx *BatchTx) Submit() error {
defer tx.close()
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(tx.startTime))
t := time.Now()
if err := tx.flush(); err != nil {
rollback(tx.ctx, tx.dbtx)
return err
}
err = tx.dbtx.Commit(tx.ctx)
err := tx.dbtx.Commit(tx.ctx)
metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t))
return err
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
defer tx.close()
@ -148,7 +150,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}
@ -157,7 +159,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: i.Cid().String(),
Data: i.RawData(),
}
@ -168,7 +170,7 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) {
atomic.StoreUint32(tx.removedCacheFlag, 1)
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}

View File

@ -39,7 +39,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -82,11 +81,12 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
height := block.NumberU64()
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
transactions := block.Transactions()
var err error
// Derive any missing fields
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
return nil, err
@ -95,11 +95,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err)
}
if len(txNodes) != len(rctNodes) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes))
return nil, fmt.Errorf("expected number of transactions (%d) does not match number of receipts (%d)",
len(txNodes), len(rctNodes))
}
// Calculate reward
@ -108,98 +109,35 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if sdi.chainConfig.Clique != nil {
reward = big.NewInt(0)
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), transactions, receipts)
}
t = time.Now()
// Begin new DB tx for everything
tx := NewDelayedTx(sdi.dbWriter.db)
defer func() {
if p := recover(); p != nil {
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
rollback(sdi.ctx, tx)
}
}()
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: sdi.ctx,
BlockNumber: block.Number().String(),
stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
// handle transaction commit or rollback for any return case
submit: func(self *BatchTx, err error) error {
defer func() {
confirm := make(chan struct{})
self.quit <- confirm
close(self.quit)
<-confirm
close(self.iplds)
}()
if p := recover(); p != nil {
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
t = time.Now()
if err := self.flush(); err != nil {
rollback(sdi.ctx, tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
t = time.Now()
batch := NewBatch(
sdi.dbWriter.db.InsertIPLDsStm(), sdi.ctx,
block.Number(),
NewDelayedTx(sdi.dbWriter.db),
)
// handle transaction rollback for failures in this scope
defer batch.RollbackOnFailure(err)
// Publish and index header, collect headerID
headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
headerID, err := sdi.PushHeader(batch, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
err = sdi.processUncles(batch, headerID, block.Number(), block.UncleHash(), block.Uncles())
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
err = sdi.processReceiptsAndTxs(batch, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
@ -211,12 +149,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
t = time.Now()
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(time.Since(t))
return blockTx, err
return batch, err
}
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
@ -414,7 +349,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if stateNode.Removed {
tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{})
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -422,7 +357,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -444,7 +379,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.Removed {
tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{})
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -458,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -489,6 +424,15 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return sdi.dbWriter.hasHeader(hash, number)
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
return NewBatch(
sdi.dbWriter.db.InsertIPLDsStm(),
ctx,
number,
NewDelayedTx(sdi.dbWriter.db),
)
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dbWriter.Close()

View File

@ -17,6 +17,7 @@
package interfaces
import (
"context"
"math/big"
"time"
@ -39,6 +40,8 @@ type StateDiffIndexer interface {
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
BeginTx(number *big.Int, ctx context.Context) Batch
// Methods used by WatchAddress API/functionality
LoadWatchedAddresses() ([]common.Address, error)
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
@ -51,7 +54,9 @@ type StateDiffIndexer interface {
// Batch required for indexing data atomically
type Batch interface {
Submit(err error) error
Submit() error
BlockNumber() string
RollbackOnFailure(error)
}
// Config used to configure different underlying implementations

View File

@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
@ -48,7 +47,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
t.Fatal(err)
}
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -61,11 +60,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber())
}
func DoTestPublishAndIndexHeaderIPLDs(t *testing.T, db sql.Database) {
@ -547,13 +542,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx1.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx1.(*file.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx1.BlockNumber())
if err := tx1.Submit(err); err != nil {
if err := tx1.Submit(); err != nil {
t.Fatal(err)
}
@ -572,13 +563,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if tx, ok := tx2.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
} else if tx, ok := tx2.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx2.BlockNumber())
if err := tx2.Submit(err); err != nil {
if err := tx2.Submit(); err != nil {
t.Fatal(err)
}
@ -597,13 +584,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx3.(*sql.BatchTx); ok {
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx3.(*file.BatchTx); ok {
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.Block2Number.String(), tx3.BlockNumber())
if err := tx3.Submit(err); err != nil {
if err := tx3.Submit(); err != nil {
t.Fatal(err)
}
}

View File

@ -20,7 +20,6 @@ import (
"context"
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
@ -51,7 +50,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -60,11 +59,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, legacyData.BlockNumber.String(), tx.BlockNumber())
}
func TestLegacyIndexer(t *testing.T, db sql.Database) {

View File

@ -19,8 +19,6 @@ package test
import (
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
"github.com/ethereum/go-ethereum/core/types"
@ -36,7 +34,7 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -45,9 +43,5 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
}
require.Equal(t, testBlock.Number().String(), tx.BlockNumber())
}

View File

@ -815,6 +815,8 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
if params.IncludeReceipts {
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
}
t := time.Now()
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
if err != nil {
return err
@ -840,9 +842,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
BlockHash: block.Hash(),
BlockNumber: block.Number(),
}, params, nodeSink, ipldSink)
if err != nil {
return err
}
// TODO this anti-pattern needs to be sorted out eventually
if err = tx.Submit(err); err != nil {
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(t))
if err = tx.Submit(); err != nil {
return fmt.Errorf("batch transaction submission failed: %w", err)
}
return nil

View File

@ -17,6 +17,7 @@
package mocks
import (
context "context"
"math/big"
"time"
@ -84,10 +85,21 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
return &batch{}
}
func (sdi *StateDiffIndexer) Close() error {
return nil
}
func (tx *batch) Submit(err error) error {
func (tx *batch) RollbackOnFailure(error) {}
func (tx *batch) Submit() error {
return nil
}
// batch.BlockNumber
func (tx *batch) BlockNumber() string {
return "0"
}