Changes to improve performance for event processing job (#304)

* Perf improvement TODOs

* Move watch contract to util package

* Get block progress from event instead of querying

* Use index field in event to check order of processing

* Use watched contract map to avoid querying database

* Use update query for blockProgress entity

Co-authored-by: Ashwin Phatak <ashwinpphatak@gmail.com>
This commit is contained in:
nikugogoi 2021-12-08 11:11:29 +05:30 committed by GitHub
parent f89a7a07aa
commit 6f98166c49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 327 additions and 176 deletions

View File

@ -6,7 +6,7 @@ import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
@ -29,13 +29,21 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { jobQueue: jobQueueConfig } = config;
const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
await db.init();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, serverConfig.mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -5,11 +5,11 @@
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import { ethers } from 'ethers';
import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { Database } from '../database';
import { Indexer } from '../indexer';
(async () => {
const argv = await yargs.parserConfiguration({
@ -37,16 +37,27 @@ import { Database } from '../database';
}).argv;
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig } = config;
const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config;
const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
assert(dbConfig);
const db = new Database(dbConfig);
await db.init();
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
const address = ethers.utils.getAddress(argv.address);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
await indexer.watchContract(argv.address, argv.startingBlock);
await db.saveContract(address, argv.startingBlock);
await db.close();
await jobQueue.stop();
process.exit();
})();

View File

@ -15,8 +15,6 @@ import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
const CONTRACT_KIND = 'token';
export class Database {
_config: ConnectionOptions
_conn!: Connection
@ -76,10 +74,10 @@ export class Database {
return repo.save(entity);
}
async getContract (address: string): Promise<Contract | undefined> {
async getContracts (): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return this._baseDatabase.getContract(repo, address);
return this._baseDatabase.getContracts(repo);
}
async createTransactionRunner (): Promise<QueryRunner> {
@ -116,12 +114,10 @@ export class Database {
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
}
async saveContract (address: string, startingBlock: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<Contract> {
const repo = queryRunner.manager.getRepository(Contract);
return this._baseDatabase.saveContract(repo, address, startingBlock, CONTRACT_KIND);
});
return this._baseDatabase.saveContract(repo, address, startingBlock, kind);
}
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
@ -171,10 +167,10 @@ export class Database {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);
}
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {

View File

@ -77,7 +77,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
@ -85,6 +84,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util';
import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue } from '@vulcanize/util';
import { Database } from './database';
import { Event } from './entity/Event';
@ -29,6 +29,8 @@ const ETH_CALL_MODE = 'eth_call';
const TRANSFER_EVENT = 'Transfer';
const APPROVAL_EVENT = 'Approval';
const CONTRACT_KIND = 'token';
interface EventResult {
event: {
from?: string;
@ -53,7 +55,7 @@ export class Indexer {
_contract: ethers.utils.Interface
_serverMode: string
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, serverMode: string) {
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) {
assert(db);
assert(ethClient);
@ -62,7 +64,7 @@ export class Indexer {
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverMode = serverMode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
const { abi, storageLayout } = artifacts;
@ -290,13 +292,6 @@ export class Indexer {
return { eventName, eventInfo };
}
async watchContract (address: string, startingBlock: number): Promise<boolean> {
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
await this._db.saveContract(ethers.utils.getAddress(address), startingBlock);
return true;
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -305,6 +300,10 @@ export class Indexer {
return this._baseIndexer.isWatchedContract(address);
}
async watchContract (address: string, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, CONTRACT_KIND, startingBlock);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._baseIndexer.saveEventEntity(dbEvent);
}
@ -365,8 +364,8 @@ export class Indexer {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {

View File

@ -56,12 +56,16 @@ export class JobRunner {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
if (!event) {
return;
}
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
await this._indexer.updateBlockProgress(event.block, event.index);
await this._jobQueue.markComplete(job);
});
}
@ -107,7 +111,6 @@ export const main = async (): Promise<any> => {
});
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
@ -117,6 +120,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -34,9 +34,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
},
Mutation: {
watchToken: (_: any, { token, startingBlock = 1 }: { token: string, startingBlock: number }): Promise<boolean> => {
watchToken: async (_: any, { token, startingBlock = 1 }: { token: string, startingBlock: number }): Promise<boolean> => {
log('watchToken', token, startingBlock);
return indexer.watchContract(token, startingBlock);
await indexer.watchContract(token, startingBlock);
return true;
}
},

View File

@ -72,7 +72,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
@ -80,6 +79,9 @@ export const main = async (): Promise<any> => {
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {

View File

@ -6,7 +6,7 @@ import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
@ -38,6 +38,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { jobQueue: jobQueueConfig } = config;
const { dbConfig, serverConfig, upstreamConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
@ -52,7 +53,15 @@ export const handler = async (argv: any): Promise<void> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, serverConfig.mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -663,10 +663,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);
}
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {

View File

@ -82,7 +82,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
@ -91,6 +90,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv);

View File

@ -11,7 +11,7 @@ import { providers, utils, BigNumber } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue } from '@vulcanize/util';
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
@ -48,7 +48,7 @@ export class Indexer implements IndexerInterface {
_baseIndexer: BaseIndexer
_isDemo: boolean
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, mode: string) {
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) {
assert(db);
assert(uniClient);
assert(erc20Client);
@ -59,14 +59,14 @@ export class Indexer implements IndexerInterface {
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, ethProvider, jobQueue);
this._isDemo = mode === 'demo';
}
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSON.parse(event.eventInfo);
const { tx } = JSON.parse(event.extraInfo);
const { tx, eventIndex } = JSON.parse(event.extraInfo);
return {
block: {
@ -78,7 +78,7 @@ export class Indexer implements IndexerInterface {
tx,
contract: event.contract,
eventIndex: event.index,
eventIndex,
event: {
__typename: event.eventName,
@ -346,8 +346,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<void> {
@ -365,10 +365,10 @@ export class Indexer implements IndexerInterface {
} = events[i];
const { __typename: eventName, ...eventInfo } = event;
const extraInfo = { tx };
const extraInfo = { tx, eventIndex };
dbEvents.push({
index: eventIndex,
index: i,
txHash: tx.hash,
contract,
eventName,

View File

@ -58,12 +58,16 @@ export class JobRunner {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
if (!event) {
return;
}
// Check if event is processed.
if (!event.block.isComplete && event.index !== event.block.lastProcessedEventIndex) {
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
await this._indexer.updateBlockProgress(event.block, event.index);
await this._jobQueue.markComplete(job);
});
}
@ -132,8 +136,6 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
@ -142,6 +144,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -82,7 +82,6 @@ export const main = async (): Promise<any> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
@ -92,6 +91,8 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
const pubSub = new PubSub();
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue);
await eventWatcher.start();

View File

@ -63,14 +63,14 @@ describe('chain pruning', () => {
const ethProvider = getCustomProvider(rpcProviderEndpoint);
indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(indexer, 'Could not create indexer object.');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
assert(indexer, 'Could not create indexer object.');
jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
});

View File

@ -6,7 +6,7 @@ import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
@ -27,13 +27,21 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { jobQueue: jobQueueConfig } = config;
const { dbConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
await db.init();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -6,10 +6,10 @@ import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { Database } from '../database';
import { watchContract } from '../utils/index';
import { Indexer } from '../indexer';
(async () => {
const argv = await yargs.parserConfiguration({
@ -43,14 +43,28 @@ import { watchContract } from '../utils/index';
}).argv;
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig } = config;
const { database: dbConfig, jobQueue: jobQueueConfig } = config;
const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
assert(dbConfig);
const db = new Database(dbConfig);
await db.init();
await watchContract(db, argv.address, argv.kind, argv.startingBlock);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
await indexer.watchContract(argv.address, argv.kind, argv.startingBlock);
await db.close();
await jobQueue.stop();
process.exit();
})();

View File

@ -45,13 +45,13 @@ export class Database implements DatabaseInterface {
.getOne();
}
async getContract (address: string): Promise<Contract | undefined> {
async getContracts (): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return this._baseDatabase.getContract(repo, address);
return this._baseDatabase.getContracts(repo);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<void> {
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<Contract> {
const repo = queryRunner.manager.getRepository(Contract);
return this._baseDatabase.saveContract(repo, address, startingBlock, kind);
@ -138,10 +138,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);
}
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {

View File

@ -77,7 +77,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
@ -85,6 +84,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -9,7 +9,7 @@ import { ethers } from 'ethers';
import assert from 'assert';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue } from '@vulcanize/util';
import { Database } from './database';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
@ -46,18 +46,22 @@ export class Indexer implements IndexerInterface {
_poolContract: ethers.utils.Interface
_nfpmContract: ethers.utils.Interface
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider) {
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);
this._nfpmContract = new ethers.utils.Interface(nfpmABI);
}
async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
}
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSON.parse(event.eventInfo);
@ -97,7 +101,7 @@ export class Indexer implements IndexerInterface {
switch (re.event.__typename) {
case 'PoolCreatedEvent': {
const poolContract = ethers.utils.getAddress(re.event.pool);
await this._db.saveContract(dbTx, poolContract, KIND_POOL, dbEvent.block.blockNumber);
await this.watchContract(poolContract, KIND_POOL, dbEvent.block.blockNumber);
}
}
}
@ -343,6 +347,14 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.isWatchedContract(address);
}
async watchContract (address: string, kind: string, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, startingBlock);
}
cacheContract (contract: Contract): void {
return this._baseIndexer.cacheContract(contract);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._baseIndexer.saveEventEntity(dbEvent);
}
@ -404,8 +416,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {

View File

@ -55,12 +55,17 @@ export class JobRunner {
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
// TODO: Support two kind of jobs on the event processing queue.
// 1) processEvent => Current single event
// 2) processEvents => Event range (multiple events)
let event = await this._baseJobRunner.processEvent(job);
let dbEvent;
const { data: { id } } = job;
if (!event) {
return;
}
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
@ -69,16 +74,13 @@ export class JobRunner {
const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSON.stringify(eventInfo);
dbEvent = await this._indexer.saveEventEntity(event);
event = await this._indexer.saveEventEntity(event);
}
dbEvent = await this._indexer.getEvent(id);
assert(dbEvent);
await this._indexer.processEvent(dbEvent);
await this._indexer.processEvent(event);
}
await this._indexer.updateBlockProgress(event.block.blockHash, event.index);
await this._indexer.updateBlockProgress(event.block, event.index);
await this._jobQueue.markComplete(job);
});
}
@ -125,8 +127,6 @@ export const main = async (): Promise<any> => {
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
@ -135,6 +135,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -72,7 +72,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');
@ -82,6 +81,9 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await eventWatcher.start();

View File

@ -8,7 +8,8 @@ import 'mocha';
import {
Config,
getConfig
getConfig,
JobQueue
} from '@vulcanize/util';
import {
deployTokens,
@ -65,6 +66,7 @@ describe('uni-watcher', () => {
let ethClient: EthClient;
let postgraphileClient: EthClient;
let ethProvider: ethers.providers.JsonRpcProvider;
let jobQueue: JobQueue;
let signer: Signer;
let recipient: string;
let deadline: number;
@ -72,7 +74,7 @@ describe('uni-watcher', () => {
before(async () => {
config = await getConfig(CONFIG_FILE);
const { database: dbConfig, upstream, server: { host, port } } = config;
const { database: dbConfig, upstream, server: { host, port }, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing dbConfig.');
assert(upstream, 'Missing upstream.');
assert(host, 'Missing host.');
@ -115,6 +117,9 @@ describe('uni-watcher', () => {
const deadlineDate = new Date();
deadlineDate.setDate(deadlineDate.getDate() + 2);
deadline = Math.floor(deadlineDate.getTime() / 1000);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
});
after(async () => {
@ -130,7 +135,8 @@ describe('uni-watcher', () => {
factory = new Contract(factoryContract.address, FACTORY_ABI, signer);
// Verifying with the db.
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.');
});
@ -265,7 +271,8 @@ describe('uni-watcher', () => {
nfpm = new Contract(nfpmContract.address, NFPM_ABI, signer);
// Verifying with the db.
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
await indexer.init();
assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.');
});

View File

@ -2,27 +2,8 @@
// Copyright 2021 Vulcanize, Inc.
//
import { ethers } from 'ethers';
import { Database } from '../database';
import { Client as UniClient } from '../client';
export async function watchContract (db: Database, address: string, kind: string, startingBlock: number): Promise<void> {
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
const contractAddress = ethers.utils.getAddress(address);
const dbTx = await db.createTransactionRunner();
try {
await db.saveContract(dbTx, contractAddress, kind, startingBlock);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
export const watchEvent = async (uniClient: UniClient, eventType: string): Promise<any> => {
return new Promise((resolve, reject) => {
(async () => {

View File

@ -6,7 +6,7 @@ import { Contract, ethers, Signer } from 'ethers';
import assert from 'assert';
import {
getConfig
getConfig, getResetConfig, JobQueue
} from '@vulcanize/util';
import {
deployWETH9Token,
@ -19,23 +19,23 @@ import {
import { Client as UniClient } from '../src/client';
import { Database } from '../src/database';
import { watchContract } from '../src/utils/index';
import { Indexer } from '../src/indexer';
const CONFIG_FILE = './environments/test.toml';
const deployFactoryContract = async (db: Database, signer: Signer): Promise<Contract> => {
const deployFactoryContract = async (indexer: Indexer, signer: Signer): Promise<Contract> => {
// Deploy factory from uniswap package.
const Factory = new ethers.ContractFactory(FACTORY_ABI, FACTORY_BYTECODE, signer);
const factory = await Factory.deploy();
assert(factory.address, 'Factory contract not deployed.');
// Watch factory contract.
await watchContract(db, factory.address, 'factory', 100);
await indexer.watchContract(factory.address, 'factory', 100);
return factory;
};
const deployNFPMContract = async (db: Database, signer: Signer, factory: Contract): Promise<void> => {
const deployNFPMContract = async (indexer: Indexer, signer: Signer, factory: Contract): Promise<void> => {
// Deploy weth9 token.
const weth9Address = await deployWETH9Token(signer);
assert(weth9Address, 'WETH9 token not deployed.');
@ -45,18 +45,19 @@ const deployNFPMContract = async (db: Database, signer: Signer, factory: Contrac
assert(nfpm.address, 'NFPM contract not deployed.');
// Watch NFPM contract.
await watchContract(db, nfpm.address, 'nfpm', 100);
await indexer.watchContract(nfpm.address, 'nfpm', 100);
};
const main = async () => {
// Get config.
const config = await getConfig(CONFIG_FILE);
const { database: dbConfig, server: { host, port }, upstream: { ethServer: { rpcProviderEndpoint } } } = config;
const { database: dbConfig, server: { host, port }, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing dbConfig.');
assert(host, 'Missing host.');
assert(port, 'Missing port.');
assert(rpcProviderEndpoint, 'Missing rpcProviderEndpoint.');
const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize uniClient.
const endpoint = `http://${host}:${port}/graphql`;
@ -71,14 +72,22 @@ const main = async () => {
const db = new Database(dbConfig);
await db.init();
const provider = new ethers.providers.JsonRpcProvider(rpcProviderEndpoint);
const provider = ethProvider as ethers.providers.JsonRpcProvider;
const signer = provider.getSigner();
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue);
let factory: Contract;
// Checking whether factory is deployed.
const factoryContract = await uniClient.getContract('factory');
if (factoryContract == null) {
factory = await deployFactoryContract(db, signer);
factory = await deployFactoryContract(indexer, signer);
} else {
factory = new Contract(factoryContract.address, FACTORY_ABI, signer);
}
@ -86,7 +95,7 @@ const main = async () => {
// Checking whether NFPM is deployed.
const nfpmContract = await uniClient.getContract('nfpm');
if (nfpmContract == null) {
await deployNFPMContract(db, signer, factory);
await deployNFPMContract(indexer, signer, factory);
}
// Closing the database.

View File

@ -11,6 +11,9 @@ export const QUEUE_CHAIN_PRUNING = 'chain-pruning';
export const JOB_KIND_INDEX = 'index';
export const JOB_KIND_PRUNE = 'prune';
export const JOB_KIND_CONTRACT = 'contract';
export const JOB_KIND_EVENT = 'event';
export const DEFAULT_CONFIG_PATH = 'environments/local.toml';
export const UNKNOWN_EVENT_NAME = '__unknown__';

View File

@ -153,20 +153,20 @@ export class Database {
.getMany();
}
async updateBlockProgress (repo: Repository<BlockProgressInterface>, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
async updateBlockProgress (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void> {
if (!block.isComplete) {
if (lastProcessedEventIndex <= block.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
}
entity.lastProcessedEventIndex = lastProcessedEventIndex;
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
block.lastProcessedEventIndex = lastProcessedEventIndex;
block.numProcessedEvents++;
if (block.numProcessedEvents >= block.numEvents) {
block.isComplete = true;
}
await repo.save(entity);
const { id, ...blockData } = block;
await repo.update(id, blockData);
}
}
@ -550,21 +550,24 @@ export class Database {
return { canonicalBlockNumber, blockHashes };
}
async getContract (repo: Repository<ContractInterface>, address: string): Promise<ContractInterface | undefined> {
async getContracts (repo: Repository<ContractInterface>): Promise<ContractInterface[]> {
return repo.createQueryBuilder('contract')
.where('address = :address', { address })
.getOne();
.getMany();
}
async saveContract (repo: Repository<ContractInterface>, address: string, startingBlock: number, kind?: string): Promise<void> {
const numRows = await repo
async saveContract (repo: Repository<ContractInterface>, address: string, startingBlock: number, kind?: string): Promise<ContractInterface> {
const contract = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
.getOne();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
const entity = repo.create({ address, kind, startingBlock });
// If contract already present, overwrite fields.
if (contract) {
entity.id = contract.id;
}
return repo.save(entity);
}
}

View File

@ -11,7 +11,8 @@ import { EthClient } from '@vulcanize/ipld-eth-client';
import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper';
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types';
import { UNKNOWN_EVENT_NAME } from './constants';
import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING } from './constants';
import { JobQueue } from './job-queue';
const MAX_EVENTS_BLOCK_RANGE = 1000;
@ -30,15 +31,31 @@ export class Indexer {
_postgraphileClient: EthClient;
_getStorageAt: GetStorageAt;
_ethProvider: ethers.providers.BaseProvider;
_jobQueue: JobQueue;
constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider) {
_watchedContracts: { [key: string]: ContractInterface } = {};
constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._jobQueue = jobQueue;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
}
async fetchContracts (): Promise<void> {
assert(this._db.getContracts);
const contracts = await this._db.getContracts();
this._watchedContracts = contracts.reduce((acc: { [key: string]: ContractInterface }, contract) => {
acc[contract.address] = contract;
return acc;
}, {});
}
async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
@ -152,12 +169,12 @@ export class Indexer {
}
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateBlockProgress(dbTx, blockHash, lastProcessedEventIndex);
res = await this._db.updateBlockProgress(dbTx, block, lastProcessedEventIndex);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
@ -281,9 +298,39 @@ export class Indexer {
}
async isWatchedContract (address : string): Promise<ContractInterface | undefined> {
assert(this._db.getContract);
return this._watchedContracts[address];
}
return this._db.getContract(ethers.utils.getAddress(address));
async watchContract (address: string, kind: string, startingBlock: number): Promise<void> {
assert(this._db.saveContract);
const dbTx = await this._db.createTransactionRunner();
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
const contractAddress = ethers.utils.getAddress(address);
try {
const contract = await this._db.saveContract(dbTx, contractAddress, kind, startingBlock);
this.cacheContract(contract);
await dbTx.commitTransaction();
await this._jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
{
kind: JOB_KIND_CONTRACT,
contract
},
{ priority: 1 }
);
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
cacheContract (contract: ContractInterface): void {
this._watchedContracts[contract.address] = contract;
}
async getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {

View File

@ -52,6 +52,10 @@ export class JobQueue {
await this._boss.start();
}
async stop (): Promise<void> {
await this._boss.stop();
}
async subscribe (queue: string, callback: JobCallback): Promise<string> {
return await this._boss.subscribe(
queue,

View File

@ -8,7 +8,7 @@ import { wait } from './misc';
import { createPruningJob } from './common';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENT, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
@ -46,34 +46,20 @@ export class JobRunner {
}
}
async processEvent (job: any): Promise<EventInterface> {
const { data: { id } } = job;
async processEvent (job: any): Promise<EventInterface | void> {
const { data: { kind } } = job;
log(`Processing event ${id}`);
switch (kind) {
case JOB_KIND_EVENT:
return this._processEvent(job);
const dbEvent = await this._indexer.getEvent(id);
assert(dbEvent);
case JOB_KIND_CONTRACT:
return this._updateWatchedContracts(job);
const event = dbEvent;
const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash);
assert(blockProgress);
const events = await this._indexer.getBlockEvents(event.block.blockHash);
const eventIndex = events.findIndex((e: any) => e.id === event.id);
assert(eventIndex !== -1);
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
const prevEvent = events[prevIndex];
if (prevEvent.index !== blockProgress.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`);
}
default:
log(`Invalid Job kind ${kind} in QUEUE_EVENT_PROCESSING.`);
break;
}
return event;
}
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> {
@ -180,8 +166,37 @@ export class JobRunner {
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENT, id: events[ei].id, publish: true });
}
}
}
async _processEvent (job: any): Promise<EventInterface> {
const { data: { id } } = job;
log(`Processing event ${id}`);
const event = await this._indexer.getEvent(id);
assert(event);
const eventIndex = event.index;
// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
const prevIndex = eventIndex - 1;
if (prevIndex !== event.block.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` +
` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${event.block.lastProcessedEventIndex}, aborting`);
}
}
return event;
}
async _updateWatchedContracts (job: any): Promise<void> {
const { data: { contract } } = job;
assert(this._indexer.cacheContract);
this._indexer.cacheContract(contract);
}
}

View File

@ -56,11 +56,12 @@ export interface IndexerInterface {
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void>
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>;
cacheContract?: (contract: ContractInterface) => void;
}
export interface EventWatcherInterface {
@ -80,12 +81,13 @@ export interface DatabaseInterface {
getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>;
getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>>;
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void>
updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<void>
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void>;
getContract?: (address: string) => Promise<ContractInterface | undefined>
getContracts?: () => Promise<ContractInterface[]>
saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, startingBlock: number) => Promise<ContractInterface>
}