Fix events out of order in job-queue (#300)

This commit is contained in:
nikugogoi 2021-12-06 17:17:55 +05:30 committed by GitHub
parent cda55646d2
commit ec586216db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -53,16 +53,28 @@ export class JobQueue {
}
async subscribe (queue: string, callback: JobCallback): Promise<string> {
return await this._boss.subscribe(queue, { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, async (job: any) => {
try {
log(`Processing queue ${queue} job ${job.id}...`);
await callback(job);
} catch (error) {
log(`Error in queue ${queue} job ${job.id}`);
log(error);
throw error;
return await this._boss.subscribe(
queue,
{
includeMetadata: true,
batchSize: JOBS_PER_INTERVAL
},
async (jobs: any) => {
// TODO: Debug jobs not fetched in order from database and use teamSize instead of batchSize.
jobs = jobs.sort((a: any, b: any) => a.createdon - b.createdon);
for (const job of jobs) {
try {
log(`Processing queue ${queue} job ${job.id}...`);
await callback(job);
} catch (error) {
log(`Error in queue ${queue} job ${job.id}`);
log(error);
throw error;
}
}
}
});
);
}
async onComplete (queue: string, callback: JobCallback): Promise<string> {
@ -81,7 +93,7 @@ export class JobQueue {
assert(this._boss);
const jobId = await this._boss.publish(queue, job, options);
log(`Created job in queue ${queue}: ${jobId}`);
log(`Created job in queue ${queue}: ${jobId} data: ${job.id}`);
}
async deleteAllJobs (): Promise<void> {