mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-28 02:52:08 +00:00
Track last processed event index (#168)
* Implement last processed event in uni-info-watcher. * Check if block or event is already processed. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
3e9aa4d066
commit
f93c0e3cb6
@ -584,6 +584,7 @@ export class Database {
|
|||||||
blockTimestamp,
|
blockTimestamp,
|
||||||
numEvents,
|
numEvents,
|
||||||
numProcessedEvents: 0,
|
numProcessedEvents: 0,
|
||||||
|
lastProcessedEventIndex: -1,
|
||||||
isComplete: (numEvents === 0)
|
isComplete: (numEvents === 0)
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -631,15 +632,21 @@ export class Database {
|
|||||||
return repo.findOne({ where: { blockHash } });
|
return repo.findOne({ where: { blockHash } });
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateBlockProgress (blockHash: string): Promise<void> {
|
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||||
await this._conn.transaction(async (tx) => {
|
await this._conn.transaction(async (tx) => {
|
||||||
const repo = tx.getRepository(BlockProgress);
|
const repo = tx.getRepository(BlockProgress);
|
||||||
const entity = await repo.findOne({ where: { blockHash } });
|
const entity = await repo.findOne({ where: { blockHash } });
|
||||||
if (entity && !entity.isComplete) {
|
if (entity && !entity.isComplete) {
|
||||||
|
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
|
||||||
|
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
entity.lastProcessedEventIndex = lastProcessedEventIndex;
|
||||||
entity.numProcessedEvents++;
|
entity.numProcessedEvents++;
|
||||||
if (entity.numProcessedEvents >= entity.numEvents) {
|
if (entity.numProcessedEvents >= entity.numEvents) {
|
||||||
entity.isComplete = true;
|
entity.isComplete = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
await repo.save(entity);
|
await repo.save(entity);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -26,6 +26,9 @@ export class BlockProgress {
|
|||||||
@Column('integer')
|
@Column('integer')
|
||||||
numProcessedEvents!: number;
|
numProcessedEvents!: number;
|
||||||
|
|
||||||
|
@Column('integer')
|
||||||
|
lastProcessedEventIndex!: number;
|
||||||
|
|
||||||
@Column('boolean')
|
@Column('boolean')
|
||||||
isComplete!: boolean
|
isComplete!: boolean
|
||||||
}
|
}
|
||||||
|
@ -174,7 +174,7 @@ export class EventWatcher {
|
|||||||
const dbEvent = await this._indexer.getEvent(request.data.id);
|
const dbEvent = await this._indexer.getEvent(request.data.id);
|
||||||
assert(dbEvent);
|
assert(dbEvent);
|
||||||
|
|
||||||
await this._indexer.updateBlockProgress(dbEvent.block.blockHash);
|
await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index);
|
||||||
|
|
||||||
log(`Job onComplete event ${request.data.id}`);
|
log(`Job onComplete event ${request.data.id}`);
|
||||||
});
|
});
|
||||||
|
@ -98,6 +98,10 @@ export class Indexer {
|
|||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
|
||||||
|
return this._db.getBlockEvents(blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
async processEvent (dbEvent: Event): Promise<void> {
|
async processEvent (dbEvent: Event): Promise<void> {
|
||||||
const resultEvent = this.getResultEvent(dbEvent);
|
const resultEvent = this.getResultEvent(dbEvent);
|
||||||
|
|
||||||
@ -180,8 +184,8 @@ export class Indexer {
|
|||||||
return this._db.getBlockProgress(blockHash);
|
return this._db.getBlockProgress(blockHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateBlockProgress (blockHash: string): Promise<void> {
|
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||||
return this._db.updateBlockProgress(blockHash);
|
return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _fetchAndSaveEvents (block: Block): Promise<void> {
|
async _fetchAndSaveEvents (block: Block): Promise<void> {
|
||||||
|
@ -13,6 +13,7 @@ import { EthClient } from '@vulcanize/ipld-eth-client';
|
|||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Database } from './database';
|
import { Database } from './database';
|
||||||
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
|
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
|
||||||
|
import { Event } from './entity/Event';
|
||||||
|
|
||||||
const log = debug('vulcanize:job-runner');
|
const log = debug('vulcanize:job-runner');
|
||||||
|
|
||||||
@ -110,12 +111,16 @@ export const main = async (): Promise<any> => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if block is being already processed.
|
||||||
|
const blockProgress = await indexer.getBlockProgress(block.hash);
|
||||||
|
if (!blockProgress) {
|
||||||
const events = await indexer.getOrFetchBlockEvents(block);
|
const events = await indexer.getOrFetchBlockEvents(block);
|
||||||
|
|
||||||
for (let ei = 0; ei < events.length; ei++) {
|
for (let ei = 0; ei < events.length; ei++) {
|
||||||
const { id } = events[ei];
|
const { id } = events[ei];
|
||||||
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id });
|
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id });
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await jobQueue.markComplete(job);
|
await jobQueue.markComplete(job);
|
||||||
});
|
});
|
||||||
@ -127,7 +132,35 @@ export const main = async (): Promise<any> => {
|
|||||||
const dbEvent = await db.getEvent(id);
|
const dbEvent = await db.getEvent(id);
|
||||||
assert(dbEvent);
|
assert(dbEvent);
|
||||||
|
|
||||||
if (!dbEvent.block.isComplete) {
|
const event: Event = dbEvent;
|
||||||
|
|
||||||
|
// Confirm that the parent block has been completely processed.
|
||||||
|
// We don't have to worry about aborting as this job will get retried later.
|
||||||
|
const parent = await indexer.getBlockProgress(event.block.parentHash);
|
||||||
|
if (!parent || !parent.isComplete) {
|
||||||
|
const message = `Abort processing of event ${id} as parent block not processed yet`;
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
const blockProgress = await indexer.getBlockProgress(event.block.blockHash);
|
||||||
|
assert(blockProgress);
|
||||||
|
|
||||||
|
const events = await indexer.getBlockEvents(event.block.blockHash);
|
||||||
|
const eventIndex = events.findIndex(e => e.id === event.id);
|
||||||
|
assert(eventIndex !== -1);
|
||||||
|
|
||||||
|
// Check if previous event in block has been processed exactly before this and abort if not.
|
||||||
|
if (eventIndex > 0) { // Skip the first event in the block.
|
||||||
|
const prevIndex = eventIndex - 1;
|
||||||
|
const prevEvent = events[prevIndex];
|
||||||
|
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
|
||||||
|
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
|
||||||
|
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if event is processed.
|
||||||
|
if (!dbEvent.block.isComplete && event.index !== blockProgress.lastProcessedEventIndex) {
|
||||||
await indexer.processEvent(dbEvent);
|
await indexer.processEvent(dbEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,8 +48,14 @@ export class JobQueue {
|
|||||||
|
|
||||||
async subscribe (queue: string, callback: JobCallback): Promise<string> {
|
async subscribe (queue: string, callback: JobCallback): Promise<string> {
|
||||||
return await this._boss.subscribe(queue, { teamSize: 1, teamConcurrency: 1 }, async (job: any) => {
|
return await this._boss.subscribe(queue, { teamSize: 1, teamConcurrency: 1 }, async (job: any) => {
|
||||||
|
try {
|
||||||
log(`Processing queue ${queue} job ${job.id}...`);
|
log(`Processing queue ${queue} job ${job.id}...`);
|
||||||
await callback(job);
|
await callback(job);
|
||||||
|
} catch (error) {
|
||||||
|
log(`Error in queue ${queue}`);
|
||||||
|
log(error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user