From 633ac6c455b859b351428d721aa0ff4d85049e09 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Sun, 6 Aug 2023 13:29:19 +0800 Subject: [PATCH] refactor batch --- indexer/database/dump/batch_tx.go | 55 +++++++++---- indexer/database/dump/indexer.go | 52 +++---------- indexer/database/file/batch_tx.go | 25 ++++-- indexer/database/file/indexer.go | 58 +++++--------- indexer/database/sql/batch_tx.go | 31 ++++---- indexer/database/sql/indexer.go | 124 ++++++++---------------------- indexer/interfaces/interfaces.go | 7 +- indexer/test/test.go | 33 ++------ indexer/test/test_legacy.go | 9 +-- indexer/test/test_mainnet.go | 10 +-- service.go | 9 ++- test_helpers/mocks/indexer.go | 14 +++- 12 files changed, 184 insertions(+), 243 deletions(-) diff --git a/indexer/database/dump/batch_tx.go b/indexer/database/dump/batch_tx.go index 1923622..a36d7ce 100644 --- a/indexer/database/dump/batch_tx.go +++ b/indexer/database/dump/batch_tx.go @@ -19,26 +19,55 @@ package dump import ( "fmt" "io" + "math/big" "github.com/cerc-io/plugeth-statediff/indexer/ipld" - "github.com/cerc-io/plugeth-statediff/indexer/models" + "github.com/cerc-io/plugeth-statediff/utils/log" ) // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string - dump io.Writer - quit chan struct{} - iplds chan models.IPLDModel - ipldCache models.IPLDBatch - - submit func(blockTx *BatchTx, err error) error + blockNum string + dump io.Writer + quit chan struct{} + iplds chan models.IPLDModel + ipldCache models.IPLDBatch } -// Submit satisfies indexer.AtomicTx -func (tx *BatchTx) Submit(err error) error { - return tx.submit(tx, err) +func NewBatch(number *big.Int, dest io.Writer) *BatchTx { + batch := &BatchTx{ + blockNum: number.String(), + dump: dest, + iplds: make(chan models.IPLDModel), + quit: make(chan struct{}), + ipldCache: models.IPLDBatch{}, + } + go batch.cache() + return batch +} + +func (self *BatchTx) Submit() error { + close(self.quit) + close(self.iplds) + + if err := self.flush(); err != nil { + return err + } + return nil +} + +func (tx *BatchTx) BlockNumber() string { + return tx.blockNum +} + +func (tx *BatchTx) RollbackOnFailure(err error) { + if p := recover(); p != nil { + log.Info("panic detected before tx submission, but rollback not supported", "panic", p) + panic(p) + } else if err != nil { + log.Info("error detected before tx submission, but rollback not supported", "error", err) + } } func (tx *BatchTx) flush() error { @@ -65,7 +94,7 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: key, Data: value, } @@ -73,7 +102,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheIPLD(i ipld.IPLD) { tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: i.Cid().String(), Data: i.RawData(), } diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index 3c9f764..7307989 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -17,6 +17,7 @@ package dump import ( + "context" "encoding/hex" "fmt" "io" @@ -37,7 +38,6 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/shared" sdtypes "github.com/cerc-io/plugeth-statediff/types" - "github.com/cerc-io/plugeth-statediff/utils/log" ) var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} @@ -62,7 +62,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - start, t := time.Now(), time.Now() + t := time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -91,40 +91,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } else { reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) } - t = time.Now() - blockTx := &BatchTx{ - BlockNumber: block.Number().String(), - dump: sdi.dump, - iplds: make(chan models.IPLDModel), - quit: make(chan struct{}), - ipldCache: models.IPLDBatch{}, - submit: func(self *BatchTx, err error) error { - close(self.quit) - close(self.iplds) - tDiff := time.Since(t) - metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) - t = time.Now() - if err := self.flush(); err != nil { - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Debug(traceMsg) - return err - } - tDiff = time.Since(t) - metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Debug(traceMsg) - return err - }, - } - go blockTx.cache() - - tDiff := time.Since(t) - metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) - - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) + blockTx := NewBatch(block.Number(), sdi.dump) t = time.Now() // Publish and index header, collect headerID @@ -133,7 +101,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, err } - tDiff = time.Since(t) + tDiff := time.Since(t) metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -352,7 +320,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -360,7 +328,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } } else { stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, @@ -383,7 +351,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -396,7 +364,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt continue } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -440,6 +408,10 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber return nil, nil } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch { + return NewBatch(number, sdi.dump) +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/indexer/database/file/batch_tx.go b/indexer/database/file/batch_tx.go index d38bd12..3096204 100644 --- a/indexer/database/file/batch_tx.go +++ b/indexer/database/file/batch_tx.go @@ -16,14 +16,29 @@ package file +import "github.com/cerc-io/plugeth-statediff/utils/log" + // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string - - submit func(blockTx *BatchTx, err error) error + blockNum string + fileWriter FileWriter } // Submit satisfies indexer.AtomicTx -func (tx *BatchTx) Submit(err error) error { - return tx.submit(tx, err) +func (tx *BatchTx) Submit() error { + tx.fileWriter.Flush() + return nil +} + +func (tx *BatchTx) BlockNumber() string { + return tx.blockNum +} + +func (tx *BatchTx) RollbackOnFailure(err error) { + if p := recover(); p != nil { + log.Info("panic detected before tx submission, but rollback not supported", "panic", p) + panic(p) + } else if err != nil { + log.Info("error detected before tx submission, but rollback not supported", "error", err) + } } diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index 65b5488..b213f91 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -17,6 +17,7 @@ package file import ( + "context" "errors" "fmt" "math/big" @@ -130,7 +131,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - start, t := time.Now(), time.Now() + t := time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -159,27 +160,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } else { reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) } - t = time.Now() blockTx := &BatchTx{ - BlockNumber: block.Number().String(), - submit: func(self *BatchTx, err error) error { - tDiff := time.Since(t) - metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) - t = time.Now() - sdi.fileWriter.Flush() - tDiff = time.Since(t) - metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Trace(traceMsg) - return err - }, + blockNum: block.Number().String(), + fileWriter: sdi.fileWriter, } - tDiff := time.Since(t) - metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() // write header, collect headerID @@ -187,7 +172,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, err } - tDiff = time.Since(t) + tDiff := time.Since(t) metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -382,20 +367,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } // PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file -func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { - tx, ok := batch.(*BatchTx) - if !ok { - return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch) - } +func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { // publish the state node var stateModel models.StateNodeModel if stateNode.Removed { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 { atomic.StoreUint32(&sdi.removedCacheFlag, 1) - sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{}) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStateCID, []byte{}) } stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -403,7 +384,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } } else { stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, @@ -423,10 +404,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if storageNode.Removed { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 { atomic.StoreUint32(&sdi.removedCacheFlag, 1) - sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{}) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStorageCID, []byte{}) } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -438,7 +419,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt continue } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -453,12 +434,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } // PushIPLD writes iplds to ipld.blocks -func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error { - tx, ok := batch.(*BatchTx) - if !ok { - return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch) - } - sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content) +func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, ipld sdtypes.IPLD) error { + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), ipld.CID, ipld.Content) return nil } @@ -480,6 +457,13 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er return false, nil } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch { + return &BatchTx{ + blockNum: number.String(), + fileWriter: sdi.fileWriter, + } +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() diff --git a/indexer/database/sql/batch_tx.go b/indexer/database/sql/batch_tx.go index dbce5b5..d42b214 100644 --- a/indexer/database/sql/batch_tx.go +++ b/indexer/database/sql/batch_tx.go @@ -35,7 +35,7 @@ const startingCacheCapacity = 1024 * 24 // BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber string + blockNum string ctx context.Context dbtx Tx stm string @@ -44,15 +44,14 @@ type BatchTx struct { ipldCache models.IPLDBatch removedCacheFlag *uint32 // Tracks expected cache size and ensures cache is caught up before flush - cacheWg sync.WaitGroup - startTime time.Time + cacheWg sync.WaitGroup } -func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, start time.Time) *BatchTx { +func NewBatch(insertStm string, ctx context.Context, number *big.Int, tx Tx) *BatchTx { blockTx := &BatchTx{ removedCacheFlag: new(uint32), ctx: ctx, - BlockNumber: number.String(), + blockNum: number.String(), stm: insertStm, iplds: make(chan models.IPLDModel), quit: make(chan (chan<- struct{})), @@ -61,35 +60,39 @@ func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, s Keys: make([]string, 0, startingCacheCapacity), Values: make([][]byte, 0, startingCacheCapacity), }, - dbtx: tx, - startTime: time.Now(), + dbtx: tx, } + go blockTx.cache() return blockTx } // Submit satisfies indexer.Batch -func (tx *BatchTx) Submit(err error) error { +func (tx *BatchTx) Submit() error { defer tx.close() - metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(tx.startTime)) t := time.Now() if err := tx.flush(); err != nil { rollback(tx.ctx, tx.dbtx) return err } - err = tx.dbtx.Commit(tx.ctx) + err := tx.dbtx.Commit(tx.ctx) metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t)) return err } +func (tx *BatchTx) BlockNumber() string { + return tx.blockNum +} + func (tx *BatchTx) RollbackOnFailure(err error) { - defer tx.close() if p := recover(); p != nil { + defer tx.close() log.Info("panic detected before tx submission, rolling back the tx", "panic", p) rollback(tx.ctx, tx.dbtx) panic(p) } else if err != nil { + defer tx.close() log.Info("error detected before tx submission, rolling back the tx", "error", err) rollback(tx.ctx, tx.dbtx) } @@ -148,7 +151,7 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: key, Data: value, } @@ -157,7 +160,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { func (tx *BatchTx) cacheIPLD(i ipld.IPLD) { tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: i.Cid().String(), Data: i.RawData(), } @@ -168,7 +171,7 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) { atomic.StoreUint32(tx.removedCacheFlag, 1) tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), Key: key, Data: value, } diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 150d405..6dab963 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -39,7 +39,6 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/shared" sdtypes "github.com/cerc-io/plugeth-statediff/types" - "github.com/cerc-io/plugeth-statediff/utils/log" ) var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} @@ -82,11 +81,12 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - start, t := time.Now(), time.Now() + t := time.Now() blockHash := block.Hash() height := block.NumberU64() - traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash) transactions := block.Transactions() + var err error + // Derive any missing fields if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil { return nil, err @@ -95,11 +95,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // Generate the block iplds txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { - return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) + return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err) } if len(txNodes) != len(rctNodes) { - return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes)) + return nil, fmt.Errorf("expected number of transactions (%d) does not match number of receipts (%d)", + len(txNodes), len(rctNodes)) } // Calculate reward @@ -108,98 +109,35 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if sdi.chainConfig.Clique != nil { reward = big.NewInt(0) } else { - reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts) + reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), transactions, receipts) } t = time.Now() // Begin new DB tx for everything - tx := NewDelayedTx(sdi.dbWriter.db) - defer func() { - if p := recover(); p != nil { - rollback(sdi.ctx, tx) - panic(p) - } else if err != nil { - rollback(sdi.ctx, tx) - } - }() - blockTx := &BatchTx{ - removedCacheFlag: new(uint32), - ctx: sdi.ctx, - BlockNumber: block.Number().String(), - stm: sdi.dbWriter.db.InsertIPLDsStm(), - iplds: make(chan models.IPLDModel), - quit: make(chan (chan<- struct{})), - ipldCache: models.IPLDBatch{ - BlockNumbers: make([]string, 0, startingCacheCapacity), - Keys: make([]string, 0, startingCacheCapacity), - Values: make([][]byte, 0, startingCacheCapacity), - }, - dbtx: tx, - // handle transaction commit or rollback for any return case - submit: func(self *BatchTx, err error) error { - defer func() { - confirm := make(chan struct{}) - self.quit <- confirm - close(self.quit) - <-confirm - close(self.iplds) - }() - if p := recover(); p != nil { - log.Info("panic detected before tx submission, rolling back the tx", "panic", p) - rollback(sdi.ctx, tx) - panic(p) - } else if err != nil { - log.Info("error detected before tx submission, rolling back the tx", "error", err) - rollback(sdi.ctx, tx) - } else { - tDiff := time.Since(t) - metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff) - t = time.Now() - if err := self.flush(); err != nil { - rollback(sdi.ctx, tx) - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start)) - log.Debug(traceMsg) - return err - } - err = tx.Commit(sdi.ctx) - tDiff = time.Since(t) - metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) - traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff) - } - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start)) - log.Debug(traceMsg) - return err - }, - } - go blockTx.cache() - - tDiff := time.Since(t) - metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) - - traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff) - t = time.Now() + batch := NewBatch( + sdi.dbWriter.db.InsertIPLDsStm(), sdi.ctx, + block.Number(), + NewDelayedTx(sdi.dbWriter.db), + ) + // handle transaction rollback for failures in this scope + defer batch.RollbackOnFailure(err) // Publish and index header, collect headerID - headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty) + headerID, err := sdi.PushHeader(batch, block.Header(), reward, totalDifficulty) if err != nil { return nil, err } - tDiff = time.Since(t) - metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff) + metrics2.IndexerMetrics.HeaderProcessingTimer.Update(time.Since(t)) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles()) + err = sdi.processUncles(batch, headerID, block.Number(), block.UncleHash(), block.Uncles()) if err != nil { return nil, err } - tDiff = time.Since(t) - metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff) + metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t)) t = time.Now() // Publish and index receipts and txs - err = sdi.processReceiptsAndTxs(blockTx, processArgs{ + err = sdi.processReceiptsAndTxs(batch, processArgs{ headerID: headerID, blockNumber: block.Number(), receipts: receipts, @@ -211,12 +149,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip if err != nil { return nil, err } - tDiff = time.Since(t) - metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) - traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff) - t = time.Now() + metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(time.Since(t)) - return blockTx, err + return batch, err } // CurrentBlock returns the HeaderModel of the highest existing block in the database. @@ -414,7 +349,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if stateNode.Removed { tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{}) stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, @@ -422,7 +357,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt } } else { stateModel = models.StateNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, @@ -444,7 +379,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if storageNode.Removed { tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{}) storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -458,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt continue } storageModel := models.StorageNodeModel{ - BlockNumber: tx.BlockNumber, + BlockNumber: tx.BlockNumber(), HeaderID: headerID, StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), StorageKey: common.BytesToHash(storageNode.LeafKey).String(), @@ -489,6 +424,15 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er return sdi.dbWriter.hasHeader(hash, number) } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch { + return NewBatch( + sdi.dbWriter.db.InsertIPLDsStm(), + ctx, + number, + NewDelayedTx(sdi.dbWriter.db), + ) +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dbWriter.Close() diff --git a/indexer/interfaces/interfaces.go b/indexer/interfaces/interfaces.go index be7080c..84f76d5 100644 --- a/indexer/interfaces/interfaces.go +++ b/indexer/interfaces/interfaces.go @@ -17,6 +17,7 @@ package interfaces import ( + "context" "math/big" "time" @@ -39,6 +40,8 @@ type StateDiffIndexer interface { PushIPLD(tx Batch, ipld sdtypes.IPLD) error ReportDBMetrics(delay time.Duration, quit <-chan bool) + BeginTx(number *big.Int, ctx context.Context) Batch + // Methods used by WatchAddress API/functionality LoadWatchedAddresses() ([]common.Address, error) InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error @@ -51,7 +54,9 @@ type StateDiffIndexer interface { // Batch required for indexing data atomically type Batch interface { - Submit(err error) error + Submit() error + BlockNumber() string + RollbackOnFailure(error) } // Config used to configure different underlying implementations diff --git a/indexer/test/test.go b/indexer/test/test.go index ad43529..5f9cf7c 100644 --- a/indexer/test/test.go +++ b/indexer/test/test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/cerc-io/plugeth-statediff/indexer/database/file" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/mocks" @@ -48,7 +47,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) { t.Fatal(err) } defer func() { - if err := tx.Submit(err); err != nil { + if err := tx.Submit(); err != nil { t.Fatal(err) } }() @@ -61,11 +60,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx.(*file.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } + require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber()) } func DoTestPublishAndIndexHeaderIPLDs(t *testing.T, db sql.Database) { @@ -547,13 +542,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx1.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx1.(*file.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber) - } + require.Equal(t, mocks.BlockNumber.String(), tx1.BlockNumber()) - if err := tx1.Submit(err); err != nil { + if err := tx1.Submit(); err != nil { t.Fatal(err) } @@ -572,13 +563,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if tx, ok := tx2.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber) - } else if tx, ok := tx2.(*sql.BatchTx); ok { - require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber) - } + require.Equal(t, mocks.BlockNumber.String(), tx2.BlockNumber()) - if err := tx2.Submit(err); err != nil { + if err := tx2.Submit(); err != nil { t.Fatal(err) } @@ -597,13 +584,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx3.(*sql.BatchTx); ok { - require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx3.(*file.BatchTx); ok { - require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber) - } + require.Equal(t, mocks.Block2Number.String(), tx3.BlockNumber()) - if err := tx3.Submit(err); err != nil { + if err := tx3.Submit(); err != nil { t.Fatal(err) } } diff --git a/indexer/test/test_legacy.go b/indexer/test/test_legacy.go index 6b93f79..8878edc 100644 --- a/indexer/test/test_legacy.go +++ b/indexer/test/test_legacy.go @@ -20,7 +20,6 @@ import ( "context" "testing" - "github.com/cerc-io/plugeth-statediff/indexer/database/file" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/ipld" @@ -51,7 +50,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) defer func() { - if err := tx.Submit(err); err != nil { + if err := tx.Submit(); err != nil { t.Fatal(err) } }() @@ -60,11 +59,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) { require.NoError(t, err) } - if batchTx, ok := tx.(*sql.BatchTx); ok { - require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber) - } else if batchTx, ok := tx.(*file.BatchTx); ok { - require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber) - } + require.Equal(t, legacyData.BlockNumber.String(), tx.BlockNumber()) } func TestLegacyIndexer(t *testing.T, db sql.Database) { diff --git a/indexer/test/test_mainnet.go b/indexer/test/test_mainnet.go index 01289ab..6e06458 100644 --- a/indexer/test/test_mainnet.go +++ b/indexer/test/test_mainnet.go @@ -19,8 +19,6 @@ package test import ( "testing" - "github.com/cerc-io/plugeth-statediff/indexer/database/file" - "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/mocks" "github.com/ethereum/go-ethereum/core/types" @@ -36,7 +34,7 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B require.NoError(t, err) defer func() { - if err := tx.Submit(err); err != nil { + if err := tx.Submit(); err != nil { t.Fatal(err) } }() @@ -45,9 +43,5 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B require.NoError(t, err) } - if batchTx, ok := tx.(*sql.BatchTx); ok { - require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber) - } else if batchTx, ok := tx.(*file.BatchTx); ok { - require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber) - } + require.Equal(t, testBlock.Number().String(), tx.BlockNumber()) } diff --git a/service.go b/service.go index e1ba69f..472b689 100644 --- a/service.go +++ b/service.go @@ -815,6 +815,8 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p if params.IncludeReceipts { receipts = sds.BlockChain.GetReceiptsByHash(block.Hash()) } + + t := time.Now() tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty) if err != nil { return err @@ -840,9 +842,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p BlockHash: block.Hash(), BlockNumber: block.Number(), }, params, nodeSink, ipldSink) + if err != nil { + return err + } - // TODO this anti-pattern needs to be sorted out eventually - if err = tx.Submit(err); err != nil { + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(t)) + if err = tx.Submit(); err != nil { return fmt.Errorf("batch transaction submission failed: %w", err) } return nil diff --git a/test_helpers/mocks/indexer.go b/test_helpers/mocks/indexer.go index 8991f9e..403b553 100644 --- a/test_helpers/mocks/indexer.go +++ b/test_helpers/mocks/indexer.go @@ -17,6 +17,7 @@ package mocks import ( + context "context" "math/big" "time" @@ -84,10 +85,21 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { return nil } +func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch { + return &batch{} +} + func (sdi *StateDiffIndexer) Close() error { return nil } -func (tx *batch) Submit(err error) error { +func (tx *batch) RollbackOnFailure(error) {} + +func (tx *batch) Submit() error { return nil } + +// batch.BlockNumber +func (tx *batch) BlockNumber() string { + return "0" +}