From c01c1d07dbaae0f273bb0ec07efbbd3e4cb958b4 Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Fri, 10 Nov 2023 19:13:17 +0530 Subject: [PATCH] Fix switch from historical to realtime processing when template create block exists near head (#465) * Fix duplicate historical processing jobs created on template create * Fix switch from historical to realtime processing when template create block is near head --- packages/util/src/events.ts | 22 +++++++++++++++------- packages/util/src/job-runner.ts | 4 +++- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 5970c87c..6ad77193 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -90,6 +90,16 @@ export class EventWatcher { this._indexer.getSyncStatus() ]); + // Wait for events job queue to be empty before starting historical or realtime processing + await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); + const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed'); + + // Stop if there are active or pending historical processing jobs + // Might be created on encountering template create in events processing + if (historicalProcessingQueueSize > 0) { + return; + } + const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH; let startBlockNumber = latestBlock.number; @@ -116,9 +126,6 @@ export class EventWatcher { } async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise { - // Wait for events job queue to be empty so that historical processing does not move far ahead - await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); - this._historicalProcessingEndBlockNumber = endBlockNumber; log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); @@ -133,14 +140,14 @@ export class EventWatcher { } async startRealtimeBlockProcessing (startBlockNumber: number): Promise { - log(`Starting realtime block processing from block ${startBlockNumber}`); - await processBlockByNumber(this._jobQueue, startBlockNumber); - // Check if realtime processing already started and avoid resubscribing to block progress event if (this._realtimeProcessingStarted) { return; } + log(`Starting realtime block processing from block ${startBlockNumber}`); + await processBlockByNumber(this._jobQueue, startBlockNumber); + this._realtimeProcessingStarted = true; // Creating an AsyncIterable from AsyncIterator to iterate over the values. @@ -223,8 +230,9 @@ export class EventWatcher { // Check if historical processing end block is reached if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) { - // Start realtime processing + // Start next batch of historical processing or realtime processing this.startBlockProcessing(); + return; } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index c0867aad..6f2e72c9 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -171,11 +171,13 @@ export class JobRunner { const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job; if (this._historicalProcessingCompletedUpto) { + // Check if historical processing start is for a previous block which happens incase of template create if (startBlock < this._historicalProcessingCompletedUpto) { + // Delete any pending historical processing jobs await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING); // Wait for events queue to be empty - log(`Waiting for events queue to be empty before restarting watcher to block ${startBlock - 1}`); + log(`Waiting for events queue to be empty before resetting watcher to block ${startBlock - 1}`); await this.jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); // Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto