From 45190527791ed988361359902d8646a857e320d8 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 16 Nov 2020 17:49:22 +0800 Subject: [PATCH 01/16] statediff: refactor metrics Remove redundant statediff/indexer/prom tooling and use existing prometheus integration. --- statediff/indexer/indexer.go | 17 ++- statediff/indexer/metrics.go | 124 +++++++++++++++ statediff/indexer/prom/db_stats_collector.go | 146 ------------------ statediff/indexer/prom/prom.go | 151 ------------------- statediff/indexer/writer.go | 9 +- statediff/service.go | 2 - 6 files changed, 138 insertions(+), 311 deletions(-) create mode 100644 statediff/indexer/metrics.go delete mode 100644 statediff/indexer/prom/db_stats_collector.go delete mode 100644 statediff/indexer/prom/prom.go diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 9a516725b..9bf539427 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -36,11 +37,12 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - "github.com/ethereum/go-ethereum/statediff/indexer/prom" "github.com/ethereum/go-ethereum/statediff/indexer/shared" sdtypes "github.com/ethereum/go-ethereum/statediff/types" ) +var indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) + // Indexer interface to allow substitution of mocks for testing type Indexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) @@ -109,12 +111,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip panic(p) } else { tDiff := time.Now().Sub(t) - prom.SetTimeMetric("t_state_store_code_processing", tDiff) + indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() err = tx.Commit() tDiff = time.Now().Sub(t) - prom.SetTimeMetric("t_postgres_commit", tDiff) + indexerMetrics.tPostgresCommit.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) } traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String()) @@ -123,7 +125,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }, } tDiff := time.Now().Sub(t) - prom.SetTimeMetric("t_free_postgres", tDiff) + indexerMetrics.tFreePostgres.Update(tDiff) + traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -133,7 +136,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Now().Sub(t) - prom.SetTimeMetric("t_header_processing", tDiff) + indexerMetrics.tHeaderProcessing.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -141,7 +144,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Now().Sub(t) - prom.SetTimeMetric("t_uncle_processing", tDiff) + indexerMetrics.tUncleProcessing.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -158,7 +161,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Now().Sub(t) - prom.SetTimeMetric("t_tx_receipt_processing", tDiff) + indexerMetrics.tTxAndRecProcessing.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go new file mode 100644 index 000000000..9da1489e8 --- /dev/null +++ b/statediff/indexer/metrics.go @@ -0,0 +1,124 @@ +package indexer + +import ( + "database/sql" + + "github.com/ethereum/go-ethereum/metrics" + "strings" +) + +const ( + indexerNamespace = "ipld_eth_indexer" // FIXME: "statediff"? +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{indexerNamespace, name} + if subsystem != "" { + parts = []string{indexerNamespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type indexerMetricsContext struct { + // The total number of processed blocks + blocks metrics.Counter + // The total number of processed transactions + transactions metrics.Counter + // The total number of processed receipts + receipts metrics.Counter + // Time spent waiting for free postgres tx + tFreePostgres metrics.Timer + // Postgres transaction commit duration + tPostgresCommit metrics.Timer + // Header processing time + tHeaderProcessing metrics.Timer + // Uncle processing time + tUncleProcessing metrics.Timer + // Tx and receipt processing time + tTxAndRecProcessing metrics.Timer + // State, storage, and code combined processing time + tStateStoreCodeProcessing metrics.Timer +} + +func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsContext { + ctx := indexerMetricsContext{ + blocks: metrics.NewCounter(), + transactions: metrics.NewCounter(), + receipts: metrics.NewCounter(), + tFreePostgres: metrics.NewTimer(), + tPostgresCommit: metrics.NewTimer(), + tHeaderProcessing: metrics.NewTimer(), + tUncleProcessing: metrics.NewTimer(), + tTxAndRecProcessing: metrics.NewTimer(), + tStateStoreCodeProcessing: metrics.NewTimer(), + } + subsys := "" // todo + reg.Register(metricName(subsys, "blocks"), ctx.blocks) + reg.Register(metricName(subsys, "transactions"), ctx.transactions) + reg.Register(metricName(subsys, "receipts"), ctx.receipts) + reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) + reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) + reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) + reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) + reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) + reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) + return ctx +} + +type dbMetrics struct { + // Maximum number of open connections to the database + maxOpen metrics.Gauge + // The number of established connections both in use and idle + open metrics.Gauge + // The number of connections currently in use + inUse metrics.Gauge + // The number of idle connections + idle metrics.Gauge + // The total number of connections waited for + waitedFor metrics.Counter + // The total time blocked waiting for a new connection + blockedSeconds metrics.Counter + // The total number of connections closed due to SetMaxIdleConns + closedMaxIdle metrics.Counter + // The total number of connections closed due to SetConnMaxLifetime + closedMaxLifetime metrics.Counter +} + +func RegisterDBMetrics(reg metrics.Registry) dbMetrics { + ctx := dbMetrics{ + maxOpen: metrics.NewGauge(), + open: metrics.NewGauge(), + inUse: metrics.NewGauge(), + idle: metrics.NewGauge(), + waitedFor: metrics.NewCounter(), + blockedSeconds: metrics.NewCounter(), + closedMaxIdle: metrics.NewCounter(), + closedMaxLifetime: metrics.NewCounter(), + } + subsys := "connections" + reg.Register(metricName(subsys, "max_open_desc"), ctx.maxOpen) + reg.Register(metricName(subsys, "open_desc"), ctx.open) + reg.Register(metricName(subsys, "in_use_desc"), ctx.inUse) + reg.Register(metricName(subsys, "idle_desc"), ctx.idle) + reg.Register(metricName(subsys, "waited_for_desc"), ctx.waitedFor) + reg.Register(metricName(subsys, "blocked_seconds_desc"), ctx.blockedSeconds) + reg.Register(metricName(subsys, "closed_max_idle_desc"), ctx.closedMaxIdle) + reg.Register(metricName(subsys, "closed_max_lifetime_desc"), ctx.closedMaxLifetime) + return ctx +} + +func (met *dbMetrics) Update(stats sql.DBStats) { + met.maxOpen.Update(int64(stats.MaxOpenConnections)) + met.open.Update(int64(stats.OpenConnections)) + met.inUse.Update(int64(stats.InUse)) + met.idle.Update(int64(stats.Idle)) + met.waitedFor.Inc(int64(stats.WaitCount)) + met.blockedSeconds.Inc(int64(stats.WaitDuration.Seconds())) + met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed)) + met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed)) +} diff --git a/statediff/indexer/prom/db_stats_collector.go b/statediff/indexer/prom/db_stats_collector.go deleted file mode 100644 index 3bab65730..000000000 --- a/statediff/indexer/prom/db_stats_collector.go +++ /dev/null @@ -1,146 +0,0 @@ -package prom - -import ( - "database/sql" - - "github.com/prometheus/client_golang/prometheus" -) - -const ( - namespace = "ipld_eth_indexer" - subsystem = "connections" -) - -// DBStatsGetter is an interface that gets sql.DBStats. -type DBStatsGetter interface { - Stats() sql.DBStats -} - -// DBStatsCollector implements the prometheus.Collector interface. -type DBStatsCollector struct { - sg DBStatsGetter - - // descriptions of exported metrics - maxOpenDesc *prometheus.Desc - openDesc *prometheus.Desc - inUseDesc *prometheus.Desc - idleDesc *prometheus.Desc - waitedForDesc *prometheus.Desc - blockedSecondsDesc *prometheus.Desc - closedMaxIdleDesc *prometheus.Desc - closedMaxLifetimeDesc *prometheus.Desc -} - -// NewDBStatsCollector creates a new DBStatsCollector. -func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector { - labels := prometheus.Labels{"db_name": dbName} - return &DBStatsCollector{ - sg: sg, - maxOpenDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "max_open"), - "Maximum number of open connections to the database.", - nil, - labels, - ), - openDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "open"), - "The number of established connections both in use and idle.", - nil, - labels, - ), - inUseDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "in_use"), - "The number of connections currently in use.", - nil, - labels, - ), - idleDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "idle"), - "The number of idle connections.", - nil, - labels, - ), - waitedForDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "waited_for"), - "The total number of connections waited for.", - nil, - labels, - ), - blockedSecondsDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"), - "The total time blocked waiting for a new connection.", - nil, - labels, - ), - closedMaxIdleDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"), - "The total number of connections closed due to SetMaxIdleConns.", - nil, - labels, - ), - closedMaxLifetimeDesc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"), - "The total number of connections closed due to SetConnMaxLifetime.", - nil, - labels, - ), - } -} - -// Describe implements the prometheus.Collector interface. -func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- c.maxOpenDesc - ch <- c.openDesc - ch <- c.inUseDesc - ch <- c.idleDesc - ch <- c.waitedForDesc - ch <- c.blockedSecondsDesc - ch <- c.closedMaxIdleDesc - ch <- c.closedMaxLifetimeDesc -} - -// Collect implements the prometheus.Collector interface. -func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) { - stats := c.sg.Stats() - - ch <- prometheus.MustNewConstMetric( - c.maxOpenDesc, - prometheus.GaugeValue, - float64(stats.MaxOpenConnections), - ) - ch <- prometheus.MustNewConstMetric( - c.openDesc, - prometheus.GaugeValue, - float64(stats.OpenConnections), - ) - ch <- prometheus.MustNewConstMetric( - c.inUseDesc, - prometheus.GaugeValue, - float64(stats.InUse), - ) - ch <- prometheus.MustNewConstMetric( - c.idleDesc, - prometheus.GaugeValue, - float64(stats.Idle), - ) - ch <- prometheus.MustNewConstMetric( - c.waitedForDesc, - prometheus.CounterValue, - float64(stats.WaitCount), - ) - ch <- prometheus.MustNewConstMetric( - c.blockedSecondsDesc, - prometheus.CounterValue, - stats.WaitDuration.Seconds(), - ) - ch <- prometheus.MustNewConstMetric( - c.closedMaxIdleDesc, - prometheus.CounterValue, - float64(stats.MaxIdleClosed), - ) - ch <- prometheus.MustNewConstMetric( - c.closedMaxLifetimeDesc, - prometheus.CounterValue, - float64(stats.MaxLifetimeClosed), - ) -} diff --git a/statediff/indexer/prom/prom.go b/statediff/indexer/prom/prom.go deleted file mode 100644 index 2b9436462..000000000 --- a/statediff/indexer/prom/prom.go +++ /dev/null @@ -1,151 +0,0 @@ -package prom - -import ( - "time" - - "github.com/jmoiron/sqlx" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -const statsSubsystem = "stats" - -var ( - metrics bool - - receipts prometheus.Counter - transactions prometheus.Counter - blocks prometheus.Counter - - lenPayloadChan prometheus.Gauge - - tPayloadDecode prometheus.Histogram - tFreePostgres prometheus.Histogram - tPostgresCommit prometheus.Histogram - tHeaderProcessing prometheus.Histogram - tUncleProcessing prometheus.Histogram - tTxAndRecProcessing prometheus.Histogram - tStateAndStoreProcessing prometheus.Histogram - tCodeAndCodeHashProcessing prometheus.Histogram -) - -// Init module initialization -func Init() { - metrics = true - - blocks = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "blocks", - Help: "The total number of processed blocks", - }) - transactions = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "transactions", - Help: "The total number of processed transactions", - }) - receipts = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "receipts", - Help: "The total number of processed receipts", - }) - - lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "len_payload_chan", - Help: "Current length of publishPayload", - }) - - tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_free_postgres", - Help: "Time spent waiting for free postgres tx", - }) - tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_postgres_commit", - Help: "Postgres transaction commit duration", - }) - tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_header_processing", - Help: "Header processing time", - }) - tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_uncle_processing", - Help: "Uncle processing time", - }) - tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_tx_receipt_processing", - Help: "Tx and receipt processing time", - }) - tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: statsSubsystem, - Name: "t_state_store_code_processing", - Help: "State, storage, and code combinedprocessing time", - }) -} - -// RegisterDBCollector create metric colletor for given connection -func RegisterDBCollector(name string, db *sqlx.DB) { - if metrics { - prometheus.Register(NewDBStatsCollector(name, db)) - } -} - -// BlockInc block counter increment -func BlockInc() { - if metrics { - blocks.Inc() - } -} - -// TransactionInc transaction counter increment -func TransactionInc() { - if metrics { - transactions.Inc() - } -} - -// ReceiptInc receipt counter increment -func ReceiptInc() { - if metrics { - receipts.Inc() - } -} - -// SetLenPayloadChan set chan length -func SetLenPayloadChan(ln int) { - if metrics { - lenPayloadChan.Set(float64(ln)) - } -} - -// SetTimeMetric time metric observation -func SetTimeMetric(name string, t time.Duration) { - if !metrics { - return - } - tAsF64 := t.Seconds() - switch name { - case "t_free_postgres": - tFreePostgres.Observe(tAsF64) - case "t_postgres_commit": - tPostgresCommit.Observe(tAsF64) - case "t_header_processing": - tHeaderProcessing.Observe(tAsF64) - case "t_uncle_processing": - tUncleProcessing.Observe(tAsF64) - case "t_tx_receipt_processing": - tTxAndRecProcessing.Observe(tAsF64) - case "t_state_store_code_processing": - tStateAndStoreProcessing.Observe(tAsF64) - } -} diff --git a/statediff/indexer/writer.go b/statediff/indexer/writer.go index 9fc4af2b1..3aa955c3e 100644 --- a/statediff/indexer/writer.go +++ b/statediff/indexer/writer.go @@ -22,7 +22,6 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - "github.com/ethereum/go-ethereum/statediff/indexer/prom" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) @@ -51,7 +50,7 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID) if err == nil { - prom.BlockInc() + indexerMetrics.blocks.Inc(1) } return headerID, err } @@ -73,7 +72,7 @@ func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payloa if err != nil { return err } - prom.TransactionInc() + indexerMetrics.transactions.Inc(1) receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)] if ok { if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil { @@ -91,7 +90,7 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model RETURNING id`, headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID) if err == nil { - prom.TransactionInc() + indexerMetrics.transactions.Inc(1) } return txID, err } @@ -101,7 +100,7 @@ func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptMod ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`, txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey) if err == nil { - prom.ReceiptInc() + indexerMetrics.receipts.Inc(1) } return err } diff --git a/statediff/service.go b/statediff/service.go index 0c2150afd..4bda6a785 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -40,7 +40,6 @@ import ( ind "github.com/ethereum/go-ethereum/statediff/indexer" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" - "github.com/ethereum/go-ethereum/statediff/indexer/prom" . "github.com/ethereum/go-ethereum/statediff/types" ) @@ -139,7 +138,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit } indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) } - prom.Init() sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, -- 2.45.2 From 4630c0c4a64833bd9bdaf5dc31c747f7bcfa79a7 Mon Sep 17 00:00:00 2001 From: ramil Date: Wed, 18 Nov 2020 20:01:37 +0300 Subject: [PATCH 02/16] get preimages in statediff service --- statediff/builder.go | 68 +++++++++++++++++--------- statediff/indexer/indexer.go | 14 ++++++ statediff/service.go | 5 +- statediff/testhelpers/mocks/builder.go | 2 +- statediff/types.go | 9 ++-- statediff/types/types.go | 7 +++ 6 files changed, 77 insertions(+), 28 deletions(-) diff --git a/statediff/builder.go b/statediff/builder.go index c9757b794..ab8811a81 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -22,6 +22,7 @@ package statediff import ( "bytes" "fmt" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -44,7 +45,7 @@ var ( type Builder interface { BuildStateDiffObject(args Args, params Params) (StateObject, error) BuildStateTrieObject(current *types.Block) (StateObject, error) - WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error + WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error } type builder struct { @@ -92,6 +93,12 @@ func codeMappingAppender(codeAndCodeHashes *[]CodeAndCodeHash) CodeSink { return nil } } +func addressMappingAppender(addressAndCodeHashes *[]AddressAndAddressHash) AddressSink { + return func(a AddressAndAddressHash) error { + *addressAndCodeHashes = append(*addressAndCodeHashes, a) + return nil + } +} // NewBuilder is used to create a statediff builder func NewBuilder(stateCache state.Database) Builder { @@ -107,21 +114,23 @@ func (sdb *builder) BuildStateTrieObject(current *types.Block) (StateObject, err return StateObject{}, fmt.Errorf("error creating trie for block %d: %v", current.Number(), err) } it := currentTrie.NodeIterator([]byte{}) - stateNodes, codeAndCodeHashes, err := sdb.buildStateTrie(it) + stateNodes, codeAndCodeHashes, addressAndAddressHashes, err := sdb.buildStateTrie(it) if err != nil { return StateObject{}, fmt.Errorf("error collecting state nodes for block %d: %v", current.Number(), err) } return StateObject{ - BlockNumber: current.Number(), - BlockHash: current.Hash(), - Nodes: stateNodes, - CodeAndCodeHashes: codeAndCodeHashes, + BlockNumber: current.Number(), + BlockHash: current.Hash(), + Nodes: stateNodes, + CodeAndCodeHashes: codeAndCodeHashes, + AddressAndAddressHashes: addressAndAddressHashes, }, nil } -func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAndCodeHash, error) { +func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAndCodeHash, []AddressAndAddressHash, error) { stateNodes := make([]StateNode, 0) codeAndCodeHashes := make([]CodeAndCodeHash, 0) + addressAndAddressHashes := make([]AddressAndAddressHash, 0) for it.Next(true) { // skip value nodes if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) { @@ -129,31 +138,32 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd } node, nodeElements, err := resolveNode(it, sdb.stateCache.TrieDB()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } switch node.NodeType { case Leaf: var account state.Account if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil { - return nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err) + return nil, nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", node.Path, err) } partialPath := trie.CompactToHex(nodeElements[0].([]byte)) valueNodePath := append(node.Path, partialPath...) encodedPath := trie.HexToCompact(valueNodePath) leafKey := encodedPath[1:] node.LeafKey = leafKey + // derive code by code hash if !bytes.Equal(account.CodeHash, nullCodeHash) { var storageNodes []StorageNode err := sdb.buildStorageNodesEventual(account.Root, nil, true, storageNodeAppender(&storageNodes)) if err != nil { - return nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err) + return nil, nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err) } node.StorageNodes = storageNodes // emit codehash => code mappings for cod codeHash := common.BytesToHash(account.CodeHash) code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash) if err != nil { - return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) + return nil, nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err) } codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{ Hash: codeHash, @@ -164,19 +174,20 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd case Extension, Branch: stateNodes = append(stateNodes, node) default: - return nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType) + return nil, nil, nil, fmt.Errorf("unexpected node type %s", node.NodeType) } } - return stateNodes, codeAndCodeHashes, it.Error() + return stateNodes, codeAndCodeHashes, addressAndAddressHashes, it.Error() } // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters func (sdb *builder) BuildStateDiffObject(args Args, params Params) (StateObject, error) { var stateNodes []StateNode var codeAndCodeHashes []CodeAndCodeHash + var addressAndCodeHashes []AddressAndAddressHash err := sdb.WriteStateDiffObject( StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}, - params, stateNodeAppender(&stateNodes), codeMappingAppender(&codeAndCodeHashes)) + params, stateNodeAppender(&stateNodes), codeMappingAppender(&codeAndCodeHashes), addressMappingAppender(&addressAndCodeHashes)) if err != nil { return StateObject{}, err } @@ -189,16 +200,16 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (StateObject, } // Writes a statediff object to output callback -func (sdb *builder) WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) WriteStateDiffObject(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { if !params.IntermediateStateNodes || len(params.WatchedAddresses) > 0 { // if we are watching only specific accounts then we are only diffing leaf nodes - return sdb.buildStateDiffWithoutIntermediateStateNodes(args, params, output, codeOutput) + return sdb.buildStateDiffWithoutIntermediateStateNodes(args, params, output, codeOutput, addressOutput) } else { - return sdb.buildStateDiffWithIntermediateStateNodes(args, params, output, codeOutput) + return sdb.buildStateDiffWithIntermediateStateNodes(args, params, output, codeOutput, addressOutput) } } -func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { // Load tries for old and new states oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot) if err != nil { @@ -246,14 +257,14 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, pa return fmt.Errorf("error building diff for updated accounts: %v", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput) + err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput, addressOutput) if err != nil { return fmt.Errorf("error building diff for created accounts: %v", err) } return nil } -func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots, params Params, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { // Load tries for old (A) and new (B) states oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot) if err != nil { @@ -300,7 +311,7 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots, return fmt.Errorf("error building diff for updated accounts: %v", err) } // build the diff nodes for created accounts - err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput) + err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput, addressOutput) if err != nil { return fmt.Errorf("error building diff for created accounts: %v", err) } @@ -497,7 +508,7 @@ func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updated // buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A // it also returns the code and codehash for created contract accounts -func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink) error { +func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink, addressOutput AddressSink) error { for _, val := range accounts { diff := StateNode{ NodeType: val.NodeType, @@ -526,6 +537,19 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey return err } } + // get address by hash + addressHash := common.BytesToHash(val.LeafKey) + address := rawdb.ReadPreimage(sdb.stateCache.TrieDB().DiskDB(), addressHash) + + log.Info("address hash %s, address %s", addressHash.String(), address) + + if err := addressOutput(AddressAndAddressHash{ + Hash: addressHash, + Address: common.BytesToAddress(address), + }); err != nil { + return err + } + if err := output(diff); err != nil { return err } diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 9a516725b..3515bc1fb 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -46,6 +46,7 @@ type Indexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error + PushAddressAndAddressHash(tx *BlockTx, addressAndCodeHash sdtypes.AddressAndAddressHash) error } // StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects @@ -393,3 +394,16 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd } return nil } + +// Publishes address and address hash pairs to the ipld database +func (sdi *StateDiffIndexer) PushAddressAndAddressHash(tx *BlockTx, addressAndCodeHash sdtypes.AddressAndAddressHash) error { + // codec doesn't matter since db key is multihash-based + mhKey, err := shared.MultihashKeyFromKeccak256(addressAndCodeHash.Hash) + if err != nil { + return err + } + if err := shared.PublishDirect(tx.dbtx, mhKey, addressAndCodeHash.Address.Bytes()); err != nil { + return err + } + return nil +} diff --git a/statediff/service.go b/statediff/service.go index 0c2150afd..f250d7c54 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -542,10 +542,13 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p codeOutput := func(c CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) } + addressOutput := func(a AddressAndAddressHash) error { + return sds.indexer.PushAddressAndAddressHash(tx, a) + } err = sds.Builder.WriteStateDiffObject(StateRoots{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, - }, params, output, codeOutput) + }, params, output, codeOutput, addressOutput) // allow dereferencing of parent, keep current locked as it should be the next parent sds.BlockChain.UnlockTrie(parentRoot) diff --git a/statediff/testhelpers/mocks/builder.go b/statediff/testhelpers/mocks/builder.go index ff9faf3ec..8aa2764aa 100644 --- a/statediff/testhelpers/mocks/builder.go +++ b/statediff/testhelpers/mocks/builder.go @@ -42,7 +42,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi } // BuildStateDiffObject mock method -func (builder *Builder) WriteStateDiffObject(args statediff.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink) error { +func (builder *Builder) WriteStateDiffObject(args statediff.StateRoots, params statediff.Params, output sdtypes.StateNodeSink, codeOutput sdtypes.CodeSink, addressOutput sdtypes.AddressSink) error { builder.StateRoots = args builder.Params = params diff --git a/statediff/types.go b/statediff/types.go index 148567dd7..be50ef5a7 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -94,10 +94,11 @@ func (sd *Payload) Encode() ([]byte, error) { // StateObject is the final output structure from the builder type StateObject struct { - BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` - BlockHash common.Hash `json:"blockHash" gencodec:"required"` - Nodes []types.StateNode `json:"nodes" gencodec:"required"` - CodeAndCodeHashes []types.CodeAndCodeHash `json:"codeMapping"` + BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` + Nodes []types.StateNode `json:"nodes" gencodec:"required"` + CodeAndCodeHashes []types.CodeAndCodeHash `json:"codeMapping"` + AddressAndAddressHashes []types.AddressAndAddressHash `json:"addressMapping"` } // AccountMap is a mapping of hex encoded path => account wrapper diff --git a/statediff/types/types.go b/statediff/types/types.go index 08e2124fa..fa8d5f862 100644 --- a/statediff/types/types.go +++ b/statediff/types/types.go @@ -56,6 +56,13 @@ type CodeAndCodeHash struct { Code []byte `json:"code"` } +// AddressAndAddressHash struct for holding keccak256(address) => address mapping +type AddressAndAddressHash struct { + Hash common.Hash `json:"addressHash"` + Address common.Address `json:"address"` +} + type StateNodeSink func(StateNode) error type StorageNodeSink func(StorageNode) error type CodeSink func(CodeAndCodeHash) error +type AddressSink func(AddressAndAddressHash) error -- 2.45.2 From 0b70abc395aec9b37927f03976daab39ead3a9d0 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 23:46:21 +0800 Subject: [PATCH 03/16] cleanup --- statediff/indexer/metrics.go | 8 ++++---- statediff/service.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index 9da1489e8..f86f96a5c 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -2,9 +2,9 @@ package indexer import ( "database/sql" + "strings" "github.com/ethereum/go-ethereum/metrics" - "strings" ) const ( @@ -24,7 +24,7 @@ func metricName(subsystem, name string) string { return strings.Join(parts, "/") } -type indexerMetricsContext struct { +type indexerMetricsHandles struct { // The total number of processed blocks blocks metrics.Counter // The total number of processed transactions @@ -45,8 +45,8 @@ type indexerMetricsContext struct { tStateStoreCodeProcessing metrics.Timer } -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsContext { - ctx := indexerMetricsContext{ +func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { + ctx := indexerMetricsHandles{ blocks: metrics.NewCounter(), transactions: metrics.NewCounter(), receipts: metrics.NewCounter(), diff --git a/statediff/service.go b/statediff/service.go index 4bda6a785..fb17edcd0 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -519,7 +519,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - log.Info("writing state diff", "block height", block.Number().Uint64()) + log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts if params.IncludeTD { -- 2.45.2 From 300dae68e5ace7068d387f55394e3cd874a721c6 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 17:14:11 +0800 Subject: [PATCH 04/16] "indexer" namespace for metrics --- statediff/indexer/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index f86f96a5c..71ea18845 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -8,7 +8,7 @@ import ( ) const ( - indexerNamespace = "ipld_eth_indexer" // FIXME: "statediff"? + indexerNamespace = "indexer" ) // Build a fully qualified metric name -- 2.45.2 From da93bdc0a30f4f595a0d2df079925a1ef91e53b0 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 19:42:46 +0800 Subject: [PATCH 05/16] add reporting loop for db metrics --- statediff/indexer/indexer.go | 25 +++++++++++++++++++- statediff/indexer/metrics.go | 44 ++++++++++++++++++------------------ statediff/service.go | 4 +++- 3 files changed, 49 insertions(+), 24 deletions(-) diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 9bf539427..c6d6f54a5 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -41,13 +41,17 @@ import ( sdtypes "github.com/ethereum/go-ethereum/statediff/types" ) -var indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) +var ( + indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) + dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) +) // Indexer interface to allow substitution of mocks for testing type Indexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error + ReportDBMetrics(delay time.Duration, quit <-chan bool) } // StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects @@ -72,6 +76,25 @@ type BlockTx struct { Close func() error } +// Reporting function to run as goroutine +func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) { + if !metrics.Enabled { + return + } + ticker := time.NewTicker(delay) + go func() { + for { + select { + case <-ticker.C: + dbMetrics.Update(sdi.dbWriter.db.Stats()) + case <-quit: + ticker.Stop() + return + } + } + }() +} + // Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) { diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index 71ea18845..5ee3426a3 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -70,7 +70,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { return ctx } -type dbMetrics struct { +type dbMetricsHandles struct { // Maximum number of open connections to the database maxOpen metrics.Gauge // The number of established connections both in use and idle @@ -82,43 +82,43 @@ type dbMetrics struct { // The total number of connections waited for waitedFor metrics.Counter // The total time blocked waiting for a new connection - blockedSeconds metrics.Counter + blockedMilliseconds metrics.Counter // The total number of connections closed due to SetMaxIdleConns closedMaxIdle metrics.Counter // The total number of connections closed due to SetConnMaxLifetime closedMaxLifetime metrics.Counter } -func RegisterDBMetrics(reg metrics.Registry) dbMetrics { - ctx := dbMetrics{ - maxOpen: metrics.NewGauge(), - open: metrics.NewGauge(), - inUse: metrics.NewGauge(), - idle: metrics.NewGauge(), - waitedFor: metrics.NewCounter(), - blockedSeconds: metrics.NewCounter(), - closedMaxIdle: metrics.NewCounter(), - closedMaxLifetime: metrics.NewCounter(), +func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { + ctx := dbMetricsHandles{ + maxOpen: metrics.NewGauge(), + open: metrics.NewGauge(), + inUse: metrics.NewGauge(), + idle: metrics.NewGauge(), + waitedFor: metrics.NewCounter(), + blockedMilliseconds: metrics.NewCounter(), + closedMaxIdle: metrics.NewCounter(), + closedMaxLifetime: metrics.NewCounter(), } subsys := "connections" - reg.Register(metricName(subsys, "max_open_desc"), ctx.maxOpen) - reg.Register(metricName(subsys, "open_desc"), ctx.open) - reg.Register(metricName(subsys, "in_use_desc"), ctx.inUse) - reg.Register(metricName(subsys, "idle_desc"), ctx.idle) - reg.Register(metricName(subsys, "waited_for_desc"), ctx.waitedFor) - reg.Register(metricName(subsys, "blocked_seconds_desc"), ctx.blockedSeconds) - reg.Register(metricName(subsys, "closed_max_idle_desc"), ctx.closedMaxIdle) - reg.Register(metricName(subsys, "closed_max_lifetime_desc"), ctx.closedMaxLifetime) + reg.Register(metricName(subsys, "max_open"), ctx.maxOpen) + reg.Register(metricName(subsys, "open"), ctx.open) + reg.Register(metricName(subsys, "in_use"), ctx.inUse) + reg.Register(metricName(subsys, "idle"), ctx.idle) + reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor) + reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) + reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) + reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) return ctx } -func (met *dbMetrics) Update(stats sql.DBStats) { +func (met *dbMetricsHandles) Update(stats sql.DBStats) { met.maxOpen.Update(int64(stats.MaxOpenConnections)) met.open.Update(int64(stats.OpenConnections)) met.inUse.Update(int64(stats.InUse)) met.idle.Update(int64(stats.Idle)) met.waitedFor.Inc(int64(stats.WaitCount)) - met.blockedSeconds.Inc(int64(stats.WaitDuration.Seconds())) + met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds())) met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed)) met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed)) } diff --git a/statediff/service.go b/statediff/service.go index fb17edcd0..cee137bf5 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -22,6 +22,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -73,7 +74,7 @@ type IService interface { // Main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params) + Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error // Method to get state diff object at specific block @@ -414,6 +415,7 @@ func (sds *Service) Start() error { if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) + go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) } return nil -- 2.45.2 From 75069685be5417579c7e2119dac6a68c28276843 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 19:42:55 +0800 Subject: [PATCH 06/16] doc --- statediff/indexer/indexer.go | 2 ++ statediff/indexer/writer.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index c6d6f54a5..2a6bac3f2 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -14,6 +14,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +// This package provides an interface for pushing and indexing IPLD objects into a Postgres database +// Metrics for reporting processing and connection stats are defined in ./metrics.go package indexer import ( diff --git a/statediff/indexer/writer.go b/statediff/indexer/writer.go index 3aa955c3e..24bef7ed3 100644 --- a/statediff/indexer/writer.go +++ b/statediff/indexer/writer.go @@ -17,9 +17,9 @@ package indexer import ( - "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/shared" @@ -29,7 +29,7 @@ var ( nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") ) -// Indexer satisfies the Indexer interface for ethereum +// Handles processing and writing of indexed IPLD objects to Postgres type PostgresCIDWriter struct { db *postgres.DB } -- 2.45.2 From c1c41ef5301656ac3426e2b64b2699b04390aaa2 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 23:47:48 +0800 Subject: [PATCH 07/16] metrics for statediff stats --- statediff/metrics.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ statediff/service.go | 8 +++++++ 2 files changed, 62 insertions(+) create mode 100644 statediff/metrics.go diff --git a/statediff/metrics.go b/statediff/metrics.go new file mode 100644 index 000000000..d75c583a0 --- /dev/null +++ b/statediff/metrics.go @@ -0,0 +1,54 @@ +package statediff + +import ( + "strings" + + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + namespace = "statediff" +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{namespace, name} + if subsystem != "" { + parts = []string{namespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type statediffMetricsHandles struct { + // Height of latest synced by core.BlockChain + // FIXME + lastSyncHeight metrics.Gauge + // Height of the latest block received from chainEvent channel + lastEventHeight metrics.Gauge + // Height of latest state diff + lastStatediffHeight metrics.Gauge + // Current length of chainEvent channels + serviceLoopChannelLen metrics.Gauge + writeLoopChannelLen metrics.Gauge +} + +func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { + ctx := statediffMetricsHandles{ + lastSyncHeight: metrics.NewGauge(), + lastEventHeight: metrics.NewGauge(), + lastStatediffHeight: metrics.NewGauge(), + serviceLoopChannelLen: metrics.NewGauge(), + writeLoopChannelLen: metrics.NewGauge(), + } + subsys := "" // todo + reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) + reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) + reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) + reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) + reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) + return ctx +} diff --git a/statediff/service.go b/statediff/service.go index cee137bf5..c3f270bb4 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -55,6 +56,8 @@ var writeLoopParams = Params{ IncludeCode: true, } +var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) + type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -193,8 +196,10 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) currentBlock := chainEvent.Block + statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) @@ -205,6 +210,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) continue } + // TODO: how to handle with concurrent workers + statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case err := <-errCh: log.Warn("Error from chain event subscription", "error", err) sds.close() @@ -226,6 +233,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("Event received from chainEventCh", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { -- 2.45.2 From 72a47729bbd4b73b55a678d52668a7c19a8a0a50 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 18:19:39 +0800 Subject: [PATCH 08/16] metrics namespace/subsystem = statediff/{indexer,service} --- statediff/indexer/metrics.go | 8 ++++---- statediff/metrics.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index 5ee3426a3..fc0727eda 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -8,7 +8,7 @@ import ( ) const ( - indexerNamespace = "indexer" + namespace = "statediff" ) // Build a fully qualified metric name @@ -16,9 +16,9 @@ func metricName(subsystem, name string) string { if name == "" { return "" } - parts := []string{indexerNamespace, name} + parts := []string{namespace, name} if subsystem != "" { - parts = []string{indexerNamespace, subsystem, name} + parts = []string{namespace, subsystem, name} } // Prometheus uses _ but geth metrics uses / and replaces return strings.Join(parts, "/") @@ -57,7 +57,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { tTxAndRecProcessing: metrics.NewTimer(), tStateStoreCodeProcessing: metrics.NewTimer(), } - subsys := "" // todo + subsys := "indexer" reg.Register(metricName(subsys, "blocks"), ctx.blocks) reg.Register(metricName(subsys, "transactions"), ctx.transactions) reg.Register(metricName(subsys, "receipts"), ctx.receipts) diff --git a/statediff/metrics.go b/statediff/metrics.go index d75c583a0..7e7d6e328 100644 --- a/statediff/metrics.go +++ b/statediff/metrics.go @@ -44,7 +44,7 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { serviceLoopChannelLen: metrics.NewGauge(), writeLoopChannelLen: metrics.NewGauge(), } - subsys := "" // todo + subsys := "service" reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) -- 2.45.2 From 02c7e785c5e82de7a8a9ec4695a8fca56df8e617 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 17:37:38 +0800 Subject: [PATCH 09/16] statediff: use a worker pool (for direct writes) --- cmd/geth/config.go | 11 ++++++- cmd/geth/main.go | 1 + cmd/utils/flags.go | 9 ++++-- statediff/service.go | 69 +++++++++++++++++++++++++++++++++----------- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index eab7f699c..bb4d2f2e2 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } + } else { + if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { + utils.Fatalf("Must pass DB parameters if enabling statediff write loop") + } } - utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) + params := statediff.ServiceParams{ + DBParams: dbParams, + EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name), + NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name), + } + utils.RegisterStateDiffService(stack, backend, params) } // Configure GraphQL if requested diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 0e657f636..a30720897 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -162,6 +162,7 @@ var ( utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, configFileFlag, } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6863364db..57481200a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -746,6 +746,10 @@ var ( Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", } + StateDiffWorkersFlag = cli.UintFlag{ + Name: "statediff.workers", + Usage: "Number of concurrent workers to use during statediff processing (0 = 1)", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C } // RegisterStateDiffService configures and registers a service to stream state diff data over RPC -// dbParams are: Postgres connection URI, Node ID, client name -func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) { - if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil { +func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) { + if err := statediff.New(stack, ethServ, params); err != nil { Fatalf("Failed to register the Statediff service: %v", err) } } diff --git a/statediff/service.go b/statediff/service.go index c3f270bb4..121279f2e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -92,6 +92,15 @@ type IService interface { WriteLoop(chainEventCh chan core.ChainEvent) } +// Wraps consructor parameters +type ServiceParams struct { + DBParams *DBParams + // Whether to enable writing state diffs directly to track blochain head + EnableWriteLoop bool + // Size of the worker pool + NumWorkers uint +} + // Service is the underlying struct for the state diffing service type Service struct { // Used to sync access to the Subscriptions @@ -107,41 +116,56 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock lastBlockCache + lastBlock blockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer // Whether to enable writing state diffs directly to track blochain head enableWriteLoop bool + // Size of the worker pool + numWorkers uint } // Wrap the cached last block for safe access from different service loops -type lastBlockCache struct { +type blockCache struct { sync.Mutex - block *types.Block + blocks map[common.Hash]*types.Block + maxSize uint +} + +func newBlockCache(max uint) blockCache { + return blockCache{ + blocks: make(map[common.Hash]*types.Block), + maxSize: max, + } } // New creates a new statediff.Service -func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { blockChain := ethServ.BlockChain() var indexer ind.Indexer - if dbParams != nil { + if params.DBParams != nil { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10), ChainID: blockChain.Config().ChainID.Uint64(), - ID: dbParams.ID, - ClientName: dbParams.ClientName, + ID: params.DBParams.ID, + ClientName: params.DBParams.ClientName, } // TODO: pass max idle, open, lifetime? - db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info) + db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info) if err != nil { return err } indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) } + workers := params.NumWorkers + if workers == 0 { + workers = 1 + } sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, @@ -149,8 +173,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), + lastBlock: newBlockCache(workers), indexer: indexer, - enableWriteLoop: enableWriteLoop, + enableWriteLoop: params.EnableWriteLoop, + numWorkers: workers, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -174,16 +200,20 @@ func (sds *Service) APIs() []rpc.API { } } -func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { +// Return the parent block of currentBlock, using the cached block if available +func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block - if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { - parentBlock = lbc.block + if block, ok := lbc.blocks[parentHash]; ok { + parentBlock = block + if len(lbc.blocks) > int(lbc.maxSize) { + delete(lbc.blocks, parentHash) + } } else { parentBlock = bc.GetBlockByHash(parentHash) } - lbc.block = currentBlock + lbc.blocks[currentBlock.Hash()] = currentBlock lbc.Unlock() return parentBlock } @@ -417,13 +447,18 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Start() error { log.Info("Starting statediff service") - chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - go sds.Loop(chainEventCh) + { + // TODO: also use worker pool here? + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + go sds.Loop(chainEventCh) + } if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) - go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) - go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + for worker := uint(0); worker < sds.numWorkers; worker++ { + go sds.WriteLoop(chainEventCh) + } } return nil -- 2.45.2 From bf02717f6076fd3c8ae07da925525982c0e8ffe3 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 20:25:45 +0800 Subject: [PATCH 10/16] fix test --- statediff/service.go | 10 +++++----- statediff/service_test.go | 3 +++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 121279f2e..7127cf5da 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -116,7 +116,7 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock blockCache + BlockCache blockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects @@ -134,7 +134,7 @@ type blockCache struct { maxSize uint } -func newBlockCache(max uint) blockCache { +func NewBlockCache(max uint) blockCache { return blockCache{ blocks: make(map[common.Hash]*types.Block), maxSize: max, @@ -173,7 +173,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), - lastBlock: newBlockCache(workers), + BlockCache: NewBlockCache(workers), indexer: indexer, enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, @@ -230,7 +230,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) currentBlock := chainEvent.Block statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) - parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue @@ -271,7 +271,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { continue } currentBlock := chainEvent.Block - parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) continue diff --git a/statediff/service_test.go b/statediff/service_test.go index ef3c1bb2c..ca9a483a5 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -94,6 +94,7 @@ func testErrorInChainEventLoop(t *testing.T) { QuitChan: serviceQuit, Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload, 2) quitChan := make(chan bool) @@ -177,6 +178,7 @@ func testErrorInBlockLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload) quitChan := make(chan bool) @@ -256,6 +258,7 @@ func testErrorInStateDiffAt(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams) if err != nil { -- 2.45.2 From 5c35c86a8b9509e42ae88f56820a4e4031ca5b21 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 00:52:49 +0800 Subject: [PATCH 11/16] fix chain event subscription --- statediff/service.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 7127cf5da..4b4a476c7 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -218,16 +218,34 @@ func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types. return parentBlock } +type workerParams struct { + chainEventCh <-chan core.ChainEvent + errCh <-chan error + wg *sync.WaitGroup + id uint +} + func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() + var wg sync.WaitGroup + wg.Add(int(sds.numWorkers)) + for worker := uint(0); worker < sds.numWorkers; worker++ { + params := workerParams{chainEventCh: chainEventCh, errCh: errCh, wg: &wg, id: worker} + go sds.writeLoopWorker(params) + } + wg.Wait() +} + +func (sds *Service) writeLoopWorker(params workerParams) { + defer params.wg.Done() for { select { //Notify chain event channel of events - case chainEvent := <-chainEventCh: - statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) - log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) + case chainEvent := <-params.chainEventCh: + statediffMetrics.writeLoopChannelLen.Update(int64(len(params.chainEventCh))) + log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) @@ -237,12 +255,12 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { } err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { - log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) + log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue } // TODO: how to handle with concurrent workers statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) - case err := <-errCh: + case err := <-params.errCh: log.Warn("Error from chain event subscription", "error", err) sds.close() return @@ -456,9 +474,7 @@ func (sds *Service) Start() error { if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - for worker := uint(0); worker < sds.numWorkers; worker++ { - go sds.WriteLoop(chainEventCh) - } + go sds.WriteLoop(chainEventCh) } return nil -- 2.45.2 From dd6f9ccabe43301c304098b7e081fbb1496a584d Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 16:48:41 +0800 Subject: [PATCH 12/16] log tweaks --- statediff/service.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 4b4a476c7..b29afc993 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -253,9 +253,10 @@ func (sds *Service) writeLoopWorker(params workerParams) { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } + log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { - log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) + log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue } // TODO: how to handle with concurrent workers @@ -282,7 +283,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { //Notify chain event channel of events case chainEvent := <-chainEventCh: statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) - log.Debug("Event received from chainEventCh", "event", chainEvent) + log.Debug("Loop(): chain event received", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { log.Debug("Currently no subscribers to the statediffing service; processing is halted") @@ -291,7 +292,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { currentBlock := chainEvent.Block parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) + log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } sds.streamStateDiff(currentBlock, parentBlock.Root()) @@ -532,7 +533,7 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod log.Info("sending code and codehash", "block height", blockNumber) currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) if err != nil { - log.Error("error creating trie for block", "number", current.Number(), "err", err) + log.Error("error creating trie for block", "block height", current.Number(), "err", err) close(quitChan) return } @@ -580,7 +581,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - log.Info("Writing state diff", "block height", block.Number().Uint64()) + // log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts if params.IncludeTD { -- 2.45.2 From ae2f32f9d8b4b6e740d33e19c104fbdcd00c363a Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 21:51:30 +0800 Subject: [PATCH 13/16] func name --- statediff/service.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index b29afc993..0bd31fdde 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -200,8 +200,9 @@ func (sds *Service) APIs() []rpc.API { } } -// Return the parent block of currentBlock, using the cached block if available -func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { +// Return the parent block of currentBlock, using the cached block if available; +// and cache the passed block +func (lbc *blockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block @@ -248,7 +249,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) - parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue @@ -290,7 +291,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { continue } currentBlock := chainEvent.Block - parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue -- 2.45.2 From 8c9d8cbc3f971de47b019f1516e4d9f4dc790f06 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 22:09:41 +0800 Subject: [PATCH 14/16] unused import --- statediff/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/statediff/service.go b/statediff/service.go index 0bd31fdde..6a6e0bd34 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -22,7 +22,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" -- 2.45.2 From ab841a9abe512f0faf76c54ead3ae74dfcb60693 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 22:58:27 +0800 Subject: [PATCH 15/16] intermediate chain event channel for metrics --- statediff/service.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 6a6e0bd34..e36ef4428 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -230,9 +230,25 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() var wg sync.WaitGroup + // Process metrics for chain events, then forward to workers + chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case chainEvent := <-chainEventCh: + statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) + chainEventFwd <- chainEvent + case <-sds.QuitChan: + return + } + } + }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventCh, errCh: errCh, wg: &wg, id: worker} + params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -244,10 +260,8 @@ func (sds *Service) writeLoopWorker(params workerParams) { select { //Notify chain event channel of events case chainEvent := <-params.chainEventCh: - statediffMetrics.writeLoopChannelLen.Update(int64(len(params.chainEventCh))) log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block - statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) -- 2.45.2 From 83c35833553e8d807b54b06c10321c69427d2aa0 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 25 Nov 2020 18:25:25 +0800 Subject: [PATCH 16/16] cleanup --- cmd/geth/usage.go | 1 + statediff/service.go | 11 ++++------- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index e3b52daa1..3acd40145 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -242,6 +242,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, }, }, { diff --git a/statediff/service.go b/statediff/service.go index e36ef4428..2f16fc4ab 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -276,11 +276,11 @@ func (sds *Service) writeLoopWorker(params workerParams) { // TODO: how to handle with concurrent workers statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case err := <-params.errCh: - log.Warn("Error from chain event subscription", "error", err) + log.Warn("Error from chain event subscription", "error", err, "worker", params.id) sds.close() return case <-sds.QuitChan: - log.Info("Quitting the statediff writing process") + log.Info("Quitting the statediff writing process", "worker", params.id) sds.close() return } @@ -480,11 +480,8 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Start() error { log.Info("Starting statediff service") - { - // TODO: also use worker pool here? - chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - go sds.Loop(chainEventCh) - } + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + go sds.Loop(chainEventCh) if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) -- 2.45.2