From 15a49346a8449d43b5eb535131e863c4befe3e2d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 1 Feb 2023 21:53:42 -0600 Subject: [PATCH 01/13] Add a new option to the config for capping the maximum difficulty. (#312) * consensus/ethash/consensus.go * Add a new param to the config for capping the maximum difficulty. This is useful for speeding up block creation on a test network. --- consensus/ethash/consensus.go | 9 ++++++++- params/config.go | 21 ++++++++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 1c38b80ea..b6e7b9511 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" @@ -330,7 +331,13 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, pa // the difficulty that a new block should have when created at time // given the parent block's time and difficulty. func (ethash *Ethash) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int { - return CalcDifficulty(chain.Config(), time, parent) + var config = chain.Config() + var ret = CalcDifficulty(config, time, parent) + if nil != config.CappedMaximumDifficulty && ret.Cmp(config.CappedMaximumDifficulty) >= 0 { + log.Info(fmt.Sprintf("Using capped difficulty %d", config.CappedMaximumDifficulty)) + return config.CappedMaximumDifficulty + } + return ret } // CalcDifficulty is the difficulty adjustment algorithm. It returns diff --git a/params/config.go b/params/config.go index 80e671f9b..b56a3a606 100644 --- a/params/config.go +++ b/params/config.go @@ -270,16 +270,16 @@ var ( // // This configuration is intentionally not using keyed fields to force anyone // adding flags to the config to also have to set these fields. - AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil} + AllEthashProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil} // AllCliqueProtocolChanges contains every protocol change (EIPs) introduced // and accepted by the Ethereum core developers into the Clique consensus. // // This configuration is intentionally not using keyed fields to force anyone // adding flags to the config to also have to set these fields. - AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}} + AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, nil, nil, false, nil, &CliqueConfig{Period: 0, Epoch: 30000}} - TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, false, new(EthashConfig), nil} + TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, nil, nil, false, new(EthashConfig), nil} TestRules = TestChainConfig.Rules(new(big.Int), false) ) @@ -377,6 +377,9 @@ type ChainConfig struct { // the network that triggers the consensus upgrade. TerminalTotalDifficulty *big.Int `json:"terminalTotalDifficulty,omitempty"` + // Cap the maximum total difficulty (for testnet use only). + CappedMaximumDifficulty *big.Int `json:"cappedMaximumDifficulty,omitempty"` + // TerminalTotalDifficultyPassed is a flag specifying that the network already // passed the terminal total difficulty. Its purpose is to disable legacy sync // even without having seen the TTD locally (safer long term). @@ -419,7 +422,11 @@ func (c *ChainConfig) String() string { switch { case c.Ethash != nil: if c.TerminalTotalDifficulty == nil { - banner += "Consensus: Ethash (proof-of-work)\n" + if nil == c.CappedMaximumDifficulty { + banner += "Consensus: Ethash (proof-of-work)\n" + } else { + banner += fmt.Sprintf("Consensus: Ethash (proof-of-work, capped difficulty at %d)\n", c.CappedMaximumDifficulty) + } } else if !c.TerminalTotalDifficultyPassed { banner += "Consensus: Beacon (proof-of-stake), merging from Ethash (proof-of-work)\n" } else { @@ -434,7 +441,11 @@ func (c *ChainConfig) String() string { banner += "Consensus: Beacon (proof-of-stake), merged from Clique (proof-of-authority)\n" } default: - banner += "Consensus: unknown\n" + if nil == c.CappedMaximumDifficulty { + banner += "Consensus: unknown\n" + } else { + banner += fmt.Sprintf("Consensus: unknown (capped difficulty at %d)\n", c.CappedMaximumDifficulty) + } } banner += "\n" -- 2.45.2 From 5504e5432b9ab64122b0edcac42218fb546ee341 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Mon, 6 Feb 2023 16:51:42 -0600 Subject: [PATCH 02/13] Add finer-grained logging and timers for statediffing. (#313) * More logging for statediffs. * Add count * Tweak logging * lint --- statediff/builder.go | 47 +++++++++++++++++-------- statediff/service.go | 4 ++- statediff/test_helpers/mocks/builder.go | 4 +-- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/statediff/builder.go b/statediff/builder.go index 8fdbd5b37..7c72ab7e5 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -22,6 +22,7 @@ package statediff import ( "bytes" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -45,7 +46,7 @@ var ( // Builder interface exposes the method for building a state diff between two blocks type Builder interface { BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) - WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error + WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error } type StateDiffBuilder struct { @@ -86,9 +87,7 @@ func NewBuilder(stateCache state.Database) Builder { func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) { var stateNodes []types2.StateLeafNode var iplds []types2.IPLD - err := sdb.WriteStateDiffObject( - types2.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}, - params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) + err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) if err != nil { return types2.StateObject{}, err } @@ -101,7 +100,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (typ } // WriteStateDiffObject writes a statediff object to output sinks -func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, +func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { // Load tries for old and new states oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot) @@ -128,16 +127,19 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params }, } - return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput) + logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber) + return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger) } func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params, - output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { + output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { + start := time.Now() + defer logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes total duration=%dms", time.Since(start).Milliseconds())) // collect a slice of all the nodes that were touched and exist at B (B-A) // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets diffAccountsAtB, err := sdb.createdAndUpdatedState( - iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput) + iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger) if err != nil { return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err) } @@ -146,28 +148,34 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs // a map of their leafkey to all the accounts that were touched and exist at A diffAccountsAtA, err := sdb.deletedOrUpdatedState( iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB, - params.watchedAddressesLeafPaths, output) + params.watchedAddressesLeafPaths, output, logger) if err != nil { return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) } // collect and sort the leafkey keys for both account mappings into a slice + t := time.Now() createKeys := trie_helpers.SortKeys(diffAccountsAtB) deleteKeys := trie_helpers.SortKeys(diffAccountsAtA) + logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes sort duration=%dms", time.Since(t).Milliseconds())) // and then find the intersection of these keys // these are the leafkeys for the accounts which exist at both A and B but are different // this also mutates the passed in createKeys and deleteKeys, removing the intersection keys // and leaving the truly created or deleted keys in place + t = time.Now() updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys) + logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes intersection count=%d duration=%dms", + len(updatedKeys), + time.Since(t).Milliseconds())) // build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two - err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput) + err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger) if err != nil { return fmt.Errorf("error building diff for updated accounts: %v", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput) + err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger) if err != nil { return fmt.Errorf("error building diff for created accounts: %v", err) } @@ -179,7 +187,9 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs // a mapping of their leafkeys to all the accounts that exist in a different state at B than A // and a slice of the paths for all of the nodes included in both func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, - watchedAddressesLeafPaths [][]byte, output types2.IPLDSink) (types2.AccountMap, error) { + watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) { + start := time.Now() + defer logger.Debug(fmt.Sprintf("statediff createdAndUpdatedState duration=%dms", time.Since(start).Milliseconds())) diffAccountsAtB := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 @@ -275,7 +285,9 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched // deletedOrUpdatedState returns a slice of all the pathes that are emptied at B // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, - watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink) (types2.AccountMap, error) { + watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) { + start := time.Now() + defer logger.Debug(fmt.Sprintf("statediff deletedOrUpdatedState duration=%dms", time.Since(start).Milliseconds())) diffAccountAtA := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 @@ -330,7 +342,9 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA // needs to be called before building account creations and deletions as this mutates // those account maps to remove the accounts which were updated func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string, - output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { + output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { + start := time.Now() + defer logger.Debug(fmt.Sprintf("statediff buildAccountUpdates duration=%dms", time.Since(start).Milliseconds())) var err error for _, key := range updatedKeys { createdAcc := creations[key] @@ -361,7 +375,10 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc // buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A // it also returns the code and codehash for created contract accounts -func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { +func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, + ipldOutput types2.IPLDSink, logger log.Logger) error { + start := time.Now() + defer logger.Debug(fmt.Sprintf("statediff buildAccountCreations duration=%dms", time.Since(start).Milliseconds())) for _, val := range accounts { diff := types2.StateLeafNode{ AccountWrapper: val, diff --git a/statediff/service.go b/statediff/service.go index 4bc5cda84..e8b4a278e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -800,9 +800,11 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p return sds.indexer.PushIPLD(tx, c) } - err = sds.Builder.WriteStateDiffObject(types2.StateRoots{ + err = sds.Builder.WriteStateDiffObject(Args{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, + BlockHash: block.Hash(), + BlockNumber: block.Number(), }, params, output, ipldOutput) // TODO this anti-pattern needs to be sorted out eventually if err := tx.Submit(err); err != nil { diff --git a/statediff/test_helpers/mocks/builder.go b/statediff/test_helpers/mocks/builder.go index 9e3ba0ec5..490393e53 100644 --- a/statediff/test_helpers/mocks/builder.go +++ b/statediff/test_helpers/mocks/builder.go @@ -44,8 +44,8 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi } // BuildStateDiffObject mock method -func (builder *Builder) WriteStateDiffObject(args sdtypes.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { - builder.StateRoots = args +func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { + builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot} builder.Params = params return builder.builderError -- 2.45.2 From f8c1032e7f7c43693f5b245c66e6bf720c401545 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 7 Feb 2023 16:26:36 -0600 Subject: [PATCH 03/13] fix: Unify and fix indexer metrics. (#314) --- statediff/indexer/database/dump/indexer.go | 18 ++-- statediff/indexer/database/dump/metrics.go | 94 ---------------- statediff/indexer/database/file/csv_writer.go | 16 ++- statediff/indexer/database/file/indexer.go | 18 ++-- statediff/indexer/database/file/metrics.go | 94 ---------------- statediff/indexer/database/file/sql_writer.go | 10 +- .../database/{sql => metrics}/metrics.go | 102 +++++++++++------- statediff/indexer/database/sql/indexer.go | 20 ++-- statediff/indexer/database/sql/interfaces.go | 17 +-- .../indexer/database/sql/postgres/pgx.go | 19 ++-- .../indexer/database/sql/postgres/sqlx.go | 19 ++-- statediff/indexer/database/sql/writer.go | 9 +- 12 files changed, 124 insertions(+), 312 deletions(-) delete mode 100644 statediff/indexer/database/dump/metrics.go delete mode 100644 statediff/indexer/database/file/metrics.go rename statediff/indexer/database/{sql => metrics}/metrics.go (59%) diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index d0a0fddce..307e95b9c 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -29,9 +29,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -41,10 +41,6 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { dump io.WriteCloser @@ -106,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip close(self.quit) close(self.iplds) tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() if err := self.flush(); err != nil { @@ -115,7 +111,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return err } tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) log.Debug(traceMsg) @@ -125,7 +121,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip go blockTx.cache() tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -137,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -146,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -163,7 +159,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/dump/metrics.go b/statediff/indexer/database/dump/metrics.go deleted file mode 100644 index 700e42dc0..000000000 --- a/statediff/indexer/database/dump/metrics.go +++ /dev/null @@ -1,94 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dump - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 0261735a6..23e92296a 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -28,6 +28,7 @@ import ( "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -235,7 +236,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) csw.rows <- tableRow{schema.TableHeader, values} - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) } func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { @@ -250,7 +251,7 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value) csw.rows <- tableRow{schema.TableTransaction, values} - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) } func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { @@ -258,7 +259,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus) csw.rows <- tableRow{schema.TableReceipt, values} - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { @@ -267,7 +268,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3) csw.rows <- tableRow{schema.TableLog, values} - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } } @@ -284,13 +285,8 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { - var storageKey string - if storageCID.StorageKey != nullHash.String() { - storageKey = storageCID.StorageKey - } - var values []interface{} - values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID, + values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, true, storageCID.Value, storageCID.Removed) csw.rows <- tableRow{schema.TableStorageNode, values} } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index cc1515b8b..ae9d8694a 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -34,9 +34,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { fileWriter FileWriter @@ -172,12 +168,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip BlockNumber: block.Number().String(), submit: func(self *BatchTx, err error) error { tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() sdi.fileWriter.Flush() tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) log.Debug(traceMsg) @@ -185,21 +181,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }, } tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() // write header, collect headerID headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty) tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // write uncles sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles()) tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -217,7 +213,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/file/metrics.go b/statediff/indexer/database/file/metrics.go deleted file mode 100644 index ca6e88f2b..000000000 --- a/statediff/indexer/database/file/metrics.go +++ /dev/null @@ -1,94 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package file - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go index c79ed843e..1e0acb21f 100644 --- a/statediff/indexer/database/file/sql_writer.go +++ b/statediff/indexer/database/file/sql_writer.go @@ -28,6 +28,7 @@ import ( "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -35,7 +36,6 @@ import ( ) var ( - nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize) writeBufferSize = pipeSize * 16 * 96 ) @@ -191,7 +191,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase) sqw.stmts <- []byte(stmt) - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) } func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { @@ -202,20 +202,20 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value)) - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) } func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus)) - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3)) - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } } diff --git a/statediff/indexer/database/sql/metrics.go b/statediff/indexer/database/metrics/metrics.go similarity index 59% rename from statediff/indexer/database/sql/metrics.go rename to statediff/indexer/database/metrics/metrics.go index b0946a722..95c1a959b 100644 --- a/statediff/indexer/database/sql/metrics.go +++ b/statediff/indexer/database/metrics/metrics.go @@ -14,10 +14,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package sql +package metrics import ( "strings" + "time" + + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) @@ -26,6 +29,11 @@ const ( namespace = "statediff" ) +var ( + IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) + DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry) +) + // Build a fully qualified metric name func metricName(subsystem, name string) string { if name == "" { @@ -39,57 +47,59 @@ func metricName(subsystem, name string) string { return strings.Join(parts, "/") } -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter +type IndexerMetricsHandles struct { + // The total number of processed BlocksCounter + BlocksCounter metrics.Counter // The total number of processed transactions - transactions metrics.Counter + TransactionsCounter metrics.Counter // The total number of processed receipts - receipts metrics.Counter + ReceiptsCounter metrics.Counter // The total number of processed logs - logs metrics.Counter + LogsCounter metrics.Counter // The total number of access list entries processed - accessListEntries metrics.Counter + AccessListEntriesCounter metrics.Counter // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer + FreePostgresTimer metrics.Timer // Postgres transaction commit duration - tPostgresCommit metrics.Timer + PostgresCommitTimer metrics.Timer // Header processing time - tHeaderProcessing metrics.Timer + HeaderProcessingTimer metrics.Timer // Uncle processing time - tUncleProcessing metrics.Timer + UncleProcessingTimer metrics.Timer // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer + TxAndRecProcessingTimer metrics.Timer // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer + StateStoreCodeProcessingTimer metrics.Timer } -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), +func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { + ctx := IndexerMetricsHandles{ + BlocksCounter: metrics.NewCounter(), + TransactionsCounter: metrics.NewCounter(), + ReceiptsCounter: metrics.NewCounter(), + LogsCounter: metrics.NewCounter(), + AccessListEntriesCounter: metrics.NewCounter(), + FreePostgresTimer: metrics.NewTimer(), + PostgresCommitTimer: metrics.NewTimer(), + HeaderProcessingTimer: metrics.NewTimer(), + UncleProcessingTimer: metrics.NewTimer(), + TxAndRecProcessingTimer: metrics.NewTimer(), + StateStoreCodeProcessingTimer: metrics.NewTimer(), } subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) + reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter) + reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter) + reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter) + reg.Register(metricName(subsys, "logs"), ctx.LogsCounter) + reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter) + reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer) + reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer) + reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer) + reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer) + reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer) + reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer) + + log.Debug("Registering statediff indexer metrics.") return ctx } @@ -132,10 +142,24 @@ func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) + + log.Debug("Registering statediff DB metrics.") return ctx } -func (met *dbMetricsHandles) Update(stats Stats) { +// DbStats interface to accommodate different concrete sql stats types +type DbStats interface { + MaxOpen() int64 + Open() int64 + InUse() int64 + Idle() int64 + WaitCount() int64 + WaitDuration() time.Duration + MaxIdleClosed() int64 + MaxLifetimeClosed() int64 +} + +func (met *dbMetricsHandles) Update(stats DbStats) { met.maxOpen.Update(stats.MaxOpen()) met.open.Update(stats.Open()) met.inUse.Update(stats.InUse()) diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 2960c90c5..a41e1631a 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -44,11 +45,6 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) - dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql type StateDiffIndexer struct { ctx context.Context @@ -75,7 +71,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo for { select { case <-ticker.C: - dbMetrics.Update(sdi.dbWriter.db.Stats()) + metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats()) case <-quit: ticker.Stop() return @@ -156,7 +152,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip rollback(sdi.ctx, tx) } else { tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() if err := self.flush(); err != nil { @@ -167,7 +163,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } err = tx.Commit(sdi.ctx) tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) } traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) @@ -178,7 +174,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip go blockTx.cache() tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -190,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -199,7 +195,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -216,7 +212,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 3fee858d6..b2117f10c 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -19,7 +19,8 @@ package sql import ( "context" "io" - "time" + + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" ) // Database interfaces required by the sql indexer @@ -35,7 +36,7 @@ type Driver interface { Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error Begin(ctx context.Context) (Tx, error) - Stats() Stats + Stats() metrics.DbStats NodeID() string Context() context.Context io.Closer @@ -71,15 +72,3 @@ type ScannableRow interface { type Result interface { RowsAffected() (int64, error) } - -// Stats interface to accommodate different concrete sql stats types -type Stats interface { - MaxOpen() int64 - Open() int64 - InUse() int64 - Idle() int64 - WaitCount() int64 - WaitDuration() time.Duration - MaxIdleClosed() int64 - MaxLifetimeClosed() int64 -} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 073d92744..038da1cfa 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -27,6 +27,7 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) @@ -144,7 +145,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) { return pgxTxWrapper{tx: tx}, nil } -func (pgx *PGXDriver) Stats() sql.Stats { +func (pgx *PGXDriver) Stats() metrics.DbStats { stats := pgx.pool.Stat() return pgxStatsWrapper{stats: stats} } @@ -178,43 +179,43 @@ type pgxStatsWrapper struct { stats *pgxpool.Stat } -// MaxOpen satisfies sql.Stats +// MaxOpen satisfies metrics.DbStats func (s pgxStatsWrapper) MaxOpen() int64 { return int64(s.stats.MaxConns()) } -// Open satisfies sql.Stats +// Open satisfies metrics.DbStats func (s pgxStatsWrapper) Open() int64 { return int64(s.stats.TotalConns()) } -// InUse satisfies sql.Stats +// InUse satisfies metrics.DbStats func (s pgxStatsWrapper) InUse() int64 { return int64(s.stats.AcquiredConns()) } -// Idle satisfies sql.Stats +// Idle satisfies metrics.DbStats func (s pgxStatsWrapper) Idle() int64 { return int64(s.stats.IdleConns()) } -// WaitCount satisfies sql.Stats +// WaitCount satisfies metrics.DbStats func (s pgxStatsWrapper) WaitCount() int64 { return s.stats.EmptyAcquireCount() } -// WaitDuration satisfies sql.Stats +// WaitDuration satisfies metrics.DbStats func (s pgxStatsWrapper) WaitDuration() time.Duration { return s.stats.AcquireDuration() } -// MaxIdleClosed satisfies sql.Stats +// MaxIdleClosed satisfies metrics.DbStats func (s pgxStatsWrapper) MaxIdleClosed() int64 { // this stat isn't supported by pgxpool, but we don't want to panic return 0 } -// MaxLifetimeClosed satisfies sql.Stats +// MaxLifetimeClosed satisfies metrics.DbStats func (s pgxStatsWrapper) MaxLifetimeClosed() int64 { return s.stats.CanceledAcquireCount() } diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index c41d39828..1c828223f 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -23,6 +23,7 @@ import ( "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) @@ -109,7 +110,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) { return sqlxTxWrapper{tx: tx}, nil } -func (driver *SQLXDriver) Stats() sql.Stats { +func (driver *SQLXDriver) Stats() metrics.DbStats { stats := driver.db.Stats() return sqlxStatsWrapper{stats: stats} } @@ -133,42 +134,42 @@ type sqlxStatsWrapper struct { stats coresql.DBStats } -// MaxOpen satisfies sql.Stats +// MaxOpen satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxOpen() int64 { return int64(s.stats.MaxOpenConnections) } -// Open satisfies sql.Stats +// Open satisfies metrics.DbStats func (s sqlxStatsWrapper) Open() int64 { return int64(s.stats.OpenConnections) } -// InUse satisfies sql.Stats +// InUse satisfies metrics.DbStats func (s sqlxStatsWrapper) InUse() int64 { return int64(s.stats.InUse) } -// Idle satisfies sql.Stats +// Idle satisfies metrics.DbStats func (s sqlxStatsWrapper) Idle() int64 { return int64(s.stats.Idle) } -// WaitCount satisfies sql.Stats +// WaitCount satisfies metrics.DbStats func (s sqlxStatsWrapper) WaitCount() int64 { return s.stats.WaitCount } -// WaitDuration satisfies sql.Stats +// WaitDuration satisfies metrics.DbStats func (s sqlxStatsWrapper) WaitDuration() time.Duration { return s.stats.WaitDuration } -// MaxIdleClosed satisfies sql.Stats +// MaxIdleClosed satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxIdleClosed() int64 { return s.stats.MaxIdleClosed } -// MaxLifetimeClosed satisfies sql.Stats +// MaxLifetimeClosed satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 { return s.stats.MaxLifetimeClosed } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index bd6fb5b67..54794f51f 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -21,6 +21,7 @@ import ( "github.com/lib/pq" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/models" ) @@ -66,7 +67,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) return nil } @@ -107,7 +108,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { if err != nil { return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} } - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) return nil } @@ -127,7 +128,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { if err != nil { return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} } - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) return nil } @@ -151,7 +152,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { if err != nil { return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} } - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } return nil } -- 2.45.2 From 73e6ee70136b2c637f3f611060bb3449e9285cd1 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 8 Feb 2023 20:55:39 -0600 Subject: [PATCH 04/13] Fix duration logging and add new metrics for fine-grained processing stats. (#315) --- statediff/builder.go | 28 ++++++++----- statediff/indexer/database/metrics/metrics.go | 42 ++++++++++++++----- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/statediff/builder.go b/statediff/builder.go index 7c72ab7e5..5cc432c8d 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -29,7 +29,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/trie_helpers" @@ -133,8 +135,8 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, outp func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { - start := time.Now() - defer logger.Debug(fmt.Sprintf("statediff BuildStateDiffWithIntermediateStateNodes total duration=%dms", time.Since(start).Milliseconds())) + logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes") + defer timeDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer) // collect a slice of all the nodes that were touched and exist at B (B-A) // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets @@ -188,8 +190,8 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs // and a slice of the paths for all of the nodes included in both func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) { - start := time.Now() - defer logger.Debug(fmt.Sprintf("statediff createdAndUpdatedState duration=%dms", time.Since(start).Milliseconds())) + logger.Debug("statediff BEGIN createdAndUpdatedStateWithIntermediateNodes") + defer timeDuration("statediff END createdAndUpdatedStateWithIntermediateNodes", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateWithIntermediateNodesTimer) diffAccountsAtB := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 @@ -286,8 +288,8 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) { - start := time.Now() - defer logger.Debug(fmt.Sprintf("statediff deletedOrUpdatedState duration=%dms", time.Since(start).Milliseconds())) + logger.Debug("statediff BEGIN deletedOrUpdatedState") + defer timeDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer) diffAccountAtA := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 @@ -343,8 +345,8 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA // those account maps to remove the accounts which were updated func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string, output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { - start := time.Now() - defer logger.Debug(fmt.Sprintf("statediff buildAccountUpdates duration=%dms", time.Since(start).Milliseconds())) + logger.Debug("statediff BEGIN buildAccountUpdates") + defer timeDuration("statediff END buildAccountUpdates", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer) var err error for _, key := range updatedKeys { createdAcc := creations[key] @@ -377,8 +379,8 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc // it also returns the code and codehash for created contract accounts func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { - start := time.Now() - defer logger.Debug(fmt.Sprintf("statediff buildAccountCreations duration=%dms", time.Since(start).Milliseconds())) + logger.Debug("statediff BEGIN buildAccountCreations") + defer timeDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer) for _, val := range accounts { diff := types2.StateLeafNode{ AccountWrapper: val, @@ -649,3 +651,9 @@ func isLeaf(elements []interface{}) (bool, error) { return false, fmt.Errorf("unknown hex prefix") } } + +func timeDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { + since := time.Since(start) + timer.Update(since) + logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) +} diff --git a/statediff/indexer/database/metrics/metrics.go b/statediff/indexer/database/metrics/metrics.go index 95c1a959b..14b5a49e1 100644 --- a/statediff/indexer/database/metrics/metrics.go +++ b/statediff/indexer/database/metrics/metrics.go @@ -70,21 +70,35 @@ type IndexerMetricsHandles struct { TxAndRecProcessingTimer metrics.Timer // State, storage, and code combined processing time StateStoreCodeProcessingTimer metrics.Timer + + // Fine-grained code timers + BuildStateDiffWithIntermediateStateNodesTimer metrics.Timer + BuildStateDiffWithoutIntermediateStateNodesTimer metrics.Timer + CreatedAndUpdatedStateWithIntermediateNodesTimer metrics.Timer + DeletedOrUpdatedStateTimer metrics.Timer + BuildAccountUpdatesTimer metrics.Timer + BuildAccountCreationsTimer metrics.Timer } func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { ctx := IndexerMetricsHandles{ - BlocksCounter: metrics.NewCounter(), - TransactionsCounter: metrics.NewCounter(), - ReceiptsCounter: metrics.NewCounter(), - LogsCounter: metrics.NewCounter(), - AccessListEntriesCounter: metrics.NewCounter(), - FreePostgresTimer: metrics.NewTimer(), - PostgresCommitTimer: metrics.NewTimer(), - HeaderProcessingTimer: metrics.NewTimer(), - UncleProcessingTimer: metrics.NewTimer(), - TxAndRecProcessingTimer: metrics.NewTimer(), - StateStoreCodeProcessingTimer: metrics.NewTimer(), + BlocksCounter: metrics.NewCounter(), + TransactionsCounter: metrics.NewCounter(), + ReceiptsCounter: metrics.NewCounter(), + LogsCounter: metrics.NewCounter(), + AccessListEntriesCounter: metrics.NewCounter(), + FreePostgresTimer: metrics.NewTimer(), + PostgresCommitTimer: metrics.NewTimer(), + HeaderProcessingTimer: metrics.NewTimer(), + UncleProcessingTimer: metrics.NewTimer(), + TxAndRecProcessingTimer: metrics.NewTimer(), + StateStoreCodeProcessingTimer: metrics.NewTimer(), + BuildStateDiffWithIntermediateStateNodesTimer: metrics.NewTimer(), + BuildStateDiffWithoutIntermediateStateNodesTimer: metrics.NewTimer(), + CreatedAndUpdatedStateWithIntermediateNodesTimer: metrics.NewTimer(), + DeletedOrUpdatedStateTimer: metrics.NewTimer(), + BuildAccountUpdatesTimer: metrics.NewTimer(), + BuildAccountCreationsTimer: metrics.NewTimer(), } subsys := "indexer" reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter) @@ -98,6 +112,12 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer) reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer) reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer) + reg.Register(metricName(subsys, "t_build_statediff_with_intermediate_state_nodes"), ctx.BuildStateDiffWithIntermediateStateNodesTimer) + reg.Register(metricName(subsys, "t_build_statediff_without_intermediate_state_nodes"), ctx.BuildStateDiffWithoutIntermediateStateNodesTimer) + reg.Register(metricName(subsys, "t_created_and_update_state_with_intermediate_nodes"), ctx.CreatedAndUpdatedStateWithIntermediateNodesTimer) + reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer) + reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer) + reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer) log.Debug("Registering statediff indexer metrics.") return ctx -- 2.45.2 From a952a1f39f3a0cc581ed013e1e681f3eaba21d12 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 15 Feb 2023 17:28:17 -0600 Subject: [PATCH 05/13] Add timers and update StandardTimer to include total time. (#316) * Add timers and update StandardTimer to include total time. * Even more timers. --- metrics/prometheus/collector.go | 3 + metrics/prometheus/collector_test.go | 3 + metrics/timer.go | 25 +++++-- statediff/builder.go | 35 +++++---- statediff/indexer/database/metrics/metrics.go | 72 +++++++++++++++++++ statediff/service.go | 6 +- statediff/trie_helpers/helpers.go | 5 ++ trie/iterator.go | 4 ++ 8 files changed, 132 insertions(+), 21 deletions(-) diff --git a/metrics/prometheus/collector.go b/metrics/prometheus/collector.go index e8d5e4f5d..76bece915 100644 --- a/metrics/prometheus/collector.go +++ b/metrics/prometheus/collector.go @@ -82,6 +82,9 @@ func (c *collector) addTimer(name string, m metrics.Timer) { c.writeSummaryPercentile(name, strconv.FormatFloat(pv[i], 'f', -1, 64), ps[i]) } c.buff.WriteRune('\n') + + c.buff.WriteString(fmt.Sprintf(typeGaugeTpl, mutateKey(name+"_total"))) + c.buff.WriteString(fmt.Sprintf(keyValueTpl, mutateKey(name+"_total"), m.Total())) } func (c *collector) addResettingTimer(name string, m metrics.ResettingTimer) { diff --git a/metrics/prometheus/collector_test.go b/metrics/prometheus/collector_test.go index 43f2f804d..194ff5541 100644 --- a/metrics/prometheus/collector_test.go +++ b/metrics/prometheus/collector_test.go @@ -92,6 +92,9 @@ test_timer {quantile="0.99"} 1.2e+08 test_timer {quantile="0.999"} 1.2e+08 test_timer {quantile="0.9999"} 1.2e+08 +# TYPE test_timer_total gauge +test_timer_total 230000000 + # TYPE test_resetting_timer_count counter test_resetting_timer_count 6 diff --git a/metrics/timer.go b/metrics/timer.go index a63c9dfb6..3d68c3135 100644 --- a/metrics/timer.go +++ b/metrics/timer.go @@ -25,6 +25,7 @@ type Timer interface { Update(time.Duration) UpdateSince(time.Time) Variance() float64 + Total() int64 } // GetOrRegisterTimer returns an existing Timer or constructs and registers a @@ -47,6 +48,7 @@ func NewCustomTimer(h Histogram, m Meter) Timer { return &StandardTimer{ histogram: h, meter: m, + total: NewCounter(), } } @@ -72,6 +74,7 @@ func NewTimer() Timer { return &StandardTimer{ histogram: NewHistogram(NewExpDecaySample(1028, 0.015)), meter: NewMeter(), + total: NewCounter(), } } @@ -134,11 +137,15 @@ func (NilTimer) UpdateSince(time.Time) {} // Variance is a no-op. func (NilTimer) Variance() float64 { return 0.0 } +// Total is a no-op. +func (NilTimer) Total() int64 { return int64(0) } + // StandardTimer is the standard implementation of a Timer and uses a Histogram // and Meter. type StandardTimer struct { histogram Histogram meter Meter + total Counter mutex sync.Mutex } @@ -200,6 +207,7 @@ func (t *StandardTimer) Snapshot() Timer { return &TimerSnapshot{ histogram: t.histogram.Snapshot().(*HistogramSnapshot), meter: t.meter.Snapshot().(*MeterSnapshot), + total: t.total.Snapshot().(CounterSnapshot), } } @@ -231,14 +239,12 @@ func (t *StandardTimer) Update(d time.Duration) { defer t.mutex.Unlock() t.histogram.Update(int64(d)) t.meter.Mark(1) + t.total.Inc(int64(d)) } // Record the duration of an event that started at a time and ends now. func (t *StandardTimer) UpdateSince(ts time.Time) { - t.mutex.Lock() - defer t.mutex.Unlock() - t.histogram.Update(int64(time.Since(ts))) - t.meter.Mark(1) + t.Update(time.Since(ts)) } // Variance returns the variance of the values in the sample. @@ -246,10 +252,18 @@ func (t *StandardTimer) Variance() float64 { return t.histogram.Variance() } +// Total returns the total time the timer has run in nanoseconds. +// This differs from Sum in that it is a simple counter, not based +// on a histogram Sample. +func (t *StandardTimer) Total() int64 { + return t.total.Count() +} + // TimerSnapshot is a read-only copy of another Timer. type TimerSnapshot struct { histogram *HistogramSnapshot meter *MeterSnapshot + total CounterSnapshot } // Count returns the number of events recorded at the time the snapshot was @@ -324,3 +338,6 @@ func (*TimerSnapshot) UpdateSince(time.Time) { // Variance returns the variance of the values at the time the snapshot was // taken. func (t *TimerSnapshot) Variance() float64 { return t.histogram.Variance() } + +// Total returns the total time the timer has run in nanoseconds. +func (t *TimerSnapshot) Total() int64 { return t.total.Count() } diff --git a/statediff/builder.go b/statediff/builder.go index 5cc432c8d..9b79b1dee 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" @@ -87,6 +86,7 @@ func NewBuilder(stateCache state.Database) Builder { // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (types2.StateObject, error) { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStateDiffObjectTimer) var stateNodes []types2.StateLeafNode var iplds []types2.IPLD err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) @@ -104,6 +104,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (typ // WriteStateDiffObject writes a statediff object to output sinks func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.WriteStateDiffObjectTimer) // Load tries for old and new states oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot) if err != nil { @@ -136,7 +137,7 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, outp func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params, output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes") - defer timeDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer) + defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer) // collect a slice of all the nodes that were touched and exist at B (B-A) // a map of their leafkey to all the accounts that were touched and exist at B // and a slice of all the paths for the nodes in both of the above sets @@ -190,12 +191,12 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs // and a slice of the paths for all of the nodes included in both func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) { - logger.Debug("statediff BEGIN createdAndUpdatedStateWithIntermediateNodes") - defer timeDuration("statediff END createdAndUpdatedStateWithIntermediateNodes", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateWithIntermediateNodesTimer) + logger.Debug("statediff BEGIN createdAndUpdatedState") + defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer) diffAccountsAtB := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 - it, _ := trie.NewDifferenceIterator(a, b) + it, itCount := trie.NewDifferenceIterator(a, b) for it.Next(true) { // ignore node if it is not along paths of interest if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) { @@ -248,6 +249,8 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, } } } + logger.Debug("statediff COUNTS createdAndUpdatedStateWithIntermediateNodes", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB)) + metrics2.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) return diffAccountsAtB, it.Error() } @@ -289,7 +292,7 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) { logger.Debug("statediff BEGIN deletedOrUpdatedState") - defer timeDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer) + defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer) diffAccountAtA := make(types2.AccountMap) watchingAddresses := len(watchedAddressesLeafPaths) > 0 @@ -345,8 +348,8 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA // those account maps to remove the accounts which were updated func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.AccountMap, updatedKeys []string, output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { - logger.Debug("statediff BEGIN buildAccountUpdates") - defer timeDuration("statediff END buildAccountUpdates", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer) + logger.Debug("statediff BEGIN buildAccountUpdates", "creations", len(creations), "deletions", len(deletions), "updatedKeys", len(updatedKeys)) + defer metrics2.ReportAndUpdateDuration("statediff END buildAccountUpdates ", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountUpdatesTimer) var err error for _, key := range updatedKeys { createdAcc := creations[key] @@ -380,7 +383,7 @@ func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions types2.Acc func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { logger.Debug("statediff BEGIN buildAccountCreations") - defer timeDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer) + defer metrics2.ReportAndUpdateDuration("statediff END buildAccountCreations", time.Now(), logger, metrics2.IndexerMetrics.BuildAccountCreationsTimer) for _, val := range accounts { diff := types2.StateLeafNode{ AccountWrapper: val, @@ -419,6 +422,7 @@ func (sdb *StateDiffBuilder) buildAccountCreations(accounts types2.AccountMap, o // i.e. it returns all the storage nodes at this state, since there is no previous state func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesEventualTimer) if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return nil } @@ -440,6 +444,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output ty // including intermediate nodes can be turned on or off func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesFromTrieTimer) for it.Next(true) { if it.Leaf() { storageLeafNode, err := sdb.processStorageValueNode(it) @@ -489,6 +494,7 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator) (type // buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output types2.StorageNodeSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedAccountStorageNodesTimer) if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { return nil } @@ -508,6 +514,7 @@ func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, out // buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output types2.StorageNodeSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer) for it.Next(true) { if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes leafKey := make([]byte, len(it.LeafKey())) @@ -528,6 +535,7 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat // buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.BuildStorageNodesIncrementalTimer) if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) { return nil } @@ -556,6 +564,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output types2.StorageNodeSink, ipldOutput types2.IPLDSink) (map[string]bool, error) { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.CreatedAndUpdatedStorageTimer) diffSlotsAtB := make(map[string]bool) it, _ := trie.NewDifferenceIterator(a, b) for it.Next(true) { @@ -585,6 +594,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, ou } func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output types2.StorageNodeSink) error { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.DeletedOrUpdatedStorageTimer) it, _ := trie.NewDifferenceIterator(b, a) for it.Next(true) { if it.Leaf() { @@ -621,6 +631,7 @@ func isValidPrefixPath(watchedAddressesLeafPaths [][]byte, currentPath []byte) b // isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.IsWatchedAddressTimer) for _, watchedAddressPath := range watchedAddressesLeafPaths { if bytes.Equal(watchedAddressPath, valueNodePath) { return true @@ -651,9 +662,3 @@ func isLeaf(elements []interface{}) (bool, error) { return false, fmt.Errorf("unknown hex prefix") } } - -func timeDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { - since := time.Since(start) - timer.Update(since) - logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) -} diff --git a/statediff/indexer/database/metrics/metrics.go b/statediff/indexer/database/metrics/metrics.go index 14b5a49e1..550dd238e 100644 --- a/statediff/indexer/database/metrics/metrics.go +++ b/statediff/indexer/database/metrics/metrics.go @@ -17,6 +17,7 @@ package metrics import ( + "fmt" "strings" "time" @@ -78,6 +79,26 @@ type IndexerMetricsHandles struct { DeletedOrUpdatedStateTimer metrics.Timer BuildAccountUpdatesTimer metrics.Timer BuildAccountCreationsTimer metrics.Timer + ResolveNodeTimer metrics.Timer + SortKeysTimer metrics.Timer + FindIntersectionTimer metrics.Timer + OutputTimer metrics.Timer + CodeOutputTimer metrics.Timer + DifferenceIteratorNextTimer metrics.Timer + DifferenceIteratorCounter metrics.Counter + DeletedOrUpdatedStorageTimer metrics.Timer + CreatedAndUpdatedStorageTimer metrics.Timer + BuildStorageNodesIncrementalTimer metrics.Timer + BuildStateTrieObjectTimer metrics.Timer + BuildStateTrieTimer metrics.Timer + BuildStateDiffObjectTimer metrics.Timer + WriteStateDiffObjectTimer metrics.Timer + CreatedAndUpdatedStateTimer metrics.Timer + BuildStorageNodesEventualTimer metrics.Timer + BuildStorageNodesFromTrieTimer metrics.Timer + BuildRemovedAccountStorageNodesTimer metrics.Timer + BuildRemovedStorageNodesFromTrieTimer metrics.Timer + IsWatchedAddressTimer metrics.Timer } func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { @@ -99,6 +120,26 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { DeletedOrUpdatedStateTimer: metrics.NewTimer(), BuildAccountUpdatesTimer: metrics.NewTimer(), BuildAccountCreationsTimer: metrics.NewTimer(), + ResolveNodeTimer: metrics.NewTimer(), + SortKeysTimer: metrics.NewTimer(), + FindIntersectionTimer: metrics.NewTimer(), + OutputTimer: metrics.NewTimer(), + CodeOutputTimer: metrics.NewTimer(), + DifferenceIteratorNextTimer: metrics.NewTimer(), + DifferenceIteratorCounter: metrics.NewCounter(), + DeletedOrUpdatedStorageTimer: metrics.NewTimer(), + CreatedAndUpdatedStorageTimer: metrics.NewTimer(), + BuildStorageNodesIncrementalTimer: metrics.NewTimer(), + BuildStateTrieObjectTimer: metrics.NewTimer(), + BuildStateTrieTimer: metrics.NewTimer(), + BuildStateDiffObjectTimer: metrics.NewTimer(), + WriteStateDiffObjectTimer: metrics.NewTimer(), + CreatedAndUpdatedStateTimer: metrics.NewTimer(), + BuildStorageNodesEventualTimer: metrics.NewTimer(), + BuildStorageNodesFromTrieTimer: metrics.NewTimer(), + BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(), + BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(), + IsWatchedAddressTimer: metrics.NewTimer(), } subsys := "indexer" reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter) @@ -118,6 +159,26 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer) reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer) reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer) + reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer) + reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer) + reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer) + reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer) + reg.Register(metricName(subsys, "t_code_output_fn"), ctx.CodeOutputTimer) + reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer) + reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) + reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer) + reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer) + reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer) + reg.Register(metricName(subsys, "t_build_state_trie_object"), ctx.BuildStateTrieObjectTimer) + reg.Register(metricName(subsys, "t_build_state_trie"), ctx.BuildStateTrieTimer) + reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer) + reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer) + reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer) + reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer) + reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer) + reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer) + reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer) + reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer) log.Debug("Registering statediff indexer metrics.") return ctx @@ -189,3 +250,14 @@ func (met *dbMetricsHandles) Update(stats DbStats) { met.closedMaxIdle.Inc(stats.MaxIdleClosed()) met.closedMaxLifetime.Inc(stats.MaxLifetimeClosed()) } + +func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { + since := UpdateDuration(start, timer) + logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) +} + +func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration { + since := time.Since(start) + timer.Update(since) + return since +} diff --git a/statediff/service.go b/statediff/service.go index e8b4a278e..880d56aa1 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -26,8 +26,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/trie" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -43,9 +41,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ind "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" types2 "github.com/ethereum/go-ethereum/statediff/types" + "github.com/ethereum/go-ethereum/trie" "github.com/thoas/go-funk" ) @@ -794,9 +794,11 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } output := func(node types2.StateLeafNode) error { + defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), logger, metrics.IndexerMetrics.OutputTimer) return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldOutput := func(c types2.IPLD) error { + defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.CodeOutputTimer) return sds.indexer.PushIPLD(tx, c) } diff --git a/statediff/trie_helpers/helpers.go b/statediff/trie_helpers/helpers.go index 0f5152774..d6c024ee4 100644 --- a/statediff/trie_helpers/helpers.go +++ b/statediff/trie_helpers/helpers.go @@ -22,12 +22,16 @@ package trie_helpers import ( "sort" "strings" + "time" + + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/types" ) // SortKeys sorts the keys in the account map func SortKeys(data types.AccountMap) []string { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer) keys := make([]string, 0, len(data)) for key := range data { keys = append(keys, key) @@ -41,6 +45,7 @@ func SortKeys(data types.AccountMap) []string { // a and b must first be sorted // this is used to find which keys have been both "deleted" and "created" i.e. they were updated func FindIntersection(a, b []string) []string { + defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer) lenA := len(a) lenB := len(b) iOfA, iOfB := 0, 0 diff --git a/trie/iterator.go b/trie/iterator.go index d2c8b6a78..3c2bc4ada 100644 --- a/trie/iterator.go +++ b/trie/iterator.go @@ -20,6 +20,9 @@ import ( "bytes" "container/heap" "errors" + "time" + + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" @@ -584,6 +587,7 @@ func (it *differenceIterator) AddResolver(resolver ethdb.KeyValueReader) { } func (it *differenceIterator) Next(bool) bool { + defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DifferenceIteratorNextTimer) // Invariants: // - We always advance at least one element in b. // - At the start of this function, a's path is lexically greater than b's. -- 2.45.2 From ed9d47885e802f7e540519de486e26b9691b475c Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 15 Feb 2023 19:56:03 -0600 Subject: [PATCH 06/13] Move noisiest timer message to trace. (#317) --- statediff/service.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/statediff/service.go b/statediff/service.go index 880d56aa1..a6db952e4 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -794,7 +794,11 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } output := func(node types2.StateLeafNode) error { - defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), logger, metrics.IndexerMetrics.OutputTimer) + defer func() { + // This is very noisy so we log at Trace. + since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer) + logger.Trace(fmt.Sprintf("statediff output duration=%dms", since.Milliseconds())) + }() return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldOutput := func(c types2.IPLD) error { -- 2.45.2 From 01df44e2dd27f3c57c2d46702d72829d6b553c00 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 9 Mar 2023 20:02:40 -0600 Subject: [PATCH 07/13] Add COPY support for inserting multiple rows in a single operation. (#328) * Add COPY support for inserting multiple rows in a single command. --- cmd/geth/config.go | 3 + cmd/geth/main.go | 1 + cmd/utils/flags.go | 7 + statediff/indexer/database/sql/interfaces.go | 14 ++ statediff/indexer/database/sql/lazy_tx.go | 61 ++++- .../database/sql/pgx_indexer_legacy_test.go | 2 +- .../indexer/database/sql/pgx_indexer_test.go | 28 ++- .../indexer/database/sql/postgres/config.go | 3 + .../indexer/database/sql/postgres/database.go | 47 ++++ .../indexer/database/sql/postgres/pgx.go | 12 +- .../indexer/database/sql/postgres/sqlx.go | 11 + .../database/sql/postgres/test_helpers.go | 4 +- statediff/indexer/database/sql/writer.go | 229 +++++++++++++----- 13 files changed, 345 insertions(+), 77 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index ae5332e30..99eed3d4c 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -268,6 +268,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffLogStatements.Name) { pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) } + if ctx.IsSet(utils.StateDiffCopyFrom.Name) { + pgConfig.CopyFrom = ctx.Bool(utils.StateDiffCopyFrom.Name) + } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 16c549fed..5b3b93d5c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -180,6 +180,7 @@ var ( utils.StateDiffWatchedAddressesFilePath, utils.StateDiffUpsert, utils.StateDiffLogStatements, + utils.StateDiffCopyFrom, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9f2624d3c..ffab935d8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1083,6 +1083,13 @@ var ( Usage: "Should the statediff service log all database statements? (Note: pgx only)", Value: false, } + + StateDiffCopyFrom = &cli.BoolFlag{ + Name: "statediff.db.copyfrom", + Usage: "Should the statediff service use COPY FROM for multiple inserts? (Note: pgx only)", + Value: false, + } + StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index b2117f10c..f964a2a90 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -31,6 +31,7 @@ type Database interface { // Driver interface has all the methods required by a driver implementation to support the sql indexer type Driver interface { + UseCopyFrom() bool QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error @@ -53,12 +54,25 @@ type Statements interface { InsertStorageStm() string InsertIPLDStm() string InsertIPLDsStm() string + + // Table/column descriptions for use with CopyFrom and similar commands. + LogTableName() []string + LogColumnNames() []string + RctTableName() []string + RctColumnNames() []string + StateTableName() []string + StateColumnNames() []string + StorageTableName() []string + StorageColumnNames() []string + TxTableName() []string + TxColumnNames() []string } // Tx interface to accommodate different concrete SQL transaction types type Tx interface { QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) + CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) Commit(ctx context.Context) error Rollback(ctx context.Context) error } diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index 922bf84a0..b2445e0d8 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -2,10 +2,16 @@ package sql import ( "context" + "reflect" + + "github.com/ethereum/go-ethereum/log" ) +// Changing this to 1 would make sure only sequential COPYs were combined. +const copyFromCheckLimit = 100 + type DelayedTx struct { - cache []cachedStmt + cache []interface{} db Database } type cachedStmt struct { @@ -13,6 +19,20 @@ type cachedStmt struct { args []interface{} } +type copyFrom struct { + tableName []string + columnNames []string + rows [][]interface{} +} + +func (cf *copyFrom) appendRows(rows [][]interface{}) { + cf.rows = append(cf.rows, rows...) +} + +func (cf *copyFrom) matches(tableName []string, columnNames []string) bool { + return reflect.DeepEqual(cf.tableName, tableName) && reflect.DeepEqual(cf.columnNames, columnNames) +} + func NewDelayedTx(db Database) *DelayedTx { return &DelayedTx{db: db} } @@ -21,6 +41,28 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface return tx.db.QueryRow(ctx, sql, args...) } +func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) { + for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 { + prevCopy, ok := tx.cache[pos].(*copyFrom) + if ok && prevCopy.matches(tableName, columnNames) { + return prevCopy, count + } + } + return nil, -1 +} + +func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + if prevCopy, distance := tx.findPrevCopyFrom(tableName, columnNames, copyFromCheckLimit); nil != prevCopy { + log.Trace("statediff lazy_tx : Appending to COPY", "table", tableName, + "current", len(prevCopy.rows), "new", len(rows), "distance", distance) + prevCopy.appendRows(rows) + } else { + tx.cache = append(tx.cache, ©From{tableName, columnNames, rows}) + } + + return 0, nil +} + func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { tx.cache = append(tx.cache, cachedStmt{sql, args}) return nil, nil @@ -39,10 +81,19 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { rollback(ctx, base) } }() - for _, stmt := range tx.cache { - _, err := base.Exec(ctx, stmt.sql, stmt.args...) - if err != nil { - return err + for _, item := range tx.cache { + switch item := item.(type) { + case *copyFrom: + _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) + if err != nil { + log.Error("COPY error", "table", item.tableName, "err", err) + return err + } + case cachedStmt: + _, err := base.Exec(ctx, item.sql, item.args...) + if err != nil { + return err + } } } tx.cache = nil diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 292548b75..80094a8d0 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -28,7 +28,7 @@ import ( ) func setupLegacyPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() + db, err = postgres.SetupPGXDB(postgres.DefaultConfig) if err != nil { t.Fatal(err) } diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 1dbf2dfa0..c0ce57c1f 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -29,8 +29,8 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/test" ) -func setupPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB() +func setupPGXIndexer(t *testing.T, config postgres.Config) { + db, err = postgres.SetupPGXDB(config) if err != nil { t.Fatal(err) } @@ -39,12 +39,16 @@ func setupPGXIndexer(t *testing.T) { } func setupPGX(t *testing.T) { - setupPGXIndexer(t) + setupPGXWithConfig(t, postgres.DefaultConfig) +} + +func setupPGXWithConfig(t *testing.T, config postgres.Config) { + setupPGXIndexer(t, config) test.SetupTestData(t, ind) } func setupPGXNonCanonical(t *testing.T) { - setupPGXIndexer(t) + setupPGXIndexer(t, postgres.DefaultConfig) test.SetupTestDataNonCanonical(t, ind) } @@ -97,6 +101,20 @@ func TestPGXIndexer(t *testing.T) { test.TestPublishAndIndexStorageIPLDs(t, db) }) + + t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) { + config := postgres.DefaultConfig + config.CopyFrom = true + + setupPGXWithConfig(t, config) + defer tearDown(t) + defer checkTxClosure(t, 1, 0, 1) + + test.TestPublishAndIndexStateIPLDs(t, db) + test.TestPublishAndIndexStorageIPLDs(t, db) + test.TestPublishAndIndexReceiptIPLDs(t, db) + test.TestPublishAndIndexLogIPLDs(t, db) + }) } // Test indexer for a canonical + a non-canonical block at London height + a non-canonical block at London height + 1 @@ -151,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) { } func TestPGXWatchAddressMethods(t *testing.T) { - setupPGXIndexer(t) + setupPGXIndexer(t, postgres.DefaultConfig) defer tearDown(t) defer checkTxClosure(t, 1, 0, 1) diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 2038bf1f5..1cbec55c1 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -92,6 +92,9 @@ type Config struct { // toggle on/off upserts Upsert bool + + // toggle on/off CopyFrom + CopyFrom bool } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index a508da83f..f682f5e78 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -84,3 +84,50 @@ func (db *DB) InsertIPLDStm() string { func (db *DB) InsertIPLDsStm() string { return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` } + +func (db *DB) AccountTableName() []string { + return []string{"eth", "state_accounts"} +} +func (db *DB) AccountColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"} +} + +func (db *DB) LogTableName() []string { + return []string{"eth", "log_cids"} +} + +func (db *DB) LogColumnNames() []string { + return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} +} + +func (db *DB) RctTableName() []string { + return []string{"eth", "receipt_cids"} +} + +func (db *DB) RctColumnNames() []string { + return []string{"block_number", "header_id", "tx_id", "leaf_cid", "contract", "contract_hash", "leaf_mh_key", "post_state", "post_status", "log_root"} +} + +func (db *DB) StateTableName() []string { + return []string{"eth", "state_cids"} +} + +func (db *DB) StateColumnNames() []string { + return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) StorageTableName() []string { + return []string{"eth", "storage_cids"} +} + +func (db *DB) StorageColumnNames() []string { + return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} +} + +func (db *DB) TxTableName() []string { + return []string{"eth", "transaction_cids"} +} + +func (db *DB) TxColumnNames() []string { + return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "mh_key", "tx_data", "tx_type", "value"} +} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 038da1cfa..7825e34bb 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -38,6 +38,7 @@ type PGXDriver struct { pool *pgxpool.Pool nodeInfo node.Info nodeID string + config Config } // ConnectPGX initializes and returns a PGX connection pool @@ -56,7 +57,7 @@ func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDrive if err != nil { return nil, ErrDBConnectionFailed(err) } - pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node} + pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config} nodeErr := pg.createNode() if nodeErr != nil { return &PGXDriver{}, ErrUnableToSetNode(nodeErr) @@ -166,6 +167,11 @@ func (pgx *PGXDriver) Context() context.Context { return pgx.ctx } +// HasCopy satisfies sql.Database +func (pgx *PGXDriver) UseCopyFrom() bool { + return pgx.config.CopyFrom +} + type resultWrapper struct { ct pgconn.CommandTag } @@ -244,3 +250,7 @@ func (t pgxTxWrapper) Commit(ctx context.Context) error { func (t pgxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) } + +func (t pgxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return t.tx.CopyFrom(ctx, tableName, columnNames, pgx.CopyFromRows(rows)) +} diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 1c828223f..452b4988a 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -19,6 +19,7 @@ package postgres import ( "context" coresql "database/sql" + "errors" "time" "github.com/jmoiron/sqlx" @@ -130,6 +131,12 @@ func (driver *SQLXDriver) Context() context.Context { return driver.ctx } +// HasCopy satisfies sql.Database +func (driver *SQLXDriver) UseCopyFrom() bool { + // sqlx does not currently support COPY. + return false +} + type sqlxStatsWrapper struct { stats coresql.DBStats } @@ -197,3 +204,7 @@ func (t sqlxTxWrapper) Commit(ctx context.Context) error { func (t sqlxTxWrapper) Rollback(ctx context.Context) error { return t.tx.Rollback() } + +func (t sqlxTxWrapper) CopyFrom(ctx context.Context, tableName []string, columnNames []string, rows [][]interface{}) (int64, error) { + return 0, errors.New("Unsupported Operation") +} diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index f8311b413..cb5255429 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -35,8 +35,8 @@ func SetupSQLXDB() (sql.Database, error) { } // SetupPGXDB is used to setup a pgx db for tests -func SetupPGXDB() (sql.Database, error) { - driver, err := NewPGXDriver(context.Background(), DefaultConfig, node.Info{}) +func SetupPGXDB(config Config) (sql.Database, error) { + driver, err := NewPGXDriver(context.Background(), config, node.Info{}) if err != nil { return nil, err } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 54794f51f..79e60f46c 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -18,6 +18,7 @@ package sql import ( "fmt" + "strconv" "github.com/lib/pq" @@ -95,18 +96,37 @@ INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, sr ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), - transaction.BlockNumber, - transaction.HeaderID, - transaction.TxHash, - transaction.CID, - transaction.Dst, - transaction.Src, - transaction.Index, - transaction.Type, - transaction.Value) - if err != nil { - return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(transaction.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + value, err := strconv.ParseFloat(transaction.Value, 64) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), + toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + transaction.Src, transaction.Index, int(transaction.Type), value))) + if err != nil { + return insertError{"eth.transaction_cids", err, "COPY", transaction} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), + transaction.BlockNumber, + transaction.HeaderID, + transaction.TxHash, + transaction.CID, + transaction.Dst, + transaction.Src, + transaction.Index, + transaction.Type, + transaction.Value) + if err != nil { + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} + } } metrics.IndexerMetrics.TransactionsCounter.Inc(1) return nil @@ -117,16 +137,30 @@ INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, cid, contract, pos ON CONFLICT (tx_id, header_id, block_number) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.BlockNumber, - rct.HeaderID, - rct.TxID, - rct.CID, - rct.Contract, - rct.PostState, - rct.PostStatus) - if err != nil { - return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(rct.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), + toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, + rct.PostState, int(rct.PostStatus)))) + if err != nil { + return insertError{"eth.receipt_cids", err, "COPY", rct} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), + rct.BlockNumber, + rct.HeaderID, + rct.TxID, + rct.CID, + rct.Contract, + rct.PostState, + rct.PostStatus) + if err != nil { + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} + } } metrics.IndexerMetrics.ReceiptsCounter.Inc(1) return nil @@ -137,22 +171,42 @@ INSERT INTO eth.log_cids (block_number, header_id, cid, rct_id, address, index, ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { - for _, log := range logs { - _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.BlockNumber, - log.HeaderID, - log.CID, - log.ReceiptID, - log.Address, - log.Index, - log.Topic0, - log.Topic1, - log.Topic2, - log.Topic3) - if err != nil { - return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + if w.useCopyForTx(tx) { + var rows [][]interface{} + for _, log := range logs { + blockNum, err := strconv.ParseInt(log.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", log} + } + + rows = append(rows, toRow(blockNum, log.HeaderID, log.CID, log.ReceiptID, + log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3)) + } + if nil != rows && len(rows) >= 0 { + _, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) + if err != nil { + return insertError{"eth.log_cids", err, "COPY", rows} + } + metrics.IndexerMetrics.LogsCounter.Inc(int64(len(rows))) + } + } else { + for _, log := range logs { + _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), + log.BlockNumber, + log.HeaderID, + log.CID, + log.ReceiptID, + log.Address, + log.Index, + log.Topic0, + log.Topic1, + log.Topic2, + log.Topic3) + if err != nil { + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} + } + metrics.IndexerMetrics.LogsCounter.Inc(1) } - metrics.IndexerMetrics.LogsCounter.Inc(1) } return nil } @@ -166,20 +220,34 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.Removed { balance = "0" } - _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), - stateNode.BlockNumber, - stateNode.HeaderID, - stateNode.StateKey, - stateNode.CID, - true, - balance, - stateNode.Nonce, - stateNode.CodeHash, - stateNode.StorageRoot, - stateNode.Removed, - ) - if err != nil { - return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), + toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, + true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), + stateNode.BlockNumber, + stateNode.HeaderID, + stateNode.StateKey, + stateNode.CID, + true, + balance, + stateNode.Nonce, + stateNode.CodeHash, + stateNode.StorageRoot, + stateNode.Removed, + ) + if err != nil { + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} + } } return nil } @@ -189,22 +257,57 @@ INSERT INTO eth.storage_cids (block_number, header_id, state_leaf_key, storage_l ON CONFLICT (header_id, state_leaf_key, storage_leaf_key, block_number) DO NOTHING */ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { - _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), - storageCID.BlockNumber, - storageCID.HeaderID, - storageCID.StateKey, - storageCID.StorageKey, - storageCID.CID, - true, - storageCID.Value, - storageCID.Removed, - ) - if err != nil { - return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseInt(storageCID.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.storage_cids", err, "COPY", storageCID} + } + + _, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), + toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, + true, storageCID.Value, storageCID.Removed))) + if err != nil { + return insertError{"eth.storage_cids", err, "COPY", storageCID} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), + storageCID.BlockNumber, + storageCID.HeaderID, + storageCID.StateKey, + storageCID.StorageKey, + storageCID.CID, + true, + storageCID.Value, + storageCID.Removed, + ) + if err != nil { + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} + } } return nil } +func (w *Writer) useCopyForTx(tx Tx) bool { + // Using COPY instead of INSERT only makes much sense if also using a DelayedTx, so that operations + // can be collected over time and then all submitted within in a single TX. + if _, ok := tx.(*DelayedTx); ok { + return w.db.UseCopyFrom() + } + return false +} + +// combine args into a row +func toRow(args ...interface{}) []interface{} { + var row []interface{} + row = append(row, args...) + return row +} + +// combine row (or rows) into a slice of rows for CopyFrom +func toRows(rows ...[]interface{}) [][]interface{} { + return rows +} + type insertError struct { table string err error -- 2.45.2 From 50ff8bf1ca9dee9444fd35017cc9fd117244fb8c Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Mon, 13 Mar 2023 12:49:29 -0500 Subject: [PATCH 08/13] Add timers/counters for LevelDB Get, Put, Has, and Delete. (#332) * Add timers/counters for LevelDB Get, Put, Has, and Delete. * Test for null metrics (the unit tests don't initialize them). --- ethdb/leveldb/leveldb.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 15bd4e6eb..50e1ec673 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -63,6 +63,11 @@ type Database struct { fn string // filename for reporting db *leveldb.DB // LevelDB instance + getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get(). + putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put(). + deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete(). + hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has(). + compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction compReadMeter metrics.Meter // Meter for measuring the data read during compaction compWriteMeter metrics.Meter // Meter for measuring the data written during compaction @@ -144,6 +149,11 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option ldb.nonlevel0CompGauge = metrics.NewRegisteredGauge(namespace+"compact/nonlevel0", nil) ldb.seekCompGauge = metrics.NewRegisteredGauge(namespace+"compact/seek", nil) + ldb.getTimer = metrics.NewRegisteredTimer(namespace+"db/get/time", nil) + ldb.putTimer = metrics.NewRegisteredTimer(namespace+"db/put/time", nil) + ldb.deleteTimer = metrics.NewRegisteredTimer(namespace+"db/delete/time", nil) + ldb.hasTimer = metrics.NewRegisteredTimer(namespace+"db/has/time", nil) + // Start up the metrics gathering and return go ldb.meter(metricsGatheringInterval) return ldb, nil @@ -182,11 +192,17 @@ func (db *Database) Close() error { // Has retrieves if a key is present in the key-value store. func (db *Database) Has(key []byte) (bool, error) { + if nil != db.hasTimer { + defer func(start time.Time) { db.hasTimer.UpdateSince(start) }(time.Now()) + } return db.db.Has(key, nil) } // Get retrieves the given key if it's present in the key-value store. func (db *Database) Get(key []byte) ([]byte, error) { + if nil != db.getTimer { + defer func(start time.Time) { db.getTimer.UpdateSince(start) }(time.Now()) + } dat, err := db.db.Get(key, nil) if err != nil { return nil, err @@ -196,11 +212,17 @@ func (db *Database) Get(key []byte) ([]byte, error) { // Put inserts the given value into the key-value store. func (db *Database) Put(key []byte, value []byte) error { + if nil != db.putTimer { + defer func(start time.Time) { db.putTimer.UpdateSince(start) }(time.Now()) + } return db.db.Put(key, value, nil) } // Delete removes the key from the key-value store. func (db *Database) Delete(key []byte) error { + if nil != db.deleteTimer { + defer func(start time.Time) { db.deleteTimer.UpdateSince(start) }(time.Now()) + } return db.db.Delete(key, nil) } -- 2.45.2 From 7b55d8c16a6d59fa0e126a20bd695581936d2750 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 14 Mar 2023 22:32:47 -0500 Subject: [PATCH 09/13] Add timer and counter for batched write operations. (#337) * Add timer and counter for batched write operations. * Tweak comment --- ethdb/leveldb/leveldb.go | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 50e1ec673..2ee1dd9d2 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -63,10 +63,12 @@ type Database struct { fn string // filename for reporting db *leveldb.DB // LevelDB instance - getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get(). - putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put(). - deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete(). - hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has(). + getTimer metrics.Timer // Timer/counter for measuring time and invocations of Get(). + putTimer metrics.Timer // Timer/counter for measuring time and invocations of Put(). + deleteTimer metrics.Timer // Timer/counter for measuring time and invocations of Delete(). + hasTimer metrics.Timer // Timer/counter for measuring time and invocations of Has(). + batchWriteTimer metrics.Timer // Timer/counter for measuring time and invocations of batch writes. + batchItemCounter metrics.Counter // Counter for measuring number of batched items written. compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction compReadMeter metrics.Meter // Meter for measuring the data read during compaction @@ -153,6 +155,8 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option ldb.putTimer = metrics.NewRegisteredTimer(namespace+"db/put/time", nil) ldb.deleteTimer = metrics.NewRegisteredTimer(namespace+"db/delete/time", nil) ldb.hasTimer = metrics.NewRegisteredTimer(namespace+"db/has/time", nil) + ldb.batchWriteTimer = metrics.NewRegisteredTimer(namespace+"db/batch_write/time", nil) + ldb.batchItemCounter = metrics.NewRegisteredCounter(namespace+"db/batch_item/count", nil) // Start up the metrics gathering and return go ldb.meter(metricsGatheringInterval) @@ -230,16 +234,20 @@ func (db *Database) Delete(key []byte) error { // database until a final write is called. func (db *Database) NewBatch() ethdb.Batch { return &batch{ - db: db.db, - b: new(leveldb.Batch), + db: db.db, + b: new(leveldb.Batch), + writeTimer: &db.batchWriteTimer, + itemCounter: &db.batchItemCounter, } } // NewBatchWithSize creates a write-only database batch with pre-allocated buffer. func (db *Database) NewBatchWithSize(size int) ethdb.Batch { return &batch{ - db: db.db, - b: leveldb.MakeBatch(size), + db: db.db, + b: leveldb.MakeBatch(size), + writeTimer: &db.batchWriteTimer, + itemCounter: &db.batchItemCounter, } } @@ -490,9 +498,11 @@ func (db *Database) meter(refresh time.Duration) { // batch is a write-only leveldb batch that commits changes to its host database // when Write is called. A batch cannot be used concurrently. type batch struct { - db *leveldb.DB - b *leveldb.Batch - size int + db *leveldb.DB + b *leveldb.Batch + size int + writeTimer *metrics.Timer + itemCounter *metrics.Counter } // Put inserts the given value into the batch for later committing. @@ -516,6 +526,12 @@ func (b *batch) ValueSize() int { // Write flushes any accumulated data to disk. func (b *batch) Write() error { + if nil != *b.writeTimer { + defer func(start time.Time) { (*b.writeTimer).UpdateSince(start) }(time.Now()) + } + if nil != *b.itemCounter { + (*b.itemCounter).Inc(int64(b.size)) + } return b.db.Write(b.b, nil) } -- 2.45.2 From 24861e4883e3b3570d862b217f9fffe803307160 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 23 Mar 2023 13:33:10 -0500 Subject: [PATCH 10/13] rename CodeOutputTimer to IPLDOutputTimer --- statediff/indexer/database/metrics/metrics.go | 6 +++--- statediff/service.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/statediff/indexer/database/metrics/metrics.go b/statediff/indexer/database/metrics/metrics.go index 550dd238e..34cb3a69b 100644 --- a/statediff/indexer/database/metrics/metrics.go +++ b/statediff/indexer/database/metrics/metrics.go @@ -83,7 +83,7 @@ type IndexerMetricsHandles struct { SortKeysTimer metrics.Timer FindIntersectionTimer metrics.Timer OutputTimer metrics.Timer - CodeOutputTimer metrics.Timer + IPLDOutputTimer metrics.Timer DifferenceIteratorNextTimer metrics.Timer DifferenceIteratorCounter metrics.Counter DeletedOrUpdatedStorageTimer metrics.Timer @@ -124,7 +124,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { SortKeysTimer: metrics.NewTimer(), FindIntersectionTimer: metrics.NewTimer(), OutputTimer: metrics.NewTimer(), - CodeOutputTimer: metrics.NewTimer(), + IPLDOutputTimer: metrics.NewTimer(), DifferenceIteratorNextTimer: metrics.NewTimer(), DifferenceIteratorCounter: metrics.NewCounter(), DeletedOrUpdatedStorageTimer: metrics.NewTimer(), @@ -163,7 +163,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer) reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer) reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer) - reg.Register(metricName(subsys, "t_code_output_fn"), ctx.CodeOutputTimer) + reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer) reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer) reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer) diff --git a/statediff/service.go b/statediff/service.go index a6db952e4..7325971c1 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -802,7 +802,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } ipldOutput := func(c types2.IPLD) error { - defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.CodeOutputTimer) + defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), logger, metrics.IndexerMetrics.IPLDOutputTimer) return sds.indexer.PushIPLD(tx, c) } -- 2.45.2 From 68835c0ef242c4bd81f81ab33f2d4360a22a9ba4 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 23 Mar 2023 13:39:18 -0500 Subject: [PATCH 11/13] update sql.Statements methods to match v5 schema --- .../indexer/database/sql/postgres/database.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index f682f5e78..b371a83b2 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -85,19 +85,12 @@ func (db *DB) InsertIPLDsStm() string { return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` } -func (db *DB) AccountTableName() []string { - return []string{"eth", "state_accounts"} -} -func (db *DB) AccountColumnNames() []string { - return []string{"block_number", "header_id", "state_path", "balance", "nonce", "code_hash", "storage_root"} -} - func (db *DB) LogTableName() []string { return []string{"eth", "log_cids"} } func (db *DB) LogColumnNames() []string { - return []string{"block_number", "header_id", "leaf_cid", "leaf_mh_key", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3", "log_data"} + return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"} } func (db *DB) RctTableName() []string { @@ -105,7 +98,7 @@ func (db *DB) RctTableName() []string { } func (db *DB) RctColumnNames() []string { - return []string{"block_number", "header_id", "tx_id", "leaf_cid", "contract", "contract_hash", "leaf_mh_key", "post_state", "post_status", "log_root"} + return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"} } func (db *DB) StateTableName() []string { @@ -113,7 +106,7 @@ func (db *DB) StateTableName() []string { } func (db *DB) StateColumnNames() []string { - return []string{"block_number", "header_id", "state_leaf_key", "cid", "state_path", "node_type", "diff", "mh_key"} + return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"} } func (db *DB) StorageTableName() []string { @@ -121,7 +114,7 @@ func (db *DB) StorageTableName() []string { } func (db *DB) StorageColumnNames() []string { - return []string{"block_number", "header_id", "state_path", "storage_leaf_key", "cid", "storage_path", "node_type", "diff", "mh_key"} + return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"} } func (db *DB) TxTableName() []string { @@ -129,5 +122,5 @@ func (db *DB) TxTableName() []string { } func (db *DB) TxColumnNames() []string { - return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "mh_key", "tx_data", "tx_type", "value"} + return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"} } -- 2.45.2 From eb44021e9f25c13da37ba01cba1cf751beae4c96 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 23 Mar 2023 14:27:03 -0500 Subject: [PATCH 12/13] update go.mod and un-comment-out service test --- go.mod | 4 ++-- go.sum | 8 +++----- statediff/service_test.go | 2 -- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 0be358207..97dea73a8 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/rs/cors v1.7.0 github.com/shirou/gopsutil v3.21.11+incompatible github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.7.0 github.com/supranational/blst v0.3.8 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/thoas/go-funk v0.9.2 @@ -120,7 +120,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/stretchr/objx v0.4.0 // indirect + github.com/stretchr/objx v0.3.0 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect diff --git a/go.sum b/go.sum index a1a93a6fb..85f989d51 100644 --- a/go.sum +++ b/go.sum @@ -537,17 +537,15 @@ github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZL github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY= github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= diff --git a/statediff/service_test.go b/statediff/service_test.go index e921d0893..1df068608 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -16,7 +16,6 @@ package statediff_test -/* import ( "bytes" "math/big" @@ -438,4 +437,3 @@ func testGetSyncStatus(t *testing.T) { } } } -*/ -- 2.45.2 From f8f446e0a9d94d73de28536f2538488bca5864ce Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 23 Mar 2023 14:49:47 -0500 Subject: [PATCH 13/13] fix balance formatting when COPY FROMing --- statediff/indexer/database/sql/writer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 79e60f46c..faa63cd21 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -220,15 +220,20 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { if stateNode.Removed { balance = "0" } + if w.useCopyForTx(tx) { blockNum, err := strconv.ParseInt(stateNode.BlockNumber, 10, 64) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} } + balInt, err := strconv.ParseInt(balance, 10, 64) + if err != nil { + return insertError{"eth.state_cids", err, "COPY", stateNode} + } _, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, - true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) + true, balInt, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) if err != nil { return insertError{"eth.state_cids", err, "COPY", stateNode} } -- 2.45.2