Concurrent statediff iteration #12

Merged
roysc merged 14 commits from use-concurrent-iterator into main 2023-09-22 08:44:38 +00:00
Showing only changes of commit 7902ad025a - Show all commits

View File

@ -84,9 +84,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64() height := block.NumberU64()
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr) traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
transactions := block.Transactions() transactions := block.Transactions()
// Derive any missing fields // Derive any missing fields
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil { if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
@ -155,20 +154,20 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else { } else {
tDiff := time.Since(t) tDiff := time.Since(t)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
if err := self.flush(); err != nil { if err := self.flush(); err != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg) log.Debug(traceMsg)
return err return err
} }
err = tx.Commit(sdi.ctx) err = tx.Commit(sdi.ctx)
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
} }
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg) log.Debug(traceMsg)
return err return err
}, },
@ -178,7 +177,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
tDiff := time.Since(t) tDiff := time.Since(t)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
t = time.Now() t = time.Now()
// Publish and index header, collect headerID // Publish and index header, collect headerID
@ -189,7 +188,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
// Publish and index uncles // Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles()) err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
@ -198,7 +197,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
// Publish and index receipts and txs // Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{ err = sdi.processReceiptsAndTxs(blockTx, processArgs{
@ -215,7 +214,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
return blockTx, err return blockTx, err
@ -236,11 +235,6 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode) tx.cacheIPLD(headerNode)
var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String() headerID := header.Hash().String()
// index header // index header
return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{