Process events in singe job and avoid block progress query to improve performance (#306)

* Avoid database query by passing event directly to job-queue

* Avoid block progress query by returning from update query

* Process batch of events for a block in a single job

* Fix smoke test for subscribed events and use teamSize for job queue
This commit is contained in:
nikugogoi 2021-12-10 10:44:10 +05:30 committed by GitHub
parent a2ad139769
commit f56f7a823f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 281 additions and 237 deletions

View File

@ -84,16 +84,22 @@ export class EventWatcher {
return;
}
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
});

View File

@ -30,3 +30,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/erc20-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50

View File

@ -101,10 +101,10 @@ export class Database {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
@ -167,7 +167,7 @@ export class Database {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);

View File

@ -84,16 +84,22 @@ export class EventWatcher {
return;
}
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
});

View File

@ -5,7 +5,7 @@
import assert from 'assert';
import debug from 'debug';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial } from 'typeorm';
import { DeepPartial, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import { BaseProvider } from '@ethersproject/providers';
@ -352,8 +352,8 @@ export class Indexer {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, options);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -364,7 +364,7 @@ export class Indexer {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}

View File

@ -47,26 +47,12 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
if (!event) {
return;
}
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block, event.index);
await this._jobQueue.markComplete(job);
await this._baseJobRunner.processEvent(job);
});
}
}

View File

@ -39,3 +39,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 1000
eventsInBatch = 50

View File

@ -39,3 +39,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 1000
eventsInBatch = 50

View File

@ -603,10 +603,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, options?: FindManyOptions<Event>): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
@ -663,7 +663,7 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);

View File

@ -4,7 +4,7 @@
import assert from 'assert';
import debug from 'debug';
import { DeepPartial, QueryRunner } from 'typeorm';
import { DeepPartial, FindManyOptions, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { providers, utils, BigNumber } from 'ethers';
@ -197,6 +197,10 @@ export class Indexer implements IndexerInterface {
};
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._baseIndexer.saveEventEntity(dbEvent);
}
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
@ -306,8 +310,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, options);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -346,7 +350,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}

View File

@ -49,26 +49,12 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
if (!event) {
return;
}
// Check if event is processed.
if (!event.block.isComplete && event.index !== event.block.lastProcessedEventIndex) {
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block, event.index);
await this._jobQueue.markComplete(job);
await this._baseJobRunner.processEvent(job);
});
}
}

View File

@ -768,20 +768,16 @@ describe('uni-info-watcher', () => {
fee
});
eventType = 'MintEvent';
await Promise.all([
transaction,
watchEvent(uniClient, eventType)
[eventValue] = await Promise.all([
// Wait for TransferEvent and get eventValue.
watchEvent(uniClient, 'TransferEvent'),
// Wait for MintEvent.
watchEvent(uniClient, 'MintEvent'),
// Wait for IncreaseLiquidityEvent.
watchEvent(uniClient, 'IncreaseLiquidityEvent'),
transaction
]);
// Wait for TransferEvent.
eventType = 'TransferEvent';
eventValue = await watchEvent(uniClient, eventType);
// Wait for IncreaseLiquidityEvent.
eventType = 'IncreaseLiquidityEvent';
await watchEvent(uniClient, eventType);
// Sleeping for 15 sec for the events to be processed.
await wait(15000);
});
@ -831,7 +827,6 @@ describe('uni-info-watcher', () => {
let oldPosition: any;
let eventValue: any;
let eventType: string;
const tokenId = 1;
const amount0Desired = 15;
@ -856,16 +851,14 @@ describe('uni-info-watcher', () => {
deadline
});
eventType = 'MintEvent';
await Promise.all([
transaction,
watchEvent(uniClient, eventType)
[eventValue] = await Promise.all([
// Wait for IncreaseLiquidityEvent and get eventValue.
watchEvent(uniClient, 'IncreaseLiquidityEvent'),
// Wait for MintEvent.
watchEvent(uniClient, 'MintEvent'),
transaction
]);
// Wait for IncreaseLiquidityEvent.
eventType = 'IncreaseLiquidityEvent';
eventValue = await watchEvent(uniClient, eventType);
// Sleeping for 15 sec for the events to be processed.
await wait(15000);
});
@ -906,7 +899,6 @@ describe('uni-info-watcher', () => {
let oldPosition: any;
let eventValue: any;
let eventType: string;
const tokenId = 1;
const liquidity = 5;
@ -929,16 +921,14 @@ describe('uni-info-watcher', () => {
deadline
});
eventType = 'BurnEvent';
await Promise.all([
transaction,
watchEvent(uniClient, eventType)
[eventValue] = await Promise.all([
// Wait for DecreaseLiquidityEvent and get eventValue.
watchEvent(uniClient, 'DecreaseLiquidityEvent'),
// Wait for BurnEvent
watchEvent(uniClient, 'BurnEvent'),
transaction
]);
// Wait for DecreaseLiquidityEvent.
eventType = 'DecreaseLiquidityEvent';
eventValue = await watchEvent(uniClient, eventType);
// Sleeping for 15 sec for the events to be processed.
await wait(15000);
});
@ -978,8 +968,6 @@ describe('uni-info-watcher', () => {
// Checked entities: Transaction.
// Unchecked entities: Position.
let eventType: string;
const tokenId = 1;
const amount0Max = 15;
const amount1Max = 15;
@ -993,16 +981,14 @@ describe('uni-info-watcher', () => {
amount1Max
});
eventType = 'BurnEvent';
await Promise.all([
transaction,
watchEvent(uniClient, eventType)
// Wait for BurnEvent.
watchEvent(uniClient, 'BurnEvent'),
// Wait for CollectEvent.
watchEvent(uniClient, 'CollectEvent')
]);
// Wait for CollectEvent.
eventType = 'CollectEvent';
await watchEvent(uniClient, eventType);
// Sleeping for 10 sec for the events to be processed.
await wait(10000);
});

View File

@ -28,3 +28,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50

View File

@ -28,3 +28,4 @@
dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50

View File

@ -78,10 +78,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
@ -138,7 +138,7 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);

View File

@ -81,16 +81,22 @@ export class EventWatcher implements EventWatcherInterface {
return;
}
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
});

View File

@ -3,7 +3,7 @@
//
import debug from 'debug';
import { DeepPartial, QueryRunner } from 'typeorm';
import { DeepPartial, FindManyOptions, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import assert from 'assert';
@ -372,8 +372,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash, options);
}
async removeUnknownEvents (block: BlockProgress): Promise<void> {
@ -416,7 +416,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
@ -426,19 +426,24 @@ export class Indexer implements IndexerInterface {
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash });
let [
{ block, logs },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
}
]
]
}
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });
] = await Promise.all([logsPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;

View File

@ -23,7 +23,6 @@ import {
import { Indexer } from './indexer';
import { Database } from './database';
import { UNKNOWN_EVENT_NAME } from './entity/Event';
const log = debug('vulcanize:job-runner');
@ -48,40 +47,12 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
// TODO: Support two kind of jobs on the event processing queue.
// 1) processEvent => Current single event
// 2) processEvents => Event range (multiple events)
let event = await this._baseJobRunner.processEvent(job);
if (!event) {
return;
}
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSON.stringify(eventInfo);
event = await this._indexer.saveEventEntity(event);
}
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block, event.index);
await this._jobQueue.markComplete(job);
await this._baseJobRunner.processEvent(job);
});
}
}

View File

@ -21,6 +21,7 @@ export interface JobQueueConfig {
dbConnectionString: string;
maxCompletionLagInSecs: number;
jobDelayInMilliSecs?: number;
eventsInBatch: number;
}
interface ServerConfig {

View File

@ -11,8 +11,8 @@ export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
export const JOB_KIND_INDEX = 'index';
export const JOB_KIND_PRUNE = 'prune';
export const JOB_KIND_EVENTS = 'events';
export const JOB_KIND_CONTRACT = 'contract';
export const JOB_KIND_EVENT = 'event';
export const DEFAULT_CONFIG_PATH = 'environments/local.toml';

View File

@ -153,7 +153,7 @@ export class Database {
.getMany();
}
async updateBlockProgress (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> {
if (!block.isComplete) {
if (lastProcessedEventIndex <= block.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
@ -165,9 +165,18 @@ export class Database {
block.isComplete = true;
}
const { id, ...blockData } = block;
await repo.update(id, blockData);
const { generatedMaps } = await repo.createQueryBuilder()
.update(block)
.set(block)
.where('id = :id', { id: block.id })
.whereEntity(block)
.returning('*')
.execute();
block = generatedMaps[0] as BlockProgressInterface;
}
return block;
}
async markBlocksAsPruned (repo: Repository<BlockProgressInterface>, blocks: BlockProgressInterface[]): Promise<void> {
@ -180,19 +189,26 @@ export class Database {
return repo.findOne(id, { relations: ['block'] });
}
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string, where: FindConditions<EventInterface> = {}): Promise<EventInterface[]> {
where.block = {
...where.block,
blockHash
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string, options: FindManyOptions<EventInterface> = {}): Promise<EventInterface[]> {
if (!Array.isArray(options.where)) {
options.where = [options.where || {}];
}
options.where.forEach((where: FindConditions<EventInterface> = {}) => {
where.block = {
...where.block,
blockHash
};
});
options.relations = ['block'];
options.order = {
...options.order,
id: 'ASC'
};
return repo.find({
where,
relations: ['block'],
order: {
id: 'ASC'
}
});
return repo.find(options);
}
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void> {

View File

@ -5,12 +5,13 @@
import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { Not } from 'typeorm';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX } from './constants';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { createPruningJob, processBlockByNumber } from './common';
import { UpstreamConfig } from './config';
@ -99,23 +100,30 @@ export class EventWatcher {
}
}
async eventProcessingCompleteHandler (job: any): Promise<EventInterface> {
const { data: { request } } = job;
async eventProcessingCompleteHandler (job: any): Promise<EventInterface[]> {
const { data: { request: { data: { blockHash } } } } = job;
assert(blockHash);
const dbEvent = await this._indexer.getEvent(request.data.id);
assert(dbEvent);
const blockProgress = await this._indexer.getBlockProgress(blockHash);
assert(blockProgress);
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
await this.publishBlockProgressToSubscribers(blockProgress);
if (blockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
if (blockProgress.isComplete) {
await this._indexer.removeUnknownEvents(blockProgress);
}
if (blockProgress.isComplete) {
await this._indexer.removeUnknownEvents(blockProgress);
}
return dbEvent;
return this._indexer.getBlockEvents(
blockProgress.blockHash,
{
where: {
eventName: Not(UNKNOWN_EVENT_NAME)
},
order: {
index: 'ASC'
}
}
);
}
async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise<void> {

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { DeepPartial, FindConditions, Not } from 'typeorm';
import { DeepPartial, FindConditions, FindManyOptions, Not } from 'typeorm';
import debug from 'debug';
import { ethers } from 'ethers';
@ -169,21 +169,20 @@ export class Indexer {
}
}
async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateBlockProgress(dbTx, block, lastProcessedEventIndex);
const updatedBlock = await this._db.updateBlockProgress(dbTx, block, lastProcessedEventIndex);
await dbTx.commitTransaction();
return updatedBlock;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getEvent (id: string): Promise<EventInterface | undefined> {
@ -205,8 +204,8 @@ export class Indexer {
return events;
}
async getBlockEvents (blockHash: string): Promise<Array<EventInterface>> {
return this._db.getBlockEvents(blockHash);
async getBlockEvents (blockHash: string, options: FindManyOptions<EventInterface> = {}): Promise<Array<EventInterface>> {
return this._db.getBlockEvents(blockHash, options);
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<EventInterface>> {
@ -229,7 +228,7 @@ export class Indexer {
where.eventName = name;
}
const events = await this._db.getBlockEvents(blockHash, where);
const events = await this._db.getBlockEvents(blockHash, { where });
log(`getEvents: db hit, num events: ${events.length}`);
return events;

View File

@ -60,22 +60,17 @@ export class JobQueue {
return await this._boss.subscribe(
queue,
{
includeMetadata: true,
batchSize: JOBS_PER_INTERVAL
teamSize: JOBS_PER_INTERVAL,
teamConcurrency: 1
},
async (jobs: any) => {
// TODO: Debug jobs not fetched in order from database and use teamSize instead of batchSize.
jobs = jobs.sort((a: any, b: any) => a.createdon - b.createdon);
for (const job of jobs) {
try {
log(`Processing queue ${queue} job ${job.id}...`);
await callback(job);
} catch (error) {
log(`Error in queue ${queue} job ${job.id}`);
log(error);
throw error;
}
async (job: any) => {
try {
log(`Processing queue ${queue} job ${job.id}...`);
await callback(job);
} catch (error) {
log(`Error in queue ${queue} job ${job.id}`);
log(error);
throw error;
}
}
);
@ -97,7 +92,7 @@ export class JobQueue {
assert(this._boss);
const jobId = await this._boss.publish(queue, job, options);
log(`Created job in queue ${queue}: ${jobId} data: ${job.id}`);
log(`Created job in queue ${queue}: ${jobId}`);
}
async deleteAllJobs (): Promise<void> {

View File

@ -4,13 +4,16 @@
import assert from 'assert';
import debug from 'debug';
import { MoreThanOrEqual } from 'typeorm';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface, BlockProgressInterface } from './types';
import { wait } from './misc';
import { createPruningJob } from './common';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENT, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
const DEFAULT_EVENTS_IN_BATCH = 50;
const log = debug('vulcanize:job-runner');
@ -18,6 +21,7 @@ export class JobRunner {
_indexer: IndexerInterface
_jobQueue: JobQueue
_jobQueueConfig: JobQueueConfig
_blockInProcess?: BlockProgressInterface
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
@ -44,22 +48,28 @@ export class JobRunner {
log(`Invalid Job kind ${kind} in QUEUE_BLOCK_PROCESSING.`);
break;
}
await this._jobQueue.markComplete(job);
}
async processEvent (job: any): Promise<EventInterface | void> {
const { data: { kind } } = job;
switch (kind) {
case JOB_KIND_EVENT:
return this._processEvent(job);
case JOB_KIND_EVENTS:
await this._processEvents(job);
break;
case JOB_KIND_CONTRACT:
return this._updateWatchedContracts(job);
await this._updateWatchedContracts(job);
break;
default:
log(`Invalid Job kind ${kind} in QUEUE_EVENT_PROCESSING.`);
break;
}
await this._jobQueue.markComplete(job);
}
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> {
@ -165,32 +175,81 @@ export class JobRunner {
await wait(jobDelayInMilliSecs);
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENT, id: events[ei].id, publish: true });
if (events.length) {
const block = events[0].block;
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: block.blockHash, publish: true });
}
}
}
async _processEvent (job: any): Promise<EventInterface> {
const { data: { id } } = job;
async _processEvents (job: any): Promise<void> {
const { blockHash } = job.data;
log(`Processing event ${id}`);
let block = await this._indexer.getBlockProgress(blockHash);
assert(block);
const event = await this._indexer.getEvent(id);
assert(event);
const eventIndex = event.index;
while (!block.isComplete) {
// Fetch events in batches
const events: EventInterface[] = await this._indexer.getBlockEvents(
blockHash,
{
take: this._jobQueueConfig.eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
where: {
index: MoreThanOrEqual(block.lastProcessedEventIndex + 1)
},
order: {
index: 'ASC'
}
}
);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
for (let event of events) {
// Process events in loop
if (prevIndex !== event.block.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${event.block.lastProcessedEventIndex}, aborting`);
const eventIndex = event.index;
log(`Processing event ${event.id} index ${eventIndex}`);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
if (prevIndex !== block.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` +
` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
}
}
let watchedContract;
if (!this._indexer.isWatchedContract) {
// uni-info-watcher indexer doesn't have watched contracts implementation.
watchedContract = true;
} else {
watchedContract = await this._indexer.isWatchedContract(event.contract);
}
if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
assert(this._indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSON.stringify(eventInfo);
event = await this._indexer.saveEventEntity(event);
}
await this._indexer.processEvent(event);
}
block = await this._indexer.updateBlockProgress(block, event.index);
}
}
return event;
}
async _updateWatchedContracts (job: any): Promise<void> {

View File

@ -2,7 +2,7 @@
// Copyright 2021 Vulcanize, Inc.
//
import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
export interface BlockProgressInterface {
id: number;
@ -52,15 +52,19 @@ export interface IndexerInterface {
getSyncStatus (): Promise<SyncStatusInterface | undefined>;
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
getBlockEvents (blockHash: string, options: FindManyOptions<EventInterface>): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>;
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>;
processEvent (event: EventInterface): Promise<void>;
parseEventNameAndArgs?: (kind: string, logObj: any) => any;
isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>;
cacheContract?: (contract: ContractInterface) => void;
}
@ -71,17 +75,18 @@ export interface EventWatcherInterface {
}
export interface DatabaseInterface {
_conn: Connection;
createTransactionRunner(): Promise<QueryRunner>;
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>;
getBlockEvents (blockHash: string, where?: FindConditions<EventInterface>): Promise<EventInterface[]>;
getBlockEvents (blockHash: string, where?: FindManyOptions<EventInterface>): Promise<EventInterface[]>;
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatusInterface | undefined>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>;
getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>>;
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void>
updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;