Backport fixes for issue #367 to v4. #394
@ -504,6 +504,12 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks whether the indicated block already exists in the output.
|
||||||
|
// In the "dump" case, this is presumed to be false.
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.dump.Close()
|
return sdi.dump.Close()
|
||||||
|
@ -542,6 +542,12 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks whether the indicated block already exists in the output.
|
||||||
|
// In the "file" case this is presumed to be false.
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.fileWriter.Close()
|
return sdi.fileWriter.Close()
|
||||||
|
@ -227,6 +227,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
return blockTx, err
|
return blockTx, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasBlock checks whether the indicated block already exists in the database.
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return sdi.dbWriter.hasHeader(hash, number)
|
||||||
|
}
|
||||||
|
|
||||||
// processHeader publishes and indexes a header IPLD in Postgres
|
// processHeader publishes and indexes a header IPLD in Postgres
|
||||||
// it returns the headerID
|
// it returns the headerID
|
||||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
||||||
|
@ -45,6 +45,7 @@ type Driver interface {
|
|||||||
|
|
||||||
// Statements interface to accommodate different SQL query syntax
|
// Statements interface to accommodate different SQL query syntax
|
||||||
type Statements interface {
|
type Statements interface {
|
||||||
|
ExistsHeaderStm() string
|
||||||
InsertHeaderStm() string
|
InsertHeaderStm() string
|
||||||
InsertUncleStm() string
|
InsertUncleStm() string
|
||||||
InsertTxStm() string
|
InsertTxStm() string
|
||||||
|
@ -84,13 +84,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
|
|||||||
for _, item := range tx.cache {
|
for _, item := range tx.cache {
|
||||||
switch item := item.(type) {
|
switch item := item.(type) {
|
||||||
case *copyFrom:
|
case *copyFrom:
|
||||||
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
_, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("COPY error", "table", item.tableName, "err", err)
|
log.Error("COPY error", "table", item.tableName, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case cachedStmt:
|
case cachedStmt:
|
||||||
_, err := base.Exec(ctx, item.sql, item.args...)
|
_, err = base.Exec(ctx, item.sql, item.args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,9 @@
|
|||||||
|
|
||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import "github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
import (
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
|
)
|
||||||
|
|
||||||
var _ sql.Database = &DB{}
|
var _ sql.Database = &DB{}
|
||||||
|
|
||||||
@ -36,6 +38,11 @@ type DB struct {
|
|||||||
sql.Driver
|
sql.Driver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExistsHeaderStm satisfies the sql.Statements interface
|
||||||
|
func (db *DB) ExistsHeaderStm() string {
|
||||||
|
return "SELECT EXISTS(SELECT 1 from eth.header_cids WHERE block_number = $1 AND block_hash = $2 LIMIT 1)"
|
||||||
|
}
|
||||||
|
|
||||||
// InsertHeaderStm satisfies the sql.Statements interface
|
// InsertHeaderStm satisfies the sql.Statements interface
|
||||||
// Stm == Statement
|
// Stm == Statement
|
||||||
func (db *DB) InsertHeaderStm() string {
|
func (db *DB) InsertHeaderStm() string {
|
||||||
|
@ -50,6 +50,11 @@ func (w *Writer) Close() error {
|
|||||||
return w.db.Close()
|
return w.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
|
||||||
|
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists)
|
||||||
|
return exists, err
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
|
|
||||||
// StateDiffIndexer interface required to index statediff data
|
// StateDiffIndexer interface required to index statediff data
|
||||||
type StateDiffIndexer interface {
|
type StateDiffIndexer interface {
|
||||||
|
HasBlock(hash common.Hash, number uint64) (bool, error)
|
||||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
|
||||||
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
|
PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
|
||||||
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||||
|
@ -39,18 +39,19 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
|||||||
return start, logger
|
return start, logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration {
|
func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration {
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
defaultStatediffMetrics.underway.Dec(1)
|
defaultStatediffMetrics.underway.Dec(1)
|
||||||
if nil == err {
|
failed := nil != err && nil != *err
|
||||||
defaultStatediffMetrics.succeeded.Inc(1)
|
if failed {
|
||||||
} else {
|
|
||||||
defaultStatediffMetrics.failed.Inc(1)
|
defaultStatediffMetrics.failed.Inc(1)
|
||||||
|
} else {
|
||||||
|
defaultStatediffMetrics.succeeded.Inc(1)
|
||||||
}
|
}
|
||||||
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
||||||
|
|
||||||
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
||||||
duration.Milliseconds(), nil != err,
|
duration.Milliseconds(), failed,
|
||||||
defaultStatediffMetrics.underway.Count(),
|
defaultStatediffMetrics.underway.Count(),
|
||||||
defaultStatediffMetrics.succeeded.Count(),
|
defaultStatediffMetrics.succeeded.Count(),
|
||||||
defaultStatediffMetrics.failed.Count(),
|
defaultStatediffMetrics.failed.Count(),
|
||||||
|
@ -151,8 +151,10 @@ type Service struct {
|
|||||||
// Job ID ticker
|
// Job ID ticker
|
||||||
lastJobID uint64
|
lastJobID uint64
|
||||||
// In flight jobs (for WriteStateDiffAt)
|
// In flight jobs (for WriteStateDiffAt)
|
||||||
currentJobs map[uint64]JobID
|
currentJobs map[uint64]JobID
|
||||||
currentJobsMutex sync.Mutex
|
currentJobsMutex sync.Mutex
|
||||||
|
currentBlocks map[string]bool
|
||||||
|
currentBlocksMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// IDs used for tracking in-progress jobs (0 for invalid)
|
// IDs used for tracking in-progress jobs (0 for invalid)
|
||||||
@ -214,21 +216,24 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
}
|
}
|
||||||
|
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
Builder: NewBuilder(blockChain.StateCache()),
|
Builder: NewBuilder(blockChain.StateCache()),
|
||||||
QuitChan: quitCh,
|
QuitChan: quitCh,
|
||||||
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]Params),
|
SubscriptionTypes: make(map[common.Hash]Params),
|
||||||
BlockCache: NewBlockCache(workers),
|
BlockCache: NewBlockCache(workers),
|
||||||
BackendAPI: backend,
|
BackendAPI: backend,
|
||||||
WaitForSync: params.WaitForSync,
|
WaitForSync: params.WaitForSync,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
||||||
currentJobs: map[uint64]JobID{},
|
currentJobs: map[uint64]JobID{},
|
||||||
|
currentJobsMutex: sync.Mutex{},
|
||||||
|
currentBlocks: map[string]bool{},
|
||||||
|
currentBlocksMutex: sync.Mutex{},
|
||||||
}
|
}
|
||||||
stack.RegisterLifecycle(sds)
|
stack.RegisterLifecycle(sds)
|
||||||
stack.RegisterAPIs(sds.APIs())
|
stack.RegisterAPIs(sds.APIs())
|
||||||
@ -251,21 +256,24 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index
|
|||||||
|
|
||||||
quitCh := make(chan bool)
|
quitCh := make(chan bool)
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
Builder: NewBuilder(blockChain.StateCache()),
|
Builder: NewBuilder(blockChain.StateCache()),
|
||||||
QuitChan: quitCh,
|
QuitChan: quitCh,
|
||||||
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]Params),
|
SubscriptionTypes: make(map[common.Hash]Params),
|
||||||
BlockCache: NewBlockCache(workers),
|
BlockCache: NewBlockCache(workers),
|
||||||
BackendAPI: backend,
|
BackendAPI: backend,
|
||||||
WaitForSync: cfg.WaitForSync,
|
WaitForSync: cfg.WaitForSync,
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: cfg.EnableWriteLoop,
|
enableWriteLoop: cfg.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
||||||
currentJobs: map[uint64]JobID{},
|
currentJobs: map[uint64]JobID{},
|
||||||
|
currentJobsMutex: sync.Mutex{},
|
||||||
|
currentBlocks: map[string]bool{},
|
||||||
|
currentBlocksMutex: sync.Mutex{},
|
||||||
}
|
}
|
||||||
|
|
||||||
if indexer != nil {
|
if indexer != nil {
|
||||||
@ -877,14 +885,44 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
|
|||||||
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
|
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Claim exclusive access for state diffing the specified block.
|
||||||
|
// Returns true and a function to release access if successful, else false, nil.
|
||||||
|
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
|
||||||
|
sds.currentBlocksMutex.Lock()
|
||||||
|
defer sds.currentBlocksMutex.Unlock()
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
|
||||||
|
if sds.currentBlocks[key] {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
sds.currentBlocks[key] = true
|
||||||
|
return true, func() {
|
||||||
|
sds.currentBlocksMutex.Lock()
|
||||||
|
defer sds.currentBlocksMutex.Unlock()
|
||||||
|
delete(sds.currentBlocks, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Writes a state diff from the current block, parent state root, and provided params
|
// Writes a state diff from the current block, parent state root, and provided params
|
||||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||||
|
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
|
||||||
|
defer relinquish()
|
||||||
|
} else {
|
||||||
|
log.Info("Not writing, statediff in progress.", "number", block.NumberU64(), "hash", block.Hash().Hex())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done {
|
||||||
|
log.Info("Not writing, statediff already done.", "number", block.NumberU64(), "hash", block.Hash().Hex())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var totalDifficulty *big.Int
|
var totalDifficulty *big.Int
|
||||||
var receipts types.Receipts
|
var receipts types.Receipts
|
||||||
var err error
|
var err error
|
||||||
var tx interfaces.Batch
|
var tx interfaces.Batch
|
||||||
start, logger := countStateDiffBegin(block)
|
start, logger := countStateDiffBegin(block)
|
||||||
defer countStateDiffEnd(start, logger, err)
|
defer countStateDiffEnd(start, logger, &err)
|
||||||
if params.IncludeTD {
|
if params.IncludeTD {
|
||||||
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
||||||
}
|
}
|
||||||
@ -916,7 +954,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
BlockNumber: block.Number(),
|
BlockNumber: block.Number(),
|
||||||
}, params, output, codeOutput)
|
}, params, output, codeOutput)
|
||||||
// TODO this anti-pattern needs to be sorted out eventually
|
// TODO this anti-pattern needs to be sorted out eventually
|
||||||
if err := tx.Submit(err); err != nil {
|
if err = tx.Submit(err); err != nil {
|
||||||
return fmt.Errorf("batch transaction submission failed: %w", err)
|
return fmt.Errorf("batch transaction submission failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,6 +34,10 @@ type StateDiffIndexer struct{}
|
|||||||
|
|
||||||
type batch struct{}
|
type batch struct{}
|
||||||
|
|
||||||
|
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||||
return &batch{}, nil
|
return &batch{}, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user