factor out PushHeader method

This commit is contained in:
Roy Crihfield 2023-08-06 13:28:49 +08:00
parent 82131564ca
commit a7b83fc63c
8 changed files with 111 additions and 28 deletions

View File

@ -74,7 +74,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -129,7 +129,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// Publish and index header, collect headerID
var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
headerID, err = sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
@ -167,9 +167,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err
}
// processHeader publishes and indexes a header IPLD in Postgres
// PushHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
tx, ok := batch.(*BatchTx)
if !ok {
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return "", err
}
tx.cacheIPLD(headerNode)
headerID := header.Hash().String()
@ -189,7 +197,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
Coinbase: header.Coinbase.String(),
Canonical: true,
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
_, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err
}

View File

@ -142,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -183,7 +183,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
// write header, collect headerID
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
@ -219,7 +222,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
// Process the header
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return "", err
}
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
headerID := header.Hash().String()
@ -240,7 +248,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld
Coinbase: header.Coinbase.String(),
Canonical: true,
})
return headerID
return headerID, nil
}
// processUncles publishes and indexes uncle IPLDs in Postgres

View File

@ -18,11 +18,14 @@ package sql
import (
"context"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/lib/pq"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/utils/log"
@ -41,14 +44,67 @@ type BatchTx struct {
ipldCache models.IPLDBatch
removedCacheFlag *uint32
// Tracks expected cache size and ensures cache is caught up before flush
cacheWg sync.WaitGroup
submit func(blockTx *BatchTx, err error) error
cacheWg sync.WaitGroup
startTime time.Time
}
// Submit satisfies indexer.AtomicTx
func NewBatchAt(insertStm string, ctx context.Context, number *big.Int, tx Tx, start time.Time) *BatchTx {
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: ctx,
BlockNumber: number.String(),
stm: insertStm,
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
startTime: time.Now(),
}
return blockTx
}
// Submit satisfies indexer.Batch
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
defer tx.close()
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(tx.startTime))
t := time.Now()
if err := tx.flush(); err != nil {
rollback(tx.ctx, tx.dbtx)
return err
}
err = tx.dbtx.Commit(tx.ctx)
metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t))
return err
}
func (tx *BatchTx) RollbackOnFailure(err error) {
defer tx.close()
if p := recover(); p != nil {
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(tx.ctx, tx.dbtx)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(tx.ctx, tx.dbtx)
}
}
func (tx *BatchTx) close() {
if tx.quit == nil {
return
}
confirm := make(chan struct{})
tx.quit <- confirm
close(tx.quit)
<-confirm
close(tx.iplds)
tx.quit = nil
}
func (tx *BatchTx) flush() error {

View File

@ -93,7 +93,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -181,8 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
// Publish and index header, collect headerID
var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
@ -230,9 +229,18 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber)
}
// processHeader publishes and indexes a header IPLD in Postgres
// PushHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
tx, ok := batch.(*BatchTx)
if !ok {
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// Process the header
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return "", err
}
tx.cacheIPLD(headerNode)
headerID := header.Hash().String()

View File

@ -3,7 +3,9 @@ package sql
import (
"context"
"reflect"
"time"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
@ -69,10 +71,12 @@ func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{})
}
func (tx *DelayedTx) Commit(ctx context.Context) error {
t := time.Now()
base, err := tx.db.Begin(ctx)
if err != nil {
return err
}
metrics.IndexerMetrics.FreePostgresTimer.Update(time.Since(t))
defer func() {
if p := recover(); p != nil {
rollback(ctx, base)

View File

@ -34,6 +34,7 @@ type StateDiffIndexer interface {
CurrentBlock() (*models.HeaderModel, error)
HasBlock(hash common.Hash, number uint64) (bool, error)
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error)
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)

View File

@ -22,23 +22,17 @@ import (
// FromBlockAndReceipts takes a block and processes it
// to return it a set of IPLD nodes for further processing.
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthTx, []*EthReceipt, [][]*EthLog, error) {
// Process the header
headerNode, err := NewEthHeader(block.Header())
if err != nil {
return nil, nil, nil, nil, err
}
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) {
// Process the txs
txNodes, err := processTransactions(block.Transactions())
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}
// Process the receipts and logs
rctNodes, logNodes, err := processReceiptsAndLogs(receipts)
return headerNode, txNodes, rctNodes, logNodes, err
return txNodes, rctNodes, logNodes, err
}
// processTransactions will take the found transactions in a parsed block body

View File

@ -52,6 +52,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return &batch{}, nil
}
func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
return "", nil
}
func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
return nil
}