diff --git a/cmd/geth/config.go b/cmd/geth/config.go index bb4d2f2e2..eab7f699c 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -191,17 +191,8 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } - } else { - if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { - utils.Fatalf("Must pass DB parameters if enabling statediff write loop") - } } - params := statediff.ServiceParams{ - DBParams: dbParams, - EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name), - NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name), - } - utils.RegisterStateDiffService(stack, backend, params) + utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) } // Configure GraphQL if requested diff --git a/cmd/geth/main.go b/cmd/geth/main.go index a30720897..0e657f636 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -162,7 +162,6 @@ var ( utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, - utils.StateDiffWorkersFlag, configFileFlag, } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 3acd40145..e3b52daa1 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -242,7 +242,6 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, - utils.StateDiffWorkersFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 57481200a..6863364db 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -746,10 +746,6 @@ var ( Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", } - StateDiffWorkersFlag = cli.UintFlag{ - Name: "statediff.workers", - Usage: "Number of concurrent workers to use during statediff processing (0 = 1)", - } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1748,8 +1744,9 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C } // RegisterStateDiffService configures and registers a service to stream state diff data over RPC -func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) { - if err := statediff.New(stack, ethServ, params); err != nil { +// dbParams are: Postgres connection URI, Node ID, client name +func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) { + if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil { Fatalf("Failed to register the Statediff service: %v", err) } } diff --git a/statediff/builder.go b/statediff/builder.go index c9757b794..ab8811a81 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -22,6 +22,7 @@ package statediff import ( "bytes" "fmt" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -44,7 +45,7 @@ var ( type Builder interface { BuildStateDiffObject(args Args, params Params) (StateObject, error) BuildStateTrieObject(current *types.Block) (StateObject, error) - WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error + WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error } type builder struct { @@ -92,6 +93,12 @@ func codeMappingAppender(codeAndCodeHashes *[]CodeAndCodeHash) CodeSink { return nil } } +func addressMappingAppender(addressAndCodeHashes *[]AddressAndAddressHash) AddressSink { + return func(a AddressAndAddressHash) error { + *addressAndCodeHashes = append(*addressAndCodeHashes, a) + return nil + } +} // NewBuilder is used to create a statediff builder func NewBuilder(stateCache state.Database) Builder { @@ -107,21 +114,23 @@ func (sdb *builder) BuildStateTrieObject(current *types.Block) (StateObject, err return StateObject{}, fmt.Errorf("error creating trie for block %d: %v", current.Number(), err) } it := currentTrie.NodeIterator([]byte{}) - stateNodes, codeAndCodeHashes, err := sdb.buildStateTrie(it) + stateNodes, codeAndCodeHashes, addressAndAddressHashes, err := sdb.buildStateTrie(it) if err != nil { return StateObject{}, fmt.Errorf("error collecting state nodes for block %d: %v", current.Number(), err) } return StateObject{ - BlockNumber: current.Number(), - BlockHash: current.Hash(), - Nodes: stateNodes, - CodeAndCodeHashes: codeAndCodeHashes, + BlockNumber: current.Number(), + BlockHash: current.Hash(), + Nodes: stateNodes, + CodeAndCodeHashes: codeAndCodeHashes, + AddressAndAddressHashes: addressAndAddressHashes, }, nil } -func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAndCodeHash, error) { +func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAndCodeHash, []AddressAndAddressHash, error) { stateNodes := make([]StateNode, 0) codeAndCodeHashes := make([]CodeAndCodeHash, 0) + addressAndAddressHashes := make([]AddressAndAddressHash, 0) for it.Next(true) { // skip value nodes if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) { @@ -129,31 +138,32 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd } node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } switch node.NodeType { case Leaf: var account state.Account if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { - return nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err) + return nil, nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err) } partialPath := trie.CompactToHex(nodeElements[0].([]byte)) valueNodePath := append(node.Path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] node.LeafKey = leafKey + // derive code by code hash if !bytes.Equal(account.CodeHash, nullCodeHash) { var storageNodes []StorageNode err := sdb.buildStorageNodesEventual(account.Root, nil, true, storageNodeAppender(&storageNodes)) if err != nil { - return nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err) + return nil, nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err) } node.StorageNodes = storageNodes // emit codehash => code mappings for cod codeHash := common.BytesToHash(account.CodeHash) code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash) if err != nil { - return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) + return nil, nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) } codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{ Hash: codeHash, @@ -164,19 +174,20 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd case Extension, Branch: stateNodes = append(stateNodes, node) default: - return nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType) + return nil, nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType) } } - return stateNodes, codeAndCodeHashes, it.Error() + return stateNodes, codeAndCodeHashes, addressAndAddressHashes, it.Error() } // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters func (sdb *builder) BuildStateDiffObject(args Args, params Params) (StateObject, error) { var stateNodes []StateNode var codeAndCodeHashes []CodeAndCodeHash + var addressAndCodeHashes []AddressAndAddressHash err := sdb.WriteStateDiffObject( StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}, - params, stateNodeAppender(&stateNodes), codeMappingAppender(&codeAndCodeHashes)) + params, stateNodeAppender(&stateNodes), codeMappingAppender(&codeAndCodeHashes), addressMappingAppender(&addressAndCodeHashes)) if err != nil { return StateObject{}, err } @@ -189,16 +200,16 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (StateObject, } // Writes a statediff object to output callback -func (sdb *builder) WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { if !params.IntermediateStateNodes || len(params.WatchedAddresses) > 0 { // if we are watching only specific accounts then we are only diffing leaf nodes - return sdb.buildStateDiffWithoutIntermediateStateNodes(args, params, output, codeOutput) + return sdb.buildStateDiffWithoutIntermediateStateNodes(args, params, output, codeOutput, addressOutput) } else { - return sdb.buildStateDiffWithIntermediateStateNodes(args, params, output, codeOutput) + return sdb.buildStateDiffWithIntermediateStateNodes(args, params, output, codeOutput, addressOutput) } } -func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { // Load tries for old and new states oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot) if err != nil { @@ -246,14 +257,14 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, pa return fmt.Errorf("error building diff for updated accounts: %v", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput) + err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput, addressOutput) if err != nil { return fmt.Errorf("error building diff for created accounts: %v", err) } return nil } -func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { // Load tries for old (A) and new (B) states oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot) if err != nil { @@ -300,7 +311,7 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots, return fmt.Errorf("error building diff for updated accounts: %v", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput) + err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput, addressOutput) if err != nil { return fmt.Errorf("error building diff for created accounts: %v", err) } @@ -497,7 +508,7 @@ func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updated // buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A // it also returns the code and codehash for created contract accounts -func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { for _, val := range accounts { diff := StateNode{ NodeType: val.NodeType, @@ -526,6 +537,19 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey return err } } + // get address by hash + addressHash := common.BytesToHash(val.LeafKey) + address := rawdb.ReadPreimage(sdb.stateCache.TrieDB().DiskDB(), addressHash) + + log.Info("address hash %s, address %s", addressHash.String(), address) + + if err := addressOutput(AddressAndAddressHash{ + Hash: addressHash, + Address: common.BytesToAddress(address), + }); err != nil { + return err + } + if err := output(diff); err != nil { return err } diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 2a6bac3f2..3515bc1fb 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -14,8 +14,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// This package provides an interface for pushing and indexing IPLD objects into a Postgres database -// Metrics for reporting processing and connection stats are defined in ./metrics.go package indexer import ( @@ -28,7 +26,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -39,21 +36,17 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/prom" "github.com/ethereum/go-ethereum/statediff/indexer/shared" sdtypes "github.com/ethereum/go-ethereum/statediff/types" ) -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) - dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) -) - // Indexer interface to allow substitution of mocks for testing type Indexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error - ReportDBMetrics(delay time.Duration, quit <-chan bool) + PushAddressAndAddressHash(tx *BlockTx, addressAndCodeHash sdtypes.AddressAndAddressHash) error } // StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects @@ -78,25 +71,6 @@ type BlockTx struct { Close func() error } -// Reporting function to run as goroutine -func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) { - if !metrics.Enabled { - return - } - ticker := time.NewTicker(delay) - go func() { - for { - select { - case <-ticker.C: - dbMetrics.Update(sdi.dbWriter.db.Stats()) - case <-quit: - ticker.Stop() - return - } - } - }() -} - // Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) { @@ -136,12 +110,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip panic(p) } else { tDiff := time.Now().Sub(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + prom.SetTimeMetric("t_state_store_code_processing", tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() err = tx.Commit() tDiff = time.Now().Sub(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + prom.SetTimeMetric("t_postgres_commit", tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) } traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String()) @@ -150,8 +124,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }, } tDiff := time.Now().Sub(t) - indexerMetrics.tFreePostgres.Update(tDiff) - + prom.SetTimeMetric("t_free_postgres", tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -161,7 +134,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Now().Sub(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + prom.SetTimeMetric("t_header_processing", tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -169,7 +142,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Now().Sub(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + prom.SetTimeMetric("t_uncle_processing", tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -186,7 +159,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Now().Sub(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + prom.SetTimeMetric("t_tx_receipt_processing", tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -421,3 +394,16 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd } return nil } + +// Publishes address and address hash pairs to the ipld database +func (sdi *StateDiffIndexer) PushAddressAndAddressHash(tx *BlockTx, addressAndCodeHash sdtypes.AddressAndAddressHash) error { + // codec doesn't matter since db key is multihash-based + mhKey, err := shared.MultihashKeyFromKeccak256(addressAndCodeHash.Hash) + if err != nil { + return err + } + if err := shared.PublishDirect(tx.dbtx, mhKey, addressAndCodeHash.Address.Bytes()); err != nil { + return err + } + return nil +} diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go deleted file mode 100644 index fc0727eda..000000000 --- a/statediff/indexer/metrics.go +++ /dev/null @@ -1,124 +0,0 @@ -package indexer - -import ( - "database/sql" - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} - -type dbMetricsHandles struct { - // Maximum number of open connections to the database - maxOpen metrics.Gauge - // The number of established connections both in use and idle - open metrics.Gauge - // The number of connections currently in use - inUse metrics.Gauge - // The number of idle connections - idle metrics.Gauge - // The total number of connections waited for - waitedFor metrics.Counter - // The total time blocked waiting for a new connection - blockedMilliseconds metrics.Counter - // The total number of connections closed due to SetMaxIdleConns - closedMaxIdle metrics.Counter - // The total number of connections closed due to SetConnMaxLifetime - closedMaxLifetime metrics.Counter -} - -func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { - ctx := dbMetricsHandles{ - maxOpen: metrics.NewGauge(), - open: metrics.NewGauge(), - inUse: metrics.NewGauge(), - idle: metrics.NewGauge(), - waitedFor: metrics.NewCounter(), - blockedMilliseconds: metrics.NewCounter(), - closedMaxIdle: metrics.NewCounter(), - closedMaxLifetime: metrics.NewCounter(), - } - subsys := "connections" - reg.Register(metricName(subsys, "max_open"), ctx.maxOpen) - reg.Register(metricName(subsys, "open"), ctx.open) - reg.Register(metricName(subsys, "in_use"), ctx.inUse) - reg.Register(metricName(subsys, "idle"), ctx.idle) - reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor) - reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) - reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) - reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) - return ctx -} - -func (met *dbMetricsHandles) Update(stats sql.DBStats) { - met.maxOpen.Update(int64(stats.MaxOpenConnections)) - met.open.Update(int64(stats.OpenConnections)) - met.inUse.Update(int64(stats.InUse)) - met.idle.Update(int64(stats.Idle)) - met.waitedFor.Inc(int64(stats.WaitCount)) - met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds())) - met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed)) - met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed)) -} diff --git a/statediff/indexer/prom/db_stats_collector.go b/statediff/indexer/prom/db_stats_collector.go new file mode 100644 index 000000000..3bab65730 --- /dev/null +++ b/statediff/indexer/prom/db_stats_collector.go @@ -0,0 +1,146 @@ +package prom + +import ( + "database/sql" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "ipld_eth_indexer" + subsystem = "connections" +) + +// DBStatsGetter is an interface that gets sql.DBStats. +type DBStatsGetter interface { + Stats() sql.DBStats +} + +// DBStatsCollector implements the prometheus.Collector interface. +type DBStatsCollector struct { + sg DBStatsGetter + + // descriptions of exported metrics + maxOpenDesc *prometheus.Desc + openDesc *prometheus.Desc + inUseDesc *prometheus.Desc + idleDesc *prometheus.Desc + waitedForDesc *prometheus.Desc + blockedSecondsDesc *prometheus.Desc + closedMaxIdleDesc *prometheus.Desc + closedMaxLifetimeDesc *prometheus.Desc +} + +// NewDBStatsCollector creates a new DBStatsCollector. +func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector { + labels := prometheus.Labels{"db_name": dbName} + return &DBStatsCollector{ + sg: sg, + maxOpenDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "max_open"), + "Maximum number of open connections to the database.", + nil, + labels, + ), + openDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "open"), + "The number of established connections both in use and idle.", + nil, + labels, + ), + inUseDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "in_use"), + "The number of connections currently in use.", + nil, + labels, + ), + idleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "idle"), + "The number of idle connections.", + nil, + labels, + ), + waitedForDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "waited_for"), + "The total number of connections waited for.", + nil, + labels, + ), + blockedSecondsDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"), + "The total time blocked waiting for a new connection.", + nil, + labels, + ), + closedMaxIdleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"), + "The total number of connections closed due to SetMaxIdleConns.", + nil, + labels, + ), + closedMaxLifetimeDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"), + "The total number of connections closed due to SetConnMaxLifetime.", + nil, + labels, + ), + } +} + +// Describe implements the prometheus.Collector interface. +func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.maxOpenDesc + ch <- c.openDesc + ch <- c.inUseDesc + ch <- c.idleDesc + ch <- c.waitedForDesc + ch <- c.blockedSecondsDesc + ch <- c.closedMaxIdleDesc + ch <- c.closedMaxLifetimeDesc +} + +// Collect implements the prometheus.Collector interface. +func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) { + stats := c.sg.Stats() + + ch <- prometheus.MustNewConstMetric( + c.maxOpenDesc, + prometheus.GaugeValue, + float64(stats.MaxOpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.openDesc, + prometheus.GaugeValue, + float64(stats.OpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.inUseDesc, + prometheus.GaugeValue, + float64(stats.InUse), + ) + ch <- prometheus.MustNewConstMetric( + c.idleDesc, + prometheus.GaugeValue, + float64(stats.Idle), + ) + ch <- prometheus.MustNewConstMetric( + c.waitedForDesc, + prometheus.CounterValue, + float64(stats.WaitCount), + ) + ch <- prometheus.MustNewConstMetric( + c.blockedSecondsDesc, + prometheus.CounterValue, + stats.WaitDuration.Seconds(), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxIdleDesc, + prometheus.CounterValue, + float64(stats.MaxIdleClosed), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxLifetimeDesc, + prometheus.CounterValue, + float64(stats.MaxLifetimeClosed), + ) +} diff --git a/statediff/indexer/prom/prom.go b/statediff/indexer/prom/prom.go new file mode 100644 index 000000000..2b9436462 --- /dev/null +++ b/statediff/indexer/prom/prom.go @@ -0,0 +1,151 @@ +package prom + +import ( + "time" + + "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const statsSubsystem = "stats" + +var ( + metrics bool + + receipts prometheus.Counter + transactions prometheus.Counter + blocks prometheus.Counter + + lenPayloadChan prometheus.Gauge + + tPayloadDecode prometheus.Histogram + tFreePostgres prometheus.Histogram + tPostgresCommit prometheus.Histogram + tHeaderProcessing prometheus.Histogram + tUncleProcessing prometheus.Histogram + tTxAndRecProcessing prometheus.Histogram + tStateAndStoreProcessing prometheus.Histogram + tCodeAndCodeHashProcessing prometheus.Histogram +) + +// Init module initialization +func Init() { + metrics = true + + blocks = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "blocks", + Help: "The total number of processed blocks", + }) + transactions = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "transactions", + Help: "The total number of processed transactions", + }) + receipts = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "receipts", + Help: "The total number of processed receipts", + }) + + lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "len_payload_chan", + Help: "Current length of publishPayload", + }) + + tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_free_postgres", + Help: "Time spent waiting for free postgres tx", + }) + tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_postgres_commit", + Help: "Postgres transaction commit duration", + }) + tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_header_processing", + Help: "Header processing time", + }) + tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_uncle_processing", + Help: "Uncle processing time", + }) + tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_tx_receipt_processing", + Help: "Tx and receipt processing time", + }) + tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_state_store_code_processing", + Help: "State, storage, and code combinedprocessing time", + }) +} + +// RegisterDBCollector create metric colletor for given connection +func RegisterDBCollector(name string, db *sqlx.DB) { + if metrics { + prometheus.Register(NewDBStatsCollector(name, db)) + } +} + +// BlockInc block counter increment +func BlockInc() { + if metrics { + blocks.Inc() + } +} + +// TransactionInc transaction counter increment +func TransactionInc() { + if metrics { + transactions.Inc() + } +} + +// ReceiptInc receipt counter increment +func ReceiptInc() { + if metrics { + receipts.Inc() + } +} + +// SetLenPayloadChan set chan length +func SetLenPayloadChan(ln int) { + if metrics { + lenPayloadChan.Set(float64(ln)) + } +} + +// SetTimeMetric time metric observation +func SetTimeMetric(name string, t time.Duration) { + if !metrics { + return + } + tAsF64 := t.Seconds() + switch name { + case "t_free_postgres": + tFreePostgres.Observe(tAsF64) + case "t_postgres_commit": + tPostgresCommit.Observe(tAsF64) + case "t_header_processing": + tHeaderProcessing.Observe(tAsF64) + case "t_uncle_processing": + tUncleProcessing.Observe(tAsF64) + case "t_tx_receipt_processing": + tTxAndRecProcessing.Observe(tAsF64) + case "t_state_store_code_processing": + tStateAndStoreProcessing.Observe(tAsF64) + } +} diff --git a/statediff/indexer/writer.go b/statediff/indexer/writer.go index 24bef7ed3..9fc4af2b1 100644 --- a/statediff/indexer/writer.go +++ b/statediff/indexer/writer.go @@ -17,11 +17,12 @@ package indexer import ( + "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/prom" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) @@ -29,7 +30,7 @@ var ( nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") ) -// Handles processing and writing of indexed IPLD objects to Postgres +// Indexer satisfies the Indexer interface for ethereum type PostgresCIDWriter struct { db *postgres.DB } @@ -50,7 +51,7 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID) if err == nil { - indexerMetrics.blocks.Inc(1) + prom.BlockInc() } return headerID, err } @@ -72,7 +73,7 @@ func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payloa if err != nil { return err } - indexerMetrics.transactions.Inc(1) + prom.TransactionInc() receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)] if ok { if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil { @@ -90,7 +91,7 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model RETURNING id`, headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID) if err == nil { - indexerMetrics.transactions.Inc(1) + prom.TransactionInc() } return txID, err } @@ -100,7 +101,7 @@ func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptMod ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`, txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey) if err == nil { - indexerMetrics.receipts.Inc(1) + prom.ReceiptInc() } return err } diff --git a/statediff/metrics.go b/statediff/metrics.go deleted file mode 100644 index 7e7d6e328..000000000 --- a/statediff/metrics.go +++ /dev/null @@ -1,54 +0,0 @@ -package statediff - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type statediffMetricsHandles struct { - // Height of latest synced by core.BlockChain - // FIXME - lastSyncHeight metrics.Gauge - // Height of the latest block received from chainEvent channel - lastEventHeight metrics.Gauge - // Height of latest state diff - lastStatediffHeight metrics.Gauge - // Current length of chainEvent channels - serviceLoopChannelLen metrics.Gauge - writeLoopChannelLen metrics.Gauge -} - -func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { - ctx := statediffMetricsHandles{ - lastSyncHeight: metrics.NewGauge(), - lastEventHeight: metrics.NewGauge(), - lastStatediffHeight: metrics.NewGauge(), - serviceLoopChannelLen: metrics.NewGauge(), - writeLoopChannelLen: metrics.NewGauge(), - } - subsys := "service" - reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) - reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) - reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) - reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) - reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) - return ctx -} diff --git a/statediff/service.go b/statediff/service.go index 2f16fc4ab..f250d7c54 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -41,6 +40,7 @@ import ( ind "github.com/ethereum/go-ethereum/statediff/indexer" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/prom" . "github.com/ethereum/go-ethereum/statediff/types" ) @@ -55,8 +55,6 @@ var writeLoopParams = Params{ IncludeCode: true, } -var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) - type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -76,7 +74,7 @@ type IService interface { // Main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) + Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error // Method to get state diff object at specific block @@ -91,15 +89,6 @@ type IService interface { WriteLoop(chainEventCh chan core.ChainEvent) } -// Wraps consructor parameters -type ServiceParams struct { - DBParams *DBParams - // Whether to enable writing state diffs directly to track blochain head - EnableWriteLoop bool - // Size of the worker pool - NumWorkers uint -} - // Service is the underlying struct for the state diffing service type Service struct { // Used to sync access to the Subscriptions @@ -115,56 +104,42 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - BlockCache blockCache + lastBlock lastBlockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer // Whether to enable writing state diffs directly to track blochain head enableWriteLoop bool - // Size of the worker pool - numWorkers uint } // Wrap the cached last block for safe access from different service loops -type blockCache struct { +type lastBlockCache struct { sync.Mutex - blocks map[common.Hash]*types.Block - maxSize uint -} - -func NewBlockCache(max uint) blockCache { - return blockCache{ - blocks: make(map[common.Hash]*types.Block), - maxSize: max, - } + block *types.Block } // New creates a new statediff.Service -// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { -func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { +func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { blockChain := ethServ.BlockChain() var indexer ind.Indexer - if params.DBParams != nil { + if dbParams != nil { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10), ChainID: blockChain.Config().ChainID.Uint64(), - ID: params.DBParams.ID, - ClientName: params.DBParams.ClientName, + ID: dbParams.ID, + ClientName: dbParams.ClientName, } // TODO: pass max idle, open, lifetime? - db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info) + db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info) if err != nil { return err } indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) } - workers := params.NumWorkers - if workers == 0 { - workers = 1 - } + prom.Init() sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, @@ -172,10 +147,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), - BlockCache: NewBlockCache(workers), indexer: indexer, - enableWriteLoop: params.EnableWriteLoop, - numWorkers: workers, + enableWriteLoop: enableWriteLoop, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -199,88 +172,46 @@ func (sds *Service) APIs() []rpc.API { } } -// Return the parent block of currentBlock, using the cached block if available; -// and cache the passed block -func (lbc *blockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block { +func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block - if block, ok := lbc.blocks[parentHash]; ok { - parentBlock = block - if len(lbc.blocks) > int(lbc.maxSize) { - delete(lbc.blocks, parentHash) - } + if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { + parentBlock = lbc.block } else { parentBlock = bc.GetBlockByHash(parentHash) } - lbc.blocks[currentBlock.Hash()] = currentBlock + lbc.block = currentBlock lbc.Unlock() return parentBlock } -type workerParams struct { - chainEventCh <-chan core.ChainEvent - errCh <-chan error - wg *sync.WaitGroup - id uint -} - func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() - var wg sync.WaitGroup - // Process metrics for chain events, then forward to workers - chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case chainEvent := <-chainEventCh: - statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) - statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) - chainEventFwd <- chainEvent - case <-sds.QuitChan: - return - } - } - }() - wg.Add(int(sds.numWorkers)) - for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker} - go sds.writeLoopWorker(params) - } - wg.Wait() -} - -func (sds *Service) writeLoopWorker(params workerParams) { - defer params.wg.Done() for { select { //Notify chain event channel of events - case chainEvent := <-params.chainEventCh: - log.Debug("WriteLoop(): chain event received", "event", chainEvent) + case chainEvent := <-chainEventCh: + log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) currentBlock := chainEvent.Block - parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) + parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } - log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { - log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) + log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) continue } - // TODO: how to handle with concurrent workers - statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) - case err := <-params.errCh: - log.Warn("Error from chain event subscription", "error", err, "worker", params.id) + case err := <-errCh: + log.Warn("Error from chain event subscription", "error", err) sds.close() return case <-sds.QuitChan: - log.Info("Quitting the statediff writing process", "worker", params.id) + log.Info("Quitting the statediff writing process") sds.close() return } @@ -296,17 +227,16 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: - statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) - log.Debug("Loop(): chain event received", "event", chainEvent) + log.Debug("Event received from chainEventCh", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { log.Debug("Currently no subscribers to the statediffing service; processing is halted") continue } currentBlock := chainEvent.Block - parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) + parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) + log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) continue } sds.streamStateDiff(currentBlock, parentBlock.Root()) @@ -485,8 +415,7 @@ func (sds *Service) Start() error { if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) - chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - go sds.WriteLoop(chainEventCh) + go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) } return nil @@ -544,7 +473,7 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod log.Info("sending code and codehash", "block height", blockNumber) currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) if err != nil { - log.Error("error creating trie for block", "block height", current.Number(), "err", err) + log.Error("error creating trie for block", "number", current.Number(), "err", err) close(quitChan) return } @@ -592,7 +521,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - // log.Info("Writing state diff", "block height", block.Number().Uint64()) + log.Info("writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts if params.IncludeTD { @@ -613,10 +542,13 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p codeOutput := func(c CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) } + addressOutput := func(a AddressAndAddressHash) error { + return sds.indexer.PushAddressAndAddressHash(tx, a) + } err = sds.Builder.WriteStateDiffObject(StateRoots{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, - }, params, output, codeOutput) + }, params, output, codeOutput, addressOutput) // allow dereferencing of parent, keep current locked as it should be the next parent sds.BlockChain.UnlockTrie(parentRoot) diff --git a/statediff/service_test.go b/statediff/service_test.go index ca9a483a5..ef3c1bb2c 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -94,7 +94,6 @@ func testErrorInChainEventLoop(t *testing.T) { QuitChan: serviceQuit, Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), - BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload, 2) quitChan := make(chan bool) @@ -178,7 +177,6 @@ func testErrorInBlockLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), - BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload) quitChan := make(chan bool) @@ -258,7 +256,6 @@ func testErrorInStateDiffAt(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), - BlockCache: statediff.NewBlockCache(1), } stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams) if err != nil { diff --git a/statediff/testhelpers/mocks/builder.go b/statediff/testhelpers/mocks/builder.go index ff9faf3ec..8aa2764aa 100644 --- a/statediff/testhelpers/mocks/builder.go +++ b/statediff/testhelpers/mocks/builder.go @@ -42,7 +42,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi } // BuildStateDiffObject mock method -func (builder *Builder) WriteStateDiffObject(args statediff.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error { +func (builder *Builder) WriteStateDiffObject(args statediff.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink, addressOutput sdtypes.AddressSink) error { builder.StateRoots = args builder.Params = params diff --git a/statediff/types.go b/statediff/types.go index 148567dd7..be50ef5a7 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -94,10 +94,11 @@ func (sd *Payload) Encode() ([]byte, error) { // StateObject is the final output structure from the builder type StateObject struct { - BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` - BlockHash common.Hash `json:"blockHash" gencodec:"required"` - Nodes []types.StateNode `json:"nodes" gencodec:"required"` - CodeAndCodeHashes []types.CodeAndCodeHash `json:"codeMapping"` + BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` + Nodes []types.StateNode `json:"nodes" gencodec:"required"` + CodeAndCodeHashes []types.CodeAndCodeHash `json:"codeMapping"` + AddressAndAddressHashes []types.AddressAndAddressHash `json:"addressMapping"` } // AccountMap is a mapping of hex encoded path => account wrapper diff --git a/statediff/types/types.go b/statediff/types/types.go index 08e2124fa..fa8d5f862 100644 --- a/statediff/types/types.go +++ b/statediff/types/types.go @@ -56,6 +56,13 @@ type CodeAndCodeHash struct { Code []byte `json:"code"` } +// AddressAndAddressHash struct for holding keccak256(address) => address mapping +type AddressAndAddressHash struct { + Hash common.Hash `json:"addressHash"` + Address common.Address `json:"address"` +} + type StateNodeSink func(StateNode) error type StorageNodeSink func(StorageNode) error type CodeSink func(CodeAndCodeHash) error +type AddressSink func(AddressAndAddressHash) error