Fix events queue job retry by resetting watcher in job-runner (#470)

* Reset watcher after events job retry in job-runner

* Push next historical job from job-runner instead of event-watcher
This commit is contained in:
Nabarun Gogoi 2023-11-14 18:43:17 +05:30 committed by GitHub
parent 6ce8d4746d
commit 7c4f9fb797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 29 deletions

View File

@ -88,16 +88,17 @@ export class EventWatcher {
}
async startBlockProcessing (): Promise<void> {
// Get latest block in chain and sync status from DB.
const [{ block: latestBlock }, syncStatus] = await Promise.all([
// Wait for events job queue to be empty before starting historical or realtime processing
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
// Get latest block in chain and sync status from DB
// Also get historical-processing queu size
const [{ block: latestBlock }, syncStatus, historicalProcessingQueueSize] = await Promise.all([
this._ethClient.getBlockByHash(),
this._indexer.getSyncStatus(),
// Wait for events job queue to be empty before starting historical or realtime processing
this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING)
this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed')
]);
const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');
// Stop if there are active or pending historical processing jobs
// Might be created on encountering template create in events processing
if (historicalProcessingQueueSize > 0) {
@ -144,7 +145,8 @@ export class EventWatcher {
{
blockNumber: startBlockNumber,
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
}
},
{ priority: 1 }
);
}
@ -241,22 +243,11 @@ export class EventWatcher {
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
// Start next batch of historical processing or realtime processing
this.startBlockProcessing();
return;
}
// Push job for next batch of blocks
await this._jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: nextBatchStartBlockNumber,
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
}
);
}
async eventProcessingCompleteHandler (job: PgBoss.JobWithMetadata<any>): Promise<void> {
const { id, retrycount, data: { request: { data }, failed, state, createdOn } } = job;
const { id, data: { request: { data }, failed, state, createdOn, retryCount } } = job;
if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
@ -274,9 +265,7 @@ export class EventWatcher {
assert(blockProgress);
// Check if job was retried
if (retrycount > 0) {
// Reset watcher to remove any data after this block
await this._indexer.resetWatcherToBlock(blockProgress.blockNumber);
if (retryCount > 0) {
// Start block processing (Try restarting historical processing or continue realtime processing)
this.startBlockProcessing();
}

View File

@ -20,7 +20,7 @@ type JobCompleteCallback = (job: PgBoss.Job | PgBoss.JobWithMetadata) => Promise
const DEFAULT_JOBS_PER_INTERVAL = 5;
// Interval time to check for events queue to be empty
const EMPTY_QUEUE_CHECK_INTERVAL = 5000;
const EMPTY_QUEUE_CHECK_INTERVAL = 1000;
const log = debug('vulcanize:job-queue');

View File

@ -170,8 +170,12 @@ export class JobRunner {
if (this._historicalProcessingCompletedUpto) {
// Check if historical processing start is for a previous block which happens incase of template create
if (startBlock < this._historicalProcessingCompletedUpto) {
// Delete any pending historical processing jobs
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);
await Promise.all([
// Delete any pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING),
// Remove pending events queue jobs
this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING)
]);
// Wait for events queue to be empty
log(`Waiting for events queue to be empty before resetting watcher to block ${startBlock - 1}`);
@ -242,6 +246,17 @@ export class JobRunner {
this._historicalProcessingCompletedUpto = endBlock;
if (endBlock < processingEndBlockNumber) {
// If endBlock is lesser than processingEndBlockNumber push new historical job
await this.jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: endBlock + 1,
processingEndBlockNumber: processingEndBlockNumber
}
);
}
await this.jobQueue.markComplete(
job,
{ isComplete: true, endBlock }
@ -679,9 +694,15 @@ export class JobRunner {
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber);
// If this was a retry attempt, unset the indexing error flag in sync status
if (retryCount > 0) {
await this._indexer.updateSyncStatusIndexingError(false);
await Promise.all([
// If this was a retry attempt, unset the indexing error flag in sync status
this._indexer.updateSyncStatusIndexingError(false),
// Reset watcher after succesfull retry so that block processing starts after this block
this._indexer.resetWatcherToBlock(block.blockNumber)
]);
log(`Watcher reset to block ${block.blockNumber} after succesffully retrying events processing`);
}
} catch (error) {
log(`Error in processing events for block ${block.blockNumber} hash ${block.blockHash}`);
@ -691,8 +712,8 @@ export class JobRunner {
this._indexer.clearProcessedBlockData(block),
// Delete all pending event processing jobs
this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING),
// Delete all pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'active'),
// Delete all active and pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'completed'),
// Set the indexing error flag in sync status
this._indexer.updateSyncStatusIndexingError(true)
]);