diff --git a/statediff/service.go b/statediff/service.go index 64a7b0b95..f2170a913 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -322,9 +322,9 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain) } type workerParams struct { - chainEventCh <-chan core.ChainEvent - wg *sync.WaitGroup - id uint + blockCh <-chan *types.Block + wg *sync.WaitGroup + id uint } func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { @@ -333,7 +333,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { errCh := chainEventSub.Err() var wg sync.WaitGroup // Process metrics for chain events, then forward to workers - chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + blockFwd := make(chan *types.Block, chainEventChanSize) wg.Add(1) go func() { defer wg.Done() @@ -341,13 +341,29 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { select { case chainEvent := <-chainEventCh: lastHeight := defaultStatediffMetrics.lastEventHeight.Value() - nextHeight := int64(chainEvent.Block.Number().Uint64()) - if nextHeight-lastHeight != 1 { - log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight) + if lastHeight == 0 { + if lastBlock, _ := sds.indexer.CurrentBlock(); lastBlock != nil { + lastHeight, _ = strconv.ParseInt(lastBlock.BlockNumber, 10, 64) + } + } + nextHeight := int64(chainEvent.Block.Number().Uint64()) + distance := nextHeight - lastHeight + if distance < 1 { + log.Warn("Statediffing service received block from the past", "next height", nextHeight, "last height", lastHeight) + blockFwd <- chainEvent.Block + } else if distance > 1 { + log.Warn("Statediffing service received block from the future", "next height", nextHeight, "last height", lastHeight) + for i := lastHeight + 1; i < nextHeight; i++ { + log.Info("Statediffing filling block between last and next", "block", i, "next height", nextHeight, "last height", lastHeight) + blockFwd <- sds.BlockChain.GetBlockByNumber(uint64(i)) + } + blockFwd <- chainEvent.Block + defaultStatediffMetrics.lastEventHeight.Update(nextHeight) + } else { + blockFwd <- chainEvent.Block + defaultStatediffMetrics.lastEventHeight.Update(nextHeight) } - defaultStatediffMetrics.lastEventHeight.Update(nextHeight) defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) - chainEventFwd <- chainEvent case err := <-errCh: log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) @@ -367,7 +383,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} + params := workerParams{blockCh: blockFwd, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -392,9 +408,9 @@ func (sds *Service) writeLoopWorker(params workerParams) { for { select { //Notify chain event channel of events - case chainEvent := <-params.chainEventCh: + case chainEvent := <-params.blockCh: log.Debug("WriteLoop(): chain event received", "event", chainEvent) - currentBlock := chainEvent.Block + currentBlock := chainEvent parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())