mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-31 04:02:06 +00:00
Refactor event-watcher and move code to util (#242)
This commit is contained in:
parent
f3c65cbd64
commit
662c79a5e7
@ -3,7 +3,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
|
||||||
import { PubSub } from 'graphql-subscriptions';
|
import { PubSub } from 'graphql-subscriptions';
|
||||||
|
|
||||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||||
@ -13,16 +12,10 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
UNKNOWN_EVENT_NAME,
|
|
||||||
UpstreamConfig
|
UpstreamConfig
|
||||||
} from '@cerc-io/util';
|
} from '@cerc-io/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Event } from './entity/Event';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
|
||||||
|
|
||||||
export class EventWatcher implements EventWatcherInterface {
|
export class EventWatcher implements EventWatcherInterface {
|
||||||
_ethClient: EthClient
|
_ethClient: EthClient
|
||||||
@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getEventIterator (): AsyncIterator<any> {
|
getEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([EVENT]);
|
return this._baseEventWatcher.getEventIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed } } = job;
|
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { id, data: { request, failed, state, createdOn } } = job;
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
|
||||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
|
||||||
|
|
||||||
// Cannot publish individual event as they are processed together in a single job.
|
|
||||||
// TODO: Use a different pubsub to publish event from job-runner.
|
|
||||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
for (const dbEvent of dbEvents) {
|
|
||||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
|
||||||
|
|
||||||
if (!failed && state === 'completed' && request.data.publish) {
|
|
||||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
|
||||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
|
||||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
|
||||||
} else {
|
|
||||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
|
||||||
|
|
||||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
|
||||||
|
|
||||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
|
||||||
await this._pubsub.publish(EVENT, {
|
|
||||||
onEvent: resultEvent
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
|
||||||
import { PubSub } from 'graphql-subscriptions';
|
import { PubSub } from 'graphql-subscriptions';
|
||||||
|
|
||||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||||
@ -13,16 +12,10 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
UNKNOWN_EVENT_NAME,
|
|
||||||
UpstreamConfig
|
UpstreamConfig
|
||||||
} from '@cerc-io/util';
|
} from '@cerc-io/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Event } from './entity/Event';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
|
||||||
|
|
||||||
export class EventWatcher implements EventWatcherInterface {
|
export class EventWatcher implements EventWatcherInterface {
|
||||||
_ethClient: EthClient
|
_ethClient: EthClient
|
||||||
@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getEventIterator (): AsyncIterator<any> {
|
getEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([EVENT]);
|
return this._baseEventWatcher.getEventIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed } } = job;
|
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { id, data: { request, failed, state, createdOn } } = job;
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
|
||||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
|
||||||
|
|
||||||
// Cannot publish individual event as they are processed together in a single job.
|
|
||||||
// TODO: Use a different pubsub to publish event from job-runner.
|
|
||||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
for (const dbEvent of dbEvents) {
|
|
||||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
|
||||||
|
|
||||||
if (!failed && state === 'completed' && request.data.publish) {
|
|
||||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
|
||||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
|
||||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
|
||||||
} else {
|
|
||||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
|
||||||
|
|
||||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
|
||||||
|
|
||||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
|
||||||
await this._pubsub.publish(EVENT, {
|
|
||||||
onEvent: resultEvent
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,27 +3,21 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
|
||||||
import { PubSub } from 'graphql-subscriptions';
|
import { PubSub } from 'graphql-subscriptions';
|
||||||
|
|
||||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||||
import {
|
import {
|
||||||
JobQueue,
|
JobQueue,
|
||||||
EventWatcher as BaseEventWatcher,
|
EventWatcher as BaseEventWatcher,
|
||||||
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
UNKNOWN_EVENT_NAME,
|
|
||||||
UpstreamConfig
|
UpstreamConfig
|
||||||
} from '@cerc-io/util';
|
} from '@cerc-io/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Event } from './entity/Event';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
export class EventWatcher implements EventWatcherInterface {
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
|
||||||
|
|
||||||
export class EventWatcher {
|
|
||||||
_ethClient: EthClient
|
_ethClient: EthClient
|
||||||
_indexer: Indexer
|
_indexer: Indexer
|
||||||
_subscription: ZenObservable.Subscription | undefined
|
_subscription: ZenObservable.Subscription | undefined
|
||||||
@ -43,7 +37,7 @@ export class EventWatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getEventIterator (): AsyncIterator<any> {
|
getEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([EVENT]);
|
return this._baseEventWatcher.getEventIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
@ -64,62 +58,13 @@ export class EventWatcher {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed } } = job;
|
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { id, data: { request, failed, state, createdOn } } = job;
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
|
||||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
|
||||||
|
|
||||||
// Cannot publish individual event as they are processed together in a single job.
|
|
||||||
// TODO: Use a different pubsub to publish event from job-runner.
|
|
||||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
for (const dbEvent of dbEvents) {
|
|
||||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
|
||||||
|
|
||||||
if (!failed && state === 'completed' && request.data.publish) {
|
|
||||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
|
||||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
|
||||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
|
||||||
} else {
|
|
||||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
|
||||||
const { block: { blockHash }, contract: token } = dbEvent;
|
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
|
||||||
|
|
||||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
|
||||||
|
|
||||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
|
||||||
await this._pubsub.publish(EVENT, {
|
|
||||||
onTokenEvent: {
|
|
||||||
blockHash,
|
|
||||||
token,
|
|
||||||
event: resultEvent
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
|
||||||
import { PubSub } from 'graphql-subscriptions';
|
import { PubSub } from 'graphql-subscriptions';
|
||||||
|
|
||||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||||
@ -13,16 +12,10 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
UNKNOWN_EVENT_NAME,
|
|
||||||
UpstreamConfig
|
UpstreamConfig
|
||||||
} from '@cerc-io/util';
|
} from '@cerc-io/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Event } from './entity/Event';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
|
||||||
|
|
||||||
export class EventWatcher implements EventWatcherInterface {
|
export class EventWatcher implements EventWatcherInterface {
|
||||||
_ethClient: EthClient
|
_ethClient: EthClient
|
||||||
@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getEventIterator (): AsyncIterator<any> {
|
getEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([EVENT]);
|
return this._baseEventWatcher.getEventIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed } } = job;
|
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { id, data: { request, failed, state, createdOn } } = job;
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
|
||||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
|
||||||
|
|
||||||
// Cannot publish individual event as they are processed together in a single job.
|
|
||||||
// TODO: Use a different pubsub to publish event from job-runner.
|
|
||||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
for (const dbEvent of dbEvents) {
|
|
||||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
|
||||||
|
|
||||||
if (!failed && state === 'completed' && request.data.publish) {
|
|
||||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
|
||||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
|
||||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
|
||||||
} else {
|
|
||||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
|
||||||
|
|
||||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
|
||||||
|
|
||||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
|
||||||
await this._pubsub.publish(EVENT, {
|
|
||||||
onEvent: resultEvent
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
|
||||||
import { PubSub } from 'graphql-subscriptions';
|
import { PubSub } from 'graphql-subscriptions';
|
||||||
|
|
||||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||||
@ -13,16 +12,10 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
UNKNOWN_EVENT_NAME,
|
|
||||||
UpstreamConfig
|
UpstreamConfig
|
||||||
} from '@cerc-io/util';
|
} from '@cerc-io/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Event } from './entity/Event';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
|
||||||
|
|
||||||
export class EventWatcher implements EventWatcherInterface {
|
export class EventWatcher implements EventWatcherInterface {
|
||||||
_ethClient: EthClient
|
_ethClient: EthClient
|
||||||
@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getEventIterator (): AsyncIterator<any> {
|
getEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([EVENT]);
|
return this._baseEventWatcher.getEventIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed } } = job;
|
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { id, data: { request, failed, state, createdOn } } = job;
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
|
||||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
|
||||||
|
|
||||||
// Cannot publish individual event as they are processed together in a single job.
|
|
||||||
// TODO: Use a different pubsub to publish event from job-runner.
|
|
||||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
for (const dbEvent of dbEvents) {
|
|
||||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
|
||||||
|
|
||||||
if (!failed && state === 'completed' && request.data.publish) {
|
|
||||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
|
||||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
|
||||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
|
||||||
} else {
|
|
||||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
|
||||||
|
|
||||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
|
||||||
|
|
||||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
|
||||||
await this._pubsub.publish(EVENT, {
|
|
||||||
onEvent: resultEvent
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
|
||||||
import { PubSub } from 'graphql-subscriptions';
|
import { PubSub } from 'graphql-subscriptions';
|
||||||
|
|
||||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||||
@ -13,16 +12,10 @@ import {
|
|||||||
EventWatcherInterface,
|
EventWatcherInterface,
|
||||||
QUEUE_BLOCK_PROCESSING,
|
QUEUE_BLOCK_PROCESSING,
|
||||||
QUEUE_EVENT_PROCESSING,
|
QUEUE_EVENT_PROCESSING,
|
||||||
UNKNOWN_EVENT_NAME,
|
|
||||||
UpstreamConfig
|
UpstreamConfig
|
||||||
} from '@cerc-io/util';
|
} from '@cerc-io/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { Event } from './entity/Event';
|
|
||||||
|
|
||||||
const EVENT = 'event';
|
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
|
||||||
|
|
||||||
export class EventWatcher implements EventWatcherInterface {
|
export class EventWatcher implements EventWatcherInterface {
|
||||||
_ethClient: EthClient
|
_ethClient: EthClient
|
||||||
@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getEventIterator (): AsyncIterator<any> {
|
getEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([EVENT]);
|
return this._baseEventWatcher.getEventIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {
|
|||||||
|
|
||||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
const { id, data: { failed } } = job;
|
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { id, data: { request, failed, state, createdOn } } = job;
|
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||||
|
|
||||||
if (failed) {
|
|
||||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
|
||||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
|
||||||
|
|
||||||
// Cannot publish individual event as they are processed together in a single job.
|
|
||||||
// TODO: Use a different pubsub to publish event from job-runner.
|
|
||||||
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
for (const dbEvent of dbEvents) {
|
|
||||||
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
|
||||||
|
|
||||||
if (!failed && state === 'completed' && request.data.publish) {
|
|
||||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
|
||||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
|
||||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
|
||||||
} else {
|
|
||||||
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
|
||||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
|
||||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
|
||||||
|
|
||||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
|
||||||
|
|
||||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
|
||||||
await this._pubsub.publish(EVENT, {
|
|
||||||
onEvent: resultEvent
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -10,10 +10,13 @@ import { EthClient } from '@cerc-io/ipld-eth-client';
|
|||||||
|
|
||||||
import { JobQueue } from './job-queue';
|
import { JobQueue } from './job-queue';
|
||||||
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
|
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
|
||||||
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS } from './constants';
|
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
|
||||||
import { createPruningJob, processBlockByNumberWithCache } from './common';
|
import { createPruningJob, processBlockByNumberWithCache } from './common';
|
||||||
import { UpstreamConfig } from './config';
|
import { UpstreamConfig } from './config';
|
||||||
import { OrderDirection } from './database';
|
import { OrderDirection } from './database';
|
||||||
|
import { getResultEvent } from './misc';
|
||||||
|
|
||||||
|
const EVENT = 'event';
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
const log = debug('vulcanize:events');
|
||||||
|
|
||||||
@ -35,6 +38,10 @@ export class EventWatcher {
|
|||||||
this._jobQueue = jobQueue;
|
this._jobQueue = jobQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getEventIterator (): AsyncIterator<any> {
|
||||||
|
return this._pubsub.asyncIterator([EVENT]);
|
||||||
|
}
|
||||||
|
|
||||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||||
return this._pubsub.asyncIterator([BlockProgressEvent]);
|
return this._pubsub.asyncIterator([BlockProgressEvent]);
|
||||||
}
|
}
|
||||||
@ -80,9 +87,14 @@ export class EventWatcher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async blockProcessingCompleteHandler (job: any): Promise<void> {
|
async blockProcessingCompleteHandler (job: any): Promise<void> {
|
||||||
const { data: { request: { data } } } = job;
|
const { id, data: { failed, request: { data } } } = job;
|
||||||
const { kind } = data;
|
const { kind } = data;
|
||||||
|
|
||||||
|
if (failed) {
|
||||||
|
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
switch (kind) {
|
switch (kind) {
|
||||||
case JOB_KIND_INDEX:
|
case JOB_KIND_INDEX:
|
||||||
await this._handleIndexingComplete(data);
|
await this._handleIndexingComplete(data);
|
||||||
@ -97,13 +109,21 @@ export class EventWatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async eventProcessingCompleteHandler (job: any): Promise<EventInterface[]> {
|
async eventProcessingCompleteHandler (job: any): Promise<void> {
|
||||||
const { data: { request: { data: { kind, blockHash } } } } = job;
|
const { id, data: { request, failed, state, createdOn } } = job;
|
||||||
|
|
||||||
|
if (failed) {
|
||||||
|
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { data: { kind, blockHash } } = request;
|
||||||
|
|
||||||
// Ignore jobs other than JOB_KIND_EVENTS
|
// Ignore jobs other than JOB_KIND_EVENTS
|
||||||
if (kind !== JOB_KIND_EVENTS) {
|
if (kind !== JOB_KIND_EVENTS) {
|
||||||
return [];
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(blockHash);
|
assert(blockHash);
|
||||||
|
|
||||||
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
||||||
@ -111,7 +131,7 @@ export class EventWatcher {
|
|||||||
|
|
||||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||||
|
|
||||||
return this._indexer.getBlockEvents(
|
const dbEvents = await this._indexer.getBlockEvents(
|
||||||
blockProgress.blockHash,
|
blockProgress.blockHash,
|
||||||
{
|
{
|
||||||
eventName: [
|
eventName: [
|
||||||
@ -123,6 +143,24 @@ export class EventWatcher {
|
|||||||
orderDirection: OrderDirection.asc
|
orderDirection: OrderDirection.asc
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||||
|
|
||||||
|
// Cannot publish individual event as they are processed together in a single job.
|
||||||
|
// TODO: Use a different pubsub to publish event from job-runner.
|
||||||
|
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
||||||
|
for (const dbEvent of dbEvents) {
|
||||||
|
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
|
||||||
|
|
||||||
|
if (!failed && state === 'completed' && request.data.publish) {
|
||||||
|
// Check for max acceptable lag time between request and sending results to live subscribers.
|
||||||
|
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
||||||
|
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
||||||
|
} else {
|
||||||
|
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
|
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
|
||||||
@ -141,6 +179,19 @@ export class EventWatcher {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async publishEventToSubscribers (dbEvent: EventInterface, timeElapsedInSeconds: number): Promise<void> {
|
||||||
|
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||||
|
const resultEvent = getResultEvent(dbEvent);
|
||||||
|
|
||||||
|
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
||||||
|
|
||||||
|
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
||||||
|
await this._pubsub.publish(EVENT, {
|
||||||
|
onEvent: resultEvent
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async _handleIndexingComplete (jobData: any): Promise<void> {
|
async _handleIndexingComplete (jobData: any): Promise<void> {
|
||||||
const { blockNumber, priority } = jobData;
|
const { blockNumber, priority } = jobData;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user