fix: Unify and fix indexer metrics. (#314)

This commit is contained in:
Thomas E Lackey 2023-02-07 16:26:36 -06:00 committed by GitHub
parent 9951e39155
commit 9ed622218d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 130 additions and 308 deletions

View File

@ -22,6 +22,8 @@ import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ipfs/go-cid"
@ -32,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
@ -43,10 +44,6 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
)
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
dump io.WriteCloser
@ -111,7 +108,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
@ -120,7 +117,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return err
}
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
@ -130,7 +127,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
go blockTx.cache()
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@ -142,7 +139,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tHeaderProcessing.Update(tDiff)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@ -151,7 +148,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@ -172,7 +169,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -1,94 +0,0 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dump
import (
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// The total number of processed logs
logs metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
return ctx
}

View File

@ -25,6 +25,8 @@ import (
"path/filepath"
"strconv"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
@ -254,7 +256,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
csw.rows <- tableRow{types.TableHeader, values}
indexerMetrics.blocks.Inc(1)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
@ -269,14 +271,14 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
csw.rows <- tableRow{types.TableTransaction, values}
indexerMetrics.transactions.Inc(1)
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
var values []interface{}
values = append(values, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
csw.rows <- tableRow{types.TableAccessListElement, values}
indexerMetrics.accessListEntries.Inc(1)
metrics.IndexerMetrics.AccessListEntriesCounter.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
@ -284,7 +286,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot)
csw.rows <- tableRow{types.TableReceipt, values}
indexerMetrics.receipts.Inc(1)
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
@ -293,7 +295,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
values = append(values, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data)
csw.rows <- tableRow{types.TableLog, values}
indexerMetrics.logs.Inc(1)
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}

View File

@ -34,9 +34,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -53,10 +53,6 @@ const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address,
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
)
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter FileWriter
@ -175,12 +171,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
@ -188,21 +184,21 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
},
}
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// write header, collect headerID
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
tDiff = time.Since(t)
indexerMetrics.tHeaderProcessing.Update(tDiff)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// write uncles
sdi.processUncles(headerID, block.Number(), uncleNodes)
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -224,7 +220,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
metrics.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -1,94 +0,0 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file
import (
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
namespace = "statediff"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{namespace, name}
if subsystem != "" {
parts = []string{namespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// The total number of processed logs
logs metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
return ctx
}

View File

@ -24,6 +24,8 @@ import (
"math/big"
"os"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
@ -214,7 +216,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
}
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
@ -225,26 +227,26 @@ func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value))
indexerMetrics.transactions.Inc(1)
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
formatPostgresStringArray(accessListElement.StorageKeys)))
indexerMetrics.accessListEntries.Inc(1)
metrics.IndexerMetrics.AccessListEntriesCounter.Inc(1)
}
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot))
indexerMetrics.receipts.Inc(1)
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data))
indexerMetrics.logs.Inc(1)
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
}

View File

@ -14,10 +14,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package sql
package metrics
import (
"strings"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
@ -26,6 +29,11 @@ const (
namespace = "statediff"
)
var (
IndexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
DBMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
@ -39,57 +47,59 @@ func metricName(subsystem, name string) string {
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
type IndexerMetricsHandles struct {
// The total number of processed BlocksCounter
BlocksCounter metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
TransactionsCounter metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
ReceiptsCounter metrics.Counter
// The total number of processed logs
logs metrics.Counter
LogsCounter metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
AccessListEntriesCounter metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
FreePostgresTimer metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
PostgresCommitTimer metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
HeaderProcessingTimer metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
UncleProcessingTimer metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
TxAndRecProcessingTimer metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
StateStoreCodeProcessingTimer metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
ctx := IndexerMetricsHandles{
BlocksCounter: metrics.NewCounter(),
TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(),
HeaderProcessingTimer: metrics.NewTimer(),
UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer)
reg.Register(metricName(subsys, "t_header_processing"), ctx.HeaderProcessingTimer)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
log.Debug("Registering statediff indexer metrics.")
return ctx
}
@ -132,10 +142,24 @@ func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
log.Debug("Registering statediff DB metrics.")
return ctx
}
func (met *dbMetricsHandles) Update(stats Stats) {
// DbStats interface to accommodate different concrete sql stats types
type DbStats interface {
MaxOpen() int64
Open() int64
InUse() int64
Idle() int64
WaitCount() int64
WaitDuration() time.Duration
MaxIdleClosed() int64
MaxLifetimeClosed() int64
}
func (met *dbMetricsHandles) Update(stats DbStats) {
met.maxOpen.Update(stats.MaxOpen())
met.open.Update(stats.Open())
met.inUse.Update(stats.InUse())

View File

@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
metrics2 "github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -45,11 +46,6 @@ import (
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
)
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of an SQL sql
type StateDiffIndexer struct {
ctx context.Context
@ -76,7 +72,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
for {
select {
case <-ticker.C:
dbMetrics.Update(sdi.dbWriter.db.Stats())
metrics2.DBMetrics.Update(sdi.dbWriter.db.Stats())
case <-quit:
ticker.Stop()
return
@ -160,7 +156,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
@ -171,7 +167,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
@ -182,7 +178,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
go blockTx.cache()
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@ -194,7 +190,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tHeaderProcessing.Update(tDiff)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@ -203,7 +199,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tUncleProcessing.Update(tDiff)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@ -224,7 +220,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Since(t)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -19,7 +19,8 @@ package sql
import (
"context"
"io"
"time"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
)
// Database interfaces required by the sql indexer
@ -35,7 +36,7 @@ type Driver interface {
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Begin(ctx context.Context) (Tx, error)
Stats() Stats
Stats() metrics.DbStats
NodeID() string
Context() context.Context
io.Closer
@ -74,15 +75,3 @@ type ScannableRow interface {
type Result interface {
RowsAffected() (int64, error)
}
// Stats interface to accommodate different concrete sql stats types
type Stats interface {
MaxOpen() int64
Open() int64
InUse() int64
Idle() int64
WaitCount() int64
WaitDuration() time.Duration
MaxIdleClosed() int64
MaxLifetimeClosed() int64
}

View File

@ -27,6 +27,7 @@ import (
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
)
@ -139,7 +140,7 @@ func (pgx *PGXDriver) Begin(ctx context.Context) (sql.Tx, error) {
return pgxTxWrapper{tx: tx}, nil
}
func (pgx *PGXDriver) Stats() sql.Stats {
func (pgx *PGXDriver) Stats() metrics.DbStats {
stats := pgx.pool.Stat()
return pgxStatsWrapper{stats: stats}
}
@ -173,43 +174,43 @@ type pgxStatsWrapper struct {
stats *pgxpool.Stat
}
// MaxOpen satisfies sql.Stats
// MaxOpen satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxConns())
}
// Open satisfies sql.Stats
// Open satisfies metrics.DbStats
func (s pgxStatsWrapper) Open() int64 {
return int64(s.stats.TotalConns())
}
// InUse satisfies sql.Stats
// InUse satisfies metrics.DbStats
func (s pgxStatsWrapper) InUse() int64 {
return int64(s.stats.AcquiredConns())
}
// Idle satisfies sql.Stats
// Idle satisfies metrics.DbStats
func (s pgxStatsWrapper) Idle() int64 {
return int64(s.stats.IdleConns())
}
// WaitCount satisfies sql.Stats
// WaitCount satisfies metrics.DbStats
func (s pgxStatsWrapper) WaitCount() int64 {
return s.stats.EmptyAcquireCount()
}
// WaitDuration satisfies sql.Stats
// WaitDuration satisfies metrics.DbStats
func (s pgxStatsWrapper) WaitDuration() time.Duration {
return s.stats.AcquireDuration()
}
// MaxIdleClosed satisfies sql.Stats
// MaxIdleClosed satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxIdleClosed() int64 {
// this stat isn't supported by pgxpool, but we don't want to panic
return 0
}
// MaxLifetimeClosed satisfies sql.Stats
// MaxLifetimeClosed satisfies metrics.DbStats
func (s pgxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.CanceledAcquireCount()
}

View File

@ -23,6 +23,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
)
@ -98,7 +99,7 @@ func (driver *SQLXDriver) Begin(_ context.Context) (sql.Tx, error) {
return sqlxTxWrapper{tx: tx}, nil
}
func (driver *SQLXDriver) Stats() sql.Stats {
func (driver *SQLXDriver) Stats() metrics.DbStats {
stats := driver.db.Stats()
return sqlxStatsWrapper{stats: stats}
}
@ -122,42 +123,42 @@ type sqlxStatsWrapper struct {
stats coresql.DBStats
}
// MaxOpen satisfies sql.Stats
// MaxOpen satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxOpen() int64 {
return int64(s.stats.MaxOpenConnections)
}
// Open satisfies sql.Stats
// Open satisfies metrics.DbStats
func (s sqlxStatsWrapper) Open() int64 {
return int64(s.stats.OpenConnections)
}
// InUse satisfies sql.Stats
// InUse satisfies metrics.DbStats
func (s sqlxStatsWrapper) InUse() int64 {
return int64(s.stats.InUse)
}
// Idle satisfies sql.Stats
// Idle satisfies metrics.DbStats
func (s sqlxStatsWrapper) Idle() int64 {
return int64(s.stats.Idle)
}
// WaitCount satisfies sql.Stats
// WaitCount satisfies metrics.DbStats
func (s sqlxStatsWrapper) WaitCount() int64 {
return s.stats.WaitCount
}
// WaitDuration satisfies sql.Stats
// WaitDuration satisfies metrics.DbStats
func (s sqlxStatsWrapper) WaitDuration() time.Duration {
return s.stats.WaitDuration
}
// MaxIdleClosed satisfies sql.Stats
// MaxIdleClosed satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxIdleClosed() int64 {
return s.stats.MaxIdleClosed
}
// MaxLifetimeClosed satisfies sql.Stats
// MaxLifetimeClosed satisfies metrics.DbStats
func (s sqlxStatsWrapper) MaxLifetimeClosed() int64 {
return s.stats.MaxLifetimeClosed
}

View File

@ -19,6 +19,8 @@ package sql
import (
"fmt"
"github.com/ethereum/go-ethereum/statediff/indexer/database/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)
@ -57,7 +59,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
}
indexerMetrics.blocks.Inc(1)
metrics.IndexerMetrics.BlocksCounter.Inc(1)
return nil
}
@ -85,7 +87,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
if err != nil {
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
}
indexerMetrics.transactions.Inc(1)
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
return nil
}
@ -100,7 +102,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL
if err != nil {
return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement}
}
indexerMetrics.accessListEntries.Inc(1)
metrics.IndexerMetrics.AccessListEntriesCounter.Inc(1)
return nil
}
@ -115,7 +117,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
if err != nil {
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
}
indexerMetrics.receipts.Inc(1)
metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
return nil
}
@ -131,7 +133,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
if err != nil {
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
}
indexerMetrics.logs.Inc(1)
metrics.IndexerMetrics.LogsCounter.Inc(1)
}
return nil
}