refactor batch
This commit is contained in:
parent
12272aa001
commit
633ac6c455
@ -19,26 +19,55 @@ package dump
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
|
||||
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/models"
|
||||
"github.com/cerc-io/plugeth-statediff/utils/log"
|
||||
)
|
||||
|
||||
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
|
||||
type BatchTx struct {
|
||||
BlockNumber string
|
||||
dump io.Writer
|
||||
quit chan struct{}
|
||||
iplds chan models.IPLDModel
|
||||
ipldCache models.IPLDBatch
|
||||
|
||||
submit func(blockTx *BatchTx, err error) error
|
||||
blockNum string
|
||||
dump io.Writer
|
||||
quit chan struct{}
|
||||
iplds chan models.IPLDModel
|
||||
ipldCache models.IPLDBatch
|
||||
}
|
||||
|
||||
// Submit satisfies indexer.AtomicTx
|
||||
func (tx *BatchTx) Submit(err error) error {
|
||||
return tx.submit(tx, err)
|
||||
func NewBatch(number *big.Int, dest io.Writer) *BatchTx {
|
||||
batch := &BatchTx{
|
||||
blockNum: number.String(),
|
||||
dump: dest,
|
||||
iplds: make(chan models.IPLDModel),
|
||||
quit: make(chan struct{}),
|
||||
ipldCache: models.IPLDBatch{},
|
||||
}
|
||||
go batch.cache()
|
||||
return batch
|
||||
}
|
||||
|
||||
func (self *BatchTx) Submit() error {
|
||||
close(self.quit)
|
||||
close(self.iplds)
|
||||
|
||||
if err := self.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx *BatchTx) BlockNumber() string {
|
||||
return tx.blockNum
|
||||
}
|
||||
|
||||
func (tx *BatchTx) RollbackOnFailure(err error) {
|
||||
if p := recover(); p != nil {
|
||||
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
log.Info("error detected before tx submission, but rollback not supported", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (tx *BatchTx) flush() error {
|
||||
@ -65,7 +94,7 @@ func (tx *BatchTx) cache() {
|
||||
|
||||
func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
Key: key,
|
||||
Data: value,
|
||||
}
|
||||
@ -73,7 +102,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
|
||||
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
Key: i.Cid().String(),
|
||||
Data: i.RawData(),
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package dump
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -37,7 +38,6 @@ import (
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/models"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/shared"
|
||||
sdtypes "github.com/cerc-io/plugeth-statediff/types"
|
||||
"github.com/cerc-io/plugeth-statediff/utils/log"
|
||||
)
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
@ -62,7 +62,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
|
||||
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||
start, t := time.Now(), time.Now()
|
||||
t := time.Now()
|
||||
blockHash := block.Hash()
|
||||
blockHashStr := blockHash.String()
|
||||
height := block.NumberU64()
|
||||
@ -91,40 +91,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
} else {
|
||||
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
|
||||
}
|
||||
t = time.Now()
|
||||
|
||||
blockTx := &BatchTx{
|
||||
BlockNumber: block.Number().String(),
|
||||
dump: sdi.dump,
|
||||
iplds: make(chan models.IPLDModel),
|
||||
quit: make(chan struct{}),
|
||||
ipldCache: models.IPLDBatch{},
|
||||
submit: func(self *BatchTx, err error) error {
|
||||
close(self.quit)
|
||||
close(self.iplds)
|
||||
tDiff := time.Since(t)
|
||||
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
if err := self.flush(); err != nil {
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
},
|
||||
}
|
||||
go blockTx.cache()
|
||||
|
||||
tDiff := time.Since(t)
|
||||
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
blockTx := NewBatch(block.Number(), sdi.dump)
|
||||
t = time.Now()
|
||||
|
||||
// Publish and index header, collect headerID
|
||||
@ -133,7 +101,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
tDiff := time.Since(t)
|
||||
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
@ -352,7 +320,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
// short circuit if it is a Removed node
|
||||
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStateCID,
|
||||
@ -360,7 +328,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
}
|
||||
} else {
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: stateNode.AccountWrapper.CID,
|
||||
@ -383,7 +351,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
// short circuit if it is a Removed node
|
||||
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
@ -396,7 +364,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
continue
|
||||
}
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
@ -440,6 +408,10 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
|
||||
return NewBatch(number, sdi.dump)
|
||||
}
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.dump.Close()
|
||||
|
@ -16,14 +16,29 @@
|
||||
|
||||
package file
|
||||
|
||||
import "github.com/cerc-io/plugeth-statediff/utils/log"
|
||||
|
||||
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
|
||||
type BatchTx struct {
|
||||
BlockNumber string
|
||||
|
||||
submit func(blockTx *BatchTx, err error) error
|
||||
blockNum string
|
||||
fileWriter FileWriter
|
||||
}
|
||||
|
||||
// Submit satisfies indexer.AtomicTx
|
||||
func (tx *BatchTx) Submit(err error) error {
|
||||
return tx.submit(tx, err)
|
||||
func (tx *BatchTx) Submit() error {
|
||||
tx.fileWriter.Flush()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx *BatchTx) BlockNumber() string {
|
||||
return tx.blockNum
|
||||
}
|
||||
|
||||
func (tx *BatchTx) RollbackOnFailure(err error) {
|
||||
if p := recover(); p != nil {
|
||||
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
log.Info("error detected before tx submission, but rollback not supported", "error", err)
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package file
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
@ -130,7 +131,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
|
||||
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||
start, t := time.Now(), time.Now()
|
||||
t := time.Now()
|
||||
blockHash := block.Hash()
|
||||
blockHashStr := blockHash.String()
|
||||
height := block.NumberU64()
|
||||
@ -159,27 +160,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
} else {
|
||||
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
|
||||
}
|
||||
t = time.Now()
|
||||
|
||||
blockTx := &BatchTx{
|
||||
BlockNumber: block.Number().String(),
|
||||
submit: func(self *BatchTx, err error) error {
|
||||
tDiff := time.Since(t)
|
||||
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
sdi.fileWriter.Flush()
|
||||
tDiff = time.Since(t)
|
||||
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Trace(traceMsg)
|
||||
return err
|
||||
},
|
||||
blockNum: block.Number().String(),
|
||||
fileWriter: sdi.fileWriter,
|
||||
}
|
||||
tDiff := time.Since(t)
|
||||
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
// write header, collect headerID
|
||||
@ -187,7 +172,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
tDiff := time.Since(t)
|
||||
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
@ -382,20 +367,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
||||
}
|
||||
|
||||
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
|
||||
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
|
||||
tx, ok := batch.(*BatchTx)
|
||||
if !ok {
|
||||
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
|
||||
}
|
||||
func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
|
||||
// publish the state node
|
||||
var stateModel models.StateNodeModel
|
||||
if stateNode.Removed {
|
||||
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
|
||||
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
|
||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
|
||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStateCID, []byte{})
|
||||
}
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStateCID,
|
||||
@ -403,7 +384,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
}
|
||||
} else {
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: stateNode.AccountWrapper.CID,
|
||||
@ -423,10 +404,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
if storageNode.Removed {
|
||||
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
|
||||
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
|
||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
|
||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStorageCID, []byte{})
|
||||
}
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
@ -438,7 +419,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
continue
|
||||
}
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
@ -453,12 +434,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
}
|
||||
|
||||
// PushIPLD writes iplds to ipld.blocks
|
||||
func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
|
||||
tx, ok := batch.(*BatchTx)
|
||||
if !ok {
|
||||
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
|
||||
}
|
||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content)
|
||||
func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, ipld sdtypes.IPLD) error {
|
||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), ipld.CID, ipld.Content)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -480,6 +457,13 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
|
||||
return &BatchTx{
|
||||
blockNum: number.String(),
|
||||
fileWriter: sdi.fileWriter,
|
||||
}
|
||||
}
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.fileWriter.Close()
|
||||
|
@ -35,7 +35,7 @@ const startingCacheCapacity = 1024 * 24
|
||||
|
||||
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
|
||||
type BatchTx struct {
|
||||
BlockNumber string
|
||||
blockNum string
|
||||
ctx context.Context
|
||||
dbtx Tx
|
||||
stm string
|
||||
@ -44,15 +44,14 @@ type BatchTx struct {
|
||||
ipldCache models.IPLDBatch
|
||||
removedCacheFlag *uint32
|
||||
// Tracks expected cache size and ensures cache is caught up before flush
|
||||
cacheWg sync.WaitGroup
|
||||
startTime time.Time
|
||||
cacheWg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, start time.Time) *BatchTx {
|
||||
func NewBatch(insertStm string, ctx context.Context, number *big.Int, tx Tx) *BatchTx {
|
||||
blockTx := &BatchTx{
|
||||
removedCacheFlag: new(uint32),
|
||||
ctx: ctx,
|
||||
BlockNumber: number.String(),
|
||||
blockNum: number.String(),
|
||||
stm: insertStm,
|
||||
iplds: make(chan models.IPLDModel),
|
||||
quit: make(chan (chan<- struct{})),
|
||||
@ -61,35 +60,39 @@ func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, s
|
||||
Keys: make([]string, 0, startingCacheCapacity),
|
||||
Values: make([][]byte, 0, startingCacheCapacity),
|
||||
},
|
||||
dbtx: tx,
|
||||
startTime: time.Now(),
|
||||
dbtx: tx,
|
||||
}
|
||||
go blockTx.cache()
|
||||
return blockTx
|
||||
}
|
||||
|
||||
// Submit satisfies indexer.Batch
|
||||
func (tx *BatchTx) Submit(err error) error {
|
||||
func (tx *BatchTx) Submit() error {
|
||||
defer tx.close()
|
||||
|
||||
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(tx.startTime))
|
||||
t := time.Now()
|
||||
if err := tx.flush(); err != nil {
|
||||
rollback(tx.ctx, tx.dbtx)
|
||||
return err
|
||||
}
|
||||
err = tx.dbtx.Commit(tx.ctx)
|
||||
err := tx.dbtx.Commit(tx.ctx)
|
||||
metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t))
|
||||
return err
|
||||
}
|
||||
|
||||
func (tx *BatchTx) BlockNumber() string {
|
||||
return tx.blockNum
|
||||
}
|
||||
|
||||
func (tx *BatchTx) RollbackOnFailure(err error) {
|
||||
defer tx.close()
|
||||
|
||||
if p := recover(); p != nil {
|
||||
defer tx.close()
|
||||
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
|
||||
rollback(tx.ctx, tx.dbtx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
defer tx.close()
|
||||
log.Info("error detected before tx submission, rolling back the tx", "error", err)
|
||||
rollback(tx.ctx, tx.dbtx)
|
||||
}
|
||||
@ -148,7 +151,7 @@ func (tx *BatchTx) cache() {
|
||||
func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
Key: key,
|
||||
Data: value,
|
||||
}
|
||||
@ -157,7 +160,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
Key: i.Cid().String(),
|
||||
Data: i.RawData(),
|
||||
}
|
||||
@ -168,7 +171,7 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) {
|
||||
atomic.StoreUint32(tx.removedCacheFlag, 1)
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
Key: key,
|
||||
Data: value,
|
||||
}
|
||||
|
@ -39,7 +39,6 @@ import (
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/models"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/shared"
|
||||
sdtypes "github.com/cerc-io/plugeth-statediff/types"
|
||||
"github.com/cerc-io/plugeth-statediff/utils/log"
|
||||
)
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
@ -82,11 +81,12 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
|
||||
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||
start, t := time.Now(), time.Now()
|
||||
t := time.Now()
|
||||
blockHash := block.Hash()
|
||||
height := block.NumberU64()
|
||||
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
|
||||
transactions := block.Transactions()
|
||||
var err error
|
||||
|
||||
// Derive any missing fields
|
||||
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
|
||||
return nil, err
|
||||
@ -95,11 +95,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
// Generate the block iplds
|
||||
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
|
||||
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err)
|
||||
}
|
||||
|
||||
if len(txNodes) != len(rctNodes) {
|
||||
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes))
|
||||
return nil, fmt.Errorf("expected number of transactions (%d) does not match number of receipts (%d)",
|
||||
len(txNodes), len(rctNodes))
|
||||
}
|
||||
|
||||
// Calculate reward
|
||||
@ -108,98 +109,35 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
if sdi.chainConfig.Clique != nil {
|
||||
reward = big.NewInt(0)
|
||||
} else {
|
||||
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
|
||||
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), transactions, receipts)
|
||||
}
|
||||
t = time.Now()
|
||||
|
||||
// Begin new DB tx for everything
|
||||
tx := NewDelayedTx(sdi.dbWriter.db)
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
}
|
||||
}()
|
||||
blockTx := &BatchTx{
|
||||
removedCacheFlag: new(uint32),
|
||||
ctx: sdi.ctx,
|
||||
BlockNumber: block.Number().String(),
|
||||
stm: sdi.dbWriter.db.InsertIPLDsStm(),
|
||||
iplds: make(chan models.IPLDModel),
|
||||
quit: make(chan (chan<- struct{})),
|
||||
ipldCache: models.IPLDBatch{
|
||||
BlockNumbers: make([]string, 0, startingCacheCapacity),
|
||||
Keys: make([]string, 0, startingCacheCapacity),
|
||||
Values: make([][]byte, 0, startingCacheCapacity),
|
||||
},
|
||||
dbtx: tx,
|
||||
// handle transaction commit or rollback for any return case
|
||||
submit: func(self *BatchTx, err error) error {
|
||||
defer func() {
|
||||
confirm := make(chan struct{})
|
||||
self.quit <- confirm
|
||||
close(self.quit)
|
||||
<-confirm
|
||||
close(self.iplds)
|
||||
}()
|
||||
if p := recover(); p != nil {
|
||||
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
|
||||
rollback(sdi.ctx, tx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
log.Info("error detected before tx submission, rolling back the tx", "error", err)
|
||||
rollback(sdi.ctx, tx)
|
||||
} else {
|
||||
tDiff := time.Since(t)
|
||||
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
|
||||
t = time.Now()
|
||||
if err := self.flush(); err != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
}
|
||||
err = tx.Commit(sdi.ctx)
|
||||
tDiff = time.Since(t)
|
||||
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
|
||||
}
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
},
|
||||
}
|
||||
go blockTx.cache()
|
||||
|
||||
tDiff := time.Since(t)
|
||||
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
|
||||
t = time.Now()
|
||||
batch := NewBatch(
|
||||
sdi.dbWriter.db.InsertIPLDsStm(), sdi.ctx,
|
||||
block.Number(),
|
||||
NewDelayedTx(sdi.dbWriter.db),
|
||||
)
|
||||
// handle transaction rollback for failures in this scope
|
||||
defer batch.RollbackOnFailure(err)
|
||||
|
||||
// Publish and index header, collect headerID
|
||||
headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
|
||||
headerID, err := sdi.PushHeader(batch, block.Header(), reward, totalDifficulty)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
|
||||
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(time.Since(t))
|
||||
t = time.Now()
|
||||
// Publish and index uncles
|
||||
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
|
||||
err = sdi.processUncles(batch, headerID, block.Number(), block.UncleHash(), block.Uncles())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
|
||||
metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t))
|
||||
t = time.Now()
|
||||
// Publish and index receipts and txs
|
||||
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
|
||||
err = sdi.processReceiptsAndTxs(batch, processArgs{
|
||||
headerID: headerID,
|
||||
blockNumber: block.Number(),
|
||||
receipts: receipts,
|
||||
@ -211,12 +149,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
|
||||
t = time.Now()
|
||||
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(time.Since(t))
|
||||
|
||||
return blockTx, err
|
||||
return batch, err
|
||||
}
|
||||
|
||||
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
|
||||
@ -414,7 +349,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
if stateNode.Removed {
|
||||
tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{})
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStateCID,
|
||||
@ -422,7 +357,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
}
|
||||
} else {
|
||||
stateModel = models.StateNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
CID: stateNode.AccountWrapper.CID,
|
||||
@ -444,7 +379,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
if storageNode.Removed {
|
||||
tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{})
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
@ -458,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
||||
continue
|
||||
}
|
||||
storageModel := models.StorageNodeModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
BlockNumber: tx.BlockNumber(),
|
||||
HeaderID: headerID,
|
||||
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
@ -489,6 +424,15 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
|
||||
return sdi.dbWriter.hasHeader(hash, number)
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
|
||||
return NewBatch(
|
||||
sdi.dbWriter.db.InsertIPLDsStm(),
|
||||
ctx,
|
||||
number,
|
||||
NewDelayedTx(sdi.dbWriter.db),
|
||||
)
|
||||
}
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.dbWriter.Close()
|
||||
|
@ -17,6 +17,7 @@
|
||||
package interfaces
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
@ -39,6 +40,8 @@ type StateDiffIndexer interface {
|
||||
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
|
||||
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||
|
||||
BeginTx(number *big.Int, ctx context.Context) Batch
|
||||
|
||||
// Methods used by WatchAddress API/functionality
|
||||
LoadWatchedAddresses() ([]common.Address, error)
|
||||
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
|
||||
@ -51,7 +54,9 @@ type StateDiffIndexer interface {
|
||||
|
||||
// Batch required for indexing data atomically
|
||||
type Batch interface {
|
||||
Submit(err error) error
|
||||
Submit() error
|
||||
BlockNumber() string
|
||||
RollbackOnFailure(error)
|
||||
}
|
||||
|
||||
// Config used to configure different underlying implementations
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
|
||||
@ -48,7 +47,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
if err := tx.Submit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
@ -61,11 +60,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if batchTx, ok := tx.(*sql.BatchTx); ok {
|
||||
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
|
||||
} else if batchTx, ok := tx.(*file.BatchTx); ok {
|
||||
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
|
||||
}
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber())
|
||||
}
|
||||
|
||||
func DoTestPublishAndIndexHeaderIPLDs(t *testing.T, db sql.Database) {
|
||||
@ -547,13 +542,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if batchTx, ok := tx1.(*sql.BatchTx); ok {
|
||||
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
|
||||
} else if batchTx, ok := tx1.(*file.BatchTx); ok {
|
||||
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
|
||||
}
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx1.BlockNumber())
|
||||
|
||||
if err := tx1.Submit(err); err != nil {
|
||||
if err := tx1.Submit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -572,13 +563,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if tx, ok := tx2.(*sql.BatchTx); ok {
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
|
||||
} else if tx, ok := tx2.(*sql.BatchTx); ok {
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
|
||||
}
|
||||
require.Equal(t, mocks.BlockNumber.String(), tx2.BlockNumber())
|
||||
|
||||
if err := tx2.Submit(err); err != nil {
|
||||
if err := tx2.Submit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -597,13 +584,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if batchTx, ok := tx3.(*sql.BatchTx); ok {
|
||||
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
|
||||
} else if batchTx, ok := tx3.(*file.BatchTx); ok {
|
||||
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
|
||||
}
|
||||
require.Equal(t, mocks.Block2Number.String(), tx3.BlockNumber())
|
||||
|
||||
if err := tx3.Submit(err); err != nil {
|
||||
if err := tx3.Submit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
|
||||
@ -51,7 +50,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
if err := tx.Submit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
@ -60,11 +59,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if batchTx, ok := tx.(*sql.BatchTx); ok {
|
||||
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
|
||||
} else if batchTx, ok := tx.(*file.BatchTx); ok {
|
||||
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
|
||||
}
|
||||
require.Equal(t, legacyData.BlockNumber.String(), tx.BlockNumber())
|
||||
}
|
||||
|
||||
func TestLegacyIndexer(t *testing.T, db sql.Database) {
|
||||
|
@ -19,8 +19,6 @@ package test
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
||||
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
@ -36,7 +34,7 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
if err := tx.Submit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
@ -45,9 +43,5 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if batchTx, ok := tx.(*sql.BatchTx); ok {
|
||||
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
|
||||
} else if batchTx, ok := tx.(*file.BatchTx); ok {
|
||||
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
|
||||
}
|
||||
require.Equal(t, testBlock.Number().String(), tx.BlockNumber())
|
||||
}
|
||||
|
@ -815,6 +815,8 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
if params.IncludeReceipts {
|
||||
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
|
||||
}
|
||||
|
||||
t := time.Now()
|
||||
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -840,9 +842,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
BlockHash: block.Hash(),
|
||||
BlockNumber: block.Number(),
|
||||
}, params, nodeSink, ipldSink)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO this anti-pattern needs to be sorted out eventually
|
||||
if err = tx.Submit(err); err != nil {
|
||||
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(t))
|
||||
if err = tx.Submit(); err != nil {
|
||||
return fmt.Errorf("batch transaction submission failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -17,6 +17,7 @@
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
@ -84,10 +85,21 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
|
||||
return &batch{}
|
||||
}
|
||||
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx *batch) Submit(err error) error {
|
||||
func (tx *batch) RollbackOnFailure(error) {}
|
||||
|
||||
func (tx *batch) Submit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// batch.BlockNumber
|
||||
func (tx *batch) BlockNumber() string {
|
||||
return "0"
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user