diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index a65966a..2bbb49b 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -413,6 +413,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// HasBlock checks whether the indicated block already exists in the output. +// In the "dump" case, this is presumed to be false. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index 15cbcb6..dd28469 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -459,6 +459,12 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// HasBlock checks whether the indicated block already exists in the output. +// In the "file" case this is presumed to be false. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 95b5ed1..a528cbd 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -25,7 +25,7 @@ import ( "math/big" "time" - core "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/metrics" @@ -248,7 +248,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, unclesHash core.Hash, uncles []*types.Header) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, unclesHash common.Hash, uncles []*types.Header) error { // publish and index uncles uncleEncoding, err := rlp.EncodeToBytes(uncles) if err != nil { @@ -349,7 +349,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs if len(receipt.PostState) == 0 { rctModel.PostStatus = receipt.Status } else { - rctModel.PostState = core.BytesToHash(receipt.PostState).String() + rctModel.PostState = common.BytesToHash(receipt.PostState).String() } if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil { @@ -400,7 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt stateModel = models.StateNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: shared.RemovedNodeStateCID, Removed: true, } @@ -408,12 +408,12 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt stateModel = models.StateNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), CID: stateNode.AccountWrapper.CID, Removed: false, Balance: stateNode.AccountWrapper.Account.Balance.String(), Nonce: stateNode.AccountWrapper.Account.Nonce, - CodeHash: core.BytesToHash(stateNode.AccountWrapper.Account.CodeHash).String(), + CodeHash: common.BytesToHash(stateNode.AccountWrapper.Account.CodeHash).String(), StorageRoot: stateNode.AccountWrapper.Account.Root.String(), } } @@ -430,8 +430,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt storageModel := models.StorageNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), - StorageKey: core.BytesToHash(storageNode.LeafKey).String(), + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: shared.RemovedNodeStorageCID, Removed: true, Value: []byte{}, @@ -444,8 +444,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt storageModel := models.StorageNodeModel{ BlockNumber: tx.BlockNumber, HeaderID: headerID, - StateKey: core.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), - StorageKey: core.BytesToHash(storageNode.LeafKey).String(), + StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(), + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: storageNode.CID, Removed: false, Value: storageNode.Value, @@ -468,6 +468,11 @@ func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) return nil } +// HasBlock checks whether the indicated block already exists in the database. +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return sdi.dbWriter.hasHeader(hash, number) +} + // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { return sdi.dbWriter.Close() @@ -476,7 +481,7 @@ func (sdi *StateDiffIndexer) Close() error { // Update the known gaps table with the gap information. // LoadWatchedAddresses reads watched addresses from the database -func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]core.Address, error) { +func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { addressStrings := make([]string, 0) pgStr := "SELECT address FROM eth_meta.watched_addresses" err := sdi.dbWriter.db.Select(sdi.ctx, &addressStrings, pgStr) @@ -484,9 +489,9 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]core.Address, error) { return nil, fmt.Errorf("error loading watched addresses: %v", err) } - watchedAddresses := []core.Address{} + watchedAddresses := []common.Address{} for _, addressString := range addressStrings { - watchedAddresses = append(watchedAddresses, core.HexToAddress(addressString)) + watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString)) } return watchedAddresses, nil diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index 47e63c0..1eaf40a 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -45,6 +45,7 @@ type Driver interface { // Statements interface to accommodate different SQL query syntax type Statements interface { + ExistsHeaderStm() string InsertHeaderStm() string InsertUncleStm() string InsertTxStm() string diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index 3d122dc..5299d06 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -17,6 +17,8 @@ package postgres import ( + "fmt" + "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" ) @@ -39,6 +41,10 @@ type DB struct { sql.Driver } +func (db *DB) ExistsHeaderStm() string { + return fmt.Sprintf("SELECT EXISTS(SELECT 1 from %s WHERE block_number = $1 AND block_hash = $2 LIMIT 1)", schema.TableHeader.Name) +} + // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 5a3b4e9..4f0b2cc 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" + "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgtype" shopspring "github.com/jackc/pgtype/ext/shopspring-numeric" "github.com/lib/pq" @@ -46,6 +47,11 @@ func (w *Writer) Close() error { return w.db.Close() } +func (w *Writer) hasHeader(blockHash common.Hash, blockNumber uint64) (exists bool, err error) { + err = w.db.QueryRow(w.db.Context(), w.db.ExistsHeaderStm(), blockNumber, blockHash.String()).Scan(&exists) + return exists, err +} + /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) diff --git a/indexer/interfaces/interfaces.go b/indexer/interfaces/interfaces.go index 596b330..94ed2af 100644 --- a/indexer/interfaces/interfaces.go +++ b/indexer/interfaces/interfaces.go @@ -28,6 +28,7 @@ import ( // StateDiffIndexer interface required to index statediff data type StateDiffIndexer interface { + HasBlock(hash common.Hash, number uint64) (bool, error) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error) PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error PushIPLD(tx Batch, ipld sdtypes.IPLD) error diff --git a/metrics_helpers.go b/metrics_helpers.go index d59abc7..f6cdf3c 100644 --- a/metrics_helpers.go +++ b/metrics_helpers.go @@ -23,9 +23,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) { +func countStateDiffBegin(block *types.Block, logger log.Logger) time.Time { start := time.Now() - logger := log.New("hash", block.Hash().String(), "number", block.NumberU64()) defaultStatediffMetrics.underway.Inc(1) logger.Debug("writeStateDiff BEGIN", @@ -35,7 +34,7 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) { "total_time", defaultStatediffMetrics.totalProcessingTime.Value(), ) - return start, logger + return start } func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration { diff --git a/service.go b/service.go index 02d598b..e79e333 100644 --- a/service.go +++ b/service.go @@ -107,6 +107,9 @@ type Service struct { // Map of block number to in-flight jobs (for WriteStateDiffAt) currentJobs map[uint64]JobID currentJobsMutex sync.Mutex + // All in-progress statediff jobs + currentBlocks map[string]bool + currentBlocksMutex sync.Mutex } // ID for identifying client subscriptions @@ -169,6 +172,7 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde maxRetry: defaultRetryLimit, jobStatusSubs: map[SubID]jobStatusSubscription{}, currentJobs: map[uint64]JobID{}, + currentBlocks: map[string]bool{}, writeLoopParams: ParamsWithMutex{Params: defaultWriteLoopParams}, } @@ -674,14 +678,45 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params)) } +// Claim exclusive access for state diffing the specified block. +// Returns true and a function to release access if successful, else false, nil. +func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + + key := fmt.Sprintf("%s,%d", block.Hash().Hex(), block.NumberU64()) + if sds.currentBlocks[key] { + return false, nil + } + sds.currentBlocks[key] = true + return true, func() { + sds.currentBlocksMutex.Lock() + defer sds.currentBlocksMutex.Unlock() + delete(sds.currentBlocks, key) + } +} + // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { + log := log.New("hash", block.Hash(), "number", block.Number()) + if granted, relinquish := sds.claimExclusiveAccess(block); granted { + defer relinquish() + } else { + log.Info("Not writing, statediff in progress.") + return nil + } + if done, _ := sds.indexer.HasBlock(block.Hash(), block.NumberU64()); done { + log.Info("Not writing, statediff already done.") + return nil + } + var totalDifficulty *big.Int var receipts types.Receipts var err error var tx interfaces.Batch - start, logger := countStateDiffBegin(block) - defer countStateDiffEnd(start, logger, &err) + + start := countStateDiffBegin(block, log) + defer countStateDiffEnd(start, log, &err) if sds.indexer == nil { return fmt.Errorf("indexer is not set; cannot write indexed diffs") } @@ -698,12 +733,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } output := func(node types2.StateLeafNode) error { - defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), logger, + defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log, metrics.IndexerMetrics.OutputTimer) return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldOutput := func(c types2.IPLD) error { - defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, + defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log, metrics.IndexerMetrics.IPLDOutputTimer) return sds.indexer.PushIPLD(tx, c) } diff --git a/test_helpers/mocks/indexer.go b/test_helpers/mocks/indexer.go index 1ad5baa..938b0ab 100644 --- a/test_helpers/mocks/indexer.go +++ b/test_helpers/mocks/indexer.go @@ -35,6 +35,10 @@ type StateDiffIndexer struct{} type batch struct{} +func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, error) { + return false, nil +} + func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { return &batch{}, nil }