Job queue to process events (#137)

* Job queue to process events.

* Event queue processing changes.
This commit is contained in:
Ashwin Phatak 2021-07-14 18:00:26 +05:30 committed by GitHub
parent a4f5d43bc5
commit 03ceb95a1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 525 additions and 293 deletions

View File

@ -42,7 +42,6 @@
"json-bigint": "^1.0.0", "json-bigint": "^1.0.0",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"pg": "^8.6.0", "pg": "^8.6.0",
"pg-boss": "^6.1.0",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"toml": "^3.0.0", "toml": "^3.0.0",
"typeorm": "^0.2.32", "typeorm": "^0.2.32",

View File

@ -6,10 +6,9 @@ import debug from 'debug';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue } from '@vulcanize/util';
import { Database } from './database'; import { Database } from './database';
import { getConfig } from './config';
import { JobQueue } from './job-queue';
import { QUEUE_TX_TRACING } from './tx-watcher'; import { QUEUE_TX_TRACING } from './tx-watcher';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');

View File

@ -7,11 +7,10 @@ import debug from 'debug';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { TracingClient } from '@vulcanize/tracing-client'; import { TracingClient } from '@vulcanize/tracing-client';
import { getConfig, JobQueue } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
import { getConfig } from './config';
import { JobQueue } from './job-queue';
import { QUEUE_TX_TRACING } from './tx-watcher'; import { QUEUE_TX_TRACING } from './tx-watcher';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');

View File

@ -10,7 +10,7 @@ import { createServer } from 'http';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { TracingClient } from '@vulcanize/tracing-client'; import { TracingClient } from '@vulcanize/tracing-client';
import { getConfig } from '@vulcanize/util'; import { getConfig, JobQueue } from '@vulcanize/util';
import typeDefs from './schema'; import typeDefs from './schema';
@ -18,7 +18,6 @@ import { createResolvers } from './resolvers';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
import { TxWatcher } from './tx-watcher'; import { TxWatcher } from './tx-watcher';
import { JobQueue } from './job-queue';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');

View File

@ -4,9 +4,9 @@ import _ from 'lodash';
import { PubSub } from 'apollo-server-express'; import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { JobQueue } from './job-queue';
import { BlockProgress } from './entity/BlockProgress'; import { BlockProgress } from './entity/BlockProgress';
const log = debug('vulcanize:tx-watcher'); const log = debug('vulcanize:tx-watcher');

View File

@ -291,7 +291,7 @@ export class Indexer {
} }
async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<void> { async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<void> {
const logs = await this._ethClient.getLogs({ blockHash, contract: token }); const { logs } = await this._ethClient.getLogs({ blockHash, contract: token });
const eventNameToTopic = getEventNameTopics(this._abi); const eventNameToTopic = getEventNameTopics(this._abi);
const logTopicToEventName = invert(eventNameToTopic); const logTopicToEventName = invert(eventNameToTopic);

View File

@ -65,21 +65,15 @@ export class EthClient {
async getLogs (vars: Vars): Promise<any> { async getLogs (vars: Vars): Promise<any> {
const result = await this._getCachedOrFetch('getLogs', vars); const result = await this._getCachedOrFetch('getLogs', vars);
const { getLogs: logs, block: { number: blockNumHex, timestamp: timestampHex } } = result; const { getLogs: resultLogs, block: { number: blockNumHex, timestamp: timestampHex } } = result;
const blockNumber = parseInt(blockNumHex, 16); const block = { hash: vars.blockHash, number: parseInt(blockNumHex, 16), timestamp: parseInt(timestampHex, 16) };
const timestamp = parseInt(timestampHex, 16); const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block }}));
return logs.map((logEntry: any) => { return { logs, block };
return _.merge({}, logEntry, {
transaction: {
block: {
hash: vars.blockHash,
number: blockNumber,
timestamp
} }
}
}); async watchBlocks (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
}); return this._graphqlClient.subscribe(ethQueries.subscribeBlocks, onNext);
} }
async watchLogs (onNext: (value: any) => void): Promise<ZenObservable.Subscription> { async watchLogs (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {

View File

@ -75,6 +75,19 @@ subscription SubscriptionReceipt {
} }
`; `;
export const subscribeBlocks = gql`
subscription {
listen(topic: "header_cids") {
relatedNode {
... on EthHeaderCid {
blockHash
blockNumber
}
}
}
}
`;
export const subscribeTransactions = gql` export const subscribeTransactions = gql`
subscription SubscriptionHeader { subscription SubscriptionHeader {
listen(topic: "transaction_cids") { listen(topic: "transaction_cids") {
@ -96,5 +109,6 @@ export default {
getLogs, getLogs,
getBlockWithTransactions, getBlockWithTransactions,
subscribeLogs, subscribeLogs,
subscribeBlocks,
subscribeTransactions subscribeTransactions
}; };

View File

@ -193,7 +193,7 @@ export class Indexer {
} }
async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<void> { async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<void> {
const logs = await this._ethClient.getLogs({ blockHash, contract: token }); const { logs } = await this._ethClient.getLogs({ blockHash, contract: token });
const eventNameToTopic = {}; // getEventNameTopics(this._abi); const eventNameToTopic = {}; // getEventNameTopics(this._abi);
const logTopicToEventName = invert(eventNameToTopic); const logTopicToEventName = invert(eventNameToTopic);

View File

@ -1,5 +1,42 @@
# Uniswap Watcher # Uniswap Watcher
## Setup
Create a postgres12 database for the job queue:
```
sudo su - postgres
createdb uni-watcher-job-queue
```
Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro).
Example:
```
postgres@tesla:~$ psql -U postgres -h localhost uni-watcher-job-queue
Password for user postgres:
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
uni-watcher-job-queue=# CREATE EXTENSION pgcrypto;
CREATE EXTENSION
uni-watcher-job-queue=# exit
```
Create a postgres12 database for the address watcher:
```
sudo su - postgres
createdb uni-watcher
```
Update `environments/local.toml` with database connection settings for both the databases.
## Run
Run the server: Run the server:
```bash ```bash

View File

@ -29,3 +29,7 @@
name = "requests" name = "requests"
enabled = false enabled = false
deleteOnStart = false deleteOnStart = false
[jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue"
maxCompletionLag = 300

View File

@ -7,6 +7,8 @@
"scripts": { "scripts": {
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml", "server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",
"server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml", "server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml",
"job-runner": "DEBUG=vulcanize:* nodemon src/job-runner.ts -f environments/local.toml",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts -f environments/local.toml",
"test": "mocha -r ts-node/register src/**/*.spec.ts", "test": "mocha -r ts-node/register src/**/*.spec.ts",
"lint": "eslint .", "lint": "eslint .",
"build": "tsc", "build": "tsc",

View File

@ -4,7 +4,7 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
import { Contract } from './entity/Contract'; import { Contract } from './entity/Contract';
import { EventSyncProgress } from './entity/EventProgress'; import { BlockProgress } from './entity/BlockProgress';
export class Database { export class Database {
_config: ConnectionOptions _config: ConnectionOptions
@ -28,17 +28,7 @@ export class Database {
return this._conn.close(); return this._conn.close();
} }
// Returns true if events have already been synced for the (block, token) combination. async getBlockEvents (blockHash: string): Promise<Event[]> {
async didSyncEvents ({ blockHash }: { blockHash: string }): Promise<boolean> {
const numRows = await this._conn.getRepository(EventSyncProgress)
.createQueryBuilder()
.where('block_hash = :blockHash', { blockHash })
.getCount();
return numRows > 0;
}
async getBlockEvents ({ blockHash }: { blockHash: string }): Promise<Event[]> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.where('block_hash = :blockHash', { blockHash }) .where('block_hash = :blockHash', { blockHash })
@ -46,7 +36,7 @@ export class Database {
.getMany(); .getMany();
} }
async getEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<Event[]> { async getEvents (blockHash: string, contract: string): Promise<Event[]> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.where('block_hash = :blockHash AND contract = :contract', { .where('block_hash = :blockHash AND contract = :contract', {
@ -57,7 +47,7 @@ export class Database {
.getMany(); .getMany();
} }
async getEventsByName ({ blockHash, contract, eventName }: { blockHash: string, contract: string, eventName: string }): Promise<Event[] | undefined> { async getEventsByName (blockHash: string, contract: string, eventName: string): Promise<Event[] | undefined> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', { .where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', {
@ -68,35 +58,33 @@ export class Database {
.getMany(); .getMany();
} }
async saveEvents ({ blockHash, events }: { blockHash: string, events: DeepPartial<Event>[] }): Promise<void> { async saveEvents (blockHash: string, blockNumber: number, events: DeepPartial<Event>[]): Promise<void> {
// In a transaction: // In a transaction:
// (1) Save all the events in the database. // (1) Save all the events in the database.
// (2) Add an entry to the event progress table. // (2) Add an entry to the block progress table.
await this._conn.transaction(async (tx) => { await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(EventSyncProgress); const numEvents = events.length;
const blockProgressRepo = tx.getRepository(BlockProgress);
// Check sync progress inside the transaction. const blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
const numRows = await repo if (!blockProgress) {
.createQueryBuilder()
.where('block_hash = :blockHash', { blockHash })
.getCount();
if (numRows === 0) {
// Bulk insert events. // Bulk insert events.
await tx.createQueryBuilder() await tx.createQueryBuilder().insert().into(Event).values(events).execute();
.insert()
.into(Event)
.values(events)
.execute();
// Update event sync progress. const entity = blockProgressRepo.create({ blockHash, blockNumber, numEvents, numProcessedEvents: 0, isComplete: (numEvents === 0) });
const progress = repo.create({ blockHash }); await blockProgressRepo.save(entity);
await repo.save(progress);
} }
}); });
} }
async getEvent (id: string): Promise<Event | undefined> {
return this._conn.getRepository(Event).findOne(id);
}
async saveEventEntity (entity: Event): Promise<Event> {
const repo = this._conn.getRepository(Event);
return await repo.save(entity);
}
async getContract (address: string): Promise<Contract | undefined> { async getContract (address: string): Promise<Contract | undefined> {
return this._conn.getRepository(Contract) return this._conn.getRepository(Contract)
.createQueryBuilder('contract') .createQueryBuilder('contract')
@ -119,4 +107,23 @@ export class Database {
} }
}); });
} }
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
const repo = this._conn.getRepository(BlockProgress);
return repo.findOne({ where: { blockHash } });
}
async updateBlockProgress (blockHash: string): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(BlockProgress);
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
}
await repo.save(entity);
}
});
}
} }

View File

@ -0,0 +1,23 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
@Entity()
@Index(['blockHash'], { unique: true })
export class BlockProgress {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar', { length: 66 })
blockHash!: string;
@Column('numeric')
blockNumber!: number;
@Column('numeric')
numEvents!: number;
@Column('numeric')
numProcessedEvents!: number;
@Column('boolean')
isComplete!: boolean
}

View File

@ -1,12 +1,15 @@
import { Entity, PrimaryColumn, Column } from 'typeorm'; import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
export const KIND_FACTORY = 'factory'; export const KIND_FACTORY = 'factory';
export const KIND_POOL = 'pool'; export const KIND_POOL = 'pool';
@Entity() @Entity()
@Index(['address'], { unique: true })
export class Contract { export class Contract {
@PrimaryColumn('varchar', { length: 42 }) @PrimaryGeneratedColumn()
id!: number;
@Column('varchar', { length: 42 })
address!: string; address!: string;
@Column('varchar', { length: 8 }) @Column('varchar', { length: 8 })

View File

@ -1,5 +1,7 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
export const UNKNOWN_EVENT_NAME = '__unknown__';
@Entity() @Entity()
// Index to query all events for a contract efficiently. // Index to query all events for a contract efficiently.
@Index(['blockHash', 'contract']) @Index(['blockHash', 'contract'])
@ -36,9 +38,6 @@ export class Event {
@Column('text') @Column('text')
extraInfo!: string; extraInfo!: string;
@Column('boolean', { default: false })
isProcessed!: boolean;
@Column('text') @Column('text')
proof!: string; proof!: string;
} }

View File

@ -1,17 +0,0 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
// Stores a row if events for a (block, token) combination have already been fetched.
//
// Required as a particular block may not have events from a particular contract,
// and we need to differentiate between that case and the case where data hasn't
// yet been synced from upstream.
//
@Entity()
@Index(['blockHash'], { unique: true })
export class EventSyncProgress {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar', { length: 66 })
blockHash!: string;
}

View File

@ -1,79 +1,115 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import _ from 'lodash'; import _ from 'lodash';
import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { JobQueue } from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { BlockProgress } from './entity/BlockProgress';
import { UNKNOWN_EVENT_NAME } from './entity/Event';
const log = debug('vulcanize:events'); const log = debug('vulcanize:events');
export const UniswapEvent = 'uniswap-event';
export const BlockProgressEvent = 'block-progress-event';
export const QUEUE_EVENT_PROCESSING = 'event-processing';
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
export class EventWatcher { export class EventWatcher {
_ethClient: EthClient _ethClient: EthClient
_indexer: Indexer _indexer: Indexer
_subscription: ZenObservable.Subscription | undefined _subscription: ZenObservable.Subscription | undefined
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: Indexer) { constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
}
getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([UniswapEvent]);
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([BlockProgressEvent]);
} }
async start (): Promise<void> { async start (): Promise<void> {
assert(!this._subscription, 'subscription already started'); assert(!this._subscription, 'subscription already started');
log('Started watching upstream logs...'); log('Started watching upstream blocks...');
this._subscription = await this._ethClient.watchLogs(async (value) => { this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const receipt = _.get(value, 'data.listen.relatedNode'); const { data: { request: { data: { blockHash, blockNumber } } } } = job;
log('watchLogs', JSON.stringify(receipt, null, 2)); log(`Job onComplete block ${blockHash} ${blockNumber}`);
});
const blocks: string[] = []; this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { data: { request, failed, state, createdOn } } = job;
const { logContracts } = receipt; await this._indexer.updateBlockProgress(request.data.blockHash);
if (logContracts && logContracts.length) { const blockProgress = await this._indexer.getBlockProgress(request.data.blockHash);
for (let logIndex = 0; logIndex < logContracts.length; logIndex++) { if (blockProgress && request.data.publishBlockProgress) {
const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash } } } = receipt; await this.publishBlockProgressToSubscribers(blockProgress);
await this._indexer.getBlockEvents(blockHash); }
blocks.push(blockHash);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
return await this.publishUniswapEventToSubscribers(request.data.id, timeElapsedInSeconds);
} else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
});
this._subscription = await this._ethClient.watchBlocks(async (value) => {
const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode');
log('watchBlock', blockHash, blockNumber);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber });
});
}
async publishUniswapEventToSubscribers (id: string, timeElapsedInSeconds: number): Promise<void> {
const dbEvent = await this._indexer.getEvent(id);
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(UniswapEvent, {
onEvent: resultEvent
});
} }
} }
const processedBlocks: any = {}; async publishBlockProgressToSubscribers (blockProgress: BlockProgress): Promise<void> {
if (!blocks.length) { const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress;
return;
}
// Process events, if from known uniswap contracts. // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
for (let bi = 0; bi < blocks.length; bi++) { await this._pubsub.publish(BlockProgressEvent, {
const blockHash = blocks[bi]; onBlockProgressEvent: {
if (processedBlocks[blockHash]) { blockHash,
continue; blockNumber,
} numEvents,
numProcessedEvents,
const events = await this._indexer.getBlockEvents(blockHash); isComplete
for (let ei = 0; ei < events.length; ei++) {
const eventObj = events[ei];
const uniContract = await this._indexer.isUniswapContract(eventObj.contract);
if (uniContract) {
log('event', JSON.stringify(eventObj, null, 2));
// TODO: Move processing to background queue (need sequential processing of events).
// Trigger other indexer methods based on event topic.
await this._indexer.processEvent(eventObj);
}
}
processedBlocks[blockHash] = true;
} }
}); });
} }
async stop (): Promise<void> { async stop (): Promise<void> {
if (this._subscription) { if (this._subscription) {
log('Stopped watching upstream logs'); log('Stopped watching upstream blocks');
this._subscription.unsubscribe(); this._subscription.unsubscribe();
} }
} }

View File

@ -1,17 +1,15 @@
import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import _ from 'lodash';
import { DeepPartial } from 'typeorm'; import { DeepPartial } from 'typeorm';
import JSONbig from 'json-bigint'; import JSONbig from 'json-bigint';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import { PubSub } from 'apollo-server-express';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { GetStorageAt } from '@vulcanize/solidity-mapper'; import { GetStorageAt } from '@vulcanize/solidity-mapper';
import { Config } from '@vulcanize/util'; import { Config } from '@vulcanize/util';
import { Database } from './database'; import { Database } from './database';
import { Event } from './entity/Event'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
import { BlockProgress } from './entity/BlockProgress';
import { Contract, KIND_FACTORY, KIND_POOL } from './entity/Contract'; import { Contract, KIND_FACTORY, KIND_POOL } from './entity/Contract';
import factoryABI from './artifacts/factory.json'; import factoryABI from './artifacts/factory.json';
@ -35,48 +33,21 @@ export class Indexer {
_config: Config; _config: Config;
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
_pubsub: PubSub
_getStorageAt: GetStorageAt _getStorageAt: GetStorageAt
_factoryContract: ethers.utils.Interface _factoryContract: ethers.utils.Interface
_poolContract: ethers.utils.Interface _poolContract: ethers.utils.Interface
constructor (config: Config, db: Database, ethClient: EthClient, pubsub: PubSub) { constructor (config: Config, db: Database, ethClient: EthClient) {
assert(config);
assert(db);
assert(ethClient);
assert(pubsub);
this._config = config; this._config = config;
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;
this._pubsub = pubsub;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._factoryContract = new ethers.utils.Interface(factoryABI); this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI); this._poolContract = new ethers.utils.Interface(poolABI);
} }
getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator(['event']);
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
const didSyncEvents = await this._db.didSyncEvents({ blockHash });
if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table.
await this.fetchAndSaveEvents({ blockHash });
log('getEvents: db miss, fetching from upstream server');
}
assert(await this._db.didSyncEvents({ blockHash }));
const events = await this._db.getBlockEvents({ blockHash });
log(`getEvents: db hit, num events: ${events.length}`);
return events;
}
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const eventFields = JSON.parse(event.eventInfo); const eventFields = JSON.parse(event.eventInfo);
@ -100,29 +71,40 @@ export class Indexer {
}, },
// TODO: Return proof only if requested. // TODO: Return proof only if requested.
proof: JSON.parse(event.proof), proof: JSON.parse(event.proof)
}; };
} }
async getEvents (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> { // Note: Some event names might be unknown at this point, as earlier events might not yet be processed.
async getOrFetchBlockEvents (blockHash: string): Promise<Array<Event>> {
const blockProgress = await this._db.getBlockProgress(blockHash);
if (!blockProgress) {
// Fetch and save events first and make a note in the event sync progress table.
await this.fetchAndSaveEvents(blockHash);
log('getBlockEvents: db miss, fetching from upstream server');
}
const events = await this._db.getBlockEvents(blockHash);
log(`getBlockEvents: db hit, num events: ${events.length}`);
return events;
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
const uniContract = await this.isUniswapContract(contract); const uniContract = await this.isUniswapContract(contract);
if (!uniContract) { if (!uniContract) {
throw new Error('Not a uniswap contract'); throw new Error('Not a uniswap contract');
} }
const didSyncEvents = await this._db.didSyncEvents({ blockHash }); // Fetch block events first.
if (!didSyncEvents) { await this.getOrFetchBlockEvents(blockHash);
// Fetch and save events first and make a note in the event sync progress table.
await this.fetchAndSaveEvents({ blockHash });
log('getEvents: db miss, fetching from upstream server');
}
assert(await this._db.didSyncEvents({ blockHash })); const events = await this._db.getEvents(blockHash, contract);
const events = await this._db.getEvents({ blockHash, contract });
log(`getEvents: db hit, num events: ${events.length}`); log(`getEvents: db hit, num events: ${events.length}`);
// Filtering.
const result = events const result = events
// TODO: Filter using db WHERE condition on contract.
.filter(event => contract === event.contract) .filter(event => contract === event.contract)
// TODO: Filter using db WHERE condition when name is not empty. // TODO: Filter using db WHERE condition when name is not empty.
.filter(event => !name || name === event.eventName); .filter(event => !name || name === event.eventName);
@ -141,17 +123,6 @@ export class Indexer {
} }
} }
async publishEventToSubscribers (dbEvent: Event): Promise<void> {
const resultEvent = this.getResultEvent(dbEvent);
log(`pushing event to GQL subscribers: ${resultEvent.event.__typename}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish('event', {
onEvent: resultEvent
});
}
async isUniswapContract (address: string): Promise<Contract | undefined> { async isUniswapContract (address: string): Promise<Contract | undefined> {
return this._db.getContract(ethers.utils.getAddress(address)); return this._db.getContract(ethers.utils.getAddress(address));
} }
@ -159,48 +130,15 @@ export class Indexer {
async processEvent (event: Event): Promise<void> { async processEvent (event: Event): Promise<void> {
// Trigger indexing of data based on the event. // Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(event); await this.triggerIndexingOnEvent(event);
// Also trigger downstream event watcher subscriptions.
await this.publishEventToSubscribers(event);
} }
async fetchAndSaveEvents ({ blockHash }: { blockHash: string }): Promise<void> { parseEventNameAndArgs (kind: string, logObj: any): any {
const logs = await this._ethClient.getLogs({ blockHash }); let eventName = UNKNOWN_EVENT_NAME;
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash,
block: {
number: blockNumber,
timestamp: blockTimestamp
}
}
} = logObj;
let eventName;
let eventInfo = {}; let eventInfo = {};
let extraInfo = {};
const contract = ethers.utils.getAddress(address); const { topics, data } = logObj;
const uniContract = await this.isUniswapContract(contract);
if (!uniContract) {
// TODO: Can only be known if events are processed serially.
continue;
}
switch (uniContract.kind) { switch (kind) {
case KIND_FACTORY: { case KIND_FACTORY: {
const logDescription = this._factoryContract.parseLog({ data, topics }); const logDescription = this._factoryContract.parseLog({ data, topics });
switch (logDescription.name) { switch (logDescription.name) {
@ -275,7 +213,47 @@ export class Indexer {
} }
} }
if (eventName) { return { eventName, eventInfo };
}
async fetchAndSaveEvents (blockHash: string): Promise<void> {
const { block, logs } = await this._ethClient.getLogs({ blockHash });
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash,
block: {
number: blockNumber,
timestamp: blockTimestamp
}
}
} = logObj;
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const extraInfo = { topics, data };
const contract = ethers.utils.getAddress(address);
const uniContract = await this.isUniswapContract(contract);
if (uniContract) {
const eventDetails = this.parseEventNameAndArgs(uniContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
}
dbEvents.push({ dbEvents.push({
blockHash, blockHash,
blockNumber, blockNumber,
@ -297,9 +275,23 @@ export class Indexer {
}) })
}); });
} }
await this._db.saveEvents(blockHash, block.number, dbEvents);
} }
const events: DeepPartial<Event>[] = _.compact(dbEvents); async getEvent (id: string): Promise<Event | undefined> {
await this._db.saveEvents({ blockHash, events }); return this._db.getEvent(id);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._db.saveEventEntity(dbEvent);
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
return this._db.getBlockProgress(blockHash);
}
async updateBlockProgress (blockHash: string): Promise<void> {
return this._db.updateBlockProgress(blockHash);
} }
} }

View File

@ -0,0 +1,103 @@
import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue } from '@vulcanize/util';
import { Indexer } from './indexer';
import { Database } from './database';
import { UNKNOWN_EVENT_NAME } from './entity/Event';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
const log = debug('vulcanize:job-runner');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string'
})
.argv;
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const indexer = new Indexer(config, db, ethClient);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start();
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { blockHash, blockNumber } } = job;
log(`Processing block ${blockHash} ${blockNumber}`);
const events = await indexer.getOrFetchBlockEvents(blockHash);
for (let ei = 0; ei < events.length; ei++) {
const { blockHash, id } = events[ei];
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { blockHash, id, publish: true });
}
await jobQueue.markComplete(job);
});
await jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const { data: { id } } = job;
log(`Processing event ${id}`);
let dbEvent = await indexer.getEvent(id);
assert(dbEvent);
const uniContract = await indexer.isUniswapContract(dbEvent.contract);
if (uniContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (dbEvent.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(dbEvent.extraInfo);
const { eventName, eventInfo } = indexer.parseEventNameAndArgs(uniContract.kind, logObj);
dbEvent.eventName = eventName;
dbEvent.eventInfo = JSON.stringify(eventInfo);
dbEvent = await indexer.saveEventEntity(dbEvent);
}
await indexer.processEvent(dbEvent);
}
await jobQueue.markComplete(job);
});
};
main().then(() => {
log('Starting job runner...');
}).catch(err => {
log(err);
});

View File

@ -3,12 +3,11 @@ import BigInt from 'apollo-type-bigint';
import debug from 'debug'; import debug from 'debug';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:resolver'); const log = debug('vulcanize:resolver');
export const createResolvers = async (indexer: Indexer): Promise<any> => { export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
assert(indexer);
return { return {
BigInt: new BigInt('bigInt'), BigInt: new BigInt('bigInt'),
@ -46,7 +45,11 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
Subscription: { Subscription: {
onEvent: { onEvent: {
subscribe: () => indexer.getEventIterator() subscribe: () => eventWatcher.getEventIterator()
},
onBlockProgressEvent: {
subscribe: () => eventWatcher.getBlockProgressEventIterator()
} }
}, },
@ -54,8 +57,14 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => { events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => {
log('events', blockHash, contract, name || ''); log('events', blockHash, contract, name || '');
const events = await indexer.getEvents(blockHash, contract, name);
const blockProgress = await indexer.getBlockProgress(blockHash);
if (!blockProgress || !blockProgress.isComplete) {
// TODO: Trigger indexing for the block.
throw new Error('Not available');
}
const events = await indexer.getEventsByFilter(blockHash, contract, name);
return events.map(event => indexer.getResultEvent(event)); return events.map(event => indexer.getResultEvent(event));
} }
} }

View File

@ -187,6 +187,13 @@ type ResultEvent {
proof: Proof proof: Proof
} }
type BlockProgressEvent {
blockNumber: Int!
blockHash: String!
numEvents: Int!
numProcessedEvents: Int!
isComplete: Boolean!
}
# #
# Queries # Queries
@ -245,7 +252,10 @@ type Query {
# #
type Subscription { type Subscription {
# Watch for events (at head of chain). # Watch for Wniswap events (at head of chain).
onEvent: ResultEvent! onEvent: ResultEvent!
# Watch for block progress events from filler process.
onBlockProgressEvent: BlockProgressEvent!
} }
`; `;

View File

@ -10,7 +10,7 @@ import { createServer } from 'http';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig } from '@vulcanize/util'; import { getConfig, JobQueue } from '@vulcanize/util';
import typeDefs from './schema'; import typeDefs from './schema';
@ -38,7 +38,7 @@ export const main = async (): Promise<any> => {
const { host, port } = config.server; const { host, port } = config.server;
const { upstream, database: dbConfig } = config; const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config'); assert(dbConfig, 'Missing database config');
@ -57,12 +57,20 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const indexer = new Indexer(config, db, ethClient, pubsub); const indexer = new Indexer(config, db, ethClient);
const eventWatcher = new EventWatcher(ethClient, indexer); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await eventWatcher.start(); await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);
const app: Application = express(); const app: Application = express();
const server = new ApolloServer({ const server = new ApolloServer({

View File

@ -14,5 +14,14 @@
}, },
"plugins": [ "plugins": [
"@typescript-eslint" "@typescript-eslint"
],
"rules": {
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/explicit-module-boundary-types": [
"warn",
{
"allowArgumentsExplicitlyTypedAsAny": true
}
] ]
} }
}

View File

@ -1,2 +1,3 @@
export * from './src/config'; export * from './src/config';
export * from './src/database'; export * from './src/database';
export * from './src/job-queue';

View File

@ -7,6 +7,8 @@
"debug": "^4.3.1", "debug": "^4.3.1",
"ethers": "^5.2.0", "ethers": "^5.2.0",
"fs-extra": "^10.0.0", "fs-extra": "^10.0.0",
"pg": "^8.6.0",
"pg-boss": "^6.1.0",
"toml": "^3.0.0" "toml": "^3.0.0"
}, },
"devDependencies": { "devDependencies": {