2021-08-18 10:20:44 +00:00
|
|
|
//
|
|
|
|
// Copyright 2021 Vulcanize, Inc.
|
|
|
|
//
|
|
|
|
|
|
|
|
import assert from 'assert';
|
|
|
|
import debug from 'debug';
|
|
|
|
import { PubSub } from 'apollo-server-express';
|
|
|
|
|
2022-09-09 11:43:01 +00:00
|
|
|
import { EthClient } from '@cerc-io/ipld-eth-client';
|
2021-08-18 10:20:44 +00:00
|
|
|
|
|
|
|
import { JobQueue } from './job-queue';
|
|
|
|
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
|
2021-12-10 05:14:10 +00:00
|
|
|
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
|
2022-10-20 13:16:56 +00:00
|
|
|
import { createPruningJob, processBlockByNumberWithCache } from './common';
|
2021-10-26 12:06:21 +00:00
|
|
|
import { UpstreamConfig } from './config';
|
2021-12-13 10:08:34 +00:00
|
|
|
import { OrderDirection } from './database';
|
2021-08-18 10:20:44 +00:00
|
|
|
|
|
|
|
const log = debug('vulcanize:events');
|
|
|
|
|
|
|
|
export const BlockProgressEvent = 'block-progress-event';
|
|
|
|
|
|
|
|
export class EventWatcher {
|
|
|
|
_ethClient: EthClient
|
|
|
|
_indexer: IndexerInterface
|
|
|
|
_subscription?: ZenObservable.Subscription
|
|
|
|
_pubsub: PubSub
|
|
|
|
_jobQueue: JobQueue
|
2021-10-26 12:06:21 +00:00
|
|
|
_upstreamConfig: UpstreamConfig
|
2021-08-18 10:20:44 +00:00
|
|
|
|
2022-06-08 06:43:52 +00:00
|
|
|
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
|
2021-10-26 12:06:21 +00:00
|
|
|
this._upstreamConfig = upstreamConfig;
|
2021-08-18 10:20:44 +00:00
|
|
|
this._ethClient = ethClient;
|
|
|
|
this._indexer = indexer;
|
|
|
|
this._pubsub = pubsub;
|
|
|
|
this._jobQueue = jobQueue;
|
|
|
|
}
|
|
|
|
|
|
|
|
getBlockProgressEventIterator (): AsyncIterator<any> {
|
|
|
|
return this._pubsub.asyncIterator([BlockProgressEvent]);
|
|
|
|
}
|
|
|
|
|
2021-10-26 12:06:21 +00:00
|
|
|
async stop (): Promise<void> {
|
|
|
|
if (this._subscription) {
|
|
|
|
log('Stopped watching upstream blocks');
|
|
|
|
this._subscription.unsubscribe();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async startBlockProcessing (): Promise<void> {
|
|
|
|
const syncStatus = await this._indexer.getSyncStatus();
|
2021-11-30 10:52:07 +00:00
|
|
|
let startBlockNumber: number;
|
2021-10-26 12:06:21 +00:00
|
|
|
|
|
|
|
if (!syncStatus) {
|
|
|
|
// Get latest block in chain.
|
|
|
|
const { block: currentBlock } = await this._ethClient.getBlockByHash();
|
2021-12-17 06:27:09 +00:00
|
|
|
startBlockNumber = currentBlock.number;
|
2021-10-26 12:06:21 +00:00
|
|
|
} else {
|
2021-12-17 06:27:09 +00:00
|
|
|
startBlockNumber = syncStatus.chainHeadBlockNumber + 1;
|
2021-10-26 12:06:21 +00:00
|
|
|
}
|
|
|
|
|
2021-11-30 10:52:07 +00:00
|
|
|
// Wait for block processing as blockProgress event might process the same block.
|
2022-10-20 13:16:56 +00:00
|
|
|
await processBlockByNumberWithCache(this._jobQueue, startBlockNumber);
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-10-26 12:06:21 +00:00
|
|
|
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
|
|
|
|
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
|
|
|
|
const blockProgressEventIterable = {
|
|
|
|
// getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events.
|
|
|
|
[Symbol.asyncIterator]: this.getBlockProgressEventIterator.bind(this)
|
|
|
|
};
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-10-26 12:06:21 +00:00
|
|
|
// Iterate over async iterable.
|
|
|
|
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of
|
|
|
|
for await (const data of blockProgressEventIterable) {
|
|
|
|
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
|
|
|
|
|
|
|
|
if (isComplete) {
|
2022-10-20 13:16:56 +00:00
|
|
|
await processBlockByNumberWithCache(this._jobQueue, blockNumber + 1);
|
2021-10-26 12:06:21 +00:00
|
|
|
}
|
|
|
|
}
|
2021-08-19 07:57:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async blockProcessingCompleteHandler (job: any): Promise<void> {
|
2021-08-30 05:46:54 +00:00
|
|
|
const { data: { request: { data } } } = job;
|
|
|
|
const { kind } = data;
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-08-30 05:46:54 +00:00
|
|
|
switch (kind) {
|
|
|
|
case JOB_KIND_INDEX:
|
2021-11-29 13:07:11 +00:00
|
|
|
await this._handleIndexingComplete(data);
|
2021-08-30 05:46:54 +00:00
|
|
|
break;
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-08-30 05:46:54 +00:00
|
|
|
case JOB_KIND_PRUNE:
|
2021-11-29 13:07:11 +00:00
|
|
|
await this._handlePruningComplete(data);
|
2021-08-30 05:46:54 +00:00
|
|
|
break;
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-08-30 05:46:54 +00:00
|
|
|
default:
|
|
|
|
throw new Error(`Invalid Job kind ${kind} in complete handler of QUEUE_BLOCK_PROCESSING.`);
|
2021-08-19 07:57:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-10 05:14:10 +00:00
|
|
|
async eventProcessingCompleteHandler (job: any): Promise<EventInterface[]> {
|
|
|
|
const { data: { request: { data: { blockHash } } } } = job;
|
|
|
|
assert(blockHash);
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-12-10 05:14:10 +00:00
|
|
|
const blockProgress = await this._indexer.getBlockProgress(blockHash);
|
|
|
|
assert(blockProgress);
|
2021-08-19 07:57:32 +00:00
|
|
|
|
2021-12-10 05:14:10 +00:00
|
|
|
await this.publishBlockProgressToSubscribers(blockProgress);
|
2021-10-20 12:19:44 +00:00
|
|
|
|
2021-12-10 05:14:10 +00:00
|
|
|
return this._indexer.getBlockEvents(
|
|
|
|
blockProgress.blockHash,
|
|
|
|
{
|
2021-12-13 10:08:34 +00:00
|
|
|
eventName: [
|
|
|
|
{ value: UNKNOWN_EVENT_NAME, not: true, operator: 'equals' }
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
orderBy: 'index',
|
|
|
|
orderDirection: OrderDirection.asc
|
2021-12-10 05:14:10 +00:00
|
|
|
}
|
|
|
|
);
|
2021-08-19 07:57:32 +00:00
|
|
|
}
|
|
|
|
|
2021-08-18 10:20:44 +00:00
|
|
|
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {
|
2021-10-12 10:32:56 +00:00
|
|
|
const { cid, blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
|
2021-08-18 10:20:44 +00:00
|
|
|
|
|
|
|
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
|
|
|
|
await this._pubsub.publish(BlockProgressEvent, {
|
|
|
|
onBlockProgressEvent: {
|
2021-10-12 10:32:56 +00:00
|
|
|
cid,
|
2021-08-18 10:20:44 +00:00
|
|
|
blockHash,
|
|
|
|
blockNumber,
|
|
|
|
numEvents,
|
|
|
|
numProcessedEvents,
|
|
|
|
isComplete
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-08-30 05:46:54 +00:00
|
|
|
async _handleIndexingComplete (jobData: any): Promise<void> {
|
2022-10-20 13:16:56 +00:00
|
|
|
const { blockNumber, priority } = jobData;
|
2021-08-30 05:46:54 +00:00
|
|
|
|
2022-10-20 13:16:56 +00:00
|
|
|
const blockProgressEntities = await this._indexer.getBlocksAtHeight(Number(blockNumber), false);
|
2021-12-16 11:46:48 +00:00
|
|
|
|
2022-10-20 13:16:56 +00:00
|
|
|
// Log a warning and return if block entries not found.
|
|
|
|
if (blockProgressEntities.length === 0) {
|
|
|
|
log(`block not indexed at height ${blockNumber}`);
|
|
|
|
return;
|
|
|
|
}
|
2021-11-30 10:52:07 +00:00
|
|
|
|
2022-10-20 13:16:56 +00:00
|
|
|
const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockProgressEntities[0].blockHash, Number(blockNumber));
|
|
|
|
log(`Job onComplete indexing block ${blockProgressEntities[0].blockHash} ${blockNumber}`);
|
|
|
|
|
|
|
|
// Create pruning job if required.
|
|
|
|
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
|
|
|
|
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
|
2021-12-16 11:46:48 +00:00
|
|
|
}
|
2021-08-30 05:46:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async _handlePruningComplete (jobData: any): Promise<void> {
|
|
|
|
const { pruneBlockHeight } = jobData;
|
|
|
|
log(`Job onComplete pruning at height ${pruneBlockHeight}`);
|
|
|
|
}
|
2021-08-18 10:20:44 +00:00
|
|
|
}
|