diff --git a/packages/address-watcher/package.json b/packages/address-watcher/package.json index e2a0e0a8..74690a1e 100644 --- a/packages/address-watcher/package.json +++ b/packages/address-watcher/package.json @@ -42,7 +42,6 @@ "json-bigint": "^1.0.0", "lodash": "^4.17.21", "pg": "^8.6.0", - "pg-boss": "^6.1.0", "reflect-metadata": "^0.1.13", "toml": "^3.0.0", "typeorm": "^0.2.32", diff --git a/packages/address-watcher/src/fill.ts b/packages/address-watcher/src/fill.ts index f51aca48..79376f17 100644 --- a/packages/address-watcher/src/fill.ts +++ b/packages/address-watcher/src/fill.ts @@ -6,10 +6,9 @@ import debug from 'debug'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { getConfig, JobQueue } from '@vulcanize/util'; import { Database } from './database'; -import { getConfig } from './config'; -import { JobQueue } from './job-queue'; import { QUEUE_TX_TRACING } from './tx-watcher'; const log = debug('vulcanize:server'); diff --git a/packages/address-watcher/src/job-runner.ts b/packages/address-watcher/src/job-runner.ts index 6df5f253..5f70d0cf 100644 --- a/packages/address-watcher/src/job-runner.ts +++ b/packages/address-watcher/src/job-runner.ts @@ -7,11 +7,10 @@ import debug from 'debug'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { TracingClient } from '@vulcanize/tracing-client'; +import { getConfig, JobQueue } from '@vulcanize/util'; import { Indexer } from './indexer'; import { Database } from './database'; -import { getConfig } from './config'; -import { JobQueue } from './job-queue'; import { QUEUE_TX_TRACING } from './tx-watcher'; const log = debug('vulcanize:server'); diff --git a/packages/address-watcher/src/server.ts b/packages/address-watcher/src/server.ts index e55f51b6..edc88be6 100644 --- a/packages/address-watcher/src/server.ts +++ b/packages/address-watcher/src/server.ts @@ -10,7 +10,7 @@ import { createServer } from 'http'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { TracingClient } from '@vulcanize/tracing-client'; -import { getConfig } from '@vulcanize/util'; +import { getConfig, JobQueue } from '@vulcanize/util'; import typeDefs from './schema'; @@ -18,7 +18,6 @@ import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database } from './database'; import { TxWatcher } from './tx-watcher'; -import { JobQueue } from './job-queue'; const log = debug('vulcanize:server'); diff --git a/packages/address-watcher/src/tx-watcher.ts b/packages/address-watcher/src/tx-watcher.ts index 5691d6df..561f64d2 100644 --- a/packages/address-watcher/src/tx-watcher.ts +++ b/packages/address-watcher/src/tx-watcher.ts @@ -4,9 +4,9 @@ import _ from 'lodash'; import { PubSub } from 'apollo-server-express'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { JobQueue } from '@vulcanize/util'; import { Indexer } from './indexer'; -import { JobQueue } from './job-queue'; import { BlockProgress } from './entity/BlockProgress'; const log = debug('vulcanize:tx-watcher'); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 02f80d14..bcb3a406 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -291,7 +291,7 @@ export class Indexer { } async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { - const logs = await this._ethClient.getLogs({ blockHash, contract: token }); + const { logs } = await this._ethClient.getLogs({ blockHash, contract: token }); const eventNameToTopic = getEventNameTopics(this._abi); const logTopicToEventName = invert(eventNameToTopic); diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index 2c6fe4a6..74fa3f7d 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -65,21 +65,15 @@ export class EthClient { async getLogs (vars: Vars): Promise { const result = await this._getCachedOrFetch('getLogs', vars); - const { getLogs: logs, block: { number: blockNumHex, timestamp: timestampHex } } = result; - const blockNumber = parseInt(blockNumHex, 16); - const timestamp = parseInt(timestampHex, 16); + const { getLogs: resultLogs, block: { number: blockNumHex, timestamp: timestampHex } } = result; + const block = { hash: vars.blockHash, number: parseInt(blockNumHex, 16), timestamp: parseInt(timestampHex, 16) }; + const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block }})); - return logs.map((logEntry: any) => { - return _.merge({}, logEntry, { - transaction: { - block: { - hash: vars.blockHash, - number: blockNumber, - timestamp - } - } - }); - }); + return { logs, block }; + } + + async watchBlocks (onNext: (value: any) => void): Promise { + return this._graphqlClient.subscribe(ethQueries.subscribeBlocks, onNext); } async watchLogs (onNext: (value: any) => void): Promise { diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index 5b7e39ff..f6abe0d1 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -75,6 +75,19 @@ subscription SubscriptionReceipt { } `; +export const subscribeBlocks = gql` +subscription { + listen(topic: "header_cids") { + relatedNode { + ... on EthHeaderCid { + blockHash + blockNumber + } + } + } +} +`; + export const subscribeTransactions = gql` subscription SubscriptionHeader { listen(topic: "transaction_cids") { @@ -96,5 +109,6 @@ export default { getLogs, getBlockWithTransactions, subscribeLogs, + subscribeBlocks, subscribeTransactions }; diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 7026c6bc..63bb7faa 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -193,7 +193,7 @@ export class Indexer { } async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { - const logs = await this._ethClient.getLogs({ blockHash, contract: token }); + const { logs } = await this._ethClient.getLogs({ blockHash, contract: token }); const eventNameToTopic = {}; // getEventNameTopics(this._abi); const logTopicToEventName = invert(eventNameToTopic); diff --git a/packages/uni-watcher/README.md b/packages/uni-watcher/README.md index 9e916ab2..d4ee5bf0 100644 --- a/packages/uni-watcher/README.md +++ b/packages/uni-watcher/README.md @@ -1,5 +1,42 @@ # Uniswap Watcher +## Setup + +Create a postgres12 database for the job queue: + +``` +sudo su - postgres +createdb uni-watcher-job-queue +``` + +Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro). + +Example: + +``` +postgres@tesla:~$ psql -U postgres -h localhost uni-watcher-job-queue +Password for user postgres: +psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) +SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) +Type "help" for help. + +uni-watcher-job-queue=# CREATE EXTENSION pgcrypto; +CREATE EXTENSION +uni-watcher-job-queue=# exit +``` + +Create a postgres12 database for the address watcher: + +``` +sudo su - postgres +createdb uni-watcher +``` + +Update `environments/local.toml` with database connection settings for both the databases. + + +## Run + Run the server: ```bash @@ -27,7 +64,7 @@ $ yarn watch:contract --address 0xfE0034a874c2707c23F91D7409E9036F5e08ac34 --kin * `yarn lint` Lint files. - + ```bash # Lint fix. $ yarn lint --fix diff --git a/packages/uni-watcher/environments/local.toml b/packages/uni-watcher/environments/local.toml index 58f447eb..5a11a6d0 100644 --- a/packages/uni-watcher/environments/local.toml +++ b/packages/uni-watcher/environments/local.toml @@ -29,3 +29,7 @@ name = "requests" enabled = false deleteOnStart = false + +[jobQueue] + dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue" + maxCompletionLag = 300 diff --git a/packages/uni-watcher/package.json b/packages/uni-watcher/package.json index 9133c353..ad14bad4 100644 --- a/packages/uni-watcher/package.json +++ b/packages/uni-watcher/package.json @@ -7,6 +7,8 @@ "scripts": { "server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml", "server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml", + "job-runner": "DEBUG=vulcanize:* nodemon src/job-runner.ts -f environments/local.toml", + "fill": "DEBUG=vulcanize:* ts-node src/fill.ts -f environments/local.toml", "test": "mocha -r ts-node/register src/**/*.spec.ts", "lint": "eslint .", "build": "tsc", diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index 36dc4fd4..74a86c5d 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -4,7 +4,7 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { Event } from './entity/Event'; import { Contract } from './entity/Contract'; -import { EventSyncProgress } from './entity/EventProgress'; +import { BlockProgress } from './entity/BlockProgress'; export class Database { _config: ConnectionOptions @@ -28,17 +28,7 @@ export class Database { return this._conn.close(); } - // Returns true if events have already been synced for the (block, token) combination. - async didSyncEvents ({ blockHash }: { blockHash: string }): Promise { - const numRows = await this._conn.getRepository(EventSyncProgress) - .createQueryBuilder() - .where('block_hash = :blockHash', { blockHash }) - .getCount(); - - return numRows > 0; - } - - async getBlockEvents ({ blockHash }: { blockHash: string }): Promise { + async getBlockEvents (blockHash: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') .where('block_hash = :blockHash', { blockHash }) @@ -46,7 +36,7 @@ export class Database { .getMany(); } - async getEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise { + async getEvents (blockHash: string, contract: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') .where('block_hash = :blockHash AND contract = :contract', { @@ -57,7 +47,7 @@ export class Database { .getMany(); } - async getEventsByName ({ blockHash, contract, eventName }: { blockHash: string, contract: string, eventName: string }): Promise { + async getEventsByName (blockHash: string, contract: string, eventName: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') .where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', { @@ -68,35 +58,33 @@ export class Database { .getMany(); } - async saveEvents ({ blockHash, events }: { blockHash: string, events: DeepPartial[] }): Promise { + async saveEvents (blockHash: string, blockNumber: number, events: DeepPartial[]): Promise { // In a transaction: // (1) Save all the events in the database. - // (2) Add an entry to the event progress table. - + // (2) Add an entry to the block progress table. await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(EventSyncProgress); - - // Check sync progress inside the transaction. - const numRows = await repo - .createQueryBuilder() - .where('block_hash = :blockHash', { blockHash }) - .getCount(); - - if (numRows === 0) { + const numEvents = events.length; + const blockProgressRepo = tx.getRepository(BlockProgress); + const blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); + if (!blockProgress) { // Bulk insert events. - await tx.createQueryBuilder() - .insert() - .into(Event) - .values(events) - .execute(); + await tx.createQueryBuilder().insert().into(Event).values(events).execute(); - // Update event sync progress. - const progress = repo.create({ blockHash }); - await repo.save(progress); + const entity = blockProgressRepo.create({ blockHash, blockNumber, numEvents, numProcessedEvents: 0, isComplete: (numEvents === 0) }); + await blockProgressRepo.save(entity); } }); } + async getEvent (id: string): Promise { + return this._conn.getRepository(Event).findOne(id); + } + + async saveEventEntity (entity: Event): Promise { + const repo = this._conn.getRepository(Event); + return await repo.save(entity); + } + async getContract (address: string): Promise { return this._conn.getRepository(Contract) .createQueryBuilder('contract') @@ -119,4 +107,23 @@ export class Database { } }); } + + async getBlockProgress (blockHash: string): Promise { + const repo = this._conn.getRepository(BlockProgress); + return repo.findOne({ where: { blockHash } }); + } + + async updateBlockProgress (blockHash: string): Promise { + await this._conn.transaction(async (tx) => { + const repo = tx.getRepository(BlockProgress); + const entity = await repo.findOne({ where: { blockHash } }); + if (entity && !entity.isComplete) { + entity.numProcessedEvents++; + if (entity.numProcessedEvents >= entity.numEvents) { + entity.isComplete = true; + } + await repo.save(entity); + } + }); + } } diff --git a/packages/uni-watcher/src/entity/BlockProgress.ts b/packages/uni-watcher/src/entity/BlockProgress.ts new file mode 100644 index 00000000..a6297aa5 --- /dev/null +++ b/packages/uni-watcher/src/entity/BlockProgress.ts @@ -0,0 +1,23 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; + +@Entity() +@Index(['blockHash'], { unique: true }) +export class BlockProgress { + @PrimaryGeneratedColumn() + id!: number; + + @Column('varchar', { length: 66 }) + blockHash!: string; + + @Column('numeric') + blockNumber!: number; + + @Column('numeric') + numEvents!: number; + + @Column('numeric') + numProcessedEvents!: number; + + @Column('boolean') + isComplete!: boolean +} diff --git a/packages/uni-watcher/src/entity/Contract.ts b/packages/uni-watcher/src/entity/Contract.ts index bf23668e..20530466 100644 --- a/packages/uni-watcher/src/entity/Contract.ts +++ b/packages/uni-watcher/src/entity/Contract.ts @@ -1,12 +1,15 @@ -import { Entity, PrimaryColumn, Column } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; export const KIND_FACTORY = 'factory'; - export const KIND_POOL = 'pool'; @Entity() +@Index(['address'], { unique: true }) export class Contract { - @PrimaryColumn('varchar', { length: 42 }) + @PrimaryGeneratedColumn() + id!: number; + + @Column('varchar', { length: 42 }) address!: string; @Column('varchar', { length: 8 }) diff --git a/packages/uni-watcher/src/entity/Event.ts b/packages/uni-watcher/src/entity/Event.ts index 58c3bb74..68db9b02 100644 --- a/packages/uni-watcher/src/entity/Event.ts +++ b/packages/uni-watcher/src/entity/Event.ts @@ -1,5 +1,7 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +export const UNKNOWN_EVENT_NAME = '__unknown__'; + @Entity() // Index to query all events for a contract efficiently. @Index(['blockHash', 'contract']) @@ -36,9 +38,6 @@ export class Event { @Column('text') extraInfo!: string; - @Column('boolean', { default: false }) - isProcessed!: boolean; - @Column('text') proof!: string; } diff --git a/packages/uni-watcher/src/entity/EventProgress.ts b/packages/uni-watcher/src/entity/EventProgress.ts deleted file mode 100644 index 5cd7851f..00000000 --- a/packages/uni-watcher/src/entity/EventProgress.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; - -// Stores a row if events for a (block, token) combination have already been fetched. -// -// Required as a particular block may not have events from a particular contract, -// and we need to differentiate between that case and the case where data hasn't -// yet been synced from upstream. -// -@Entity() -@Index(['blockHash'], { unique: true }) -export class EventSyncProgress { - @PrimaryGeneratedColumn() - id!: number; - - @Column('varchar', { length: 66 }) - blockHash!: string; -} diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index c2c86ce2..1b7ac98c 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -1,79 +1,115 @@ import assert from 'assert'; import debug from 'debug'; import _ from 'lodash'; +import { PubSub } from 'apollo-server-express'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { JobQueue } from '@vulcanize/util'; import { Indexer } from './indexer'; +import { BlockProgress } from './entity/BlockProgress'; +import { UNKNOWN_EVENT_NAME } from './entity/Event'; const log = debug('vulcanize:events'); +export const UniswapEvent = 'uniswap-event'; +export const BlockProgressEvent = 'block-progress-event'; +export const QUEUE_EVENT_PROCESSING = 'event-processing'; +export const QUEUE_BLOCK_PROCESSING = 'block-processing'; + export class EventWatcher { _ethClient: EthClient _indexer: Indexer _subscription: ZenObservable.Subscription | undefined + _pubsub: PubSub + _jobQueue: JobQueue - constructor (ethClient: EthClient, indexer: Indexer) { - assert(ethClient); - assert(indexer); - + constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; this._indexer = indexer; + this._pubsub = pubsub; + this._jobQueue = jobQueue; + } + + getEventIterator (): AsyncIterator { + return this._pubsub.asyncIterator([UniswapEvent]); + } + + getBlockProgressEventIterator (): AsyncIterator { + return this._pubsub.asyncIterator([BlockProgressEvent]); } async start (): Promise { assert(!this._subscription, 'subscription already started'); - log('Started watching upstream logs...'); + log('Started watching upstream blocks...'); - this._subscription = await this._ethClient.watchLogs(async (value) => { - const receipt = _.get(value, 'data.listen.relatedNode'); - log('watchLogs', JSON.stringify(receipt, null, 2)); + this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { + const { data: { request: { data: { blockHash, blockNumber } } } } = job; + log(`Job onComplete block ${blockHash} ${blockNumber}`); + }); - const blocks: string[] = []; + this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + const { data: { request, failed, state, createdOn } } = job; - const { logContracts } = receipt; - if (logContracts && logContracts.length) { - for (let logIndex = 0; logIndex < logContracts.length; logIndex++) { - const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash } } } = receipt; - await this._indexer.getBlockEvents(blockHash); - blocks.push(blockHash); - } + await this._indexer.updateBlockProgress(request.data.blockHash); + const blockProgress = await this._indexer.getBlockProgress(request.data.blockHash); + if (blockProgress && request.data.publishBlockProgress) { + await this.publishBlockProgressToSubscribers(blockProgress); } - const processedBlocks: any = {}; - if (!blocks.length) { - return; + const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; + log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + return await this.publishUniswapEventToSubscribers(request.data.id, timeElapsedInSeconds); + } else { + log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + } } + }); - // Process events, if from known uniswap contracts. - for (let bi = 0; bi < blocks.length; bi++) { - const blockHash = blocks[bi]; - if (processedBlocks[blockHash]) { - continue; - } + this._subscription = await this._ethClient.watchBlocks(async (value) => { + const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode'); + log('watchBlock', blockHash, blockNumber); + await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber }); + }); + } - const events = await this._indexer.getBlockEvents(blockHash); - for (let ei = 0; ei < events.length; ei++) { - const eventObj = events[ei]; - const uniContract = await this._indexer.isUniswapContract(eventObj.contract); - if (uniContract) { - log('event', JSON.stringify(eventObj, null, 2)); + async publishUniswapEventToSubscribers (id: string, timeElapsedInSeconds: number): Promise { + const dbEvent = await this._indexer.getEvent(id); - // TODO: Move processing to background queue (need sequential processing of events). - // Trigger other indexer methods based on event topic. - await this._indexer.processEvent(eventObj); - } - } + if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) { + const resultEvent = this._indexer.getResultEvent(dbEvent); - processedBlocks[blockHash] = true; + log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`); + + // Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`. + await this._pubsub.publish(UniswapEvent, { + onEvent: resultEvent + }); + } + } + + async publishBlockProgressToSubscribers (blockProgress: BlockProgress): Promise { + const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; + + // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. + await this._pubsub.publish(BlockProgressEvent, { + onBlockProgressEvent: { + blockHash, + blockNumber, + numEvents, + numProcessedEvents, + isComplete } }); } async stop (): Promise { if (this._subscription) { - log('Stopped watching upstream logs'); + log('Stopped watching upstream blocks'); this._subscription.unsubscribe(); } } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 39b65a04..0922f4c4 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -1,17 +1,15 @@ -import assert from 'assert'; import debug from 'debug'; -import _ from 'lodash'; import { DeepPartial } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; -import { PubSub } from 'apollo-server-express'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { GetStorageAt } from '@vulcanize/solidity-mapper'; import { Config } from '@vulcanize/util'; import { Database } from './database'; -import { Event } from './entity/Event'; +import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; +import { BlockProgress } from './entity/BlockProgress'; import { Contract, KIND_FACTORY, KIND_POOL } from './entity/Contract'; import factoryABI from './artifacts/factory.json'; @@ -35,48 +33,21 @@ export class Indexer { _config: Config; _db: Database _ethClient: EthClient - _pubsub: PubSub _getStorageAt: GetStorageAt _factoryContract: ethers.utils.Interface _poolContract: ethers.utils.Interface - constructor (config: Config, db: Database, ethClient: EthClient, pubsub: PubSub) { - assert(config); - assert(db); - assert(ethClient); - assert(pubsub); - + constructor (config: Config, db: Database, ethClient: EthClient) { this._config = config; this._db = db; this._ethClient = ethClient; - this._pubsub = pubsub; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._factoryContract = new ethers.utils.Interface(factoryABI); this._poolContract = new ethers.utils.Interface(poolABI); } - getEventIterator (): AsyncIterator { - return this._pubsub.asyncIterator(['event']); - } - - async getBlockEvents (blockHash: string): Promise> { - const didSyncEvents = await this._db.didSyncEvents({ blockHash }); - if (!didSyncEvents) { - // Fetch and save events first and make a note in the event sync progress table. - await this.fetchAndSaveEvents({ blockHash }); - log('getEvents: db miss, fetching from upstream server'); - } - - assert(await this._db.didSyncEvents({ blockHash })); - - const events = await this._db.getBlockEvents({ blockHash }); - log(`getEvents: db hit, num events: ${events.length}`); - - return events; - } - getResultEvent (event: Event): ResultEvent { const eventFields = JSON.parse(event.eventInfo); @@ -100,29 +71,40 @@ export class Indexer { }, // TODO: Return proof only if requested. - proof: JSON.parse(event.proof), + proof: JSON.parse(event.proof) }; } - async getEvents (blockHash: string, contract: string, name: string | null): Promise> { + // Note: Some event names might be unknown at this point, as earlier events might not yet be processed. + async getOrFetchBlockEvents (blockHash: string): Promise> { + const blockProgress = await this._db.getBlockProgress(blockHash); + if (!blockProgress) { + // Fetch and save events first and make a note in the event sync progress table. + await this.fetchAndSaveEvents(blockHash); + log('getBlockEvents: db miss, fetching from upstream server'); + } + + const events = await this._db.getBlockEvents(blockHash); + log(`getBlockEvents: db hit, num events: ${events.length}`); + + return events; + } + + async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { const uniContract = await this.isUniswapContract(contract); if (!uniContract) { throw new Error('Not a uniswap contract'); } - const didSyncEvents = await this._db.didSyncEvents({ blockHash }); - if (!didSyncEvents) { - // Fetch and save events first and make a note in the event sync progress table. - await this.fetchAndSaveEvents({ blockHash }); - log('getEvents: db miss, fetching from upstream server'); - } + // Fetch block events first. + await this.getOrFetchBlockEvents(blockHash); - assert(await this._db.didSyncEvents({ blockHash })); - - const events = await this._db.getEvents({ blockHash, contract }); + const events = await this._db.getEvents(blockHash, contract); log(`getEvents: db hit, num events: ${events.length}`); + // Filtering. const result = events + // TODO: Filter using db WHERE condition on contract. .filter(event => contract === event.contract) // TODO: Filter using db WHERE condition when name is not empty. .filter(event => !name || name === event.eventName); @@ -141,17 +123,6 @@ export class Indexer { } } - async publishEventToSubscribers (dbEvent: Event): Promise { - const resultEvent = this.getResultEvent(dbEvent); - - log(`pushing event to GQL subscribers: ${resultEvent.event.__typename}`); - - // Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`. - await this._pubsub.publish('event', { - onEvent: resultEvent - }); - } - async isUniswapContract (address: string): Promise { return this._db.getContract(ethers.utils.getAddress(address)); } @@ -159,13 +130,94 @@ export class Indexer { async processEvent (event: Event): Promise { // Trigger indexing of data based on the event. await this.triggerIndexingOnEvent(event); - - // Also trigger downstream event watcher subscriptions. - await this.publishEventToSubscribers(event); } - async fetchAndSaveEvents ({ blockHash }: { blockHash: string }): Promise { - const logs = await this._ethClient.getLogs({ blockHash }); + parseEventNameAndArgs (kind: string, logObj: any): any { + let eventName = UNKNOWN_EVENT_NAME; + let eventInfo = {}; + + const { topics, data } = logObj; + + switch (kind) { + case KIND_FACTORY: { + const logDescription = this._factoryContract.parseLog({ data, topics }); + switch (logDescription.name) { + case 'PoolCreated': { + eventName = logDescription.name; + const { token0, token1, fee, tickSpacing, pool } = logDescription.args; + eventInfo = { token0, token1, fee, tickSpacing, pool }; + + break; + } + } + + break; + } + case KIND_POOL: { + const logDescription = this._poolContract.parseLog({ data, topics }); + switch (logDescription.name) { + case 'Initialize': { + eventName = logDescription.name; + const { sqrtPriceX96, tick } = logDescription.args; + eventInfo = { sqrtPriceX96: sqrtPriceX96.toString(), tick }; + + break; + } + case 'Mint': { + eventName = logDescription.name; + const { sender, owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args; + eventInfo = { + sender, + owner, + tickLower, + tickUpper, + amount: amount.toString(), + amount0: amount0.toString(), + amount1: amount1.toString() + }; + + break; + } + case 'Burn': { + eventName = logDescription.name; + const { owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args; + eventInfo = { + owner, + tickLower, + tickUpper, + amount: amount.toString(), + amount0: amount0.toString(), + amount1: amount1.toString() + }; + + break; + } + case 'Swap': { + eventName = logDescription.name; + const { sender, recipient, amount0, amount1, sqrtPriceX96, liquidity, tick } = logDescription.args; + eventInfo = { + sender, + recipient, + amount0: amount0.toString(), + amount1: amount1.toString(), + sqrtPriceX96: sqrtPriceX96.toString(), + liquidity: liquidity.toString(), + tick + }; + + break; + } + } + + break; + } + } + + return { eventName, eventInfo }; + } + + async fetchAndSaveEvents (blockHash: string): Promise { + const { block, logs } = await this._ethClient.getLogs({ blockHash }); const dbEvents: Array> = []; @@ -189,117 +241,57 @@ export class Indexer { } } = logObj; - let eventName; + let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; - let extraInfo = {}; + const extraInfo = { topics, data }; const contract = ethers.utils.getAddress(address); const uniContract = await this.isUniswapContract(contract); - if (!uniContract) { - // TODO: Can only be known if events are processed serially. - continue; + + if (uniContract) { + const eventDetails = this.parseEventNameAndArgs(uniContract.kind, logObj); + eventName = eventDetails.eventName; + eventInfo = eventDetails.eventInfo; } - switch (uniContract.kind) { - case KIND_FACTORY: { - const logDescription = this._factoryContract.parseLog({ data, topics }); - switch (logDescription.name) { - case 'PoolCreated': { - eventName = logDescription.name; - const { token0, token1, fee, tickSpacing, pool } = logDescription.args; - eventInfo = { token0, token1, fee, tickSpacing, pool }; - - break; + dbEvents.push({ + blockHash, + blockNumber, + blockTimestamp, + index: logIndex, + txHash, + contract, + eventName, + eventInfo: JSONbig.stringify(eventInfo), + extraInfo: JSONbig.stringify(extraInfo), + proof: JSONbig.stringify({ + data: JSONbig.stringify({ + blockHash, + receipt: { + cid, + ipldBlock } - } - - break; - } - case KIND_POOL: { - const logDescription = this._poolContract.parseLog({ data, topics }); - switch (logDescription.name) { - case 'Initialize': { - eventName = logDescription.name; - const { sqrtPriceX96, tick } = logDescription.args; - eventInfo = { sqrtPriceX96: sqrtPriceX96.toString(), tick }; - - break; - } - case 'Mint': { - eventName = logDescription.name; - const { sender, owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args; - eventInfo = { - sender, - owner, - tickLower, - tickUpper, - amount: amount.toString(), - amount0: amount0.toString(), - amount1: amount1.toString() - }; - - break; - } - case 'Burn': { - eventName = logDescription.name; - const { owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args; - eventInfo = { - owner, - tickLower, - tickUpper, - amount: amount.toString(), - amount0: amount0.toString(), - amount1: amount1.toString() - }; - - break; - } - case 'Swap': { - eventName = logDescription.name; - const { sender, recipient, amount0, amount1, sqrtPriceX96, liquidity, tick } = logDescription.args; - eventInfo = { - sender, - recipient, - amount0: amount0.toString(), - amount1: amount1.toString(), - sqrtPriceX96: sqrtPriceX96.toString(), - liquidity: liquidity.toString(), - tick - }; - - break; - } - } - - break; - } - } - - if (eventName) { - dbEvents.push({ - blockHash, - blockNumber, - blockTimestamp, - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbig.stringify(eventInfo), - extraInfo: JSONbig.stringify(extraInfo), - proof: JSONbig.stringify({ - data: JSONbig.stringify({ - blockHash, - receipt: { - cid, - ipldBlock - } - }) }) - }); - } + }) + }); } - const events: DeepPartial[] = _.compact(dbEvents); - await this._db.saveEvents({ blockHash, events }); + await this._db.saveEvents(blockHash, block.number, dbEvents); + } + + async getEvent (id: string): Promise { + return this._db.getEvent(id); + } + + async saveEventEntity (dbEvent: Event): Promise { + return this._db.saveEventEntity(dbEvent); + } + + async getBlockProgress (blockHash: string): Promise { + return this._db.getBlockProgress(blockHash); + } + + async updateBlockProgress (blockHash: string): Promise { + return this._db.updateBlockProgress(blockHash); } } diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts new file mode 100644 index 00000000..c41283db --- /dev/null +++ b/packages/uni-watcher/src/job-runner.ts @@ -0,0 +1,103 @@ +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { getConfig, JobQueue } from '@vulcanize/util'; + +import { Indexer } from './indexer'; +import { Database } from './database'; +import { UNKNOWN_EVENT_NAME } from './entity/Event'; +import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events'; + +const log = debug('vulcanize:job-runner'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)) + .option('f', { + alias: 'config-file', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string' + }) + .argv; + + const config = await getConfig(argv.f); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig } = upstream; + assert(gqlEndpoint, 'Missing upstream gqlEndpoint'); + assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint'); + + const cache = await getCache(cacheConfig); + + const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache }); + + const indexer = new Indexer(config, db, ethClient); + + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + await jobQueue.start(); + + await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + const { data: { blockHash, blockNumber } } = job; + + log(`Processing block ${blockHash} ${blockNumber}`); + + const events = await indexer.getOrFetchBlockEvents(blockHash); + for (let ei = 0; ei < events.length; ei++) { + const { blockHash, id } = events[ei]; + await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { blockHash, id, publish: true }); + } + + await jobQueue.markComplete(job); + }); + + await jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + const { data: { id } } = job; + + log(`Processing event ${id}`); + + let dbEvent = await indexer.getEvent(id); + assert(dbEvent); + + const uniContract = await indexer.isUniswapContract(dbEvent.contract); + if (uniContract) { + // We might not have parsed this event yet. This can happen if the contract was added + // as a result of a previous event in the same block. + if (dbEvent.eventName === UNKNOWN_EVENT_NAME) { + const logObj = JSON.parse(dbEvent.extraInfo); + const { eventName, eventInfo } = indexer.parseEventNameAndArgs(uniContract.kind, logObj); + dbEvent.eventName = eventName; + dbEvent.eventInfo = JSON.stringify(eventInfo); + dbEvent = await indexer.saveEventEntity(dbEvent); + } + + await indexer.processEvent(dbEvent); + } + + await jobQueue.markComplete(job); + }); +}; + +main().then(() => { + log('Starting job runner...'); +}).catch(err => { + log(err); +}); diff --git a/packages/uni-watcher/src/resolvers.ts b/packages/uni-watcher/src/resolvers.ts index a0f50dca..138fcbf7 100644 --- a/packages/uni-watcher/src/resolvers.ts +++ b/packages/uni-watcher/src/resolvers.ts @@ -3,12 +3,11 @@ import BigInt from 'apollo-type-bigint'; import debug from 'debug'; import { Indexer } from './indexer'; +import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer): Promise => { - assert(indexer); - +export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { return { BigInt: new BigInt('bigInt'), @@ -46,7 +45,11 @@ export const createResolvers = async (indexer: Indexer): Promise => { Subscription: { onEvent: { - subscribe: () => indexer.getEventIterator() + subscribe: () => eventWatcher.getEventIterator() + }, + + onBlockProgressEvent: { + subscribe: () => eventWatcher.getBlockProgressEventIterator() } }, @@ -54,8 +57,14 @@ export const createResolvers = async (indexer: Indexer): Promise => { events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => { log('events', blockHash, contract, name || ''); - const events = await indexer.getEvents(blockHash, contract, name); + const blockProgress = await indexer.getBlockProgress(blockHash); + if (!blockProgress || !blockProgress.isComplete) { + // TODO: Trigger indexing for the block. + throw new Error('Not available'); + } + + const events = await indexer.getEventsByFilter(blockHash, contract, name); return events.map(event => indexer.getResultEvent(event)); } } diff --git a/packages/uni-watcher/src/schema.ts b/packages/uni-watcher/src/schema.ts index 3d317779..0001e5fd 100644 --- a/packages/uni-watcher/src/schema.ts +++ b/packages/uni-watcher/src/schema.ts @@ -187,6 +187,13 @@ type ResultEvent { proof: Proof } +type BlockProgressEvent { + blockNumber: Int! + blockHash: String! + numEvents: Int! + numProcessedEvents: Int! + isComplete: Boolean! +} # # Queries @@ -245,7 +252,10 @@ type Query { # type Subscription { - # Watch for events (at head of chain). + # Watch for Wniswap events (at head of chain). onEvent: ResultEvent! + + # Watch for block progress events from filler process. + onBlockProgressEvent: BlockProgressEvent! } `; diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index d149b1e2..88088a1e 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -10,7 +10,7 @@ import { createServer } from 'http'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig } from '@vulcanize/util'; +import { getConfig, JobQueue } from '@vulcanize/util'; import typeDefs from './schema'; @@ -38,7 +38,7 @@ export const main = async (): Promise => { const { host, port } = config.server; - const { upstream, database: dbConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; assert(dbConfig, 'Missing database config'); @@ -57,12 +57,20 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config, db, ethClient, pubsub); + const indexer = new Indexer(config, db, ethClient); - const eventWatcher = new EventWatcher(ethClient, indexer); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await eventWatcher.start(); - const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer); + const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); const app: Application = express(); const server = new ApolloServer({ diff --git a/packages/util/.eslintrc.json b/packages/util/.eslintrc.json index acccd0d2..476d529d 100644 --- a/packages/util/.eslintrc.json +++ b/packages/util/.eslintrc.json @@ -14,5 +14,14 @@ }, "plugins": [ "@typescript-eslint" - ] + ], + "rules": { + "@typescript-eslint/no-explicit-any": "off", + "@typescript-eslint/explicit-module-boundary-types": [ + "warn", + { + "allowArgumentsExplicitlyTypedAsAny": true + } + ] + } } diff --git a/packages/util/index.ts b/packages/util/index.ts index 0afa90a9..8aff1cc8 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -1,2 +1,3 @@ export * from './src/config'; export * from './src/database'; +export * from './src/job-queue'; diff --git a/packages/util/package.json b/packages/util/package.json index 40cad460..15e08da1 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -7,6 +7,8 @@ "debug": "^4.3.1", "ethers": "^5.2.0", "fs-extra": "^10.0.0", + "pg": "^8.6.0", + "pg-boss": "^6.1.0", "toml": "^3.0.0" }, "devDependencies": { diff --git a/packages/address-watcher/src/job-queue.ts b/packages/util/src/job-queue.ts similarity index 100% rename from packages/address-watcher/src/job-queue.ts rename to packages/util/src/job-queue.ts