From cf7bbd3f8ee747eaf401d107618501c68336a574 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Fri, 30 Jun 2023 00:16:18 -0500 Subject: [PATCH 1/7] WIP: 396 - don't skip any blocks if the head of the chain advances by more than one block. --- statediff/service.go | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 64a7b0b95..f2170a913 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -322,9 +322,9 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain) } type workerParams struct { - chainEventCh <-chan core.ChainEvent - wg *sync.WaitGroup - id uint + blockCh <-chan *types.Block + wg *sync.WaitGroup + id uint } func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { @@ -333,7 +333,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { errCh := chainEventSub.Err() var wg sync.WaitGroup // Process metrics for chain events, then forward to workers - chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + blockFwd := make(chan *types.Block, chainEventChanSize) wg.Add(1) go func() { defer wg.Done() @@ -341,13 +341,29 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { select { case chainEvent := <-chainEventCh: lastHeight := defaultStatediffMetrics.lastEventHeight.Value() - nextHeight := int64(chainEvent.Block.Number().Uint64()) - if nextHeight-lastHeight != 1 { - log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight) + if lastHeight == 0 { + if lastBlock, _ := sds.indexer.CurrentBlock(); lastBlock != nil { + lastHeight, _ = strconv.ParseInt(lastBlock.BlockNumber, 10, 64) + } + } + nextHeight := int64(chainEvent.Block.Number().Uint64()) + distance := nextHeight - lastHeight + if distance < 1 { + log.Warn("Statediffing service received block from the past", "next height", nextHeight, "last height", lastHeight) + blockFwd <- chainEvent.Block + } else if distance > 1 { + log.Warn("Statediffing service received block from the future", "next height", nextHeight, "last height", lastHeight) + for i := lastHeight + 1; i < nextHeight; i++ { + log.Info("Statediffing filling block between last and next", "block", i, "next height", nextHeight, "last height", lastHeight) + blockFwd <- sds.BlockChain.GetBlockByNumber(uint64(i)) + } + blockFwd <- chainEvent.Block + defaultStatediffMetrics.lastEventHeight.Update(nextHeight) + } else { + blockFwd <- chainEvent.Block + defaultStatediffMetrics.lastEventHeight.Update(nextHeight) } - defaultStatediffMetrics.lastEventHeight.Update(nextHeight) defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) - chainEventFwd <- chainEvent case err := <-errCh: log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) @@ -367,7 +383,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker} + params := workerParams{blockCh: blockFwd, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -392,9 +408,9 @@ func (sds *Service) writeLoopWorker(params workerParams) { for { select { //Notify chain event channel of events - case chainEvent := <-params.chainEventCh: + case chainEvent := <-params.blockCh: log.Debug("WriteLoop(): chain event received", "event", chainEvent) - currentBlock := chainEvent.Block + currentBlock := chainEvent parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) -- 2.45.2 From 67f4bab8ba07022d9cc09859d79ae3ff9d070501 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 5 Jul 2023 15:14:20 -0500 Subject: [PATCH 2/7] Combine backfilling of the gap-to-head and the fix for 396. --- statediff/service.go | 148 ++++++++++++++++--------------------------- 1 file changed, 56 insertions(+), 92 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index f2170a913..5d9a09017 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -170,6 +170,12 @@ type statusSubscription struct { quitChan chan<- bool } +// Utility type for showing the relative positions of the blockchain and the statediff indexer. +type servicePosition struct { + chainBlockNumber uint64 + indexerBlockNumber uint64 +} + // BlockCache caches the last block for safe access from different service loops type BlockCache struct { sync.Mutex @@ -328,6 +334,12 @@ type workerParams struct { } func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { + initialPos := sds.currentPosition() + log.Info( + "WriteLoop: initial positions", + "chain", initialPos.chainBlockNumber, + "indexer", initialPos.indexerBlockNumber, + ) chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() @@ -340,28 +352,30 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { for { select { case chainEvent := <-chainEventCh: - lastHeight := defaultStatediffMetrics.lastEventHeight.Value() + lastHeight := uint64(defaultStatediffMetrics.lastEventHeight.Value()) if lastHeight == 0 { - if lastBlock, _ := sds.indexer.CurrentBlock(); lastBlock != nil { - lastHeight, _ = strconv.ParseInt(lastBlock.BlockNumber, 10, 64) - } + lastHeight = initialPos.indexerBlockNumber } - nextHeight := int64(chainEvent.Block.Number().Uint64()) + nextHeight := chainEvent.Block.Number().Uint64() distance := nextHeight - lastHeight if distance < 1 { - log.Warn("Statediffing service received block from the past", "next height", nextHeight, "last height", lastHeight) + log.Warn("WriteLoop: received unexpected block from the past", "next height", nextHeight, "last height", lastHeight) blockFwd <- chainEvent.Block } else if distance > 1 { - log.Warn("Statediffing service received block from the future", "next height", nextHeight, "last height", lastHeight) - for i := lastHeight + 1; i < nextHeight; i++ { - log.Info("Statediffing filling block between last and next", "block", i, "next height", nextHeight, "last height", lastHeight) - blockFwd <- sds.BlockChain.GetBlockByNumber(uint64(i)) + log.Warn("WriteLoop: received unexpected block from the future", "next height", nextHeight, "last height", lastHeight) + if distance <= sds.backfillMaxHeadGap { + for i := lastHeight + 1; i < nextHeight; i++ { + log.Info("WriteLoop: backfilling gap to head", "block", i, "next height", nextHeight, "last height", lastHeight, "gap", distance) + blockFwd <- sds.BlockChain.GetBlockByNumber(i) + } + } else { + log.Warn("WriteLoop: gap to head too large to backfill", "next height", nextHeight, "last height", lastHeight, "gap", distance) } blockFwd <- chainEvent.Block - defaultStatediffMetrics.lastEventHeight.Update(nextHeight) + defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } else { blockFwd <- chainEvent.Block - defaultStatediffMetrics.lastEventHeight.Update(nextHeight) + defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) case err := <-errCh: @@ -1130,57 +1144,24 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add // Backfill is executed on startup to make sure there are no gaps in the recent past when tracking head. func (sds *Service) Backfill() { - chainBlock := sds.BlockChain.CurrentBlock() - if nil == chainBlock { - log.Info("Backfill: No previous chain block, nothing to backfill.") - return - } - - chainBlockNumber := chainBlock.Number.Uint64() - if chainBlockNumber == 0 { + pos := sds.currentPosition() + if pos.chainBlockNumber == 0 { log.Info("Backfill: At start of chain, nothing to backfill.") return } - indexerBlock, err := sds.indexer.CurrentBlock() - if nil == indexerBlock { - log.Info("Backfill: No previous indexer block, nothing to backfill.") - return - } - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) - if nil != err { - log.Error("Backfill error: " + err.Error()) - return - } - - headGap := chainBlockNumber - indexerBlockNumber log.Info( "Backfill: initial positions", - "chain", chainBlockNumber, - "indexer", indexerBlockNumber, - "headGap", headGap, + "chain", pos.chainBlockNumber, + "indexer", pos.indexerBlockNumber, ) - if sds.backfillMaxHeadGap > 0 && headGap > 0 { - if headGap < sds.backfillMaxHeadGap { - sds.backfillHeadGap(indexerBlockNumber, chainBlockNumber) - log.Info("Backfill: all workers done filling headGap.") - } else { - log.Error("Backfill: headGap too large to fill.") - } - } - if sds.backfillCheckPastBlocks > 0 { var gapCheckBeginNumber uint64 = 0 - if indexerBlockNumber > sds.backfillCheckPastBlocks { - gapCheckBeginNumber = indexerBlockNumber - sds.backfillCheckPastBlocks + if pos.indexerBlockNumber > sds.backfillCheckPastBlocks { + gapCheckBeginNumber = pos.indexerBlockNumber - sds.backfillCheckPastBlocks } - blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, chainBlockNumber) + blockGaps, err := sds.indexer.DetectGaps(gapCheckBeginNumber, pos.chainBlockNumber) if nil != err { log.Error("Backfill error: " + err.Error()) return @@ -1188,52 +1169,15 @@ func (sds *Service) Backfill() { if nil != blockGaps && len(blockGaps) > 0 { gapsMsg, _ := json.Marshal(blockGaps) - log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + log.Info("Backfill: detected gaps in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg)) sds.backfillDetectedGaps(blockGaps) - log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber, "gaps", string(gapsMsg)) + log.Info("Backfill: done processing detected gaps in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber, "gaps", string(gapsMsg)) } else { - log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", chainBlockNumber) + log.Info("Backfill: no gaps detected in range", "begin", gapCheckBeginNumber, "end", pos.chainBlockNumber) } } } -// backfillHeadGap fills in any gap between the statediff position and the chain position at startup. -// A gap can be created in this way if there is some problem in statediffing (eg, DB connectivity is lost, -// while the chain keeps syncing), if the process is terminated with a statediff in-flight, etc. -func (sds *Service) backfillHeadGap(indexerBlockNumber uint64, chainBlockNumber uint64) { - var ch = make(chan uint64) - var wg sync.WaitGroup - for i := uint(0); i < sds.numWorkers; i++ { - wg.Add(1) - go func(w uint) { - defer wg.Done() - for { - select { - case num, ok := <-ch: - if !ok { - log.Info("Backfill: headGap done", "worker", w) - return - } - log.Info("Backfill: backfilling head gap", "block", num, "worker", w) - err := sds.writeStateDiffAt(num, writeLoopParams.Params) - if err != nil { - log.Error("Backfill error: " + err.Error()) - } - case <-sds.QuitChan: - log.Info("Backfill: quitting before finish", "worker", w) - return - } - } - }(i) - } - - for bn := indexerBlockNumber + 1; bn <= chainBlockNumber; bn++ { - ch <- bn - } - close(ch) - wg.Wait() -} - // backfillDetectedGaps fills gaps which have occurred in the recent past. These gaps can happen because of // transient errors, such as DB errors that are later corrected (so head statediffing continues, but with a hole) // a missed ChainEvent (happens sometimes when debugging), or if the process is terminated when an earlier block @@ -1273,3 +1217,23 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { close(ch) wg.Wait() } + +func (sds *Service) currentPosition() servicePosition { + ret := servicePosition{} + chainBlock := sds.BlockChain.CurrentBlock() + if nil != chainBlock { + ret.chainBlockNumber = chainBlock.Number.Uint64() + } + + indexerBlock, _ := sds.indexer.CurrentBlock() + if nil != indexerBlock { + indexerBlockNumber, err := strconv.ParseUint(indexerBlock.BlockNumber, 10, 64) + if nil == err { + ret.indexerBlockNumber = indexerBlockNumber + } else { + log.Error("Error parsing indexer block number", "block", indexerBlock.BlockNumber) + } + } + + return ret +} -- 2.45.2 From 29cad48db78933a0028478bf8c0eabe26bde838d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 5 Jul 2023 15:17:05 -0500 Subject: [PATCH 3/7] Add comment. --- statediff/service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/statediff/service.go b/statediff/service.go index 5d9a09017..705281421 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -1218,6 +1218,7 @@ func (sds *Service) backfillDetectedGaps(blockGaps []*interfaces.BlockGap) { wg.Wait() } +// currentPosition returns the current block height for both the BlockChain and the statediff indexer. func (sds *Service) currentPosition() servicePosition { ret := servicePosition{} chainBlock := sds.BlockChain.CurrentBlock() -- 2.45.2 From 0fccb7ab13cd674c8a6b7a79976e3b86c9f0b871 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 5 Jul 2023 15:21:49 -0500 Subject: [PATCH 4/7] Tweak flag message. --- cmd/utils/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7cd724994..e4fd04cd3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1127,7 +1127,7 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. } StateDiffBackfillMaxHeadGap = &cli.Uint64Flag{ Name: "statediff.backfillmaxheadgap", - Usage: "The maximum gap between the startup statediff and startup head positions that can be backfilled.", + Usage: "The maximum gap between the current statediff and head positions that can be backfilled.", Value: 7200, } ) -- 2.45.2 From b6a4083ce603ca8d662b1f9975576a3ee1d6927d Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 5 Jul 2023 15:48:18 -0500 Subject: [PATCH 5/7] Add comment. --- statediff/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/statediff/service.go b/statediff/service.go index 705281421..0f8533995 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -468,6 +468,8 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + // TODO: Do we need to track the last streamed block as we do for the WriteLoop so that we can detect + // and plug any gaps in the events? If not, we risk presenting an incomplete record. defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("Loop(): chain event received", "event", chainEvent) // if we don't have any subscribers, do not process a statediff -- 2.45.2 From fd2abe760b471ddcec749772bdd559283e2d0d81 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 5 Jul 2023 16:17:34 -0500 Subject: [PATCH 6/7] Tweak to avoid int/uint overflow issues. --- statediff/service.go | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 0f8533995..50ea0f00d 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -357,25 +357,28 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { lastHeight = initialPos.indexerBlockNumber } nextHeight := chainEvent.Block.Number().Uint64() - distance := nextHeight - lastHeight - if distance < 1 { + if nextHeight > lastHeight { + distance := nextHeight - lastHeight + if distance == 1 { + log.Info("WriteLoop: received expected block", "next height", nextHeight, "last height", lastHeight) + blockFwd <- chainEvent.Block + defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) + } else { + log.Warn("WriteLoop: received unexpected block from the future", "next height", nextHeight, "last height", lastHeight) + if distance <= sds.backfillMaxHeadGap { + for i := lastHeight + 1; i < nextHeight; i++ { + log.Info("WriteLoop: backfilling gap to head", "block", i, "next height", nextHeight, "last height", lastHeight, "gap", distance) + blockFwd <- sds.BlockChain.GetBlockByNumber(i) + } + } else { + log.Warn("WriteLoop: gap to head too large to backfill", "next height", nextHeight, "last height", lastHeight, "gap", distance) + } + blockFwd <- chainEvent.Block + defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) + } + } else { log.Warn("WriteLoop: received unexpected block from the past", "next height", nextHeight, "last height", lastHeight) blockFwd <- chainEvent.Block - } else if distance > 1 { - log.Warn("WriteLoop: received unexpected block from the future", "next height", nextHeight, "last height", lastHeight) - if distance <= sds.backfillMaxHeadGap { - for i := lastHeight + 1; i < nextHeight; i++ { - log.Info("WriteLoop: backfilling gap to head", "block", i, "next height", nextHeight, "last height", lastHeight, "gap", distance) - blockFwd <- sds.BlockChain.GetBlockByNumber(i) - } - } else { - log.Warn("WriteLoop: gap to head too large to backfill", "next height", nextHeight, "last height", lastHeight, "gap", distance) - } - blockFwd <- chainEvent.Block - defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) - } else { - blockFwd <- chainEvent.Block - defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) case err := <-errCh: -- 2.45.2 From 52c98c2ba2004484370c353546330ccc89719256 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 5 Jul 2023 17:57:03 -0500 Subject: [PATCH 7/7] Tweak log messages. --- statediff/service.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 50ea0f00d..9839cd048 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -360,24 +360,24 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { if nextHeight > lastHeight { distance := nextHeight - lastHeight if distance == 1 { - log.Info("WriteLoop: received expected block", "next height", nextHeight, "last height", lastHeight) + log.Info("WriteLoop: received expected block", "block height", nextHeight, "last height", lastHeight) blockFwd <- chainEvent.Block defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } else { - log.Warn("WriteLoop: received unexpected block from the future", "next height", nextHeight, "last height", lastHeight) + log.Warn("WriteLoop: received unexpected block from the future", "block height", nextHeight, "last height", lastHeight) if distance <= sds.backfillMaxHeadGap { for i := lastHeight + 1; i < nextHeight; i++ { - log.Info("WriteLoop: backfilling gap to head", "block", i, "next height", nextHeight, "last height", lastHeight, "gap", distance) + log.Info("WriteLoop: backfilling gap to head", "block", i, "block height", nextHeight, "last height", lastHeight) blockFwd <- sds.BlockChain.GetBlockByNumber(i) } } else { - log.Warn("WriteLoop: gap to head too large to backfill", "next height", nextHeight, "last height", lastHeight, "gap", distance) + log.Warn("WriteLoop: gap to head too large to backfill", "block height", nextHeight, "last height", lastHeight, "gap", distance) } blockFwd <- chainEvent.Block defaultStatediffMetrics.lastEventHeight.Update(int64(nextHeight)) } } else { - log.Warn("WriteLoop: received unexpected block from the past", "next height", nextHeight, "last height", lastHeight) + log.Warn("WriteLoop: received unexpected block from the past", "block height", nextHeight, "last height", lastHeight) blockFwd <- chainEvent.Block } defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) -- 2.45.2