From b091f3382e0e53885aaab2ea0781f1f4a91c5776 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Jun 2023 13:32:57 -0500 Subject: [PATCH] Refactor and add comments. --- statediff/service.go | 286 ++++++++++++++++++++++--------------------- 1 file changed, 147 insertions(+), 139 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 62fcdb255..78d57814a 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -327,145 +327,6 @@ type workerParams struct { id uint } -func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { - var ch = make(chan uint64) - var wg sync.WaitGroup - for i := uint(0); i < sds.numWorkers; i++ { - wg.Add(1) - go func(w uint) { - defer wg.Done() - for { - select { - case num, ok := <-ch: - if !ok { - log.Info("Backfill: detected gap fill done", "worker", w) - return - } - log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, writeLoopParams.Params) - if err != nil { - log.Error("Backfill error: " + err.Error()) - } - case <-sds.QuitChan: - log.Info("Backfill: quitting before finish", "worker", w) - return - } - } - }(i) - } - - for _, gap := range blockGaps { - for num := gap.FirstMissing; num <= gap.LastMissing; num++ { - ch <- num - } - } - close(ch) - wg.Wait() -} - -func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { - headGap := chainBlockNumber - indexerBlockNumber - var ch = make(chan uint64, headGap) - for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ { - ch <- bn - } - - var wg sync.WaitGroup - for i := uint(0); i < sds.numWorkers; i++ { - wg.Add(1) - go func(w uint) { - defer wg.Done() - for { - select { - case num, ok := <-ch: - if !ok { - log.Info("Backfill: headGap done", "worker", w) - return - } - log.Info("Backfill: backfilling head gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, writeLoopParams.Params) - if err != nil { - log.Error("Backfill error: " + err.Error()) - } - case <-sds.QuitChan: - log.Info("Backfill: quitting before finish", "worker", w) - return - } - } - }(i) - } - close(ch) - wg.Wait() -} - -func (sds *Service) Backfill() { - chainBlock := sds.BlockChain.CurrentBlock() - if nil == chainBlock { - log.Info("Backfill: No previous chain block, nothing to backfill.") - return - } - - chainBlockNumber := chainBlock.Number.Uint64() - if chainBlockNumber == 0 { - log.Info("Backfill: At start of chain, nothing to backfill.") - return - } - - indexerBlock, err := sds.indexer.CurrentBlock() - if nil == indexerBlock { - log.Info("Backfill: No previous indexer block, nothing to backfill.") - return - } - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - headGap := chainBlockNumber - indexerBlockNumber - log.Info( - "Backfill: initial positions", - "chain", chainBlockNumber, - "indexer", indexerBlockNumber, - "headGap", headGap, - ) - - if sds.backfillMaxHeadGap > 0 && headGap > 0 { - if headGap < sds.backfillMaxHeadGap { - sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) - log.Info("Backfill: all workers done filling headGap.") - } else { - log.Error("Backfill: headGap too large to fill.") - } - } - - if sds.backfillCheckPastBlocks > 0 { - var gapCheckBeginNumber uint64 = 0 - if indexerBlockNumber > sds.backfillCheckPastBlocks { - gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks - } - blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - if nil != blockGaps && len(blockGaps) > 0 { - gapsMsg, _ := json.Marshal(blockGaps) - log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) - sds.backfillDetectedGaps(blockGaps) - log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) - } else { - log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) - } - } -} - func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() @@ -1250,3 +1111,150 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add return addresses, nil } + +// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head. +func (sds *Service) Backfill() { + chainBlock := sds.BlockChain.CurrentBlock() + if nil == chainBlock { + log.Info("Backfill: No previous chain block, nothing to backfill.") + return + } + + chainBlockNumber := chainBlock.Number.Uint64() + if chainBlockNumber == 0 { + log.Info("Backfill: At start of chain, nothing to backfill.") + return + } + + indexerBlock, err := sds.indexer.CurrentBlock() + if nil == indexerBlock { + log.Info("Backfill: No previous indexer block, nothing to backfill.") + return + } + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + headGap := chainBlockNumber - indexerBlockNumber + log.Info( + "Backfill: initial positions", + "chain", chainBlockNumber, + "indexer", indexerBlockNumber, + "headGap", headGap, + ) + + if sds.backfillMaxHeadGap > 0 && headGap > 0 { + if headGap < sds.backfillMaxHeadGap { + sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) + log.Info("Backfill: all workers done filling headGap.") + } else { + log.Error("Backfill: headGap too large to fill.") + } + } + + if sds.backfillCheckPastBlocks > 0 { + var gapCheckBeginNumber uint64 = 0 + if indexerBlockNumber > sds.backfillCheckPastBlocks { + gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks + } + blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) + if nil != err { + log.Error("Backfill error: " + err.Error()) + return + } + + if nil != blockGaps && len(blockGaps) > 0 { + gapsMsg, _ := json.Marshal(blockGaps) + log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + sds.backfillDetectedGaps(blockGaps) + log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + } else { + log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) + } + } +} + +// backfillHeadGap fills in any gap between the statediff position and the chain position at startup. +// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost, +// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc. +func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { + headGap := chainBlockNumber - indexerBlockNumber + var ch = make(chan uint64, headGap) + for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ { + ch <- bn + } + + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: headGap done", "worker", w) + return + } + log.Info("Backfill: backfilling head gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: " + err.Error()) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + close(ch) + wg.Wait() +} + +// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of +// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole) +// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block +// is still in-flight, but a later block was already written. +func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { + var ch = make(chan uint64) + var wg sync.WaitGroup + for i := uint(0); i < sds.numWorkers; i++ { + wg.Add(1) + go func(w uint) { + defer wg.Done() + for { + select { + case num, ok := <-ch: + if !ok { + log.Info("Backfill: detected gap fill done", "worker", w) + return + } + log.Info("Backfill: backfilling detected gap", "block", num, "worker", w) + err := sds.writeStateDiffAt(num, writeLoopParams.Params) + if err != nil { + log.Error("Backfill error: " + err.Error()) + } + case <-sds.QuitChan: + log.Info("Backfill: quitting before finish", "worker", w) + return + } + } + }(i) + } + + for _, gap := range blockGaps { + for num := gap.FirstMissing; num <= gap.LastMissing; num++ { + ch <- num + } + } + close(ch) + wg.Wait() +}