diff --git a/cmd/serve.go b/cmd/serve.go index e8f3f10..7c69de5 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -58,7 +58,9 @@ func serve() { // start service and servers logWithCommand.Info("Starting statediff service") wg := new(sync.WaitGroup) - go statediffService.Loop(wg) + if err := statediffService.Loop(wg); err != nil { + logWithCommand.Fatalf("unable to start statediff service: %v", err) + } logWithCommand.Info("Starting RPC servers") if err := startServers(statediffService); err != nil { logWithCommand.Fatal(err) diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index 61f5a8a..2df6f6a 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -29,6 +29,8 @@ const statsSubsystem = "stats" var ( metrics bool + queuedRanges prometheus.Counter + lastLoadedHeight prometheus.Gauge lastProcessedHeight prometheus.Gauge @@ -39,6 +41,7 @@ var ( ) const ( + RANGES_QUEUED = "ranges_queued" LOADED_HEIGHT = "loaded_height" PROCESSED_HEIGHT = "processed_height" T_BLOCK_LOAD = "t_block_load" @@ -51,6 +54,12 @@ const ( func Init() { metrics = true + queuedRanges = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: RANGES_QUEUED, + Help: "Number of range requests currently queued", + }) + lastLoadedHeight = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: LOADED_HEIGHT, @@ -88,13 +97,27 @@ func Init() { }) } -// RegisterDBCollector create metric colletor for given connection +// RegisterDBCollector create metric collector for given connection func RegisterDBCollector(name string, db *sqlx.DB) { if metrics { prometheus.Register(NewDBStatsCollector(name, db)) } } +// IncQueuedRanges increments the number of queued range requests +func IncQueuedRanges() { + if metrics { + queuedRanges.Inc() + } +} + +// DecQueuedRanges decrements the number of queued range requests +func DecQueuedRanges() { + if metrics { + queuedRanges.Add(-1) + } +} + // SetLastLoadedHeight sets last loaded height func SetLastLoadedHeight(height int64) { if metrics { diff --git a/pkg/service.go b/pkg/service.go index 959e06b..bb3226a 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -46,7 +46,7 @@ type StateDiffService interface { APIs() []rpc.API Protocols() []p2p.Protocol // Loop is the main event loop for processing state diffs - Loop(wg *sync.WaitGroup) + Loop(wg *sync.WaitGroup) error // StateDiffAt method to get state diff object at specific block StateDiffAt(blockNumber uint64, params sd.Params) (*sd.Payload, error) // StateDiffFor method to get state diff object at specific block @@ -68,7 +68,7 @@ type Service struct { // Used to read data from leveldb lvlDBReader Reader // Used to signal shutdown of the service - quitChan chan bool + quitChan chan struct{} // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer // range queue @@ -91,7 +91,6 @@ func NewStateDiffService(lvlDBReader Reader, indexer ind.Indexer, conf Config) ( return &Service{ lvlDBReader: lvlDBReader, Builder: builder, - quitChan: make(chan bool), indexer: indexer, workers: conf.ServiceWorkers, queue: make(chan RangeRequest, conf.WorkerQueueSize), @@ -117,7 +116,11 @@ func (sds *Service) APIs() []rpc.API { } // Loop is an empty service loop for awaiting rpc requests -func (sds *Service) Loop(wg *sync.WaitGroup) { +func (sds *Service) Loop(wg *sync.WaitGroup) error { + if sds.quitChan != nil { + return fmt.Errorf("service loop is already running") + } + sds.quitChan = make(chan struct{}) for i := 0; i < int(sds.workers); i++ { wg.Add(1) go func(id int) { @@ -125,6 +128,7 @@ func (sds *Service) Loop(wg *sync.WaitGroup) { for { select { case blockRange := <-sds.queue: + prom.DecQueuedRanges() for j := blockRange.Start; j <= blockRange.Start; j++ { if err := sds.WriteStateDiffAt(j, blockRange.Params); err != nil { logrus.Errorf("service worker %d error writing statediff at height %d in range (%d, %d) : %v", id, j, blockRange.Start, blockRange.Stop, err) @@ -147,8 +151,12 @@ func (sds *Service) Loop(wg *sync.WaitGroup) { }(i) } for _, preRun := range sds.preruns { - sds.queue <- preRun + if err := sds.WriteStateDiffsInRange(preRun.Start, preRun.Stop, preRun.Params); err != nil { + close(sds.quitChan) + return err + } } + return nil } // StateDiffAt returns a state diff object payload at the specific blockheight @@ -265,8 +273,7 @@ func (sds *Service) processStateTrie(block *types.Block, params sd.Params) (*sd. // Start is used to begin the service func (sds *Service) Start() error { logrus.Info("starting statediff service") - go sds.Loop(new(sync.WaitGroup)) - return nil + return sds.Loop(new(sync.WaitGroup)) } // Stop is used to close down the service @@ -372,6 +379,8 @@ func (sds *Service) WriteStateDiffsInRange(start, stop uint64, params sd.Params) blocked := time.NewTimer(30 * time.Second) select { case sds.queue <- RangeRequest{Start: start, Stop: stop, Params: params}: + prom.IncQueuedRanges() + logrus.Infof("added range (%d, %d) to the worker queue", start, stop) return nil case <-blocked.C: return fmt.Errorf("unable to add range (%d, %d) to the worker queue", start, stop)