2021-08-12 09:58:13 +00:00
|
|
|
//
|
|
|
|
// Copyright 2021 Vulcanize, Inc.
|
|
|
|
//
|
|
|
|
|
2021-06-25 11:05:47 +00:00
|
|
|
import assert from 'assert';
|
|
|
|
import debug from 'debug';
|
|
|
|
import PgBoss from 'pg-boss';
|
|
|
|
|
|
|
|
interface Config {
|
|
|
|
dbConnectionString: string
|
|
|
|
maxCompletionLag: number
|
|
|
|
}
|
|
|
|
|
|
|
|
type JobCallback = (job: any) => Promise<void>;
|
|
|
|
|
|
|
|
const log = debug('vulcanize:job-queue');
|
|
|
|
|
|
|
|
export class JobQueue {
|
|
|
|
_config: Config;
|
|
|
|
_boss: PgBoss;
|
|
|
|
|
|
|
|
constructor (config: Config) {
|
|
|
|
this._config = config;
|
2021-07-20 10:16:35 +00:00
|
|
|
this._boss = new PgBoss({
|
|
|
|
// https://github.com/timgit/pg-boss/blob/master/docs/configuration.md
|
|
|
|
|
|
|
|
connectionString: this._config.dbConnectionString,
|
|
|
|
onComplete: true,
|
|
|
|
|
|
|
|
// Num of retries with backoff
|
|
|
|
retryLimit: 15,
|
|
|
|
retryDelay: 1,
|
|
|
|
retryBackoff: true,
|
|
|
|
|
|
|
|
expireInHours: 24 * 7, // 7 days
|
|
|
|
|
|
|
|
retentionDays: 30, // 30 days
|
|
|
|
|
|
|
|
newJobCheckIntervalSeconds: 1
|
|
|
|
});
|
|
|
|
|
2021-06-25 11:05:47 +00:00
|
|
|
this._boss.on('error', error => log(error));
|
|
|
|
}
|
|
|
|
|
|
|
|
get maxCompletionLag (): number {
|
|
|
|
return this._config.maxCompletionLag;
|
|
|
|
}
|
|
|
|
|
|
|
|
async start (): Promise<void> {
|
|
|
|
await this._boss.start();
|
|
|
|
}
|
|
|
|
|
|
|
|
async subscribe (queue: string, callback: JobCallback): Promise<string> {
|
2021-07-20 10:16:35 +00:00
|
|
|
return await this._boss.subscribe(queue, { teamSize: 1, teamConcurrency: 1 }, async (job: any) => {
|
2021-07-28 12:06:43 +00:00
|
|
|
try {
|
|
|
|
log(`Processing queue ${queue} job ${job.id}...`);
|
|
|
|
await callback(job);
|
|
|
|
} catch (error) {
|
|
|
|
log(`Error in queue ${queue}`);
|
|
|
|
log(error);
|
|
|
|
throw error;
|
|
|
|
}
|
2021-06-25 11:05:47 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
async onComplete (queue: string, callback: JobCallback): Promise<string> {
|
|
|
|
return await this._boss.onComplete(queue, async (job: any) => {
|
2021-06-25 12:26:58 +00:00
|
|
|
const { id, data: { failed, createdOn } } = job;
|
|
|
|
log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`);
|
2021-06-25 11:05:47 +00:00
|
|
|
await callback(job);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
async markComplete (job: any): Promise<void> {
|
|
|
|
this._boss.complete(job.id);
|
|
|
|
}
|
|
|
|
|
2021-07-22 11:02:39 +00:00
|
|
|
async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise<void> {
|
2021-06-25 11:05:47 +00:00
|
|
|
assert(this._boss);
|
|
|
|
|
2021-07-22 11:02:39 +00:00
|
|
|
const jobId = await this._boss.publish(queue, job, options);
|
2021-06-25 11:05:47 +00:00
|
|
|
log(`Created job in queue ${queue}: ${jobId}`);
|
|
|
|
}
|
|
|
|
}
|