Combine backfilling of the gap-to-head and the fix for 396.

This commit is contained in:
Thomas E Lackey 2023-07-05 15:14:20 -05:00
parent cf7bbd3f8e
commit 67f4bab8ba

View File

@ -170,6 +170,12 @@ type statusSubscription struct {
quitChan chan<- bool
}
// Utility type for showing the relative positions of the blockchain and the statediff indexer.
type servicePosition struct {
chainBlockNumber uint64
indexerBlockNumber uint64
}
// BlockCache caches the last block for safe access from different service loops
type BlockCache struct {
sync.Mutex
@ -328,6 +334,12 @@ type workerParams struct {
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
initialPos := sds.currentPosition()
log.Info(
"WriteLoop: initial positions",
"chain", initialPos.chainBlockNumber,
"indexer", initialPos.indexerBlockNumber,
)
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err()
@ -340,28 +352,30 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
for {
select {
case chainEvent := <-chainEventCh:
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
lastHeight := uint64(defaultStatediffMetrics.lastEventHeight.Value())
if lastHeight == 0 {
if lastBlock, _ := sds.indexer.CurrentBlock(); lastBlock != nil {
lastHeight, _ = strconv.ParseInt(lastBlock.BlockNumber, 10, 64)
}
lastHeight = initialPos.indexerBlockNumber
}
nextHeight := int64(chainEvent.Block.Number().Uint64())
nextHeight := chainEvent.Block.Number().Uint64()
distance := nextHeight - lastHeight
if distance < 1 {
log.Warn("Statediffing service received block from the past", "next height", nextHeight, "last height", lastHeight)
log.Warn("WriteLoop: received unexpected block from the past", "next height", nextHeight, "last height", lastHeight)
blockFwd <- chainEvent.Block
} else if distance > 1 {
log.Warn("Statediffing service received block from the future", "next height", nextHeight, "last height", lastHeight)
for i := lastHeight + 1; i < nextHeight; i++ {
log.Info("Statediffing filling block between last and next", "block", i, "next height", nextHeight, "last height", lastHeight)
blockFwd <- sds.BlockChain.GetBlockByNumber(uint64(i))
log.Warn("WriteLoop: received unexpected block from the future", "next height", nextHeight, "last height", lastHeight)
if distance <= sds.backfillMaxHeadGap {
for i := lastHeight + 1; i < nextHeight; i++ {
log.Info("WriteLoop: backfilling gap to head", "block", i, "next height", nextHeight, "last height", lastHeight, "gap", distance)
blockFwd <- sds.BlockChain.GetBlockByNumber(i)
}
} else {
log.Warn("WriteLoop: gap to head too large to backfill", "next height", nextHeight, "last height", lastHeight, "gap", distance)
}
blockFwd <- chainEvent.Block
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
} else {
blockFwd <- chainEvent.Block
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
}
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
case err := <-errCh:
@ -1130,57 +1144,24 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
func (sds *Service) Backfill() {
chainBlock := sds.BlockChain.CurrentBlock()
if nil == chainBlock {
log.Info("Backfill: No previous chain block, nothing to backfill.")
return
}
chainBlockNumber := chainBlock.Number.Uint64()
if chainBlockNumber == 0 {
pos := sds.currentPosition()
if pos.chainBlockNumber == 0 {
log.Info("Backfill: At start of chain, nothing to backfill.")
return
}
indexerBlock, err := sds.indexer.CurrentBlock()
if nil == indexerBlock {
log.Info("Backfill: No previous indexer block, nothing to backfill.")
return
}
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
headGap := chainBlockNumber - indexerBlockNumber
log.Info(
"Backfill: initial positions",
"chain", chainBlockNumber,
"indexer", indexerBlockNumber,
"headGap", headGap,
"chain", pos.chainBlockNumber,
"indexer", pos.indexerBlockNumber,
)
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
if headGap < sds.backfillMaxHeadGap {
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
log.Info("Backfill: all workers done filling headGap.")
} else {
log.Error("Backfill: headGap too large to fill.")
}
}
if sds.backfillCheckPastBlocks > 0 {
var gapCheckBeginNumber uint64 = 0
if indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
if pos.indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = pos.indexerBlockNumber - sds.backfillCheckPastBlocks
}
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, pos.chainBlockNumber)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
@ -1188,52 +1169,15 @@ func (sds *Service) Backfill() {
if nil != blockGaps && len(blockGaps) > 0 {
gapsMsg, _ := json.Marshal(blockGaps)
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
sds.backfillDetectedGaps(blockGaps)
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
} else {
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber)
}
}
}
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: headGap done", "worker", w)
return
}
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ {
ch <- bn
}
close(ch)
wg.Wait()
}
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
@ -1273,3 +1217,23 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
close(ch)
wg.Wait()
}
func (sds *Service) currentPosition() servicePosition {
ret := servicePosition{}
chainBlock := sds.BlockChain.CurrentBlock()
if nil != chainBlock {
ret.chainBlockNumber = chainBlock.Number.Uint64()
}
indexerBlock, _ := sds.indexer.CurrentBlock()
if nil != indexerBlock {
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil == err {
ret.indexerBlockNumber = indexerBlockNumber
} else {
log.Error("Error parsing indexer block number", "block", indexerBlock.BlockNumber)
}
}
return ret
}