mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-04-20 21:51:14 +00:00
Fix uni-watcher to handle mainnet data (#293)
* Fix watchers to handle mainnet data * Tweak jobs fetched per interval to reduce event processing time
This commit is contained in:
parent
08c712d766
commit
32fea1f2cb
@ -61,6 +61,7 @@ export class JobRunner {
|
|||||||
await this._indexer.processEvent(event);
|
await this._indexer.processEvent(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,7 @@ export class JobRunner {
|
|||||||
await this._indexer.processEvent(event);
|
await this._indexer.processEvent(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -63,6 +63,7 @@ export class JobRunner {
|
|||||||
await this._indexer.processEvent(event);
|
await this._indexer.processEvent(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,7 @@ export class JobRunner {
|
|||||||
await this._indexer.processEvent(dbEvent);
|
await this._indexer.processEvent(dbEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
|
||||||
await this._jobQueue.markComplete(job);
|
await this._jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,6 @@ export class EventWatcher {
|
|||||||
const dbEvent = await this._indexer.getEvent(request.data.id);
|
const dbEvent = await this._indexer.getEvent(request.data.id);
|
||||||
assert(dbEvent);
|
assert(dbEvent);
|
||||||
|
|
||||||
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
|
|
||||||
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
|
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
|
||||||
|
|
||||||
if (blockProgress) {
|
if (blockProgress) {
|
||||||
|
@ -253,7 +253,7 @@ export class Indexer {
|
|||||||
let res;
|
let res;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
res = this._db.saveEventEntity(dbTx, dbEvent);
|
res = await this._db.saveEventEntity(dbTx, dbEvent);
|
||||||
await dbTx.commitTransaction();
|
await dbTx.commitTransaction();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await dbTx.rollbackTransaction();
|
await dbTx.rollbackTransaction();
|
||||||
|
@ -13,6 +13,8 @@ interface Config {
|
|||||||
|
|
||||||
type JobCallback = (job: any) => Promise<void>;
|
type JobCallback = (job: any) => Promise<void>;
|
||||||
|
|
||||||
|
const JOBS_PER_INTERVAL = 5;
|
||||||
|
|
||||||
const log = debug('vulcanize:job-queue');
|
const log = debug('vulcanize:job-queue');
|
||||||
|
|
||||||
export class JobQueue {
|
export class JobQueue {
|
||||||
@ -36,7 +38,7 @@ export class JobQueue {
|
|||||||
|
|
||||||
retentionDays: 30, // 30 days
|
retentionDays: 30, // 30 days
|
||||||
|
|
||||||
newJobCheckIntervalSeconds: 1
|
newJobCheckInterval: 100
|
||||||
});
|
});
|
||||||
|
|
||||||
this._boss.on('error', error => log(error));
|
this._boss.on('error', error => log(error));
|
||||||
@ -51,12 +53,12 @@ export class JobQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async subscribe (queue: string, callback: JobCallback): Promise<string> {
|
async subscribe (queue: string, callback: JobCallback): Promise<string> {
|
||||||
return await this._boss.subscribe(queue, { teamSize: 1, teamConcurrency: 1 }, async (job: any) => {
|
return await this._boss.subscribe(queue, { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, async (job: any) => {
|
||||||
try {
|
try {
|
||||||
log(`Processing queue ${queue} job ${job.id}...`);
|
log(`Processing queue ${queue} job ${job.id}...`);
|
||||||
await callback(job);
|
await callback(job);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
log(`Error in queue ${queue}`);
|
log(`Error in queue ${queue} job ${job.id}`);
|
||||||
log(error);
|
log(error);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
@ -64,7 +66,7 @@ export class JobQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async onComplete (queue: string, callback: JobCallback): Promise<string> {
|
async onComplete (queue: string, callback: JobCallback): Promise<string> {
|
||||||
return await this._boss.onComplete(queue, async (job: any) => {
|
return await this._boss.onComplete(queue, { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, async (job: any) => {
|
||||||
const { id, data: { failed, createdOn } } = job;
|
const { id, data: { failed, createdOn } } = job;
|
||||||
log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`);
|
log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`);
|
||||||
await callback(job);
|
await callback(job);
|
||||||
|
@ -56,14 +56,6 @@ export class JobRunner {
|
|||||||
|
|
||||||
const event = dbEvent;
|
const event = dbEvent;
|
||||||
|
|
||||||
// Confirm that the parent block has been completely processed.
|
|
||||||
// We don't have to worry about aborting as this job will get retried later.
|
|
||||||
const parent = await this._indexer.getBlockProgress(event.block.parentHash);
|
|
||||||
if (!parent || !parent.isComplete) {
|
|
||||||
const message = `Abort processing of event ${id} as parent block not processed yet`;
|
|
||||||
throw new Error(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash);
|
const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash);
|
||||||
assert(blockProgress);
|
assert(blockProgress);
|
||||||
|
|
||||||
@ -159,7 +151,7 @@ export class JobRunner {
|
|||||||
throw new Error(message);
|
throw new Error(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
if (!parent.isComplete) {
|
||||||
// Parent block indexing needs to finish before this block can be indexed.
|
// Parent block indexing needs to finish before this block can be indexed.
|
||||||
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
|
||||||
log(message);
|
log(message);
|
||||||
|
Loading…
Reference in New Issue
Block a user