diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 21193d76b..241824976 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -504,6 +504,12 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd return nil } +// HasBlock checks whether the indicated block already exists in the output. +// In the "dump" case, this is presumed to be false. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 0618d7832..388535620 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -542,6 +542,12 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd return nil } +// HasBlock checks whether the indicated block already exists in the output. +// In the "file" case this is presumed to be false. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 2f8336c6f..123c3800c 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -227,6 +227,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return blockTx, err } +// HasBlock checks whether the indicated block already exists in the database. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return sdi.dbWriter.hasHeader(hash, number) +} + // processHeader publishes and indexes a header IPLD in Postgres // it returns the headerID func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index f85f33ae4..033c2562b 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -45,6 +45,7 @@ type Driver interface { // Statements interface to accommodate different SQL query syntax type Statements interface { + ExistsHeaderStm() string InsertHeaderStm() string InsertUncleStm() string InsertTxStm() string diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 8ee86251a..62dbab2ae 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -16,7 +16,9 @@ package postgres -import "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" +import ( + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" +) var _ sql.Database = &DB{} @@ -36,6 +38,11 @@ type DB struct { sql.Driver } +// ExistsHeaderStm satisfies the sql.Statements interface +func (db *DB) ExistsHeaderStm() string { + return "SELECT EXISTS(SELECT 1 from eth.header_cids WHERE block_number = $1 AND block_hash = $2 LIMIT 1)" +} + // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 3e281943b..89d081118 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -50,6 +50,11 @@ func (w *Writer) Close() error { return w.db.Close() } +func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { + err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) + return exists, err +} + /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go index 6910e3f49..a39ede8c1 100644 --- a/statediff/indexer/interfaces/interfaces.go +++ b/statediff/indexer/interfaces/interfaces.go @@ -29,6 +29,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { + HasBlock(hash common.Hash, number uint64) (bool, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error diff --git a/statediff/service.go b/statediff/service.go index 7770c5a79..4d344169e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -151,8 +151,10 @@ type Service struct { // Job ID ticker lastJobID uint64 // In flight jobs (for WriteStateDiffAt) - currentJobs map[uint64]JobID - currentJobsMutex sync.Mutex + currentJobs map[uint64]JobID + currentJobsMutex sync.Mutex + currentBlocks map[string]bool + currentBlocksMutex sync.Mutex } // IDs used for tracking in-progress jobs (0 for invalid) @@ -214,21 +216,24 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params } sds := &Service{ - Mutex: sync.Mutex{}, - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - WaitForSync: params.WaitForSync, - indexer: indexer, - enableWriteLoop: params.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[rpc.ID]statusSubscription{}, - currentJobs: map[uint64]JobID{}, + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: params.WaitForSync, + indexer: indexer, + enableWriteLoop: params.EnableWriteLoop, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentJobsMutex: sync.Mutex{}, + currentBlocks: map[string]bool{}, + currentBlocksMutex: sync.Mutex{}, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -251,21 +256,24 @@ func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, index quitCh := make(chan bool) sds := &Service{ - Mutex: sync.Mutex{}, - BlockChain: blockChain, - Builder: NewBuilder(blockChain.StateCache()), - QuitChan: quitCh, - Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), - SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), - BackendAPI: backend, - WaitForSync: cfg.WaitForSync, - indexer: indexer, - enableWriteLoop: cfg.EnableWriteLoop, - numWorkers: workers, - maxRetry: defaultRetryLimit, - jobStatusSubs: map[rpc.ID]statusSubscription{}, - currentJobs: map[uint64]JobID{}, + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: cfg.WaitForSync, + indexer: indexer, + enableWriteLoop: cfg.EnableWriteLoop, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + currentJobsMutex: sync.Mutex{}, + currentBlocks: map[string]bool{}, + currentBlocksMutex: sync.Mutex{}, } if indexer != nil { @@ -877,8 +885,38 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } +// Claim exclusive access for state diffing the specified block. +// Returns true and a function to release access if successful, else false, nil. +func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + + key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64()) + if sds.currentBlocks[key] { + return false, nil + } + sds.currentBlocks[key] = true + return true, func() { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + delete(sds.currentBlocks, key) + } +} + // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { + if granted, relinquish := sds.claimExclusiveAccess(block); granted { + defer relinquish() + } else { + log.Info("Not writing, statediff in progress.", "number", block.NumberU64(), "hash", block.Hash().Hex()) + return nil + } + + if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done { + log.Info("Not writing, statediff already done.", "number", block.NumberU64(), "hash", block.Hash().Hex()) + return nil + } + var totalDifficulty *big.Int var receipts types.Receipts var err error diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index 15794b237..dd00c69d6 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -34,6 +34,10 @@ type StateDiffIndexer struct{} type batch struct{} +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { return &batch{}, nil }