Statediff Prometheus metrics refactor #39

Merged
telackey merged 5 commits from statediff-prometheus into v1.9.24-statediff 2020-11-24 07:29:52 +00:00
6 changed files with 169 additions and 315 deletions

View File

@ -14,6 +14,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This package provides an interface for pushing and indexing IPLD objects into a Postgres database
// Metrics for reporting processing and connection stats are defined in ./metrics.go
package indexer
import (
@ -26,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
@ -36,16 +39,21 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
var (
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
)
// Indexer interface to allow substitution of mocks for testing
type Indexer interface {
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
}
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
@ -70,6 +78,25 @@ type BlockTx struct {
Close func() error
}
// Reporting function to run as goroutine
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
if !metrics.Enabled {
return
}
ticker := time.NewTicker(delay)
go func() {
for {
select {
case <-ticker.C:
dbMetrics.Update(sdi.dbWriter.db.Stats())
case <-quit:
ticker.Stop()
return
}
}
}()
}
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
@ -109,12 +136,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
panic(p)
} else {
tDiff := time.Now().Sub(t)
prom.SetTimeMetric("t_state_store_code_processing", tDiff)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
err = tx.Commit()
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_postgres_commit", tDiff)
indexerMetrics.tPostgresCommit.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Now().Sub(start).String())
@ -123,7 +150,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
},
}
tDiff := time.Now().Sub(t)
prom.SetTimeMetric("t_free_postgres", tDiff)
indexerMetrics.tFreePostgres.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
@ -133,7 +161,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_header_processing", tDiff)
indexerMetrics.tHeaderProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
@ -141,7 +169,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_uncle_processing", tDiff)
indexerMetrics.tUncleProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
@ -158,7 +186,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return nil, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_tx_receipt_processing", tDiff)
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()

View File

@ -0,0 +1,124 @@
package indexer
import (
"database/sql"
"strings"
"github.com/ethereum/go-ethereum/metrics"
)
const (
indexerNamespace = "indexer"
)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
return ""
}
parts := []string{indexerNamespace, name}
if subsystem != "" {
parts = []string{indexerNamespace, subsystem, name}
}
// Prometheus uses _ but geth metrics uses / and replaces
return strings.Join(parts, "/")
}
type indexerMetricsHandles struct {
// The total number of processed blocks
blocks metrics.Counter
// The total number of processed transactions
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// Time spent waiting for free postgres tx
tFreePostgres metrics.Timer
// Postgres transaction commit duration
tPostgresCommit metrics.Timer
// Header processing time
tHeaderProcessing metrics.Timer
// Uncle processing time
tUncleProcessing metrics.Timer
// Tx and receipt processing time
tTxAndRecProcessing metrics.Timer
// State, storage, and code combined processing time
tStateStoreCodeProcessing metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
ctx := indexerMetricsHandles{
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
tHeaderProcessing: metrics.NewTimer(),
tUncleProcessing: metrics.NewTimer(),
tTxAndRecProcessing: metrics.NewTimer(),
tStateStoreCodeProcessing: metrics.NewTimer(),
}
subsys := "" // todo
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
return ctx
}
type dbMetricsHandles struct {
// Maximum number of open connections to the database
maxOpen metrics.Gauge
// The number of established connections both in use and idle
open metrics.Gauge
// The number of connections currently in use
inUse metrics.Gauge
// The number of idle connections
idle metrics.Gauge
// The total number of connections waited for
waitedFor metrics.Counter
// The total time blocked waiting for a new connection
blockedMilliseconds metrics.Counter
// The total number of connections closed due to SetMaxIdleConns
closedMaxIdle metrics.Counter
// The total number of connections closed due to SetConnMaxLifetime
closedMaxLifetime metrics.Counter
}
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
ctx := dbMetricsHandles{
maxOpen: metrics.NewGauge(),
open: metrics.NewGauge(),
inUse: metrics.NewGauge(),
idle: metrics.NewGauge(),
waitedFor: metrics.NewCounter(),
blockedMilliseconds: metrics.NewCounter(),
closedMaxIdle: metrics.NewCounter(),
closedMaxLifetime: metrics.NewCounter(),
}
subsys := "connections"
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
reg.Register(metricName(subsys, "open"), ctx.open)
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
reg.Register(metricName(subsys, "idle"), ctx.idle)
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
return ctx
}
func (met *dbMetricsHandles) Update(stats sql.DBStats) {
met.maxOpen.Update(int64(stats.MaxOpenConnections))
met.open.Update(int64(stats.OpenConnections))
met.inUse.Update(int64(stats.InUse))
met.idle.Update(int64(stats.Idle))
met.waitedFor.Inc(int64(stats.WaitCount))
met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds()))
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
}

View File

@ -1,146 +0,0 @@
package prom
import (
"database/sql"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespace = "ipld_eth_indexer"
subsystem = "connections"
)
// DBStatsGetter is an interface that gets sql.DBStats.
type DBStatsGetter interface {
Stats() sql.DBStats
}
// DBStatsCollector implements the prometheus.Collector interface.
type DBStatsCollector struct {
sg DBStatsGetter
// descriptions of exported metrics
maxOpenDesc *prometheus.Desc
openDesc *prometheus.Desc
inUseDesc *prometheus.Desc
idleDesc *prometheus.Desc
waitedForDesc *prometheus.Desc
blockedSecondsDesc *prometheus.Desc
closedMaxIdleDesc *prometheus.Desc
closedMaxLifetimeDesc *prometheus.Desc
}
// NewDBStatsCollector creates a new DBStatsCollector.
func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector {
labels := prometheus.Labels{"db_name": dbName}
return &DBStatsCollector{
sg: sg,
maxOpenDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "max_open"),
"Maximum number of open connections to the database.",
nil,
labels,
),
openDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "open"),
"The number of established connections both in use and idle.",
nil,
labels,
),
inUseDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "in_use"),
"The number of connections currently in use.",
nil,
labels,
),
idleDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "idle"),
"The number of idle connections.",
nil,
labels,
),
waitedForDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "waited_for"),
"The total number of connections waited for.",
nil,
labels,
),
blockedSecondsDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"),
"The total time blocked waiting for a new connection.",
nil,
labels,
),
closedMaxIdleDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"),
"The total number of connections closed due to SetMaxIdleConns.",
nil,
labels,
),
closedMaxLifetimeDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"),
"The total number of connections closed due to SetConnMaxLifetime.",
nil,
labels,
),
}
}
// Describe implements the prometheus.Collector interface.
func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.maxOpenDesc
ch <- c.openDesc
ch <- c.inUseDesc
ch <- c.idleDesc
ch <- c.waitedForDesc
ch <- c.blockedSecondsDesc
ch <- c.closedMaxIdleDesc
ch <- c.closedMaxLifetimeDesc
}
// Collect implements the prometheus.Collector interface.
func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) {
stats := c.sg.Stats()
ch <- prometheus.MustNewConstMetric(
c.maxOpenDesc,
prometheus.GaugeValue,
float64(stats.MaxOpenConnections),
)
ch <- prometheus.MustNewConstMetric(
c.openDesc,
prometheus.GaugeValue,
float64(stats.OpenConnections),
)
ch <- prometheus.MustNewConstMetric(
c.inUseDesc,
prometheus.GaugeValue,
float64(stats.InUse),
)
ch <- prometheus.MustNewConstMetric(
c.idleDesc,
prometheus.GaugeValue,
float64(stats.Idle),
)
ch <- prometheus.MustNewConstMetric(
c.waitedForDesc,
prometheus.CounterValue,
float64(stats.WaitCount),
)
ch <- prometheus.MustNewConstMetric(
c.blockedSecondsDesc,
prometheus.CounterValue,
stats.WaitDuration.Seconds(),
)
ch <- prometheus.MustNewConstMetric(
c.closedMaxIdleDesc,
prometheus.CounterValue,
float64(stats.MaxIdleClosed),
)
ch <- prometheus.MustNewConstMetric(
c.closedMaxLifetimeDesc,
prometheus.CounterValue,
float64(stats.MaxLifetimeClosed),
)
}

View File

@ -1,151 +0,0 @@
package prom
import (
"time"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const statsSubsystem = "stats"
var (
metrics bool
receipts prometheus.Counter
transactions prometheus.Counter
blocks prometheus.Counter
lenPayloadChan prometheus.Gauge
tPayloadDecode prometheus.Histogram
tFreePostgres prometheus.Histogram
tPostgresCommit prometheus.Histogram
tHeaderProcessing prometheus.Histogram
tUncleProcessing prometheus.Histogram
tTxAndRecProcessing prometheus.Histogram
tStateAndStoreProcessing prometheus.Histogram
tCodeAndCodeHashProcessing prometheus.Histogram
)
// Init module initialization
func Init() {
metrics = true
blocks = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "blocks",
Help: "The total number of processed blocks",
})
transactions = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "transactions",
Help: "The total number of processed transactions",
})
receipts = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "receipts",
Help: "The total number of processed receipts",
})
lenPayloadChan = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "len_payload_chan",
Help: "Current length of publishPayload",
})
tFreePostgres = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: statsSubsystem,
Name: "t_free_postgres",
Help: "Time spent waiting for free postgres tx",
})
tPostgresCommit = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: statsSubsystem,
Name: "t_postgres_commit",
Help: "Postgres transaction commit duration",
})
tHeaderProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: statsSubsystem,
Name: "t_header_processing",
Help: "Header processing time",
})
tUncleProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: statsSubsystem,
Name: "t_uncle_processing",
Help: "Uncle processing time",
})
tTxAndRecProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: statsSubsystem,
Name: "t_tx_receipt_processing",
Help: "Tx and receipt processing time",
})
tStateAndStoreProcessing = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: statsSubsystem,
Name: "t_state_store_code_processing",
Help: "State, storage, and code combinedprocessing time",
})
}
// RegisterDBCollector create metric colletor for given connection
func RegisterDBCollector(name string, db *sqlx.DB) {
if metrics {
prometheus.Register(NewDBStatsCollector(name, db))
}
}
// BlockInc block counter increment
func BlockInc() {
if metrics {
blocks.Inc()
}
}
// TransactionInc transaction counter increment
func TransactionInc() {
if metrics {
transactions.Inc()
}
}
// ReceiptInc receipt counter increment
func ReceiptInc() {
if metrics {
receipts.Inc()
}
}
// SetLenPayloadChan set chan length
func SetLenPayloadChan(ln int) {
if metrics {
lenPayloadChan.Set(float64(ln))
}
}
// SetTimeMetric time metric observation
func SetTimeMetric(name string, t time.Duration) {
if !metrics {
return
}
tAsF64 := t.Seconds()
switch name {
case "t_free_postgres":
tFreePostgres.Observe(tAsF64)
case "t_postgres_commit":
tPostgresCommit.Observe(tAsF64)
case "t_header_processing":
tHeaderProcessing.Observe(tAsF64)
case "t_uncle_processing":
tUncleProcessing.Observe(tAsF64)
case "t_tx_receipt_processing":
tTxAndRecProcessing.Observe(tAsF64)
case "t_state_store_code_processing":
tStateAndStoreProcessing.Observe(tAsF64)
}
}

View File

@ -17,12 +17,11 @@
package indexer
import (
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
@ -30,7 +29,7 @@ var (
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
)
// Indexer satisfies the Indexer interface for ethereum
// Handles processing and writing of indexed IPLD objects to Postgres
type PostgresCIDWriter struct {
db *postgres.DB
}
@ -51,7 +50,7 @@ func (in *PostgresCIDWriter) upsertHeaderCID(tx *sqlx.Tx, header models.HeaderMo
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
if err == nil {
prom.BlockInc()
indexerMetrics.blocks.Inc(1)
}
return headerID, err
}
@ -73,7 +72,7 @@ func (in *PostgresCIDWriter) upsertTransactionAndReceiptCIDs(tx *sqlx.Tx, payloa
if err != nil {
return err
}
prom.TransactionInc()
indexerMetrics.transactions.Inc(1)
receiptCidMeta, ok := payload.ReceiptCIDs[common.HexToHash(trxCidMeta.TxHash)]
if ok {
if err := in.upsertReceiptCID(tx, receiptCidMeta, txID); err != nil {
@ -91,7 +90,7 @@ func (in *PostgresCIDWriter) upsertTransactionCID(tx *sqlx.Tx, transaction model
RETURNING id`,
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID)
if err == nil {
prom.TransactionInc()
indexerMetrics.transactions.Inc(1)
}
return txID, err
}
@ -101,7 +100,7 @@ func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct models.ReceiptMod
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`,
txID, rct.CID, rct.Contract, rct.ContractHash, rct.Topic0s, rct.Topic1s, rct.Topic2s, rct.Topic3s, rct.LogContracts, rct.MhKey)
if err == nil {
prom.ReceiptInc()
indexerMetrics.receipts.Inc(1)
}
return err
}

View File

@ -22,6 +22,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@ -40,7 +41,6 @@ import (
ind "github.com/ethereum/go-ethereum/statediff/indexer"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/prom"
. "github.com/ethereum/go-ethereum/statediff/types"
)
@ -74,7 +74,7 @@ type IService interface {
// Main event loop for processing state diffs
Loop(chainEventCh chan core.ChainEvent)
// Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
// Method to get state diff object at specific block
@ -139,7 +139,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit
}
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
}
prom.Init()
sds := &Service{
Mutex: sync.Mutex{},
BlockChain: blockChain,
@ -416,6 +415,7 @@ func (sds *Service) Start() error {
if sds.enableWriteLoop {
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
}
return nil
@ -521,7 +521,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
log.Info("writing state diff", "block height", block.Number().Uint64())
log.Info("Writing state diff", "block height", block.Number().Uint64())
var totalDifficulty *big.Int
var receipts types.Receipts
if params.IncludeTD {