From dab2d6d3e75f6fa6db47ddbd1c452f6e9ba8af14 Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Mon, 27 Nov 2023 14:13:02 +0530 Subject: [PATCH] Publish events for GQL subscription in historical processing (#492) * Add realtime block complete event * Use realtime block complete event for realtime processing * Refactor realtimeBlockComplete event interface --- packages/util/src/events.ts | 104 ++++++++++++++++++++------------ packages/util/src/fill.ts | 10 +-- packages/util/src/job-runner.ts | 16 ++--- packages/util/src/types.ts | 1 + 4 files changed, 75 insertions(+), 56 deletions(-) diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 5916d7f0..8e445593 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -16,17 +16,23 @@ import { HistoricalJobData, HistoricalJobResponseData } from './job-runner'; import { JobQueueConfig, ServerConfig } from './config'; const EVENT = 'event'; +const BLOCK_PROGRESS_EVENT = 'block-progress-event'; +const REALTIME_BLOCK_COMPLETE_EVENT = 'realtime-block-complete-event'; const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000; const log = debug('vulcanize:events'); -export const BlockProgressEvent = 'block-progress-event'; - interface Config { server: ServerConfig; jobQueue: JobQueueConfig; } + +interface RealtimeBlockCompleteEvent { + blockNumber: number; + isComplete: boolean; +} + export class EventWatcher { _config: Config; _ethClient: EthClient; @@ -52,7 +58,11 @@ export class EventWatcher { } getBlockProgressEventIterator (): AsyncIterator { - return this._pubsub.asyncIterator([BlockProgressEvent]); + return this._pubsub.asyncIterator([BLOCK_PROGRESS_EVENT]); + } + + getRealtimeBlockCompleteEvent (): AsyncIterator<{ onRealtimeBlockCompleteEvent: RealtimeBlockCompleteEvent }> { + return this._pubsub.asyncIterator(REALTIME_BLOCK_COMPLETE_EVENT); } async start (): Promise { @@ -163,15 +173,15 @@ export class EventWatcher { // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of - const blockProgressEventIterable = { - // getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events. - [Symbol.asyncIterator]: this.getBlockProgressEventIterator.bind(this) + const realtimeBlockCompleteEventIterable = { + // getRealtimeBlockCompleteEvent returns an AsyncIterator which can be used to listen to realtime processing block complete events. + [Symbol.asyncIterator]: this.getRealtimeBlockCompleteEvent.bind(this) }; // Iterate over async iterable. // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of - for await (const data of blockProgressEventIterable) { - const { onBlockProgressEvent: { blockNumber, isComplete } } = data; + for await (const data of realtimeBlockCompleteEventIterable) { + const { onRealtimeBlockCompleteEvent: { blockNumber, isComplete } } = data; if (this._shutDown) { log(`Graceful shutdown after processing block ${blockNumber}`); @@ -260,7 +270,7 @@ export class EventWatcher { return; } - const { blockHash, publish }: EventsJobData = data; + const { blockHash, publish, isRealtimeProcessing }: EventsJobData = data; const blockProgress = await this._indexer.getBlockProgress(blockHash); assert(blockProgress); @@ -270,50 +280,64 @@ export class EventWatcher { this.startBlockProcessing(); } - // Check if publish is set to true - // Events and blocks are not published in historical processing - // GQL subscription events will not be triggered if publish is set to false - if (publish) { - await this.publishBlockProgressToSubscribers(blockProgress); + if (isRealtimeProcessing) { + await this.publishRealtimeBlockCompleteToSubscribers(blockProgress); + } - const dbEvents = await this._indexer.getBlockEvents( - blockProgress.blockHash, - { - eventName: [ - { value: UNKNOWN_EVENT_NAME, not: true, operator: 'equals' } - ] - }, - { - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); + const dbEventsPromise = this._indexer.getBlockEvents( + blockProgress.blockHash, + { + eventName: [ + { value: UNKNOWN_EVENT_NAME, not: true, operator: 'equals' } + ] + }, + { + orderBy: 'index', + orderDirection: OrderDirection.asc + } + ); - const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; + const [dbEvents] = await Promise.all([ + dbEventsPromise, + this.publishBlockProgressToSubscribers(blockProgress) + ]); - // Cannot publish individual event as they are processed together in a single job. - // TODO: Use a different pubsub to publish event from job-runner. - // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - for (const dbEvent of dbEvents) { - log(`Job onComplete event ${dbEvent.id} publish ${publish}`); + const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - if (!failed && state === 'completed') { - // Check for max acceptable lag time between request and sending results to live subscribers. - if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); - } else { - log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); - } + // Cannot publish individual event as they are processed together in a single job. + // TODO: Use a different pubsub to publish event from job-runner. + // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + for (const dbEvent of dbEvents) { + log(`Job onComplete event ${dbEvent.id} publish ${publish}`); + + if (!failed && state === 'completed' && publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); } } } } + async publishRealtimeBlockCompleteToSubscribers (blockProgress: BlockProgressInterface): Promise { + const { blockNumber, isComplete } = blockProgress; + + // Publishing the event here will result in pushing the payload to realtime processing subscriber + await this._pubsub.publish(REALTIME_BLOCK_COMPLETE_EVENT, { + onRealtimeBlockCompleteEvent: { + blockNumber, + isComplete + } + }); + } + async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise { const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. - await this._pubsub.publish(BlockProgressEvent, { + await this._pubsub.publish(BLOCK_PROGRESS_EVENT, { onBlockProgressEvent: { cid, blockHash, diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index 11a1c7d9..197db25b 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -63,17 +63,17 @@ export const fillBlocks = async ( // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of - const blockProgressEventIterable = { - // getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events. - [Symbol.asyncIterator]: eventWatcher.getBlockProgressEventIterator.bind(eventWatcher) + const realtimeBlockCompleteEventIterable = { + // getRealtimeBlockCompleteEvent returns an AsyncIterator which can be used to listen to realtime processing block complete events. + [Symbol.asyncIterator]: eventWatcher.getRealtimeBlockCompleteEvent.bind(eventWatcher) }; console.time('time:fill#fillBlocks-process_blocks'); // Iterate over async iterable. // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of - for await (const data of blockProgressEventIterable) { - const { onBlockProgressEvent: { blockNumber, isComplete } } = data; + for await (const data of realtimeBlockCompleteEventIterable) { + const { onRealtimeBlockCompleteEvent: { blockNumber, isComplete } } = data; if (isComplete) { const blocksProcessed = blockNumber - startBlock + 1; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 93c2fc09..607a6d77 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -225,7 +225,7 @@ export class JobRunner { } // Push event processing job for each block - await this._pushEventProcessingJobsForBlocks(blocks); + await this._pushEventProcessingJobsForBlocks(blocks, false); } // Update sync status canonical, indexed and chain head block to end block @@ -259,16 +259,15 @@ export class JobRunner { ); } - async _pushEventProcessingJobsForBlocks (blocks: BlockProgressInterface[]): Promise { + async _pushEventProcessingJobsForBlocks (blocks: BlockProgressInterface[], isRealtimeProcessing: boolean): Promise { // Push event processing job for each block // const pushJobForBlockPromises = blocks.map(async block => { for (const block of blocks) { const eventsProcessingJob: EventsJobData = { kind: EventsQueueJobKind.EVENTS, blockHash: block.blockHash, - // Avoid publishing GQL subscription event in historical processing - // Publishing when realtime processing is listening to events will cause problems - publish: false + publish: true, + isRealtimeProcessing }; await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); @@ -611,12 +610,7 @@ export class JobRunner { // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. - const eventsProcessingJob: EventsJobData = { - kind: EventsQueueJobKind.EVENTS, - blockHash: blockProgress.blockHash, - publish: true - }; - await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); + await this._pushEventProcessingJobsForBlocks([blockProgress], true); const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index f1ed7d7a..277406b8 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -302,6 +302,7 @@ export interface EventsJobData { kind: EventsQueueJobKind.EVENTS; blockHash: string; publish: boolean; + isRealtimeProcessing: boolean; } export interface ContractJobData {