Publish events for GQL subscription in historical processing (#492)

* Add realtime block complete event

* Use realtime block complete event for realtime processing

* Refactor realtimeBlockComplete event interface
This commit is contained in:
Nabarun Gogoi 2023-11-27 14:13:02 +05:30 committed by GitHub
parent 220f3ddf24
commit dab2d6d3e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 56 deletions

View File

@ -16,17 +16,23 @@ import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
import { JobQueueConfig, ServerConfig } from './config'; import { JobQueueConfig, ServerConfig } from './config';
const EVENT = 'event'; const EVENT = 'event';
const BLOCK_PROGRESS_EVENT = 'block-progress-event';
const REALTIME_BLOCK_COMPLETE_EVENT = 'realtime-block-complete-event';
const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000; const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000;
const log = debug('vulcanize:events'); const log = debug('vulcanize:events');
export const BlockProgressEvent = 'block-progress-event';
interface Config { interface Config {
server: ServerConfig; server: ServerConfig;
jobQueue: JobQueueConfig; jobQueue: JobQueueConfig;
} }
interface RealtimeBlockCompleteEvent {
blockNumber: number;
isComplete: boolean;
}
export class EventWatcher { export class EventWatcher {
_config: Config; _config: Config;
_ethClient: EthClient; _ethClient: EthClient;
@ -52,7 +58,11 @@ export class EventWatcher {
} }
getBlockProgressEventIterator (): AsyncIterator<any> { getBlockProgressEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([BlockProgressEvent]); return this._pubsub.asyncIterator([BLOCK_PROGRESS_EVENT]);
}
getRealtimeBlockCompleteEvent (): AsyncIterator<{ onRealtimeBlockCompleteEvent: RealtimeBlockCompleteEvent }> {
return this._pubsub.asyncIterator(REALTIME_BLOCK_COMPLETE_EVENT);
} }
async start (): Promise<void> { async start (): Promise<void> {
@ -163,15 +173,15 @@ export class EventWatcher {
// Creating an AsyncIterable from AsyncIterator to iterate over the values. // Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
const blockProgressEventIterable = { const realtimeBlockCompleteEventIterable = {
// getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events. // getRealtimeBlockCompleteEvent returns an AsyncIterator which can be used to listen to realtime processing block complete events.
[Symbol.asyncIterator]: this.getBlockProgressEventIterator.bind(this) [Symbol.asyncIterator]: this.getRealtimeBlockCompleteEvent.bind(this)
}; };
// Iterate over async iterable. // Iterate over async iterable.
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
for await (const data of blockProgressEventIterable) { for await (const data of realtimeBlockCompleteEventIterable) {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data; const { onRealtimeBlockCompleteEvent: { blockNumber, isComplete } } = data;
if (this._shutDown) { if (this._shutDown) {
log(`Graceful shutdown after processing block ${blockNumber}`); log(`Graceful shutdown after processing block ${blockNumber}`);
@ -260,7 +270,7 @@ export class EventWatcher {
return; return;
} }
const { blockHash, publish }: EventsJobData = data; const { blockHash, publish, isRealtimeProcessing }: EventsJobData = data;
const blockProgress = await this._indexer.getBlockProgress(blockHash); const blockProgress = await this._indexer.getBlockProgress(blockHash);
assert(blockProgress); assert(blockProgress);
@ -270,13 +280,11 @@ export class EventWatcher {
this.startBlockProcessing(); this.startBlockProcessing();
} }
// Check if publish is set to true if (isRealtimeProcessing) {
// Events and blocks are not published in historical processing await this.publishRealtimeBlockCompleteToSubscribers(blockProgress);
// GQL subscription events will not be triggered if publish is set to false }
if (publish) {
await this.publishBlockProgressToSubscribers(blockProgress);
const dbEvents = await this._indexer.getBlockEvents( const dbEventsPromise = this._indexer.getBlockEvents(
blockProgress.blockHash, blockProgress.blockHash,
{ {
eventName: [ eventName: [
@ -289,6 +297,11 @@ export class EventWatcher {
} }
); );
const [dbEvents] = await Promise.all([
dbEventsPromise,
this.publishBlockProgressToSubscribers(blockProgress)
]);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
// Cannot publish individual event as they are processed together in a single job. // Cannot publish individual event as they are processed together in a single job.
@ -297,7 +310,7 @@ export class EventWatcher {
for (const dbEvent of dbEvents) { for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${publish}`); log(`Job onComplete event ${dbEvent.id} publish ${publish}`);
if (!failed && state === 'completed') { if (!failed && state === 'completed' && publish) {
// Check for max acceptable lag time between request and sending results to live subscribers. // Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
@ -307,13 +320,24 @@ export class EventWatcher {
} }
} }
} }
async publishRealtimeBlockCompleteToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
const { blockNumber, isComplete } = blockProgress;
// Publishing the event here will result in pushing the payload to realtime processing subscriber
await this._pubsub.publish(REALTIME_BLOCK_COMPLETE_EVENT, {
onRealtimeBlockCompleteEvent: {
blockNumber,
isComplete
}
});
} }
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> { async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(BlockProgressEvent, { await this._pubsub.publish(BLOCK_PROGRESS_EVENT, {
onBlockProgressEvent: { onBlockProgressEvent: {
cid, cid,
blockHash, blockHash,

View File

@ -63,17 +63,17 @@ export const fillBlocks = async (
// Creating an AsyncIterable from AsyncIterator to iterate over the values. // Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
const blockProgressEventIterable = { const realtimeBlockCompleteEventIterable = {
// getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events. // getRealtimeBlockCompleteEvent returns an AsyncIterator which can be used to listen to realtime processing block complete events.
[Symbol.asyncIterator]: eventWatcher.getBlockProgressEventIterator.bind(eventWatcher) [Symbol.asyncIterator]: eventWatcher.getRealtimeBlockCompleteEvent.bind(eventWatcher)
}; };
console.time('time:fill#fillBlocks-process_blocks'); console.time('time:fill#fillBlocks-process_blocks');
// Iterate over async iterable. // Iterate over async iterable.
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
for await (const data of blockProgressEventIterable) { for await (const data of realtimeBlockCompleteEventIterable) {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data; const { onRealtimeBlockCompleteEvent: { blockNumber, isComplete } } = data;
if (isComplete) { if (isComplete) {
const blocksProcessed = blockNumber - startBlock + 1; const blocksProcessed = blockNumber - startBlock + 1;

View File

@ -225,7 +225,7 @@ export class JobRunner {
} }
// Push event processing job for each block // Push event processing job for each block
await this._pushEventProcessingJobsForBlocks(blocks); await this._pushEventProcessingJobsForBlocks(blocks, false);
} }
// Update sync status canonical, indexed and chain head block to end block // Update sync status canonical, indexed and chain head block to end block
@ -259,16 +259,15 @@ export class JobRunner {
); );
} }
async _pushEventProcessingJobsForBlocks (blocks: BlockProgressInterface[]): Promise<void> { async _pushEventProcessingJobsForBlocks (blocks: BlockProgressInterface[], isRealtimeProcessing: boolean): Promise<void> {
// Push event processing job for each block // Push event processing job for each block
// const pushJobForBlockPromises = blocks.map(async block => { // const pushJobForBlockPromises = blocks.map(async block => {
for (const block of blocks) { for (const block of blocks) {
const eventsProcessingJob: EventsJobData = { const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS, kind: EventsQueueJobKind.EVENTS,
blockHash: block.blockHash, blockHash: block.blockHash,
// Avoid publishing GQL subscription event in historical processing publish: true,
// Publishing when realtime processing is listening to events will cause problems isRealtimeProcessing
publish: false
}; };
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
@ -611,12 +610,7 @@ export class JobRunner {
// Push job to event processing queue. // Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents. // Block with all events processed or no events will not be processed again due to check in _processEvents.
const eventsProcessingJob: EventsJobData = { await this._pushEventProcessingJobsForBlocks([blockProgress], true);
kind: EventsQueueJobKind.EVENTS,
blockHash: blockProgress.blockHash,
publish: true
};
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);

View File

@ -302,6 +302,7 @@ export interface EventsJobData {
kind: EventsQueueJobKind.EVENTS; kind: EventsQueueJobKind.EVENTS;
blockHash: string; blockHash: string;
publish: boolean; publish: boolean;
isRealtimeProcessing: boolean;
} }
export interface ContractJobData { export interface ContractJobData {