metrics for statediff stats
This commit is contained in:
parent
75069685be
commit
c1c41ef530
54
statediff/metrics.go
Normal file
54
statediff/metrics.go
Normal file
@ -0,0 +1,54 @@
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{namespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{namespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type statediffMetricsHandles struct {
|
||||
// Height of latest synced by core.BlockChain
|
||||
// FIXME
|
||||
lastSyncHeight metrics.Gauge
|
||||
// Height of the latest block received from chainEvent channel
|
||||
lastEventHeight metrics.Gauge
|
||||
// Height of latest state diff
|
||||
lastStatediffHeight metrics.Gauge
|
||||
// Current length of chainEvent channels
|
||||
serviceLoopChannelLen metrics.Gauge
|
||||
writeLoopChannelLen metrics.Gauge
|
||||
}
|
||||
|
||||
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
|
||||
ctx := statediffMetricsHandles{
|
||||
lastSyncHeight: metrics.NewGauge(),
|
||||
lastEventHeight: metrics.NewGauge(),
|
||||
lastStatediffHeight: metrics.NewGauge(),
|
||||
serviceLoopChannelLen: metrics.NewGauge(),
|
||||
writeLoopChannelLen: metrics.NewGauge(),
|
||||
}
|
||||
subsys := "" // todo
|
||||
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
|
||||
reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight)
|
||||
reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight)
|
||||
reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen)
|
||||
reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen)
|
||||
return ctx
|
||||
}
|
@ -32,6 +32,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/eth"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
@ -55,6 +56,8 @@ var writeLoopParams = Params{
|
||||
IncludeCode: true,
|
||||
}
|
||||
|
||||
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
|
||||
|
||||
type blockChain interface {
|
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||
GetBlockByHash(hash common.Hash) *types.Block
|
||||
@ -193,8 +196,10 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
select {
|
||||
//Notify chain event channel of events
|
||||
case chainEvent := <-chainEventCh:
|
||||
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent)
|
||||
currentBlock := chainEvent.Block
|
||||
statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain)
|
||||
if parentBlock == nil {
|
||||
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
|
||||
@ -205,6 +210,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error())
|
||||
continue
|
||||
}
|
||||
// TODO: how to handle with concurrent workers
|
||||
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||
case err := <-errCh:
|
||||
log.Warn("Error from chain event subscription", "error", err)
|
||||
sds.close()
|
||||
@ -226,6 +233,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
|
||||
select {
|
||||
//Notify chain event channel of events
|
||||
case chainEvent := <-chainEventCh:
|
||||
statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
log.Debug("Event received from chainEventCh", "event", chainEvent)
|
||||
// if we don't have any subscribers, do not process a statediff
|
||||
if atomic.LoadInt32(&sds.subscribers) == 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user