396: Don't skip any blocks if the head of the chain advances by more than one block. #397
@ -1127,7 +1127,7 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
|
|||||||
}
|
}
|
||||||
StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{
|
StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{
|
||||||
Name: "statediff.backfillmaxheadgap",
|
Name: "statediff.backfillmaxheadgap",
|
||||||
Usage: "The maximum gap between the startup statediff and startup head positions that can be backfilled.",
|
Usage: "The maximum gap between the current statediff and head positions that can be backfilled.",
|
||||||
Value: 7200,
|
Value: 7200,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -170,6 +170,12 @@ type statusSubscription struct {
|
|||||||
quitChan chan<- bool
|
quitChan chan<- bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Utility type for showing the relative positions of the blockchain and the statediff indexer.
|
||||||
|
type servicePosition struct {
|
||||||
|
chainBlockNumber uint64
|
||||||
|
indexerBlockNumber uint64
|
||||||
|
}
|
||||||
|
|
||||||
// BlockCache caches the last block for safe access from different service loops
|
// BlockCache caches the last block for safe access from different service loops
|
||||||
type BlockCache struct {
|
type BlockCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -322,32 +328,59 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain)
|
|||||||
}
|
}
|
||||||
|
|
||||||
type workerParams struct {
|
type workerParams struct {
|
||||||
chainEventCh <-chan core.ChainEvent
|
blockCh <-chan *types.Block
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
id uint
|
id uint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||||
|
initialPos := sds.currentPosition()
|
||||||
|
log.Info(
|
||||||
|
"WriteLoop: initial positions",
|
||||||
|
"chain", initialPos.chainBlockNumber,
|
||||||
|
"indexer", initialPos.indexerBlockNumber,
|
||||||
|
)
|
||||||
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
||||||
defer chainEventSub.Unsubscribe()
|
defer chainEventSub.Unsubscribe()
|
||||||
errCh := chainEventSub.Err()
|
errCh := chainEventSub.Err()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
// Process metrics for chain events, then forward to workers
|
// Process metrics for chain events, then forward to workers
|
||||||
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
|
blockFwd := make(chan *types.Block, chainEventChanSize)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case chainEvent := <-chainEventCh:
|
case chainEvent := <-chainEventCh:
|
||||||
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
|
lastHeight := uint64(defaultStatediffMetrics.lastEventHeight.Value())
|
||||||
nextHeight := int64(chainEvent.Block.Number().Uint64())
|
if lastHeight == 0 {
|
||||||
if nextHeight-lastHeight != 1 {
|
lastHeight = initialPos.indexerBlockNumber
|
||||||
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight)
|
}
|
||||||
|
nextHeight := chainEvent.Block.Number().Uint64()
|
||||||
|
if nextHeight > lastHeight {
|
||||||
|
distance := nextHeight - lastHeight
|
||||||
|
if distance == 1 {
|
||||||
|
log.Info("WriteLoop: received expected block", "block height", nextHeight, "last height", lastHeight)
|
||||||
|
blockFwd <- chainEvent.Block
|
||||||
|
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
|
||||||
|
} else {
|
||||||
|
log.Warn("WriteLoop: received unexpected block from the future", "block height", nextHeight, "last height", lastHeight)
|
||||||
|
if distance <= sds.backfillMaxHeadGap {
|
||||||
|
for i := lastHeight + 1; i < nextHeight; i++ {
|
||||||
|
log.Info("WriteLoop: backfilling gap to head", "block", i, "block height", nextHeight, "last height", lastHeight)
|
||||||
|
blockFwd <- sds.BlockChain.GetBlockByNumber(i)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Warn("WriteLoop: gap to head too large to backfill", "block height", nextHeight, "last height", lastHeight, "gap", distance)
|
||||||
|
}
|
||||||
|
blockFwd <- chainEvent.Block
|
||||||
|
defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Warn("WriteLoop: received unexpected block from the past", "block height", nextHeight, "last height", lastHeight)
|
||||||
|
blockFwd <- chainEvent.Block
|
||||||
}
|
}
|
||||||
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
|
|
||||||
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||||
chainEventFwd <- chainEvent
|
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
log.Error("Error from chain event subscription", "error", err)
|
log.Error("Error from chain event subscription", "error", err)
|
||||||
close(sds.QuitChan)
|
close(sds.QuitChan)
|
||||||
@ -367,7 +400,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
|||||||
}()
|
}()
|
||||||
wg.Add(int(sds.numWorkers))
|
wg.Add(int(sds.numWorkers))
|
||||||
for worker := uint(0); worker < sds.numWorkers; worker++ {
|
for worker := uint(0); worker < sds.numWorkers; worker++ {
|
||||||
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker}
|
params := workerParams{blockCh: blockFwd, wg: &wg, id: worker}
|
||||||
go sds.writeLoopWorker(params)
|
go sds.writeLoopWorker(params)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -392,9 +425,9 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
//Notify chain event channel of events
|
//Notify chain event channel of events
|
||||||
case chainEvent := <-params.chainEventCh:
|
case chainEvent := <-params.blockCh:
|
||||||
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
|
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
|
||||||
currentBlock := chainEvent.Block
|
currentBlock := chainEvent
|
||||||
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
|
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
|
||||||
if parentBlock == nil {
|
if parentBlock == nil {
|
||||||
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
|
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
|
||||||
@ -438,6 +471,8 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
|
|||||||
select {
|
select {
|
||||||
//Notify chain event channel of events
|
//Notify chain event channel of events
|
||||||
case chainEvent := <-chainEventCh:
|
case chainEvent := <-chainEventCh:
|
||||||
|
// TODO: Do we need to track the last streamed block as we do for the WriteLoop so that we can detect
|
||||||
|
// and plug any gaps in the events? If not, we risk presenting an incomplete record.
|
||||||
defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||||
log.Debug("Loop(): chain event received", "event", chainEvent)
|
log.Debug("Loop(): chain event received", "event", chainEvent)
|
||||||
// if we don't have any subscribers, do not process a statediff
|
// if we don't have any subscribers, do not process a statediff
|
||||||
@ -1114,57 +1149,24 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
|
|||||||
|
|
||||||
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
|
// Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head.
|
||||||
func (sds *Service) Backfill() {
|
func (sds *Service) Backfill() {
|
||||||
chainBlock := sds.BlockChain.CurrentBlock()
|
pos := sds.currentPosition()
|
||||||
if nil == chainBlock {
|
if pos.chainBlockNumber == 0 {
|
||||||
log.Info("Backfill: No previous chain block, nothing to backfill.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
chainBlockNumber := chainBlock.Number.Uint64()
|
|
||||||
if chainBlockNumber == 0 {
|
|
||||||
log.Info("Backfill: At start of chain, nothing to backfill.")
|
log.Info("Backfill: At start of chain, nothing to backfill.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
indexerBlock, err := sds.indexer.CurrentBlock()
|
|
||||||
if nil == indexerBlock {
|
|
||||||
log.Info("Backfill: No previous indexer block, nothing to backfill.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if nil != err {
|
|
||||||
log.Error("Backfill error: " + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
|
|
||||||
if nil != err {
|
|
||||||
log.Error("Backfill error: " + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
headGap := chainBlockNumber - indexerBlockNumber
|
|
||||||
log.Info(
|
log.Info(
|
||||||
"Backfill: initial positions",
|
"Backfill: initial positions",
|
||||||
"chain", chainBlockNumber,
|
"chain", pos.chainBlockNumber,
|
||||||
"indexer", indexerBlockNumber,
|
"indexer", pos.indexerBlockNumber,
|
||||||
"headGap", headGap,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if sds.backfillMaxHeadGap > 0 && headGap > 0 {
|
|
||||||
if headGap < sds.backfillMaxHeadGap {
|
|
||||||
sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber)
|
|
||||||
log.Info("Backfill: all workers done filling headGap.")
|
|
||||||
} else {
|
|
||||||
log.Error("Backfill: headGap too large to fill.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if sds.backfillCheckPastBlocks > 0 {
|
if sds.backfillCheckPastBlocks > 0 {
|
||||||
var gapCheckBeginNumber uint64 = 0
|
var gapCheckBeginNumber uint64 = 0
|
||||||
if indexerBlockNumber > sds.backfillCheckPastBlocks {
|
if pos.indexerBlockNumber > sds.backfillCheckPastBlocks {
|
||||||
gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks
|
gapCheckBeginNumber = pos.indexerBlockNumber - sds.backfillCheckPastBlocks
|
||||||
}
|
}
|
||||||
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber)
|
blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, pos.chainBlockNumber)
|
||||||
if nil != err {
|
if nil != err {
|
||||||
log.Error("Backfill error: " + err.Error())
|
log.Error("Backfill error: " + err.Error())
|
||||||
return
|
return
|
||||||
@ -1172,52 +1174,15 @@ func (sds *Service) Backfill() {
|
|||||||
|
|
||||||
if nil != blockGaps && len(blockGaps) > 0 {
|
if nil != blockGaps && len(blockGaps) > 0 {
|
||||||
gapsMsg, _ := json.Marshal(blockGaps)
|
gapsMsg, _ := json.Marshal(blockGaps)
|
||||||
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
|
log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
|
||||||
sds.backfillDetectedGaps(blockGaps)
|
sds.backfillDetectedGaps(blockGaps)
|
||||||
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg))
|
log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg))
|
||||||
} else {
|
} else {
|
||||||
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber)
|
log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// backfillHeadGap fills in any gap between the statediff position and the chain position at startup.
|
|
||||||
// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost,
|
|
||||||
// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc.
|
|
||||||
func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) {
|
|
||||||
var ch = make(chan uint64)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for i := uint(0); i < sds.numWorkers; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(w uint) {
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case num, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
log.Info("Backfill: headGap done", "worker", w)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Info("Backfill: backfilling head gap", "block", num, "worker", w)
|
|
||||||
err := sds.writeStateDiffAt(num, writeLoopParams.Params)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Backfill error: " + err.Error())
|
|
||||||
}
|
|
||||||
case <-sds.QuitChan:
|
|
||||||
log.Info("Backfill: quitting before finish", "worker", w)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ {
|
|
||||||
ch <- bn
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
|
// backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of
|
||||||
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
|
// transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole)
|
||||||
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
|
// a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block
|
||||||
@ -1257,3 +1222,24 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) {
|
|||||||
close(ch)
|
close(ch)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// currentPosition returns the current block height for both the BlockChain and the statediff indexer.
|
||||||
|
func (sds *Service) currentPosition() servicePosition {
|
||||||
|
ret := servicePosition{}
|
||||||
|
chainBlock := sds.BlockChain.CurrentBlock()
|
||||||
|
if nil != chainBlock {
|
||||||
|
ret.chainBlockNumber = chainBlock.Number.Uint64()
|
||||||
|
}
|
||||||
|
|
||||||
|
indexerBlock, _ := sds.indexer.CurrentBlock()
|
||||||
|
if nil != indexerBlock {
|
||||||
|
indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64)
|
||||||
|
if nil == err {
|
||||||
|
ret.indexerBlockNumber = indexerBlockNumber
|
||||||
|
} else {
|
||||||
|
log.Error("Error parsing indexer block number", "block", indexerBlock.BlockNumber)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user