diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index fbba9245..5916d7f0 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -88,16 +88,17 @@ export class EventWatcher { } async startBlockProcessing (): Promise { - // Get latest block in chain and sync status from DB. - const [{ block: latestBlock }, syncStatus] = await Promise.all([ + // Wait for events job queue to be empty before starting historical or realtime processing + await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); + + // Get latest block in chain and sync status from DB + // Also get historical-processing queu size + const [{ block: latestBlock }, syncStatus, historicalProcessingQueueSize] = await Promise.all([ this._ethClient.getBlockByHash(), this._indexer.getSyncStatus(), - // Wait for events job queue to be empty before starting historical or realtime processing - this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING) + this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed') ]); - const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed'); - // Stop if there are active or pending historical processing jobs // Might be created on encountering template create in events processing if (historicalProcessingQueueSize > 0) { @@ -144,7 +145,8 @@ export class EventWatcher { { blockNumber: startBlockNumber, processingEndBlockNumber: this._historicalProcessingEndBlockNumber - } + }, + { priority: 1 } ); } @@ -241,22 +243,11 @@ export class EventWatcher { if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) { // Start next batch of historical processing or realtime processing this.startBlockProcessing(); - - return; } - - // Push job for next batch of blocks - await this._jobQueue.pushJob( - QUEUE_HISTORICAL_PROCESSING, - { - blockNumber: nextBatchStartBlockNumber, - processingEndBlockNumber: this._historicalProcessingEndBlockNumber - } - ); } async eventProcessingCompleteHandler (job: PgBoss.JobWithMetadata): Promise { - const { id, retrycount, data: { request: { data }, failed, state, createdOn } } = job; + const { id, data: { request: { data }, failed, state, createdOn, retryCount } } = job; if (failed) { log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`); @@ -274,9 +265,7 @@ export class EventWatcher { assert(blockProgress); // Check if job was retried - if (retrycount > 0) { - // Reset watcher to remove any data after this block - await this._indexer.resetWatcherToBlock(blockProgress.blockNumber); + if (retryCount > 0) { // Start block processing (Try restarting historical processing or continue realtime processing) this.startBlockProcessing(); } diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 8374a996..49fdffa8 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -20,7 +20,7 @@ type JobCompleteCallback = (job: PgBoss.Job | PgBoss.JobWithMetadata) => Promise const DEFAULT_JOBS_PER_INTERVAL = 5; // Interval time to check for events queue to be empty -const EMPTY_QUEUE_CHECK_INTERVAL = 5000; +const EMPTY_QUEUE_CHECK_INTERVAL = 1000; const log = debug('vulcanize:job-queue'); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index bf192503..25036251 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -170,8 +170,12 @@ export class JobRunner { if (this._historicalProcessingCompletedUpto) { // Check if historical processing start is for a previous block which happens incase of template create if (startBlock < this._historicalProcessingCompletedUpto) { - // Delete any pending historical processing jobs - await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING); + await Promise.all([ + // Delete any pending historical processing jobs + this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING), + // Remove pending events queue jobs + this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING) + ]); // Wait for events queue to be empty log(`Waiting for events queue to be empty before resetting watcher to block ${startBlock - 1}`); @@ -242,6 +246,17 @@ export class JobRunner { this._historicalProcessingCompletedUpto = endBlock; + if (endBlock < processingEndBlockNumber) { + // If endBlock is lesser than processingEndBlockNumber push new historical job + await this.jobQueue.pushJob( + QUEUE_HISTORICAL_PROCESSING, + { + blockNumber: endBlock + 1, + processingEndBlockNumber: processingEndBlockNumber + } + ); + } + await this.jobQueue.markComplete( job, { isComplete: true, endBlock } @@ -679,9 +694,15 @@ export class JobRunner { this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber); - // If this was a retry attempt, unset the indexing error flag in sync status if (retryCount > 0) { - await this._indexer.updateSyncStatusIndexingError(false); + await Promise.all([ + // If this was a retry attempt, unset the indexing error flag in sync status + this._indexer.updateSyncStatusIndexingError(false), + // Reset watcher after succesfull retry so that block processing starts after this block + this._indexer.resetWatcherToBlock(block.blockNumber) + ]); + + log(`Watcher reset to block ${block.blockNumber} after succesffully retrying events processing`); } } catch (error) { log(`Error in processing events for block ${block.blockNumber} hash ${block.blockHash}`); @@ -691,8 +712,8 @@ export class JobRunner { this._indexer.clearProcessedBlockData(block), // Delete all pending event processing jobs this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING), - // Delete all pending historical processing jobs - this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'active'), + // Delete all active and pending historical processing jobs + this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'completed'), // Set the indexing error flag in sync status this._indexer.updateSyncStatusIndexingError(true) ]);