diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index 03b00d53..ffdc62e2 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -84,16 +84,22 @@ export class EventWatcher { return; } - const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); - + const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job); const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); - if (!failed && state === 'completed' && request.data.publish) { - // Check for max acceptable lag time between request and sending results to live subscribers. - if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); - } else { - log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + + // Cannot publish individual event as they are processed together in a single job. + // TODO: Use a different pubsub to publish event from job-runner. + // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + for (const dbEvent of dbEvents) { + log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`); + + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + } } } }); diff --git a/packages/erc20-watcher/environments/local.toml b/packages/erc20-watcher/environments/local.toml index 4c4edf18..7eef4cab 100644 --- a/packages/erc20-watcher/environments/local.toml +++ b/packages/erc20-watcher/environments/local.toml @@ -30,3 +30,4 @@ dbConnectionString = "postgres://postgres:postgres@localhost/erc20-watcher-job-queue" maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 + eventsInBatch = 50 diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index 0d7a5319..392c0656 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -101,10 +101,10 @@ export class Database { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string, where: FindConditions): Promise { + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash, where); + return this._baseDatabase.getBlockEvents(repo, blockHash, options); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { @@ -167,7 +167,7 @@ export class Database { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); diff --git a/packages/erc20-watcher/src/events.ts b/packages/erc20-watcher/src/events.ts index 1ef4584e..ecd0aa6a 100644 --- a/packages/erc20-watcher/src/events.ts +++ b/packages/erc20-watcher/src/events.ts @@ -84,16 +84,22 @@ export class EventWatcher { return; } - const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); - + const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job); const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); - if (!failed && state === 'completed' && request.data.publish) { - // Check for max acceptable lag time between request and sending results to live subscribers. - if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); - } else { - log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + + // Cannot publish individual event as they are processed together in a single job. + // TODO: Use a different pubsub to publish event from job-runner. + // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + for (const dbEvent of dbEvents) { + log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`); + + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + } } } }); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 67ef4a0c..8c18c726 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -5,7 +5,7 @@ import assert from 'assert'; import debug from 'debug'; import { JsonFragment } from '@ethersproject/abi'; -import { DeepPartial } from 'typeorm'; +import { DeepPartial, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import { BaseProvider } from '@ethersproject/providers'; @@ -352,8 +352,8 @@ export class Indexer { return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); } - async getBlockEvents (blockHash: string): Promise> { - return this._baseIndexer.getBlockEvents(blockHash); + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise> { + return this._baseIndexer.getBlockEvents(blockHash, options); } async removeUnknownEvents (block: BlockProgress): Promise { @@ -364,7 +364,7 @@ export class Indexer { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 5d894c80..f8ff54ed 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -47,26 +47,12 @@ export class JobRunner { async subscribeBlockProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._baseJobRunner.processBlock(job); - - await this._jobQueue.markComplete(job); }); } async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - const event = await this._baseJobRunner.processEvent(job); - - if (!event) { - return; - } - - const watchedContract = await this._indexer.isWatchedContract(event.contract); - if (watchedContract) { - await this._indexer.processEvent(event); - } - - await this._indexer.updateBlockProgress(event.block, event.index); - await this._jobQueue.markComplete(job); + await this._baseJobRunner.processEvent(job); }); } } diff --git a/packages/uni-info-watcher/environments/local.toml b/packages/uni-info-watcher/environments/local.toml index 117bc866..aafb06bb 100644 --- a/packages/uni-info-watcher/environments/local.toml +++ b/packages/uni-info-watcher/environments/local.toml @@ -39,3 +39,4 @@ dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue" maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 1000 + eventsInBatch = 50 diff --git a/packages/uni-info-watcher/environments/test.toml b/packages/uni-info-watcher/environments/test.toml index dcbcb3f9..c1fed6e8 100644 --- a/packages/uni-info-watcher/environments/test.toml +++ b/packages/uni-info-watcher/environments/test.toml @@ -39,3 +39,4 @@ dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue" maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 1000 + eventsInBatch = 50 diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 6bfa3598..35c93f51 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -603,10 +603,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string, where: FindConditions): Promise { + async getBlockEvents (blockHash: string, options?: FindManyOptions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash, where); + return this._baseDatabase.getBlockEvents(repo, blockHash, options); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { @@ -663,7 +663,7 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index c64771e3..70fe8e3c 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { DeepPartial, QueryRunner } from 'typeorm'; +import { DeepPartial, FindManyOptions, QueryRunner } from 'typeorm'; import JSONbig from 'json-bigint'; import { providers, utils, BigNumber } from 'ethers'; @@ -197,6 +197,10 @@ export class Indexer implements IndexerInterface { }; } + async saveEventEntity (dbEvent: Event): Promise { + return this._baseIndexer.saveEventEntity(dbEvent); + } + async markBlocksAsPruned (blocks: BlockProgress[]): Promise { return this._baseIndexer.markBlocksAsPruned(blocks); } @@ -306,8 +310,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); } - async getBlockEvents (blockHash: string): Promise> { - return this._baseIndexer.getBlockEvents(blockHash); + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise> { + return this._baseIndexer.getBlockEvents(blockHash, options); } async removeUnknownEvents (block: BlockProgress): Promise { @@ -346,7 +350,7 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index ca99bac0..02754f84 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -49,26 +49,12 @@ export class JobRunner { async subscribeBlockProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._baseJobRunner.processBlock(job); - - await this._jobQueue.markComplete(job); }); } async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - const event = await this._baseJobRunner.processEvent(job); - - if (!event) { - return; - } - - // Check if event is processed. - if (!event.block.isComplete && event.index !== event.block.lastProcessedEventIndex) { - await this._indexer.processEvent(event); - } - - await this._indexer.updateBlockProgress(event.block, event.index); - await this._jobQueue.markComplete(job); + await this._baseJobRunner.processEvent(job); }); } } diff --git a/packages/uni-info-watcher/src/smoke.test.ts b/packages/uni-info-watcher/src/smoke.test.ts index dc02e81e..51979c47 100644 --- a/packages/uni-info-watcher/src/smoke.test.ts +++ b/packages/uni-info-watcher/src/smoke.test.ts @@ -768,20 +768,16 @@ describe('uni-info-watcher', () => { fee }); - eventType = 'MintEvent'; - await Promise.all([ - transaction, - watchEvent(uniClient, eventType) + [eventValue] = await Promise.all([ + // Wait for TransferEvent and get eventValue. + watchEvent(uniClient, 'TransferEvent'), + // Wait for MintEvent. + watchEvent(uniClient, 'MintEvent'), + // Wait for IncreaseLiquidityEvent. + watchEvent(uniClient, 'IncreaseLiquidityEvent'), + transaction ]); - // Wait for TransferEvent. - eventType = 'TransferEvent'; - eventValue = await watchEvent(uniClient, eventType); - - // Wait for IncreaseLiquidityEvent. - eventType = 'IncreaseLiquidityEvent'; - await watchEvent(uniClient, eventType); - // Sleeping for 15 sec for the events to be processed. await wait(15000); }); @@ -831,7 +827,6 @@ describe('uni-info-watcher', () => { let oldPosition: any; let eventValue: any; - let eventType: string; const tokenId = 1; const amount0Desired = 15; @@ -856,16 +851,14 @@ describe('uni-info-watcher', () => { deadline }); - eventType = 'MintEvent'; - await Promise.all([ - transaction, - watchEvent(uniClient, eventType) + [eventValue] = await Promise.all([ + // Wait for IncreaseLiquidityEvent and get eventValue. + watchEvent(uniClient, 'IncreaseLiquidityEvent'), + // Wait for MintEvent. + watchEvent(uniClient, 'MintEvent'), + transaction ]); - // Wait for IncreaseLiquidityEvent. - eventType = 'IncreaseLiquidityEvent'; - eventValue = await watchEvent(uniClient, eventType); - // Sleeping for 15 sec for the events to be processed. await wait(15000); }); @@ -906,7 +899,6 @@ describe('uni-info-watcher', () => { let oldPosition: any; let eventValue: any; - let eventType: string; const tokenId = 1; const liquidity = 5; @@ -929,16 +921,14 @@ describe('uni-info-watcher', () => { deadline }); - eventType = 'BurnEvent'; - await Promise.all([ - transaction, - watchEvent(uniClient, eventType) + [eventValue] = await Promise.all([ + // Wait for DecreaseLiquidityEvent and get eventValue. + watchEvent(uniClient, 'DecreaseLiquidityEvent'), + // Wait for BurnEvent + watchEvent(uniClient, 'BurnEvent'), + transaction ]); - // Wait for DecreaseLiquidityEvent. - eventType = 'DecreaseLiquidityEvent'; - eventValue = await watchEvent(uniClient, eventType); - // Sleeping for 15 sec for the events to be processed. await wait(15000); }); @@ -978,8 +968,6 @@ describe('uni-info-watcher', () => { // Checked entities: Transaction. // Unchecked entities: Position. - let eventType: string; - const tokenId = 1; const amount0Max = 15; const amount1Max = 15; @@ -993,16 +981,14 @@ describe('uni-info-watcher', () => { amount1Max }); - eventType = 'BurnEvent'; await Promise.all([ transaction, - watchEvent(uniClient, eventType) + // Wait for BurnEvent. + watchEvent(uniClient, 'BurnEvent'), + // Wait for CollectEvent. + watchEvent(uniClient, 'CollectEvent') ]); - // Wait for CollectEvent. - eventType = 'CollectEvent'; - await watchEvent(uniClient, eventType); - // Sleeping for 10 sec for the events to be processed. await wait(10000); }); diff --git a/packages/uni-watcher/environments/local.toml b/packages/uni-watcher/environments/local.toml index 3cee3971..ce252d61 100644 --- a/packages/uni-watcher/environments/local.toml +++ b/packages/uni-watcher/environments/local.toml @@ -28,3 +28,4 @@ dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue" maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 + eventsInBatch = 50 diff --git a/packages/uni-watcher/environments/test.toml b/packages/uni-watcher/environments/test.toml index 43b96e52..6bd041f3 100644 --- a/packages/uni-watcher/environments/test.toml +++ b/packages/uni-watcher/environments/test.toml @@ -28,3 +28,4 @@ dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue" maxCompletionLagInSecs = 300 jobDelayInMilliSecs = 100 + eventsInBatch = 50 diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index 27d8b57b..fa20d03c 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -78,10 +78,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string, where: FindConditions): Promise { + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash, where); + return this._baseDatabase.getBlockEvents(repo, blockHash, options); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { @@ -138,7 +138,7 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index e1dd517c..5d4ddfa1 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -81,16 +81,22 @@ export class EventWatcher implements EventWatcherInterface { return; } - const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); - + const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job); const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); - if (!failed && state === 'completed' && request.data.publish) { - // Check for max acceptable lag time between request and sending results to live subscribers. - if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds); - } else { - log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + + // Cannot publish individual event as they are processed together in a single job. + // TODO: Use a different pubsub to publish event from job-runner. + // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + for (const dbEvent of dbEvents) { + log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`); + + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + } } } }); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 81d08e61..b9778a1c 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -3,7 +3,7 @@ // import debug from 'debug'; -import { DeepPartial, QueryRunner } from 'typeorm'; +import { DeepPartial, FindManyOptions, QueryRunner } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import assert from 'assert'; @@ -372,8 +372,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); } - async getBlockEvents (blockHash: string): Promise> { - return this._baseIndexer.getBlockEvents(blockHash); + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise> { + return this._baseIndexer.getBlockEvents(blockHash, options); } async removeUnknownEvents (block: BlockProgress): Promise { @@ -416,7 +416,7 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } @@ -426,19 +426,24 @@ export class Indexer implements IndexerInterface { async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { assert(blockHash); - let { block, logs } = await this._ethClient.getLogs({ blockHash }); - const { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions + const logsPromise = this._ethClient.getLogs({ blockHash }); + const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash }); + + let [ + { block, logs }, + { + allEthHeaderCids: { + nodes: [ + { + ethTransactionCidsByHeaderId: { + nodes: transactions + } } - } - ] + ] + } } - } = await this._postgraphileClient.getBlockWithTransactions({ blockHash }); + ] = await Promise.all([logsPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 8485f36e..8729f785 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -23,7 +23,6 @@ import { import { Indexer } from './indexer'; import { Database } from './database'; -import { UNKNOWN_EVENT_NAME } from './entity/Event'; const log = debug('vulcanize:job-runner'); @@ -48,40 +47,12 @@ export class JobRunner { async subscribeBlockProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._baseJobRunner.processBlock(job); - - await this._jobQueue.markComplete(job); }); } async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - // TODO: Support two kind of jobs on the event processing queue. - // 1) processEvent => Current single event - // 2) processEvents => Event range (multiple events) - let event = await this._baseJobRunner.processEvent(job); - - if (!event) { - return; - } - - const watchedContract = await this._indexer.isWatchedContract(event.contract); - - if (watchedContract) { - // We might not have parsed this event yet. This can happen if the contract was added - // as a result of a previous event in the same block. - if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSON.parse(event.extraInfo); - const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - event.eventName = eventName; - event.eventInfo = JSON.stringify(eventInfo); - event = await this._indexer.saveEventEntity(event); - } - - await this._indexer.processEvent(event); - } - - await this._indexer.updateBlockProgress(event.block, event.index); - await this._jobQueue.markComplete(job); + await this._baseJobRunner.processEvent(job); }); } } diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 25d075f2..add61138 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -21,6 +21,7 @@ export interface JobQueueConfig { dbConnectionString: string; maxCompletionLagInSecs: number; jobDelayInMilliSecs?: number; + eventsInBatch: number; } interface ServerConfig { diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index 7595da90..6b3afa52 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -11,8 +11,8 @@ export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; export const JOB_KIND_INDEX = 'index'; export const JOB_KIND_PRUNE = 'prune'; +export const JOB_KIND_EVENTS = 'events'; export const JOB_KIND_CONTRACT = 'contract'; -export const JOB_KIND_EVENT = 'event'; export const DEFAULT_CONFIG_PATH = 'environments/local.toml'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index fece54f9..4e7b770f 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -153,7 +153,7 @@ export class Database { .getMany(); } - async updateBlockProgress (repo: Repository, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (repo: Repository, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { if (!block.isComplete) { if (lastProcessedEventIndex <= block.lastProcessedEventIndex) { throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); @@ -165,9 +165,18 @@ export class Database { block.isComplete = true; } - const { id, ...blockData } = block; - await repo.update(id, blockData); + const { generatedMaps } = await repo.createQueryBuilder() + .update(block) + .set(block) + .where('id = :id', { id: block.id }) + .whereEntity(block) + .returning('*') + .execute(); + + block = generatedMaps[0] as BlockProgressInterface; } + + return block; } async markBlocksAsPruned (repo: Repository, blocks: BlockProgressInterface[]): Promise { @@ -180,19 +189,26 @@ export class Database { return repo.findOne(id, { relations: ['block'] }); } - async getBlockEvents (repo: Repository, blockHash: string, where: FindConditions = {}): Promise { - where.block = { - ...where.block, - blockHash + async getBlockEvents (repo: Repository, blockHash: string, options: FindManyOptions = {}): Promise { + if (!Array.isArray(options.where)) { + options.where = [options.where || {}]; + } + + options.where.forEach((where: FindConditions = {}) => { + where.block = { + ...where.block, + blockHash + }; + }); + + options.relations = ['block']; + + options.order = { + ...options.order, + id: 'ASC' }; - return repo.find({ - where, - relations: ['block'], - order: { - id: 'ASC' - } - }); + return repo.find(options); } async saveEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 05cf04f5..f7500ece 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -5,12 +5,13 @@ import assert from 'assert'; import debug from 'debug'; import { PubSub } from 'apollo-server-express'; +import { Not } from 'typeorm'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; -import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX } from './constants'; +import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; import { createPruningJob, processBlockByNumber } from './common'; import { UpstreamConfig } from './config'; @@ -99,23 +100,30 @@ export class EventWatcher { } } - async eventProcessingCompleteHandler (job: any): Promise { - const { data: { request } } = job; + async eventProcessingCompleteHandler (job: any): Promise { + const { data: { request: { data: { blockHash } } } } = job; + assert(blockHash); - const dbEvent = await this._indexer.getEvent(request.data.id); - assert(dbEvent); + const blockProgress = await this._indexer.getBlockProgress(blockHash); + assert(blockProgress); - const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash); + await this.publishBlockProgressToSubscribers(blockProgress); - if (blockProgress) { - await this.publishBlockProgressToSubscribers(blockProgress); - - if (blockProgress.isComplete) { - await this._indexer.removeUnknownEvents(blockProgress); - } + if (blockProgress.isComplete) { + await this._indexer.removeUnknownEvents(blockProgress); } - return dbEvent; + return this._indexer.getBlockEvents( + blockProgress.blockHash, + { + where: { + eventName: Not(UNKNOWN_EVENT_NAME) + }, + order: { + index: 'ASC' + } + } + ); } async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index c0e875dd..cf9fb714 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { DeepPartial, FindConditions, Not } from 'typeorm'; +import { DeepPartial, FindConditions, FindManyOptions, Not } from 'typeorm'; import debug from 'debug'; import { ethers } from 'ethers'; @@ -169,21 +169,20 @@ export class Indexer { } } - async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { const dbTx = await this._db.createTransactionRunner(); - let res; try { - res = await this._db.updateBlockProgress(dbTx, block, lastProcessedEventIndex); + const updatedBlock = await this._db.updateBlockProgress(dbTx, block, lastProcessedEventIndex); await dbTx.commitTransaction(); + + return updatedBlock; } catch (error) { await dbTx.rollbackTransaction(); throw error; } finally { await dbTx.release(); } - - return res; } async getEvent (id: string): Promise { @@ -205,8 +204,8 @@ export class Indexer { return events; } - async getBlockEvents (blockHash: string): Promise> { - return this._db.getBlockEvents(blockHash); + async getBlockEvents (blockHash: string, options: FindManyOptions = {}): Promise> { + return this._db.getBlockEvents(blockHash, options); } async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { @@ -229,7 +228,7 @@ export class Indexer { where.eventName = name; } - const events = await this._db.getBlockEvents(blockHash, where); + const events = await this._db.getBlockEvents(blockHash, { where }); log(`getEvents: db hit, num events: ${events.length}`); return events; diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index a6cda25d..b39aeddd 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -60,22 +60,17 @@ export class JobQueue { return await this._boss.subscribe( queue, { - includeMetadata: true, - batchSize: JOBS_PER_INTERVAL + teamSize: JOBS_PER_INTERVAL, + teamConcurrency: 1 }, - async (jobs: any) => { - // TODO: Debug jobs not fetched in order from database and use teamSize instead of batchSize. - jobs = jobs.sort((a: any, b: any) => a.createdon - b.createdon); - - for (const job of jobs) { - try { - log(`Processing queue ${queue} job ${job.id}...`); - await callback(job); - } catch (error) { - log(`Error in queue ${queue} job ${job.id}`); - log(error); - throw error; - } + async (job: any) => { + try { + log(`Processing queue ${queue} job ${job.id}...`); + await callback(job); + } catch (error) { + log(`Error in queue ${queue} job ${job.id}`); + log(error); + throw error; } } ); @@ -97,7 +92,7 @@ export class JobQueue { assert(this._boss); const jobId = await this._boss.publish(queue, job, options); - log(`Created job in queue ${queue}: ${jobId} data: ${job.id}`); + log(`Created job in queue ${queue}: ${jobId}`); } async deleteAllJobs (): Promise { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 3bb97efb..03708728 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,13 +4,16 @@ import assert from 'assert'; import debug from 'debug'; +import { MoreThanOrEqual } from 'typeorm'; + +import { JobQueueConfig } from './config'; +import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants'; +import { JobQueue } from './job-queue'; +import { EventInterface, IndexerInterface, SyncStatusInterface, BlockProgressInterface } from './types'; import { wait } from './misc'; import { createPruningJob } from './common'; -import { JobQueueConfig } from './config'; -import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENT, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; -import { JobQueue } from './job-queue'; -import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; +const DEFAULT_EVENTS_IN_BATCH = 50; const log = debug('vulcanize:job-runner'); @@ -18,6 +21,7 @@ export class JobRunner { _indexer: IndexerInterface _jobQueue: JobQueue _jobQueueConfig: JobQueueConfig + _blockInProcess?: BlockProgressInterface constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { this._jobQueueConfig = jobQueueConfig; @@ -44,22 +48,28 @@ export class JobRunner { log(`Invalid Job kind ${kind} in QUEUE_BLOCK_PROCESSING.`); break; } + + await this._jobQueue.markComplete(job); } async processEvent (job: any): Promise { const { data: { kind } } = job; switch (kind) { - case JOB_KIND_EVENT: - return this._processEvent(job); + case JOB_KIND_EVENTS: + await this._processEvents(job); + break; case JOB_KIND_CONTRACT: - return this._updateWatchedContracts(job); + await this._updateWatchedContracts(job); + break; default: log(`Invalid Job kind ${kind} in QUEUE_EVENT_PROCESSING.`); break; } + + await this._jobQueue.markComplete(job); } async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise { @@ -165,32 +175,81 @@ export class JobRunner { await wait(jobDelayInMilliSecs); const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); - for (let ei = 0; ei < events.length; ei++) { - await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENT, id: events[ei].id, publish: true }); + if (events.length) { + const block = events[0].block; + + await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: block.blockHash, publish: true }); } } } - async _processEvent (job: any): Promise { - const { data: { id } } = job; + async _processEvents (job: any): Promise { + const { blockHash } = job.data; - log(`Processing event ${id}`); + let block = await this._indexer.getBlockProgress(blockHash); + assert(block); - const event = await this._indexer.getEvent(id); - assert(event); - const eventIndex = event.index; + while (!block.isComplete) { + // Fetch events in batches + const events: EventInterface[] = await this._indexer.getBlockEvents( + blockHash, + { + take: this._jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH, + where: { + index: MoreThanOrEqual(block.lastProcessedEventIndex + 1) + }, + order: { + index: 'ASC' + } + } + ); - // Check if previous event in block has been processed exactly before this and abort if not. - if (eventIndex > 0) { // Skip the first event in the block. - const prevIndex = eventIndex - 1; + for (let event of events) { + // Process events in loop - if (prevIndex !== event.block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` + - ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${event.block.lastProcessedEventIndex}, aborting`); + const eventIndex = event.index; + log(`Processing event ${event.id} index ${eventIndex}`); + + // Check if previous event in block has been processed exactly before this and abort if not. + if (eventIndex > 0) { // Skip the first event in the block. + const prevIndex = eventIndex - 1; + + if (prevIndex !== block.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` + + ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + } + } + + let watchedContract; + + if (!this._indexer.isWatchedContract) { + // uni-info-watcher indexer doesn't have watched contracts implementation. + watchedContract = true; + } else { + watchedContract = await this._indexer.isWatchedContract(event.contract); + } + + if (watchedContract) { + // We might not have parsed this event yet. This can happen if the contract was added + // as a result of a previous event in the same block. + if (event.eventName === UNKNOWN_EVENT_NAME) { + const logObj = JSON.parse(event.extraInfo); + + assert(this._indexer.parseEventNameAndArgs); + assert(typeof watchedContract !== 'boolean'); + const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj); + + event.eventName = eventName; + event.eventInfo = JSON.stringify(eventInfo); + event = await this._indexer.saveEventEntity(event); + } + + await this._indexer.processEvent(event); + } + + block = await this._indexer.updateBlockProgress(block, event.index); } } - - return event; } async _updateWatchedContracts (job: any): Promise { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 285489f3..51d22ec7 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -2,7 +2,7 @@ // Copyright 2021 Vulcanize, Inc. // -import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; +import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; export interface BlockProgressInterface { id: number; @@ -52,15 +52,19 @@ export interface IndexerInterface { getSyncStatus (): Promise; getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise getBlocksAtHeight (height: number, isPruned: boolean): Promise; - getBlockEvents (blockHash: string): Promise> + getBlockEvents (blockHash: string, options: FindManyOptions): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise getOrFetchBlockEvents (block: DeepPartial): Promise> removeUnknownEvents (block: BlockProgressInterface): Promise - updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise + updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise; + saveEventEntity (dbEvent: EventInterface): Promise; + processEvent (event: EventInterface): Promise; + parseEventNameAndArgs?: (kind: string, logObj: any) => any; + isWatchedContract?: (address: string) => Promise; cacheContract?: (contract: ContractInterface) => void; } @@ -71,17 +75,18 @@ export interface EventWatcherInterface { } export interface DatabaseInterface { + _conn: Connection; createTransactionRunner(): Promise; getBlocksAtHeight (height: number, isPruned: boolean): Promise; getBlockProgress (blockHash: string): Promise; - getBlockEvents (blockHash: string, where?: FindConditions): Promise; + getBlockEvents (blockHash: string, where?: FindManyOptions): Promise; getEvent (id: string): Promise getSyncStatus (queryRunner: QueryRunner): Promise getAncestorAtDepth (blockHash: string, depth: number): Promise getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>; getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise>; markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise; - updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise + updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise;