diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 0e0db9156..9b54cd699 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -412,6 +412,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// HasBlock checks whether the indicated block already exists in the output. +// In the "dump" case, this is presumed to be false. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index d4f6a8f4b..291aba16e 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -461,6 +461,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// HasBlock checks whether the indicated block already exists in the output. +// In the "file" case this is presumed to be false. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 8a6228fa5..48da1fee9 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -219,6 +219,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } +// HasBlock checks whether the indicated block already exists in the database. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return sdi.dbWriter.hasHeader(hash, number) +} + // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index f964a2a90..685a9e6e9 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -45,6 +45,7 @@ type Driver interface { // Statements interface to accommodate different SQL query syntax type Statements interface { + ExistsHeaderStm() string InsertHeaderStm() string InsertUncleStm() string InsertTxStm() string diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index b371a83b2..173608b60 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -17,6 +17,8 @@ package postgres import ( + "fmt" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" ) @@ -39,6 +41,10 @@ type DB struct { sql.Driver } +func (db *DB) ExistsHeaderStm() string { + return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name) +} + // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 9ff85d195..113d2f48d 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -20,6 +20,8 @@ import ( "fmt" "strconv" + "github.com/ethereum/go-ethereum/common" + "github.com/jackc/pgtype" shopspring "github.com/jackc/pgtype/ext/shopspring-numeric" "github.com/lib/pq" @@ -46,6 +48,11 @@ func (w *Writer) Close() error { return w.db.Close() } +func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { + err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) + return exists, err +} + /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 9836d6a86..63d5bc353 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,6 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { + HasBlock(hash common.Hash, number uint64) (bool, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error PushIPLD(tx Batch, ipld sdtypes.IPLD) error diff --git a/statediff/service.go b/statediff/service.go index ec25d9c6a..b6e0db961 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -146,8 +146,10 @@ type Service struct { // Job ID ticker lastJobID uint64 // In flight jobs (for WriteStateDiffAt) - currentJobs map[uint64]JobID - currentJobsMutex sync.Mutex + currentJobs map[uint64]JobID + currentJobsMutex sync.Mutex + currentBlocks map[string]bool + currentBlocksMutex sync.Mutex } // IDs used for tracking in-progress jobs (0 for invalid) @@ -209,21 +211,24 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params } sds := &Service{ - Mutex: sync.Mutex{}, - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - WaitForSync: params.WaitForSync, - indexer: indexer, - enableWriteLoop: params.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[rpc.ID]statusSubscription{}, - currentJobs: map[uint64]JobID{}, + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: params.WaitForSync, + indexer: indexer, + enableWriteLoop: params.EnableWriteLoop, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentJobsMutex: sync.Mutex{}, + currentBlocks: map[string]bool{}, + currentBlocksMutex: sync.Mutex{}, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -246,21 +251,24 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index quitCh := make(chan bool) sds := &Service{ - Mutex: sync.Mutex{}, - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - WaitForSync: cfg.WaitForSync, - indexer: indexer, - enableWriteLoop: cfg.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[rpc.ID]statusSubscription{}, - currentJobs: map[uint64]JobID{}, + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: cfg.WaitForSync, + indexer: indexer, + enableWriteLoop: cfg.EnableWriteLoop, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentJobsMutex: sync.Mutex{}, + currentBlocks: map[string]bool{}, + currentBlocksMutex: sync.Mutex{}, } if indexer != nil { @@ -808,8 +816,38 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } +// Claim exclusive access for state diffing the specified block. +// Returns true and a function to release access if successful, else false, nil. +func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + + key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64()) + if sds.currentBlocks[key] { + return false, nil + } + sds.currentBlocks[key] = true + return true, func() { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + delete(sds.currentBlocks, key) + } +} + // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { + if granted, relinquish := sds.claimExclusiveAccess(block); granted { + defer relinquish() + } else { + log.Info("Not writing, statediff in progress.", "number", block.NumberU64(), "hash", block.Hash().Hex()) + return nil + } + + if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done { + log.Info("Not writing, statediff already done.", "number", block.NumberU64(), "hash", block.Hash().Hex()) + return nil + } + var totalDifficulty *big.Int var receipts types.Receipts var err error diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index 0524fbc14..767f436f1 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -34,6 +34,10 @@ type StateDiffIndexer struct{} type batch struct{} +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { return &batch{}, nil }