queue size metric; touchup

This commit is contained in:
i-norden 2021-10-25 14:26:41 -05:00
parent aace7793ba
commit f679082c3b
3 changed files with 43 additions and 9 deletions

View File

@ -58,7 +58,9 @@ func serve() {
// start service and servers
logWithCommand.Info("Starting statediff service")
wg := new(sync.WaitGroup)
go statediffService.Loop(wg)
if err := statediffService.Loop(wg); err != nil {
logWithCommand.Fatalf("unable to start statediff service: %v", err)
}
logWithCommand.Info("Starting RPC servers")
if err := startServers(statediffService); err != nil {
logWithCommand.Fatal(err)

View File

@ -29,6 +29,8 @@ const statsSubsystem = "stats"
var (
metrics bool
queuedRanges prometheus.Counter
lastLoadedHeight prometheus.Gauge
lastProcessedHeight prometheus.Gauge
@ -39,6 +41,7 @@ var (
)
const (
RANGES_QUEUED = "ranges_queued"
LOADED_HEIGHT = "loaded_height"
PROCESSED_HEIGHT = "processed_height"
T_BLOCK_LOAD = "t_block_load"
@ -51,6 +54,12 @@ const (
func Init() {
metrics = true
queuedRanges = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: RANGES_QUEUED,
Help: "Number of range requests currently queued",
})
lastLoadedHeight = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: LOADED_HEIGHT,
@ -88,13 +97,27 @@ func Init() {
})
}
// RegisterDBCollector create metric colletor for given connection
// RegisterDBCollector create metric collector for given connection
func RegisterDBCollector(name string, db *sqlx.DB) {
if metrics {
prometheus.Register(NewDBStatsCollector(name, db))
}
}
// IncQueuedRanges increments the number of queued range requests
func IncQueuedRanges() {
if metrics {
queuedRanges.Inc()
}
}
// DecQueuedRanges decrements the number of queued range requests
func DecQueuedRanges() {
if metrics {
queuedRanges.Add(-1)
}
}
// SetLastLoadedHeight sets last loaded height
func SetLastLoadedHeight(height int64) {
if metrics {

View File

@ -46,7 +46,7 @@ type StateDiffService interface {
APIs() []rpc.API
Protocols() []p2p.Protocol
// Loop is the main event loop for processing state diffs
Loop(wg *sync.WaitGroup)
Loop(wg *sync.WaitGroup) error
// StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error)
// StateDiffFor method to get state diff object at specific block
@ -68,7 +68,7 @@ type Service struct {
// Used to read data from leveldb
lvlDBReader Reader
// Used to signal shutdown of the service
quitChan chan bool
quitChan chan struct{}
// Interface for publishing statediffs as PG-IPLD objects
indexer ind.Indexer
// range queue
@ -91,7 +91,6 @@ func NewStateDiffService(lvlDBReader Reader, indexer ind.Indexer, conf Config) (
return &Service{
lvlDBReader: lvlDBReader,
Builder: builder,
quitChan: make(chan bool),
indexer: indexer,
workers: conf.ServiceWorkers,
queue: make(chan RangeRequest, conf.WorkerQueueSize),
@ -117,7 +116,11 @@ func (sds *Service) APIs() []rpc.API {
}
// Loop is an empty service loop for awaiting rpc requests
func (sds *Service) Loop(wg *sync.WaitGroup) {
func (sds *Service) Loop(wg *sync.WaitGroup) error {
if sds.quitChan != nil {
return fmt.Errorf("service loop is already running")
}
sds.quitChan = make(chan struct{})
for i := 0; i < int(sds.workers); i++ {
wg.Add(1)
go func(id int) {
@ -125,6 +128,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) {
for {
select {
case blockRange := <-sds.queue:
prom.DecQueuedRanges()
for j := blockRange.Start; j <= blockRange.Start; j++ {
if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil {
logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err)
@ -147,8 +151,12 @@ func (sds *Service) Loop(wg *sync.WaitGroup) {
}(i)
}
for _, preRun := range sds.preruns {
sds.queue <- preRun
if err := sds.WriteStateDiffsInRange(preRun.Start, preRun.Stop, preRun.Params); err != nil {
close(sds.quitChan)
return err
}
}
return nil
}
// StateDiffAt returns a state diff object payload at the specific blockheight
@ -265,8 +273,7 @@ func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd.
// Start is used to begin the service
func (sds *Service) Start() error {
logrus.Info("starting statediff service")
go sds.Loop(new(sync.WaitGroup))
return nil
return sds.Loop(new(sync.WaitGroup))
}
// Stop is used to close down the service
@ -372,6 +379,8 @@ func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params)
blocked := time.NewTimer(30 * time.Second)
select {
case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}:
prom.IncQueuedRanges()
logrus.Infof("added range (%d, %d) to the worker queue", start, stop)
return nil
case <-blocked.C:
return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop)