diff --git a/statediff/service.go b/statediff/service.go index 6a6e0bd34..e36ef4428 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -230,9 +230,25 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() var wg sync.WaitGroup + // Process metrics for chain events, then forward to workers + chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case chainEvent := <-chainEventCh: + statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) + chainEventFwd <- chainEvent + case <-sds.QuitChan: + return + } + } + }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventCh, errCh: errCh, wg: &wg, id: worker} + params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -244,10 +260,8 @@ func (sds *Service) writeLoopWorker(params workerParams) { select { //Notify chain event channel of events case chainEvent := <-params.chainEventCh: - statediffMetrics.writeLoopChannelLen.Update(int64(len(params.chainEventCh))) log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block - statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())