Additional statediff metrics and logging. #304

Merged
telackey merged 7 commits from telackey/logging_001 into v1.10.26-statediff-v4 2023-01-12 17:19:38 +00:00
4 changed files with 131 additions and 12 deletions

View File

@ -142,12 +142,22 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN
// WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error {
return api.sds.WriteStateDiffAt(blockNumber, params)
var err error
start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber)
defer countApiRequestEnd(start, logger, err)
err = api.sds.WriteStateDiffAt(blockNumber, params)
return err
}
// WriteStateDiffFor writes a state diff object directly to DB for the specific block hash
func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash common.Hash, params Params) error {
return api.sds.WriteStateDiffFor(blockHash, params)
var err error
start, logger := countApiRequestBegin("writeStateDiffFor", blockHash.Hex())
defer countApiRequestEnd(start, logger, err)
err = api.sds.WriteStateDiffFor(blockHash, params)
return err
}
// WatchAddress changes the list of watched addresses to which the direct indexing is restricted according to given operation

View File

@ -26,6 +26,8 @@ const (
namespace = "statediff"
)
var defaultStatediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
// Build a fully qualified metric name
func metricName(subsystem, name string) string {
if name == "" {
@ -58,6 +60,14 @@ type statediffMetricsHandles struct {
knownGapErrorStart metrics.Gauge
// A known gaps end block which had an error being written to the DB
knownGapErrorEnd metrics.Gauge
apiRequests metrics.Counter
apiRequestsUnderway metrics.Counter
failed metrics.Counter
succeeded metrics.Counter
underway metrics.Counter
totalProcessingTime metrics.Gauge
}
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
@ -71,6 +81,12 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
knownGapEnd: metrics.NewGauge(),
knownGapErrorStart: metrics.NewGauge(),
knownGapErrorEnd: metrics.NewGauge(),
apiRequests: metrics.NewCounter(),
apiRequestsUnderway: metrics.NewCounter(),
failed: metrics.NewCounter(),
succeeded: metrics.NewCounter(),
underway: metrics.NewCounter(),
totalProcessingTime: metrics.NewGauge(),
}
subsys := "service"
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
@ -82,5 +98,11 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
reg.Register(metricName(subsys, "known_gaps_end"), ctx.knownGapEnd)
reg.Register(metricName(subsys, "known_gaps_error_start"), ctx.knownGapErrorStart)
reg.Register(metricName(subsys, "known_gaps_error_end"), ctx.knownGapErrorEnd)
reg.Register(metricName(subsys, "api_requests"), ctx.apiRequests)
reg.Register(metricName(subsys, "api_requests_underway"), ctx.apiRequestsUnderway)
reg.Register(metricName(subsys, "failed"), ctx.failed)
reg.Register(metricName(subsys, "succeeded"), ctx.succeeded)
reg.Register(metricName(subsys, "underway"), ctx.underway)
reg.Register(metricName(subsys, "total_processing_time"), ctx.totalProcessingTime)
return ctx
}

View File

@ -0,0 +1,89 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package statediff
import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
start := time.Now()
logger := log.New("hash", block.Hash().Hex(), "number", block.NumberU64())
defaultStatediffMetrics.underway.Inc(1)
logger.Debug(fmt.Sprintf("writeStateDiff BEGIN [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
defaultStatediffMetrics.underway.Count(),
defaultStatediffMetrics.succeeded.Count(),
defaultStatediffMetrics.failed.Count(),
defaultStatediffMetrics.totalProcessingTime.Value(),
))
return start, logger
}
func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration {
duration := time.Since(start)
defaultStatediffMetrics.underway.Dec(1)
if nil == err {
defaultStatediffMetrics.succeeded.Inc(1)
} else {
defaultStatediffMetrics.failed.Inc(1)
}
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
duration.Milliseconds(), nil != err,
defaultStatediffMetrics.underway.Count(),
defaultStatediffMetrics.succeeded.Count(),
defaultStatediffMetrics.failed.Count(),
defaultStatediffMetrics.totalProcessingTime.Value(),
))
return duration
}
func countApiRequestBegin(methodName string, blockHashOrNumber interface{}) (time.Time, log.Logger) {
start := time.Now()
logger := log.New(methodName, blockHashOrNumber)
defaultStatediffMetrics.apiRequests.Inc(1)
defaultStatediffMetrics.apiRequestsUnderway.Inc(1)
logger.Debug(fmt.Sprintf("statediff API BEGIN [underway=%d, requests=%d])",
defaultStatediffMetrics.apiRequestsUnderway.Count(),
defaultStatediffMetrics.apiRequests.Count(),
))
return start, logger
}
func countApiRequestEnd(start time.Time, logger log.Logger, err error) time.Duration {
duration := time.Since(start)
defaultStatediffMetrics.apiRequestsUnderway.Dec(1)
logger.Debug(fmt.Sprintf("statediff API END (duration=%dms, err=%t) [underway=%d, requests=%d]",
duration.Milliseconds(), nil != err,
defaultStatediffMetrics.apiRequestsUnderway.Count(),
defaultStatediffMetrics.apiRequests.Count(),
))
return duration
}

View File

@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
@ -69,8 +68,6 @@ var writeLoopParams = ParamsWithMutex{
},
}
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
CurrentBlock() *types.Block
@ -270,13 +267,13 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
for {
select {
case chainEvent := <-chainEventCh:
lastHeight := statediffMetrics.lastEventHeight.Value()
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
nextHeight := int64(chainEvent.Block.Number().Uint64())
if nextHeight-lastHeight != 1 {
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight)
}
statediffMetrics.lastEventHeight.Update(nextHeight)
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
@ -314,7 +311,7 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
genesisBlockNumber, "error", err.Error(), "worker", workerId)
return
}
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
defaultStatediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
}
func (sds *Service) writeLoopWorker(params workerParams) {
@ -350,7 +347,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
}
// TODO: how to handle with concurrent workers
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
defaultStatediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
case <-sds.QuitChan:
log.Info("Quitting the statediff writing process", "worker", params.id)
return
@ -368,7 +365,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
select {
//Notify chain event channel of events
case chainEvent := <-chainEventCh:
statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
log.Debug("Loop(): chain event received", "event", chainEvent)
// if we don't have any subscribers, do not process a statediff
if atomic.LoadInt32(&sds.subscribers) == 0 {
@ -800,11 +797,12 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
// log.Info("Writing state diff", "block height", block.Number().Uint64())
var totalDifficulty *big.Int
var receipts types.Receipts
var err error
var tx interfaces.Batch
start, logger := countStateDiffBegin(block)
defer countStateDiffEnd(start, logger, err)
if params.IncludeTD {
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
}