Refactor and add comments.

This commit is contained in:
Thomas E Lackey 2023-06-22 13:32:57 -05:00
parent 2bf4c91269
commit b091f3382e

View File

@ -327,145 +327,6 @@ type workerParams struct {
id uint id uint
} }
func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: detected gap fill done", "worker", w)
return
}
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for _, gap := range blockGaps {
for num := gap.FirstMissing; num <= gap.LastMissing; num++ {
ch <- num
}
}
close(ch)
wg.Wait()
}
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
headGap := chainBlockNumber - indexerBlockNumber
var ch = make(chan uint64, headGap)
for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ {
ch <- bn
}
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: headGap done", "worker", w)
return
}
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
close(ch)
wg.Wait()
}
func (sds *Service) Backfill() {
chainBlock := sds.BlockChain.CurrentBlock()
if nil == chainBlock {
log.Info("Backfill: No previous chain block, nothing to backfill.")
return
}
chainBlockNumber := chainBlock.Number.Uint64()
if chainBlockNumber == 0 {
log.Info("Backfill: At start of chain, nothing to backfill.")
return
}
indexerBlock, err := sds.indexer.CurrentBlock()
if nil == indexerBlock {
log.Info("Backfill: No previous indexer block, nothing to backfill.")
return
}
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
headGap := chainBlockNumber - indexerBlockNumber
log.Info(
"Backfill: initial positions",
"chain", chainBlockNumber,
"indexer", indexerBlockNumber,
"headGap", headGap,
)
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
if headGap < sds.backfillMaxHeadGap {
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
log.Info("Backfill: all workers done filling headGap.")
} else {
log.Error("Backfill: headGap too large to fill.")
}
}
if sds.backfillCheckPastBlocks > 0 {
var gapCheckBeginNumber uint64 = 0
if indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
}
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
if nil != blockGaps && len(blockGaps) > 0 {
gapsMsg, _ := json.Marshal(blockGaps)
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
sds.backfillDetectedGaps(blockGaps)
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
} else {
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
}
}
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe() defer chainEventSub.Unsubscribe()
@ -1250,3 +1111,150 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
return addresses, nil return addresses, nil
} }
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
func (sds *Service) Backfill() {
chainBlock := sds.BlockChain.CurrentBlock()
if nil == chainBlock {
log.Info("Backfill: No previous chain block, nothing to backfill.")
return
}
chainBlockNumber := chainBlock.Number.Uint64()
if chainBlockNumber == 0 {
log.Info("Backfill: At start of chain, nothing to backfill.")
return
}
indexerBlock, err := sds.indexer.CurrentBlock()
if nil == indexerBlock {
log.Info("Backfill: No previous indexer block, nothing to backfill.")
return
}
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
headGap := chainBlockNumber - indexerBlockNumber
log.Info(
"Backfill: initial positions",
"chain", chainBlockNumber,
"indexer", indexerBlockNumber,
"headGap", headGap,
)
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
if headGap < sds.backfillMaxHeadGap {
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
log.Info("Backfill: all workers done filling headGap.")
} else {
log.Error("Backfill: headGap too large to fill.")
}
}
if sds.backfillCheckPastBlocks > 0 {
var gapCheckBeginNumber uint64 = 0
if indexerBlockNumber > sds.backfillCheckPastBlocks {
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
}
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
if nil != err {
log.Error("Backfill error: " + err.Error())
return
}
if nil != blockGaps && len(blockGaps) > 0 {
gapsMsg, _ := json.Marshal(blockGaps)
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
sds.backfillDetectedGaps(blockGaps)
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
} else {
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
}
}
}
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
headGap := chainBlockNumber - indexerBlockNumber
var ch = make(chan uint64, headGap)
for bn := indexerBlockNumber; bn < chainBlockNumber; bn++ {
ch <- bn
}
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: headGap done", "worker", w)
return
}
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
close(ch)
wg.Wait()
}
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
// is still in-flight, but a later block was already written.
func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
var ch = make(chan uint64)
var wg sync.WaitGroup
for i := uint(0); i < sds.numWorkers; i++ {
wg.Add(1)
go func(w uint) {
defer wg.Done()
for {
select {
case num, ok := <-ch:
if !ok {
log.Info("Backfill: detected gap fill done", "worker", w)
return
}
log.Info("Backfill: backfilling detected gap", "block", num, "worker", w)
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
if err != nil {
log.Error("Backfill error: " + err.Error())
}
case <-sds.QuitChan:
log.Info("Backfill: quitting before finish", "worker", w)
return
}
}
}(i)
}
for _, gap := range blockGaps {
for num := gap.FirstMissing; num <= gap.LastMissing; num++ {
ch <- num
}
}
close(ch)
wg.Wait()
}