diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 2cc7e2e0a..213d9b55d 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -22,6 +22,8 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" + ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ipfs/go-cid" @@ -32,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" @@ -43,10 +44,6 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { dump io.WriteCloser @@ -111,7 +108,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip close(self.quit) close(self.iplds) tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() if err := self.flush(); err != nil { @@ -120,7 +117,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return err } tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) log.Debug(traceMsg) @@ -130,7 +127,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip go blockTx.cache() tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -142,7 +139,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -151,7 +148,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -172,7 +169,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/dump/metrics.go b/statediff/indexer/database/dump/metrics.go deleted file mode 100644 index 700e42dc0..000000000 --- a/statediff/indexer/database/dump/metrics.go +++ /dev/null @@ -1,94 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dump - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 2d4d997e3..f099ca9b0 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -25,6 +25,8 @@ import ( "path/filepath" "strconv" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" + blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" @@ -254,7 +256,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase) csw.rows <- tableRow{types.TableHeader, values} - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) } func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { @@ -269,14 +271,14 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) csw.rows <- tableRow{types.TableTransaction, values} - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) } func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { var values []interface{} values = append(values, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) csw.rows <- tableRow{types.TableAccessListElement, values} - indexerMetrics.accessListEntries.Inc(1) + metrics.IndexerMetrics.AccessListEntriesCounter.Inc(1) } func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { @@ -284,7 +286,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) csw.rows <- tableRow{types.TableReceipt, values} - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { @@ -293,7 +295,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { values = append(values, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data) csw.rows <- tableRow{types.TableLog, values} - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 8103a68f4..86c64862c 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -34,9 +34,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { fileWriter FileWriter @@ -175,12 +171,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip BlockNumber: block.Number().String(), submit: func(self *BatchTx, err error) error { tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() sdi.fileWriter.Flush() tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) log.Debug(traceMsg) @@ -188,21 +184,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip }, } tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() // write header, collect headerID headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty) tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // write uncles sdi.processUncles(headerID, block.Number(), uncleNodes) tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() @@ -224,7 +220,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/file/metrics.go b/statediff/indexer/database/file/metrics.go deleted file mode 100644 index ca6e88f2b..000000000 --- a/statediff/indexer/database/file/metrics.go +++ /dev/null @@ -1,94 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package file - -import ( - "strings" - - "github.com/ethereum/go-ethereum/metrics" -) - -const ( - namespace = "statediff" -) - -// Build a fully qualified metric name -func metricName(subsystem, name string) string { - if name == "" { - return "" - } - parts := []string{namespace, name} - if subsystem != "" { - parts = []string{namespace, subsystem, name} - } - // Prometheus uses _ but geth metrics uses / and replaces - return strings.Join(parts, "/") -} - -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter - // The total number of processed transactions - transactions metrics.Counter - // The total number of processed receipts - receipts metrics.Counter - // The total number of processed logs - logs metrics.Counter - // The total number of access list entries processed - accessListEntries metrics.Counter - // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer - // Postgres transaction commit duration - tPostgresCommit metrics.Timer - // Header processing time - tHeaderProcessing metrics.Timer - // Uncle processing time - tUncleProcessing metrics.Timer - // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer - // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer -} - -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), - } - subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) - return ctx -} diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go index b947fada9..6098a8cb5 100644 --- a/statediff/indexer/database/file/sql_writer.go +++ b/statediff/indexer/database/file/sql_writer.go @@ -24,6 +24,8 @@ import ( "math/big" "os" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" + blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" @@ -214,7 +216,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase) sqw.stmts <- []byte(stmt) - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) } func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { @@ -225,26 +227,26 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) { func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) { sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)) - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) } func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys))) - indexerMetrics.accessListEntries.Inc(1) + metrics.IndexerMetrics.AccessListEntriesCounter.Inc(1) } func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)) - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data)) - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } } diff --git a/statediff/indexer/database/sql/metrics.go b/statediff/indexer/database/metrics/metrics.go similarity index 59% rename from statediff/indexer/database/sql/metrics.go rename to statediff/indexer/database/metrics/metrics.go index b0946a722..95c1a959b 100644 --- a/statediff/indexer/database/sql/metrics.go +++ b/statediff/indexer/database/metrics/metrics.go @@ -14,10 +14,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package sql +package metrics import ( "strings" + "time" + + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) @@ -26,6 +29,11 @@ const ( namespace = "statediff" ) +var ( + IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) + DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry) +) + // Build a fully qualified metric name func metricName(subsystem, name string) string { if name == "" { @@ -39,57 +47,59 @@ func metricName(subsystem, name string) string { return strings.Join(parts, "/") } -type indexerMetricsHandles struct { - // The total number of processed blocks - blocks metrics.Counter +type IndexerMetricsHandles struct { + // The total number of processed BlocksCounter + BlocksCounter metrics.Counter // The total number of processed transactions - transactions metrics.Counter + TransactionsCounter metrics.Counter // The total number of processed receipts - receipts metrics.Counter + ReceiptsCounter metrics.Counter // The total number of processed logs - logs metrics.Counter + LogsCounter metrics.Counter // The total number of access list entries processed - accessListEntries metrics.Counter + AccessListEntriesCounter metrics.Counter // Time spent waiting for free postgres tx - tFreePostgres metrics.Timer + FreePostgresTimer metrics.Timer // Postgres transaction commit duration - tPostgresCommit metrics.Timer + PostgresCommitTimer metrics.Timer // Header processing time - tHeaderProcessing metrics.Timer + HeaderProcessingTimer metrics.Timer // Uncle processing time - tUncleProcessing metrics.Timer + UncleProcessingTimer metrics.Timer // Tx and receipt processing time - tTxAndRecProcessing metrics.Timer + TxAndRecProcessingTimer metrics.Timer // State, storage, and code combined processing time - tStateStoreCodeProcessing metrics.Timer + StateStoreCodeProcessingTimer metrics.Timer } -func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { - ctx := indexerMetricsHandles{ - blocks: metrics.NewCounter(), - transactions: metrics.NewCounter(), - receipts: metrics.NewCounter(), - logs: metrics.NewCounter(), - accessListEntries: metrics.NewCounter(), - tFreePostgres: metrics.NewTimer(), - tPostgresCommit: metrics.NewTimer(), - tHeaderProcessing: metrics.NewTimer(), - tUncleProcessing: metrics.NewTimer(), - tTxAndRecProcessing: metrics.NewTimer(), - tStateStoreCodeProcessing: metrics.NewTimer(), +func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { + ctx := IndexerMetricsHandles{ + BlocksCounter: metrics.NewCounter(), + TransactionsCounter: metrics.NewCounter(), + ReceiptsCounter: metrics.NewCounter(), + LogsCounter: metrics.NewCounter(), + AccessListEntriesCounter: metrics.NewCounter(), + FreePostgresTimer: metrics.NewTimer(), + PostgresCommitTimer: metrics.NewTimer(), + HeaderProcessingTimer: metrics.NewTimer(), + UncleProcessingTimer: metrics.NewTimer(), + TxAndRecProcessingTimer: metrics.NewTimer(), + StateStoreCodeProcessingTimer: metrics.NewTimer(), } subsys := "indexer" - reg.Register(metricName(subsys, "blocks"), ctx.blocks) - reg.Register(metricName(subsys, "transactions"), ctx.transactions) - reg.Register(metricName(subsys, "receipts"), ctx.receipts) - reg.Register(metricName(subsys, "logs"), ctx.logs) - reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) - reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) - reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) - reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing) - reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing) - reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing) - reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing) + reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter) + reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter) + reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter) + reg.Register(metricName(subsys, "logs"), ctx.LogsCounter) + reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter) + reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer) + reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer) + reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer) + reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer) + reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer) + reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer) + + log.Debug("Registering statediff indexer metrics.") return ctx } @@ -132,10 +142,24 @@ func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) + + log.Debug("Registering statediff DB metrics.") return ctx } -func (met *dbMetricsHandles) Update(stats Stats) { +// DbStats interface to accommodate different concrete sql stats types +type DbStats interface { + MaxOpen() int64 + Open() int64 + InUse() int64 + Idle() int64 + WaitCount() int64 + WaitDuration() time.Duration + MaxIdleClosed() int64 + MaxLifetimeClosed() int64 +} + +func (met *dbMetricsHandles) Update(stats DbStats) { met.maxOpen.Update(stats.MaxOpen()) met.open.Update(stats.Open()) met.inUse.Update(stats.InUse()) diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 9e23405a0..66893ce2a 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" @@ -45,11 +46,6 @@ import ( var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} -var ( - indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) - dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) -) - // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql type StateDiffIndexer struct { ctx context.Context @@ -76,7 +72,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo for { select { case <-ticker.C: - dbMetrics.Update(sdi.dbWriter.db.Stats()) + metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats()) case <-quit: ticker.Stop() return @@ -160,7 +156,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip rollback(sdi.ctx, tx) } else { tDiff := time.Since(t) - indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) + metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() if err := self.flush(); err != nil { @@ -171,7 +167,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } err = tx.Commit(sdi.ctx) tDiff = time.Since(t) - indexerMetrics.tPostgresCommit.Update(tDiff) + metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) } traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) @@ -182,7 +178,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip go blockTx.cache() tDiff := time.Since(t) - indexerMetrics.tFreePostgres.Update(tDiff) + metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) t = time.Now() @@ -194,7 +190,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tHeaderProcessing.Update(tDiff) + metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles @@ -203,7 +199,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tUncleProcessing.Update(tDiff) + metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs @@ -224,7 +220,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip return nil, err } tDiff = time.Since(t) - indexerMetrics.tTxAndRecProcessing.Update(tDiff) + metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) t = time.Now() diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 613a09251..1e7278db6 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -19,7 +19,8 @@ package sql import ( "context" "io" - "time" + + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" ) // Database interfaces required by the sql indexer @@ -35,7 +36,7 @@ type Driver interface { Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error Begin(ctx context.Context) (Tx, error) - Stats() Stats + Stats() metrics.DbStats NodeID() string Context() context.Context io.Closer @@ -74,15 +75,3 @@ type ScannableRow interface { type Result interface { RowsAffected() (int64, error) } - -// Stats interface to accommodate different concrete sql stats types -type Stats interface { - MaxOpen() int64 - Open() int64 - InUse() int64 - Idle() int64 - WaitCount() int64 - WaitDuration() time.Duration - MaxIdleClosed() int64 - MaxLifetimeClosed() int64 -} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 9f1c4d571..6b75559df 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -27,6 +27,7 @@ import ( "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) @@ -139,7 +140,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) { return pgxTxWrapper{tx: tx}, nil } -func (pgx *PGXDriver) Stats() sql.Stats { +func (pgx *PGXDriver) Stats() metrics.DbStats { stats := pgx.pool.Stat() return pgxStatsWrapper{stats: stats} } @@ -173,43 +174,43 @@ type pgxStatsWrapper struct { stats *pgxpool.Stat } -// MaxOpen satisfies sql.Stats +// MaxOpen satisfies metrics.DbStats func (s pgxStatsWrapper) MaxOpen() int64 { return int64(s.stats.MaxConns()) } -// Open satisfies sql.Stats +// Open satisfies metrics.DbStats func (s pgxStatsWrapper) Open() int64 { return int64(s.stats.TotalConns()) } -// InUse satisfies sql.Stats +// InUse satisfies metrics.DbStats func (s pgxStatsWrapper) InUse() int64 { return int64(s.stats.AcquiredConns()) } -// Idle satisfies sql.Stats +// Idle satisfies metrics.DbStats func (s pgxStatsWrapper) Idle() int64 { return int64(s.stats.IdleConns()) } -// WaitCount satisfies sql.Stats +// WaitCount satisfies metrics.DbStats func (s pgxStatsWrapper) WaitCount() int64 { return s.stats.EmptyAcquireCount() } -// WaitDuration satisfies sql.Stats +// WaitDuration satisfies metrics.DbStats func (s pgxStatsWrapper) WaitDuration() time.Duration { return s.stats.AcquireDuration() } -// MaxIdleClosed satisfies sql.Stats +// MaxIdleClosed satisfies metrics.DbStats func (s pgxStatsWrapper) MaxIdleClosed() int64 { // this stat isn't supported by pgxpool, but we don't want to panic return 0 } -// MaxLifetimeClosed satisfies sql.Stats +// MaxLifetimeClosed satisfies metrics.DbStats func (s pgxStatsWrapper) MaxLifetimeClosed() int64 { return s.stats.CanceledAcquireCount() } diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 9f1753e67..529e7f7c8 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -23,6 +23,7 @@ import ( "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/node" ) @@ -98,7 +99,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) { return sqlxTxWrapper{tx: tx}, nil } -func (driver *SQLXDriver) Stats() sql.Stats { +func (driver *SQLXDriver) Stats() metrics.DbStats { stats := driver.db.Stats() return sqlxStatsWrapper{stats: stats} } @@ -122,42 +123,42 @@ type sqlxStatsWrapper struct { stats coresql.DBStats } -// MaxOpen satisfies sql.Stats +// MaxOpen satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxOpen() int64 { return int64(s.stats.MaxOpenConnections) } -// Open satisfies sql.Stats +// Open satisfies metrics.DbStats func (s sqlxStatsWrapper) Open() int64 { return int64(s.stats.OpenConnections) } -// InUse satisfies sql.Stats +// InUse satisfies metrics.DbStats func (s sqlxStatsWrapper) InUse() int64 { return int64(s.stats.InUse) } -// Idle satisfies sql.Stats +// Idle satisfies metrics.DbStats func (s sqlxStatsWrapper) Idle() int64 { return int64(s.stats.Idle) } -// WaitCount satisfies sql.Stats +// WaitCount satisfies metrics.DbStats func (s sqlxStatsWrapper) WaitCount() int64 { return s.stats.WaitCount } -// WaitDuration satisfies sql.Stats +// WaitDuration satisfies metrics.DbStats func (s sqlxStatsWrapper) WaitDuration() time.Duration { return s.stats.WaitDuration } -// MaxIdleClosed satisfies sql.Stats +// MaxIdleClosed satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxIdleClosed() int64 { return s.stats.MaxIdleClosed } -// MaxLifetimeClosed satisfies sql.Stats +// MaxLifetimeClosed satisfies metrics.DbStats func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 { return s.stats.MaxLifetimeClosed } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index c1a67f2f8..36b0703dc 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -19,6 +19,8 @@ package sql import ( "fmt" + "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/statediff/indexer/models" ) @@ -57,7 +59,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } - indexerMetrics.blocks.Inc(1) + metrics.IndexerMetrics.BlocksCounter.Inc(1) return nil } @@ -85,7 +87,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { if err != nil { return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} } - indexerMetrics.transactions.Inc(1) + metrics.IndexerMetrics.TransactionsCounter.Inc(1) return nil } @@ -100,7 +102,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL if err != nil { return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement} } - indexerMetrics.accessListEntries.Inc(1) + metrics.IndexerMetrics.AccessListEntriesCounter.Inc(1) return nil } @@ -115,7 +117,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { if err != nil { return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} } - indexerMetrics.receipts.Inc(1) + metrics.IndexerMetrics.ReceiptsCounter.Inc(1) return nil } @@ -131,7 +133,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { if err != nil { return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} } - indexerMetrics.logs.Inc(1) + metrics.IndexerMetrics.LogsCounter.Inc(1) } return nil }