Update pgboss queue params. (#154)

This commit is contained in:
Ashwin Phatak 2021-07-20 15:46:35 +05:30 committed by GitHub
parent c1ef96ba2e
commit 10e7d37fa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 2 deletions

View File

@ -39,6 +39,7 @@ query allEthHeaderCids($blockNumber: BigInt) {
cid cid
blockNumber blockNumber
blockHash blockHash
parentHash
timestamp timestamp
ethTransactionCidsByHeaderId { ethTransactionCidsByHeaderId {
nodes { nodes {
@ -67,6 +68,7 @@ subscription SubscriptionReceipt {
ethHeaderCidByHeaderId { ethHeaderCidByHeaderId {
blockHash blockHash
blockNumber blockNumber
parentHash
} }
} }
} }
@ -82,6 +84,7 @@ subscription {
... on EthHeaderCid { ... on EthHeaderCid {
blockHash blockHash
blockNumber blockNumber
parentHash
} }
} }
} }
@ -97,6 +100,7 @@ subscription SubscriptionHeader {
ethHeaderCidByHeaderId { ethHeaderCidByHeaderId {
blockHash blockHash
blockNumber blockNumber
parentHash
} }
} }
} }

View File

@ -17,7 +17,24 @@ export class JobQueue {
constructor (config: Config) { constructor (config: Config) {
this._config = config; this._config = config;
this._boss = new PgBoss({ connectionString: this._config.dbConnectionString, onComplete: true }); this._boss = new PgBoss({
// https://github.com/timgit/pg-boss/blob/master/docs/configuration.md
connectionString: this._config.dbConnectionString,
onComplete: true,
// Num of retries with backoff
retryLimit: 15,
retryDelay: 1,
retryBackoff: true,
expireInHours: 24 * 7, // 7 days
retentionDays: 30, // 30 days
newJobCheckIntervalSeconds: 1
});
this._boss.on('error', error => log(error)); this._boss.on('error', error => log(error));
} }
@ -30,7 +47,7 @@ export class JobQueue {
} }
async subscribe (queue: string, callback: JobCallback): Promise<string> { async subscribe (queue: string, callback: JobCallback): Promise<string> {
return await this._boss.subscribe(queue, async (job: any) => { return await this._boss.subscribe(queue, { teamSize: 1, teamConcurrency: 1 }, async (job: any) => {
log(`Processing queue ${queue} job ${job.id}...`); log(`Processing queue ${queue} job ${job.id}...`);
await callback(job); await callback(job);
}); });