396: Don't skip any blocks if the head of the chain advances by more than one block. #397

Merged
telackey merged 7 commits from telackey/396 into v1.11.6-statediff-v5 2023-07-06 17:00:59 +00:00
Showing only changes of commit cf7bbd3f8e - Show all commits

View File

@ -322,9 +322,9 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain)
}
type workerParams struct {
chainEventCh <-chan core.ChainEvent
wg *sync.WaitGroup
id uint
blockCh <-chan *types.Block
wg *sync.WaitGroup
id uint
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
@ -333,7 +333,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
errCh := chainEventSub.Err()
var wg sync.WaitGroup
// Process metrics for chain events, then forward to workers
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
blockFwd := make(chan *types.Block, chainEventChanSize)
wg.Add(1)
go func() {
defer wg.Done()
@ -341,13 +341,29 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
select {
case chainEvent := <-chainEventCh:
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
nextHeight := int64(chainEvent.Block.Number().Uint64())
if nextHeight-lastHeight != 1 {
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight)
if lastHeight == 0 {
if lastBlock, _ := sds.indexer.CurrentBlock(); lastBlock != nil {
lastHeight, _ = strconv.ParseInt(lastBlock.BlockNumber, 10, 64)
}
}
nextHeight := int64(chainEvent.Block.Number().Uint64())
distance := nextHeight - lastHeight
if distance < 1 {
log.Warn("Statediffing service received block from the past", "next height", nextHeight, "last height", lastHeight)
blockFwd <- chainEvent.Block
} else if distance > 1 {
log.Warn("Statediffing service received block from the future", "next height", nextHeight, "last height", lastHeight)
for i := lastHeight + 1; i < nextHeight; i++ {
log.Info("Statediffing filling block between last and next", "block", i, "next height", nextHeight, "last height", lastHeight)
blockFwd <- sds.BlockChain.GetBlockByNumber(uint64(i))
}
blockFwd <- chainEvent.Block
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
} else {
blockFwd <- chainEvent.Block
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
}
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
@ -367,7 +383,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
}()
wg.Add(int(sds.numWorkers))
for worker := uint(0); worker < sds.numWorkers; worker++ {
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker}
params := workerParams{blockCh: blockFwd, wg: &wg, id: worker}
go sds.writeLoopWorker(params)
}
wg.Wait()
@ -392,9 +408,9 @@ func (sds *Service) writeLoopWorker(params workerParams) {
for {
select {
//Notify chain event channel of events
case chainEvent := <-params.chainEventCh:
case chainEvent := <-params.blockCh:
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
currentBlock := chainEvent.Block
currentBlock := chainEvent
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())