add reporting loop for db metrics
This commit is contained in:
parent
300dae68e5
commit
da93bdc0a3
@ -41,13 +41,17 @@ import (
|
|||||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
var (
|
||||||
|
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||||
|
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
||||||
|
)
|
||||||
|
|
||||||
// Indexer interface to allow substitution of mocks for testing
|
// Indexer interface to allow substitution of mocks for testing
|
||||||
type Indexer interface {
|
type Indexer interface {
|
||||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
||||||
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
|
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
|
||||||
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
||||||
|
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
|
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
|
||||||
@ -72,6 +76,25 @@ type BlockTx struct {
|
|||||||
Close func() error
|
Close func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reporting function to run as goroutine
|
||||||
|
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
|
||||||
|
if !metrics.Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ticker := time.NewTicker(delay)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
dbMetrics.Update(sdi.dbWriter.db.Stats())
|
||||||
|
case <-quit:
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
||||||
|
@ -70,7 +70,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
|||||||
return ctx
|
return ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbMetrics struct {
|
type dbMetricsHandles struct {
|
||||||
// Maximum number of open connections to the database
|
// Maximum number of open connections to the database
|
||||||
maxOpen metrics.Gauge
|
maxOpen metrics.Gauge
|
||||||
// The number of established connections both in use and idle
|
// The number of established connections both in use and idle
|
||||||
@ -82,43 +82,43 @@ type dbMetrics struct {
|
|||||||
// The total number of connections waited for
|
// The total number of connections waited for
|
||||||
waitedFor metrics.Counter
|
waitedFor metrics.Counter
|
||||||
// The total time blocked waiting for a new connection
|
// The total time blocked waiting for a new connection
|
||||||
blockedSeconds metrics.Counter
|
blockedMilliseconds metrics.Counter
|
||||||
// The total number of connections closed due to SetMaxIdleConns
|
// The total number of connections closed due to SetMaxIdleConns
|
||||||
closedMaxIdle metrics.Counter
|
closedMaxIdle metrics.Counter
|
||||||
// The total number of connections closed due to SetConnMaxLifetime
|
// The total number of connections closed due to SetConnMaxLifetime
|
||||||
closedMaxLifetime metrics.Counter
|
closedMaxLifetime metrics.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
func RegisterDBMetrics(reg metrics.Registry) dbMetrics {
|
func RegisterDBMetrics(reg metrics.Registry) dbMetricsHandles {
|
||||||
ctx := dbMetrics{
|
ctx := dbMetricsHandles{
|
||||||
maxOpen: metrics.NewGauge(),
|
maxOpen: metrics.NewGauge(),
|
||||||
open: metrics.NewGauge(),
|
open: metrics.NewGauge(),
|
||||||
inUse: metrics.NewGauge(),
|
inUse: metrics.NewGauge(),
|
||||||
idle: metrics.NewGauge(),
|
idle: metrics.NewGauge(),
|
||||||
waitedFor: metrics.NewCounter(),
|
waitedFor: metrics.NewCounter(),
|
||||||
blockedSeconds: metrics.NewCounter(),
|
blockedMilliseconds: metrics.NewCounter(),
|
||||||
closedMaxIdle: metrics.NewCounter(),
|
closedMaxIdle: metrics.NewCounter(),
|
||||||
closedMaxLifetime: metrics.NewCounter(),
|
closedMaxLifetime: metrics.NewCounter(),
|
||||||
}
|
}
|
||||||
subsys := "connections"
|
subsys := "connections"
|
||||||
reg.Register(metricName(subsys, "max_open_desc"), ctx.maxOpen)
|
reg.Register(metricName(subsys, "max_open"), ctx.maxOpen)
|
||||||
reg.Register(metricName(subsys, "open_desc"), ctx.open)
|
reg.Register(metricName(subsys, "open"), ctx.open)
|
||||||
reg.Register(metricName(subsys, "in_use_desc"), ctx.inUse)
|
reg.Register(metricName(subsys, "in_use"), ctx.inUse)
|
||||||
reg.Register(metricName(subsys, "idle_desc"), ctx.idle)
|
reg.Register(metricName(subsys, "idle"), ctx.idle)
|
||||||
reg.Register(metricName(subsys, "waited_for_desc"), ctx.waitedFor)
|
reg.Register(metricName(subsys, "waited_for"), ctx.waitedFor)
|
||||||
reg.Register(metricName(subsys, "blocked_seconds_desc"), ctx.blockedSeconds)
|
reg.Register(metricName(subsys, "blocked_milliseconds"), ctx.blockedMilliseconds)
|
||||||
reg.Register(metricName(subsys, "closed_max_idle_desc"), ctx.closedMaxIdle)
|
reg.Register(metricName(subsys, "closed_max_idle"), ctx.closedMaxIdle)
|
||||||
reg.Register(metricName(subsys, "closed_max_lifetime_desc"), ctx.closedMaxLifetime)
|
reg.Register(metricName(subsys, "closed_max_lifetime"), ctx.closedMaxLifetime)
|
||||||
return ctx
|
return ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (met *dbMetrics) Update(stats sql.DBStats) {
|
func (met *dbMetricsHandles) Update(stats sql.DBStats) {
|
||||||
met.maxOpen.Update(int64(stats.MaxOpenConnections))
|
met.maxOpen.Update(int64(stats.MaxOpenConnections))
|
||||||
met.open.Update(int64(stats.OpenConnections))
|
met.open.Update(int64(stats.OpenConnections))
|
||||||
met.inUse.Update(int64(stats.InUse))
|
met.inUse.Update(int64(stats.InUse))
|
||||||
met.idle.Update(int64(stats.Idle))
|
met.idle.Update(int64(stats.Idle))
|
||||||
met.waitedFor.Inc(int64(stats.WaitCount))
|
met.waitedFor.Inc(int64(stats.WaitCount))
|
||||||
met.blockedSeconds.Inc(int64(stats.WaitDuration.Seconds()))
|
met.blockedMilliseconds.Inc(int64(stats.WaitDuration.Milliseconds()))
|
||||||
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
|
met.closedMaxIdle.Inc(int64(stats.MaxIdleClosed))
|
||||||
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
|
met.closedMaxLifetime.Inc(int64(stats.MaxLifetimeClosed))
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
@ -73,7 +74,7 @@ type IService interface {
|
|||||||
// Main event loop for processing state diffs
|
// Main event loop for processing state diffs
|
||||||
Loop(chainEventCh chan core.ChainEvent)
|
Loop(chainEventCh chan core.ChainEvent)
|
||||||
// Method to subscribe to receive state diff processing output
|
// Method to subscribe to receive state diff processing output
|
||||||
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
|
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
|
||||||
// Method to unsubscribe from state diff processing
|
// Method to unsubscribe from state diff processing
|
||||||
Unsubscribe(id rpc.ID) error
|
Unsubscribe(id rpc.ID) error
|
||||||
// Method to get state diff object at specific block
|
// Method to get state diff object at specific block
|
||||||
@ -414,6 +415,7 @@ func (sds *Service) Start() error {
|
|||||||
if sds.enableWriteLoop {
|
if sds.enableWriteLoop {
|
||||||
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
||||||
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
||||||
|
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user