From c1c41ef5301656ac3426e2b64b2699b04390aaa2 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 23:47:48 +0800 Subject: [PATCH] metrics for statediff stats --- statediff/metrics.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ statediff/service.go | 8 +++++++ 2 files changed, 62 insertions(+) create mode 100644 statediff/metrics.go diff --git a/statediff/metrics.go b/statediff/metrics.go new file mode 100644 index 000000000..d75c583a0 --- /dev/null +++ b/statediff/metrics.go @@ -0,0 +1,54 @@ +package statediff + +import ( + "strings" + + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + namespace = "statediff" +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{namespace, name} + if subsystem != "" { + parts = []string{namespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type statediffMetricsHandles struct { + // Height of latest synced by core.BlockChain + // FIXME + lastSyncHeight metrics.Gauge + // Height of the latest block received from chainEvent channel + lastEventHeight metrics.Gauge + // Height of latest state diff + lastStatediffHeight metrics.Gauge + // Current length of chainEvent channels + serviceLoopChannelLen metrics.Gauge + writeLoopChannelLen metrics.Gauge +} + +func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { + ctx := statediffMetricsHandles{ + lastSyncHeight: metrics.NewGauge(), + lastEventHeight: metrics.NewGauge(), + lastStatediffHeight: metrics.NewGauge(), + serviceLoopChannelLen: metrics.NewGauge(), + writeLoopChannelLen: metrics.NewGauge(), + } + subsys := "" // todo + reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) + reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) + reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) + reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) + reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) + return ctx +} diff --git a/statediff/service.go b/statediff/service.go index cee137bf5..c3f270bb4 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -55,6 +56,8 @@ var writeLoopParams = Params{ IncludeCode: true, } +var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) + type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -193,8 +196,10 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) currentBlock := chainEvent.Block + statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) @@ -205,6 +210,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) continue } + // TODO: how to handle with concurrent workers + statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case err := <-errCh: log.Warn("Error from chain event subscription", "error", err) sds.close() @@ -226,6 +233,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("Event received from chainEventCh", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 {