Fix switch from historical to realtime processing when template create block exists near head (#465)

* Fix duplicate historical processing jobs created on template create

* Fix switch from historical to realtime processing when template create block is near head
This commit is contained in:
Nabarun Gogoi 2023-11-10 19:13:17 +05:30 committed by GitHub
parent be24166ea7
commit c01c1d07db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 8 deletions

View File

@ -90,6 +90,16 @@ export class EventWatcher {
this._indexer.getSyncStatus() this._indexer.getSyncStatus()
]); ]);
// Wait for events job queue to be empty before starting historical or realtime processing
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');
// Stop if there are active or pending historical processing jobs
// Might be created on encountering template create in events processing
if (historicalProcessingQueueSize > 0) {
return;
}
const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH; const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH;
let startBlockNumber = latestBlock.number; let startBlockNumber = latestBlock.number;
@ -116,9 +126,6 @@ export class EventWatcher {
} }
async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> { async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
// Wait for events job queue to be empty so that historical processing does not move far ahead
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
this._historicalProcessingEndBlockNumber = endBlockNumber; this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
@ -133,14 +140,14 @@ export class EventWatcher {
} }
async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> { async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);
// Check if realtime processing already started and avoid resubscribing to block progress event // Check if realtime processing already started and avoid resubscribing to block progress event
if (this._realtimeProcessingStarted) { if (this._realtimeProcessingStarted) {
return; return;
} }
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);
this._realtimeProcessingStarted = true; this._realtimeProcessingStarted = true;
// Creating an AsyncIterable from AsyncIterator to iterate over the values. // Creating an AsyncIterable from AsyncIterator to iterate over the values.
@ -223,8 +230,9 @@ export class EventWatcher {
// Check if historical processing end block is reached // Check if historical processing end block is reached
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) { if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
// Start realtime processing // Start next batch of historical processing or realtime processing
this.startBlockProcessing(); this.startBlockProcessing();
return; return;
} }

View File

@ -171,11 +171,13 @@ export class JobRunner {
const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job; const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job;
if (this._historicalProcessingCompletedUpto) { if (this._historicalProcessingCompletedUpto) {
// Check if historical processing start is for a previous block which happens incase of template create
if (startBlock < this._historicalProcessingCompletedUpto) { if (startBlock < this._historicalProcessingCompletedUpto) {
// Delete any pending historical processing jobs
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING); await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);
// Wait for events queue to be empty // Wait for events queue to be empty
log(`Waiting for events queue to be empty before restarting watcher to block ${startBlock - 1}`); log(`Waiting for events queue to be empty before resetting watcher to block ${startBlock - 1}`);
await this.jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); await this.jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
// Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto // Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto