367: Check if the statediff is already in progress or completed. (#392)

* Check if the statediff is already in progress or completed.

* Update mock

* Add comments.
This commit is contained in:
Thomas E Lackey 2023-06-01 13:21:33 -05:00 committed by GitHub
parent bac14cc48a
commit caa685321a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 106 additions and 32 deletions

View File

@ -412,6 +412,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
return nil
}
// HasBlock checks whether the indicated block already exists in the output.
// In the "dump" case, this is presumed to be false.
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
return false, nil
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()

View File

@ -461,6 +461,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD)
return nil
}
// HasBlock checks whether the indicated block already exists in the output.
// In the "file" case this is presumed to be false.
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
return false, nil
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close()

View File

@ -219,6 +219,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err
}
// HasBlock checks whether the indicated block already exists in the database.
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
return sdi.dbWriter.hasHeader(hash, number)
}
// processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {

View File

@ -45,6 +45,7 @@ type Driver interface {
// Statements interface to accommodate different SQL query syntax
type Statements interface {
ExistsHeaderStm() string
InsertHeaderStm() string
InsertUncleStm() string
InsertTxStm() string

View File

@ -17,6 +17,8 @@
package postgres
import (
"fmt"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
)
@ -39,6 +41,10 @@ type DB struct {
sql.Driver
}
func (db *DB) ExistsHeaderStm() string {
return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name)
}
// InsertHeaderStm satisfies the sql.Statements interface
// Stm == Statement
func (db *DB) InsertHeaderStm() string {

View File

@ -20,6 +20,8 @@ import (
"fmt"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgtype"
shopspring "github.com/jackc/pgtype/ext/shopspring-numeric"
"github.com/lib/pq"
@ -46,6 +48,11 @@ func (w *Writer) Close() error {
return w.db.Close()
}
func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) {
err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists)
return exists, err
}
/*
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)

View File

@ -29,6 +29,7 @@ import (
// StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface {
HasBlock(hash common.Hash, number uint64) (bool, error)
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
PushIPLD(tx Batch, ipld sdtypes.IPLD) error

View File

@ -146,8 +146,10 @@ type Service struct {
// Job ID ticker
lastJobID uint64
// In flight jobs (for WriteStateDiffAt)
currentJobs map[uint64]JobID
currentJobsMutex sync.Mutex
currentJobs map[uint64]JobID
currentJobsMutex sync.Mutex
currentBlocks map[string]bool
currentBlocksMutex sync.Mutex
}
// IDs used for tracking in-progress jobs (0 for invalid)
@ -209,21 +211,24 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
}
sds := &Service{
Mutex: sync.Mutex{},
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: params.WaitForSync,
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
jobStatusSubs: map[rpc.ID]statusSubscription{},
currentJobs: map[uint64]JobID{},
Mutex: sync.Mutex{},
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: params.WaitForSync,
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
jobStatusSubs: map[rpc.ID]statusSubscription{},
currentJobs: map[uint64]JobID{},
currentJobsMutex: sync.Mutex{},
currentBlocks: map[string]bool{},
currentBlocksMutex: sync.Mutex{},
}
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())
@ -246,21 +251,24 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index
quitCh := make(chan bool)
sds := &Service{
Mutex: sync.Mutex{},
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: cfg.WaitForSync,
indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
jobStatusSubs: map[rpc.ID]statusSubscription{},
currentJobs: map[uint64]JobID{},
Mutex: sync.Mutex{},
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: cfg.WaitForSync,
indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
jobStatusSubs: map[rpc.ID]statusSubscription{},
currentJobs: map[uint64]JobID{},
currentJobsMutex: sync.Mutex{},
currentBlocks: map[string]bool{},
currentBlocksMutex: sync.Mutex{},
}
if indexer != nil {
@ -808,8 +816,38 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}
// Claim exclusive access for state diffing the specified block.
// Returns true and a function to release access if successful, else false, nil.
func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
sds.currentBlocksMutex.Lock()
defer sds.currentBlocksMutex.Unlock()
key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64())
if sds.currentBlocks[key] {
return false, nil
}
sds.currentBlocks[key] = true
return true, func() {
sds.currentBlocksMutex.Lock()
defer sds.currentBlocksMutex.Unlock()
delete(sds.currentBlocks, key)
}
}
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
defer relinquish()
} else {
log.Info("Not writing, statediff in progress.", "number", block.NumberU64(), "hash", block.Hash().Hex())
return nil
}
if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done {
log.Info("Not writing, statediff already done.", "number", block.NumberU64(), "hash", block.Hash().Hex())
return nil
}
var totalDifficulty *big.Int
var receipts types.Receipts
var err error

View File

@ -34,6 +34,10 @@ type StateDiffIndexer struct{}
type batch struct{}
func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) {
return false, nil
}
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
return &batch{}, nil
}