From da93bdc0a30f4f595a0d2df079925a1ef91e53b0 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 19:42:46 +0800 Subject: [PATCH] add reporting loop for db metrics --- statediff/indexer/indexer.go | 25 +++++++++++++++++++- statediff/indexer/metrics.go | 44 ++++++++++++++++++------------------ statediff/service.go | 4 +++- 3 files changed, 49 insertions(+), 24 deletions(-) diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 9bf539427..c6d6f54a5 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -41,13 +41,17 @@ import ( sdtypes "github.com/ethereum/go-ethereum/statediff/types" ) -var indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) +var ( + indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry) + dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) +) // Indexer interface to allow substitution of mocks for testing type Indexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error + ReportDBMetrics(delay time.Duration, quit <-chan bool) } // StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects @@ -72,6 +76,25 @@ type BlockTx struct { Close func() error } +// Reporting function to run as goroutine +func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) { + if !metrics.Enabled { + return + } + ticker := time.NewTicker(delay) + go func() { + for { + select { + case <-ticker.C: + dbMetrics.Update(sdi.dbWriter.db.Stats()) + case <-quit: + ticker.Stop() + return + } + } + }() +} + // Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) { diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index 71ea18845..5ee3426a3 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -70,7 +70,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { return ctx } -type dbMetrics struct { +type dbMetricsHandles struct { // Maximum number of open connections to the database maxOpen metrics.Gauge // The number of established connections both in use and idle @@ -82,43 +82,43 @@ type dbMetrics struct { // The total number of connections waited for waitedFor metrics.Counter // The total time blocked waiting for a new connection - blockedSeconds metrics.Counter + blockedMilliseconds metrics.Counter // The total number of connections closed due to SetMaxIdleConns closedMaxIdle metrics.Counter // The total number of connections closed due to SetConnMaxLifetime closedMaxLifetime metrics.Counter } -func RegisterDBMetrics(reg metrics.Registry) dbMetrics { - ctx := dbMetrics{ - maxOpen: metrics.NewGauge(), - open: metrics.NewGauge(), - inUse: metrics.NewGauge(), - idle: metrics.NewGauge(), - waitedFor: metrics.NewCounter(), - blockedSeconds: metrics.NewCounter(), - closedMaxIdle: metrics.NewCounter(), - closedMaxLifetime: metrics.NewCounter(), +func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles { + ctx := dbMetricsHandles{ + maxOpen: metrics.NewGauge(), + open: metrics.NewGauge(), + inUse: metrics.NewGauge(), + idle: metrics.NewGauge(), + waitedFor: metrics.NewCounter(), + blockedMilliseconds: metrics.NewCounter(), + closedMaxIdle: metrics.NewCounter(), + closedMaxLifetime: metrics.NewCounter(), } subsys := "connections" - reg.Register(metricName(subsys, "max_open_desc"), ctx.maxOpen) - reg.Register(metricName(subsys, "open_desc"), ctx.open) - reg.Register(metricName(subsys, "in_use_desc"), ctx.inUse) - reg.Register(metricName(subsys, "idle_desc"), ctx.idle) - reg.Register(metricName(subsys, "waited_for_desc"), ctx.waitedFor) - reg.Register(metricName(subsys, "blocked_seconds_desc"), ctx.blockedSeconds) - reg.Register(metricName(subsys, "closed_max_idle_desc"), ctx.closedMaxIdle) - reg.Register(metricName(subsys, "closed_max_lifetime_desc"), ctx.closedMaxLifetime) + reg.Register(metricName(subsys, "max_open"), ctx.maxOpen) + reg.Register(metricName(subsys, "open"), ctx.open) + reg.Register(metricName(subsys, "in_use"), ctx.inUse) + reg.Register(metricName(subsys, "idle"), ctx.idle) + reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor) + reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds) + reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle) + reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime) return ctx } -func (met *dbMetrics) Update(stats sql.DBStats) { +func (met *dbMetricsHandles) Update(stats sql.DBStats) { met.maxOpen.Update(int64(stats.MaxOpenConnections)) met.open.Update(int64(stats.OpenConnections)) met.inUse.Update(int64(stats.InUse)) met.idle.Update(int64(stats.Idle)) met.waitedFor.Inc(int64(stats.WaitCount)) - met.blockedSeconds.Inc(int64(stats.WaitDuration.Seconds())) + met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds())) met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed)) met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed)) } diff --git a/statediff/service.go b/statediff/service.go index fb17edcd0..cee137bf5 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -22,6 +22,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -73,7 +74,7 @@ type IService interface { // Main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params) + Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error // Method to get state diff object at specific block @@ -414,6 +415,7 @@ func (sds *Service) Start() error { if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) + go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) } return nil