diff --git a/packages/uni-info-watcher/package.json b/packages/uni-info-watcher/package.json index d2b08a15..5fcca024 100644 --- a/packages/uni-info-watcher/package.json +++ b/packages/uni-info-watcher/package.json @@ -24,7 +24,8 @@ "generate:schema": "get-graphql-schema https://api.thegraph.com/subgraphs/name/ianlapham/uniswap-v3-alt > docs/analysis/schema/full-schema.graphql", "lint:schema": "graphql-schema-linter", "smoke-test": "mocha src/smoke.test.ts", - "test:gpev": "mocha src/get-prev-entity.test.ts" + "test:gpev": "mocha src/get-prev-entity.test.ts", + "fill": "DEBUG=vulcanize:* ts-node src/fill.ts -f environments/local.toml" }, "devDependencies": { "@types/chance": "^1.1.2", diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 74fd16ca..030d5f90 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -691,11 +691,11 @@ export class Database { .getMany(); } - async saveEvents (queryRunner: QueryRunner, block: Block, events: DeepPartial[]): Promise { + async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const { - hash: blockHash, - number: blockNumber, - timestamp: blockTimestamp, + blockHash, + blockNumber, + blockTimestamp, parentHash } = block; diff --git a/packages/uni-info-watcher/src/entity/BlockProgress.ts b/packages/uni-info-watcher/src/entity/BlockProgress.ts index 4e8cf9a7..e67cf2dd 100644 --- a/packages/uni-info-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-info-watcher/src/entity/BlockProgress.ts @@ -4,11 +4,13 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { BlockProgressInterface } from '@vulcanize/util'; + @Entity() @Index(['blockHash'], { unique: true }) @Index(['blockNumber']) @Index(['parentHash']) -export class BlockProgress { +export class BlockProgress implements BlockProgressInterface { @PrimaryGeneratedColumn() id!: number; @@ -35,4 +37,7 @@ export class BlockProgress { @Column('boolean') isComplete!: boolean + + @Column('boolean', { default: false }) + isPruned!: boolean } diff --git a/packages/uni-info-watcher/src/events.ts b/packages/uni-info-watcher/src/events.ts index 34d92ee4..aa2b9a8e 100644 --- a/packages/uni-info-watcher/src/events.ts +++ b/packages/uni-info-watcher/src/events.ts @@ -5,9 +5,11 @@ import assert from 'assert'; import debug from 'debug'; import _ from 'lodash'; -import { EthClient } from '@vulcanize/ipld-eth-client'; +import { PubSub } from 'apollo-server-express'; + +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util'; -import { JobQueue } from '../../util'; import { Indexer } from './indexer'; const log = debug('vulcanize:events'); @@ -115,28 +117,33 @@ export interface ResultEvent { } } -export const QUEUE_EVENT_PROCESSING = 'event-processing'; -export const QUEUE_BLOCK_PROCESSING = 'block-processing'; - -export class EventWatcher { - _subscription?: ZenObservable.Subscription +export class EventWatcher implements EventWatcherInterface { _ethClient: EthClient - _jobQueue: JobQueue _indexer: Indexer + _subscription?: ZenObservable.Subscription + _pubsub: PubSub + _jobQueue: JobQueue + _eventWatcher: BaseEventWatcher - constructor (indexer: Indexer, ethClient: EthClient, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; - this._jobQueue = jobQueue; this._indexer = indexer; + this._pubsub = pubsub; + this._jobQueue = jobQueue; + this._eventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); + } + + getBlockProgressEventIterator (): AsyncIterator { + return this._eventWatcher.getBlockProgressEventIterator(); } async start (): Promise { assert(!this._subscription, 'subscription already started'); log('Started watching upstream events...'); - await this._initBlockProcessingOnCompleteHandler(); - await this._initEventProcessingOnCompleteHandler(); - await this._watchBlocksAtChainHead(); + await this.initBlockProcessingOnCompleteHandler(); + await this.initEventProcessingOnCompleteHandler(); + await this.watchBlocksAtChainHead(); } async stop (): Promise { @@ -146,7 +153,26 @@ export class EventWatcher { } } - async _watchBlocksAtChainHead (): Promise { + async initBlockProcessingOnCompleteHandler (): Promise { + await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { + const { data: { request: { data: { blockHash, blockNumber } } } } = job; + log(`Job onComplete block ${blockHash} ${blockNumber}`); + + // Publish block progress event. + const blockProgress = await this._indexer.getBlockProgress(blockHash); + if (blockProgress) { + await this._eventWatcher.publishBlockProgressToSubscribers(blockProgress); + } + }); + } + + async initEventProcessingOnCompleteHandler (): Promise { + await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + await this._eventWatcher.eventProcessingCompleteHandler(job); + }); + } + + async watchBlocksAtChainHead (): Promise { log('Started watching upstream blocks...'); this._subscription = await this._ethClient.watchBlocks(async (value) => { const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); @@ -155,34 +181,7 @@ export class EventWatcher { log('watchBlock', blockHash, blockNumber); - const block = { - hash: blockHash, - number: blockNumber, - parentHash, - timestamp - }; - - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { block }); - }); - } - - async _initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - const { data: { request: { data: { block } } } } = job; - log(`Job onComplete block ${block.hash} ${block.number}`); - }); - } - - async _initEventProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - const { data: { request } } = job; - - const dbEvent = await this._indexer.getEvent(request.data.id); - assert(dbEvent); - - await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index); - - log(`Job onComplete event ${request.data.id}`); + await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp }); }); } } diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts new file mode 100644 index 00000000..9c3c3f00 --- /dev/null +++ b/packages/uni-info-watcher/src/fill.ts @@ -0,0 +1,95 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { getConfig, fillBlocks, JobQueue } from '@vulcanize/util'; +import { Client as UniClient } from '@vulcanize/uni-watcher'; +import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; + +import { Database } from './database'; +import { PubSub } from 'apollo-server-express'; +import { Indexer } from './indexer'; +import { EventWatcher } from './events'; + +const log = debug('vulcanize:server'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'configuration file path (toml)' + }, + startBlock: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to start processing at' + }, + endBlock: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to stop processing at' + } + }).argv; + + const config = await getConfig(argv.configFile); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { ethServer: { gqlPostgraphileEndpoint }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream; + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const uniClient = new UniClient(uniWatcher); + const erc20Client = new ERC20Client(tokenWatcher); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient); + + assert(jobQueueConfig, 'Missing job queue config'); + const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); + + await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); +}; + +main().then(() => { + process.exit(); +}).catch(err => { + log(err); +}); diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 6faf0ae5..6e762f45 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -7,9 +7,11 @@ import debug from 'debug'; import { DeepPartial, QueryRunner } from 'typeorm'; import JSONbig from 'json-bigint'; import { utils } from 'ethers'; + import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { IndexerInterface } from '@vulcanize/util'; import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; @@ -42,7 +44,7 @@ export interface ValueResult { export { OrderDirection, BlockHeight }; -export class Indexer { +export class Indexer implements IndexerInterface { _db: Database _uniClient: UniClient _erc20Client: ERC20Client @@ -87,8 +89,9 @@ export class Indexer { } // Note: Some event names might be unknown at this point, as earlier events might not yet be processed. - async getOrFetchBlockEvents (block: Block): Promise> { - const blockProgress = await this._db.getBlockProgress(block.hash); + async getOrFetchBlockEvents (block: DeepPartial): Promise> { + assert(block.blockHash); + const blockProgress = await this._db.getBlockProgress(block.blockHash); if (!blockProgress) { // Fetch and save events first and make a note in the event sync progress table. @@ -96,7 +99,7 @@ export class Indexer { log('getBlockEvents: db miss, fetching from upstream server'); } - const events = await this._db.getBlockEvents(block.hash); + const events = await this._db.getBlockEvents(block.blockHash); log(`getBlockEvents: db hit, num events: ${events.length}`); return events; @@ -352,8 +355,9 @@ export class Indexer { return res; } - async _fetchAndSaveEvents (block: Block): Promise { - const events = await this._uniClient.getEvents(block.hash); + async _fetchAndSaveEvents (block: DeepPartial): Promise { + assert(block.blockHash); + const events = await this._uniClient.getEvents(block.blockHash); const dbEvents: Array> = []; for (let i = 0; i < events.length; i++) { diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index eab1d888..8f99ab58 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -10,13 +10,12 @@ import debug from 'debug'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; -import { getConfig, JobQueue, wait } from '@vulcanize/util'; +import { getConfig, JobQueue, wait, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@vulcanize/util'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { Indexer } from './indexer'; import { Database } from './database'; -import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events'; import { Event } from './entity/Event'; const log = debug('vulcanize:job-runner'); @@ -73,43 +72,42 @@ export const main = async (): Promise => { await jobQueue.start(); await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - const { data: { block, priority } } = job; - log(`Processing block hash ${block.hash} number ${block.number}`); + const { data: { blockHash, blockNumber, parentHash, timestamp, priority } } = job; + log(`Processing block number ${blockNumber} hash ${blockHash} `); + + // Init sync status record if none exists. + let syncStatus = await indexer.getSyncStatus(); + if (!syncStatus) { + syncStatus = await indexer.updateSyncStatus(blockHash, blockNumber); + } // Check if parent block has been processed yet, if not, push a high priority job to process that first and abort. // However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep. - let syncStatus = await indexer.getSyncStatus(); - if (!syncStatus) { - syncStatus = await indexer.updateSyncStatus(block.hash, block.number); - } - - if (block.hash !== syncStatus.latestCanonicalBlockHash) { - const parent = await indexer.getBlockProgress(block.parentHash); + if (blockHash !== syncStatus.latestCanonicalBlockHash) { + const parent = await indexer.getBlockProgress(parentHash); if (!parent) { - const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await indexer.getBlock(block.parentHash); + const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await indexer.getBlock(parentHash); // Create a higher priority job to index parent block and then abort. // We don't have to worry about aborting as this job will get retried later. const newPriority = (priority || 0) + 1; await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { - block: { - hash: block.parentHash, - number: parentBlockNumber, - parentHash: grandparentHash, - timestamp: parentTimestamp - }, + blockHash: parentHash, + blockNumber: parentBlockNumber, + parentHash: grandparentHash, + timestamp: parentTimestamp, priority: newPriority }, { priority: newPriority }); - const message = `Parent block number ${parentBlockNumber} hash ${block.parentHash} of block number ${block.number} hash ${block.hash} not fetched yet, aborting`; + const message = `Parent block number ${parentBlockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash} not fetched yet, aborting`; log(message); throw new Error(message); } - if (block.parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) { + if (parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) { // Parent block indexing needs to finish before this block can be indexed. - const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${block.parentHash} of block number ${block.number} hash ${block.hash}, aborting`; + const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`; log(message); throw new Error(message); @@ -117,12 +115,12 @@ export const main = async (): Promise => { } // Check if block is being already processed. - const blockProgress = await indexer.getBlockProgress(block.hash); + const blockProgress = await indexer.getBlockProgress(blockHash); if (!blockProgress) { // Delay to allow uni-watcher to process block. await wait(jobDelay); - const events = await indexer.getOrFetchBlockEvents(block); + const events = await indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); for (let ei = 0; ei < events.length; ei++) { const { id } = events[ei]; diff --git a/packages/uni-info-watcher/src/resolvers.ts b/packages/uni-info-watcher/src/resolvers.ts index 763d294a..795b98a8 100644 --- a/packages/uni-info-watcher/src/resolvers.ts +++ b/packages/uni-info-watcher/src/resolvers.ts @@ -21,17 +21,24 @@ import { TokenHourData } from './entity/TokenHourData'; import { Transaction } from './entity/Transaction'; import { UniswapDayData } from './entity/UniswapDayData'; import { Position } from './entity/Position'; +import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); export { BlockHeight }; -export const createResolvers = async (indexer: Indexer): Promise => { +export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { assert(indexer); return { BigInt: new BigInt('bigInt'), + Subscription: { + onBlockProgressEvent: { + subscribe: () => eventWatcher.getBlockProgressEventIterator() + } + }, + Query: { bundle: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { log('bundle', id, block); diff --git a/packages/uni-info-watcher/src/schema.ts b/packages/uni-info-watcher/src/schema.ts index 2635a70c..db561212 100644 --- a/packages/uni-info-watcher/src/schema.ts +++ b/packages/uni-info-watcher/src/schema.ts @@ -166,6 +166,14 @@ type Block { timestamp: Int! } +type BlockProgressEvent { + blockNumber: Int! + blockHash: String! + numEvents: Int! + numProcessedEvents: Int! + isComplete: Boolean! +} + enum OrderDirection { asc desc @@ -436,4 +444,12 @@ type Query { where: Block_filter ): [Block!]! } + +# +# Subscriptions +# +type Subscription { + # Watch for block progress events from filler process. + onBlockProgressEvent: BlockProgressEvent! +} `; diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 1cdc308e..30f95955 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -5,7 +5,7 @@ import assert from 'assert'; import 'reflect-metadata'; import express, { Application } from 'express'; -import { ApolloServer } from 'apollo-server-express'; +import { ApolloServer, PubSub } from 'apollo-server-express'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; @@ -84,10 +84,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); await jobQueue.start(); - const eventWatcher = new EventWatcher(indexer, ethClient, jobQueue); + const pubSub = new PubSub(); + const eventWatcher = new EventWatcher(ethClient, indexer, pubSub, jobQueue); await eventWatcher.start(); - const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer); + const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); const app: Application = express(); const server = new ApolloServer({ diff --git a/packages/uni-info-watcher/test/utils.ts b/packages/uni-info-watcher/test/utils.ts index f5838292..6698f81b 100644 --- a/packages/uni-info-watcher/test/utils.ts +++ b/packages/uni-info-watcher/test/utils.ts @@ -7,6 +7,7 @@ import { ethers } from 'ethers'; import { request } from 'graphql-request'; import Decimal from 'decimal.js'; import _ from 'lodash'; +import { DeepPartial } from 'typeorm'; import { queryFactory, @@ -19,9 +20,10 @@ import { queryTokenHourData, queryTransactions } from '../test/queries'; -import { TestDatabase } from './test-db'; import { Block } from '../src/events'; import { Token } from '../src/entity/Token'; +import { BlockProgress } from '../src/entity/BlockProgress'; +import { TestDatabase } from './test-db'; export const checkUniswapDayData = async (endpoint: string): Promise => { // Checked values: date, tvlUSD. @@ -179,17 +181,23 @@ export const insertDummyBlock = async (db: TestDatabase, parentBlock: Block): Pr const parentHash = parentBlock.hash; const blockNumber = parentBlock.number + 1; - const block: Block = { - number: blockNumber, - hash: blockHash, - timestamp: blockTimestamp, + const block: DeepPartial = { + blockNumber, + blockHash, + blockTimestamp, parentHash }; await db.updateSyncStatus(dbTx, blockHash, blockNumber); await db.saveEvents(dbTx, block, []); await dbTx.commitTransaction(); - return block; + + return { + number: blockNumber, + hash: blockHash, + timestamp: blockTimestamp, + parentHash + }; } catch (error) { await dbTx.rollbackTransaction(); throw error; diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index e549fa0f..a0817a68 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -79,14 +79,12 @@ export class Database { return events; } - async saveEvents (queryRunner: QueryRunner, block: any, events: DeepPartial[]): Promise { + async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const { - hash: blockHash, - number: blockNumber, - timestamp: blockTimestamp, - parent: { - hash: parentHash - } + blockHash, + blockNumber, + blockTimestamp, + parentHash } = block; assert(blockHash); diff --git a/packages/uni-watcher/src/entity/BlockProgress.ts b/packages/uni-watcher/src/entity/BlockProgress.ts index d1666fbd..e67cf2dd 100644 --- a/packages/uni-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-watcher/src/entity/BlockProgress.ts @@ -4,11 +4,13 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { BlockProgressInterface } from '@vulcanize/util'; + @Entity() @Index(['blockHash'], { unique: true }) @Index(['blockNumber']) @Index(['parentHash']) -export class BlockProgress { +export class BlockProgress implements BlockProgressInterface { @PrimaryGeneratedColumn() id!: number; diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index 45f11232..da55dae5 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -8,32 +8,37 @@ import _ from 'lodash'; import { PubSub } from 'apollo-server-express'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { JobQueue, MAX_REORG_DEPTH } from '@vulcanize/util'; +import { + JobQueue, + EventWatcher as BaseEventWatcher, + MAX_REORG_DEPTH, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + QUEUE_CHAIN_PRUNING, + EventWatcherInterface +} from '@vulcanize/util'; import { Indexer } from './indexer'; -import { BlockProgress } from './entity/BlockProgress'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; const log = debug('vulcanize:events'); export const UniswapEvent = 'uniswap-event'; -export const BlockProgressEvent = 'block-progress-event'; -export const QUEUE_EVENT_PROCESSING = 'event-processing'; -export const QUEUE_BLOCK_PROCESSING = 'block-processing'; -export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; -export class EventWatcher { +export class EventWatcher implements EventWatcherInterface { _ethClient: EthClient _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined + _subscription?: ZenObservable.Subscription _pubsub: PubSub _jobQueue: JobQueue + _eventWatcher: BaseEventWatcher constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; + this._eventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { @@ -41,7 +46,7 @@ export class EventWatcher { } getBlockProgressEventIterator (): AsyncIterator { - return this._pubsub.asyncIterator([BlockProgressEvent]); + return this._eventWatcher.getBlockProgressEventIterator(); } async start (): Promise { @@ -56,12 +61,13 @@ export class EventWatcher { async watchBlocksAtChainHead (): Promise { log('Started watching upstream blocks...'); this._subscription = await this._ethClient.watchBlocks(async (value) => { - const { blockHash, blockNumber, parentHash } = _.get(value, 'data.listen.relatedNode'); + const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode'); await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber); log('watchBlock', blockHash, blockNumber); - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash }); + + await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp }); }); } @@ -84,30 +90,23 @@ export class EventWatcher { // Publish block progress event. const blockProgress = await this._indexer.getBlockProgress(blockHash); if (blockProgress) { - await this.publishBlockProgressToSubscribers(blockProgress); + await this._eventWatcher.publishBlockProgressToSubscribers(blockProgress); } }); } async initEventProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + const dbEvent = await this._eventWatcher.eventProcessingCompleteHandler(job); + const { data: { request, failed, state, createdOn } } = job; - const dbEvent = await this._indexer.getEvent(request.data.id); - assert(dbEvent); - - await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index); - const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash); - if (blockProgress) { - await this.publishBlockProgressToSubscribers(blockProgress); - } - const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); if (!failed && state === 'completed' && request.data.publish) { // Check for max acceptable lag time between request and sending results to live subscribers. if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - return await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds); + await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds); } else { log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); } @@ -142,26 +141,4 @@ export class EventWatcher { }); } } - - async publishBlockProgressToSubscribers (blockProgress: BlockProgress): Promise { - const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; - - // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. - await this._pubsub.publish(BlockProgressEvent, { - onBlockProgressEvent: { - blockHash, - blockNumber, - numEvents, - numProcessedEvents, - isComplete - } - }); - } - - async stop (): Promise { - if (this._subscription) { - log('Stopped watching upstream blocks'); - this._subscription.unsubscribe(); - } - } } diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index 5f6face3..176b2b52 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -7,13 +7,15 @@ import 'reflect-metadata'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; +import { PubSub } from 'apollo-server-express'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, JobQueue } from '@vulcanize/util'; +import { getConfig, fillBlocks, JobQueue } from '@vulcanize/util'; import { Database } from './database'; -import { QUEUE_BLOCK_PROCESSING } from './events'; +import { Indexer } from './indexer'; +import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -64,7 +66,15 @@ export const main = async (): Promise => { cache }); - assert(jobQueueConfig, 'Missing job queue config'); + const postgraphileClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + cache + }); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + const indexer = new Indexer(config, db, ethClient, postgraphileClient); const { dbConnectionString, maxCompletionLag } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -72,22 +82,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); await jobQueue.start(); - for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) { - log(`Fill block ${blockNumber}`); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - // TODO: Add pause between requests so as to not overwhelm the upsteam server. - const result = await ethClient.getBlockWithTransactions({ blockNumber }); - const { allEthHeaderCids: { nodes: blockNodes } } = result; - for (let bi = 0; bi < blockNodes.length; bi++) { - const { blockHash, blockNumber, parentHash } = blockNodes[bi]; - const blockProgress = await db.getBlockProgress(blockHash); - if (blockProgress) { - log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`); - } else { - await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash }); - } - } - } + assert(jobQueueConfig, 'Missing job queue config'); + + await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); }; main().then(() => { diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index cdaf7058..ea43fe24 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -16,11 +16,11 @@ import { Database } from './database'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; import { BlockProgress } from './entity/BlockProgress'; import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract'; +import { SyncStatus } from './entity/SyncStatus'; import { abi as factoryABI, storageLayout as factoryStorageLayout } from './artifacts/factory.json'; import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json'; import poolABI from './artifacts/pool.json'; -import { SyncStatus } from './entity/SyncStatus'; // TODO: Move to config. const MAX_EVENTS_BLOCK_RANGE = 1000; @@ -103,16 +103,17 @@ export class Indexer { } // Note: Some event names might be unknown at this point, as earlier events might not yet be processed. - async getOrFetchBlockEvents (blockHash: string): Promise> { - const blockProgress = await this._db.getBlockProgress(blockHash); + async getOrFetchBlockEvents (block: DeepPartial): Promise> { + assert(block.blockHash); + const blockProgress = await this._db.getBlockProgress(block.blockHash); if (!blockProgress) { // Fetch and save events first and make a note in the event sync progress table. - log(`getBlockEvents: db miss, fetching from upstream server ${blockHash}`); - await this.fetchAndSaveEvents(blockHash); + log(`getBlockEvents: db miss, fetching from upstream server ${block.blockHash}`); + await this.fetchAndSaveEvents(block); } - const events = await this._db.getBlockEvents(blockHash); - log(`getBlockEvents: db hit, ${blockHash} num events: ${events.length}`); + const events = await this._db.getBlockEvents(block.blockHash); + log(`getBlockEvents: db hit, ${block.blockHash} num events: ${events.length}`); return events; } @@ -314,8 +315,9 @@ export class Indexer { return { eventName, eventInfo }; } - async fetchAndSaveEvents (blockHash: string): Promise { - const { block, logs } = await this._ethClient.getLogs({ blockHash }); + async fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { + assert(blockHash); + let { block, logs } = await this._ethClient.getLogs({ blockHash }); const { allEthHeaderCids: { @@ -388,6 +390,13 @@ export class Indexer { const dbTx = await this._db.createTransactionRunner(); try { + block = { + blockHash, + blockNumber: block.number, + blockTimestamp: block.timestamp, + parentHash: block.parent.hash + }; + await this._db.saveEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); } catch (error) { diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 1dce0436..fdaf19be 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -10,12 +10,11 @@ import debug from 'debug'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, JobQueue, MAX_REORG_DEPTH } from '@vulcanize/util'; +import { getConfig, JobQueue, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util'; import { Indexer } from './indexer'; import { Database } from './database'; import { UNKNOWN_EVENT_NAME, Event } from './entity/Event'; -import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from './events'; const log = debug('vulcanize:job-runner'); @@ -36,7 +35,7 @@ export class JobRunner { async subscribeBlockProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - const { data: { blockHash, blockNumber, parentHash, priority } } = job; + const { data: { blockHash, blockNumber, parentHash, timestamp, priority } } = job; log(`Processing block number ${blockNumber} hash ${blockHash} `); @@ -51,7 +50,7 @@ export class JobRunner { if (blockHash !== syncStatus.latestCanonicalBlockHash) { const parent = await this._indexer.getBlockProgress(parentHash); if (!parent) { - const { number: parentBlockNumber, parent: { hash: grandparentHash } } = await this._indexer.getBlock(parentHash); + const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash); // Create a higher priority job to index parent block and then abort. // We don't have to worry about aborting as this job will get retried later. @@ -60,6 +59,7 @@ export class JobRunner { blockHash: parentHash, blockNumber: parentBlockNumber, parentHash: grandparentHash, + timestamp: parentTimestamp, priority: newPriority }, { priority: newPriority }); @@ -78,7 +78,7 @@ export class JobRunner { } } - const events = await this._indexer.getOrFetchBlockEvents(blockHash); + const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); for (let ei = 0; ei < events.length; ei++) { await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); } diff --git a/packages/util/index.ts b/packages/util/index.ts index e57648c5..34642c49 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -1,5 +1,12 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + export * from './src/config'; export * from './src/database'; export * from './src/job-queue'; export * from './src/constants'; export * from './src/index'; +export * from './src/fill'; +export * from './src/events'; +export * from './src/types'; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 751bdc70..f45922c4 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -12,6 +12,12 @@ import { Config as CacheConfig } from '@vulcanize/cache'; const log = debug('vulcanize:config'); +export interface JobQueueConfig { + dbConnectionString: string; + maxCompletionLag: number; + jobDelay?: number; +} + export interface Config { server: { host: string; @@ -36,11 +42,7 @@ export interface Config { gqlSubscriptionEndpoint: string; } }, - jobQueue: { - dbConnectionString: string; - maxCompletionLag: number; - jobDelay?: number; - } + jobQueue: JobQueueConfig } export const getConfig = async (configFile: string): Promise => { diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index c4fee396..b35f5447 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -3,3 +3,7 @@ // export const MAX_REORG_DEPTH = 16; + +export const QUEUE_BLOCK_PROCESSING = 'block-processing'; +export const QUEUE_EVENT_PROCESSING = 'event-processing'; +export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts new file mode 100644 index 00000000..bb833573 --- /dev/null +++ b/packages/util/src/events.ts @@ -0,0 +1,72 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import debug from 'debug'; +import { PubSub } from 'apollo-server-express'; + +import { EthClient } from '@vulcanize/ipld-eth-client'; + +import { JobQueue } from './job-queue'; +import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; + +const log = debug('vulcanize:events'); + +export const BlockProgressEvent = 'block-progress-event'; + +export class EventWatcher { + _ethClient: EthClient + _indexer: IndexerInterface + _subscription?: ZenObservable.Subscription + _pubsub: PubSub + _jobQueue: JobQueue + + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { + this._ethClient = ethClient; + this._indexer = indexer; + this._pubsub = pubsub; + this._jobQueue = jobQueue; + } + + getBlockProgressEventIterator (): AsyncIterator { + return this._pubsub.asyncIterator([BlockProgressEvent]); + } + + async publishBlockProgressToSubscribers (blockProgress: BlockProgressInterface): Promise { + const { blockHash, blockNumber, numEvents, numProcessedEvents, isComplete } = blockProgress; + + // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. + await this._pubsub.publish(BlockProgressEvent, { + onBlockProgressEvent: { + blockHash, + blockNumber, + numEvents, + numProcessedEvents, + isComplete + } + }); + } + + async stop (): Promise { + if (this._subscription) { + log('Stopped watching upstream blocks'); + this._subscription.unsubscribe(); + } + } + + async eventProcessingCompleteHandler (job: any): Promise { + const { data: { request } } = job; + + const dbEvent = await this._indexer.getEvent(request.data.id); + assert(dbEvent); + + await this._indexer.updateBlockProgress(dbEvent.block.blockHash, dbEvent.index); + const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash); + if (blockProgress) { + await this.publishBlockProgressToSubscribers(blockProgress); + } + + return dbEvent; + } +} diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts new file mode 100644 index 00000000..f68a4b40 --- /dev/null +++ b/packages/util/src/fill.ts @@ -0,0 +1,60 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import debug from 'debug'; + +import { EthClient } from '@vulcanize/ipld-eth-client'; + +import { JobQueue } from './job-queue'; +import { QUEUE_BLOCK_PROCESSING } from './constants'; +import { EventWatcherInterface, IndexerInterface } from './types'; + +const log = debug('vulcanize:fill'); + +export const fillBlocks = async ( + jobQueue: JobQueue, + indexer: IndexerInterface, + ethClient: EthClient, + eventWatcher: EventWatcherInterface, + { startBlock, endBlock }: { startBlock: number, endBlock: number} +): Promise => { + await eventWatcher.initBlockProcessingOnCompleteHandler(); + await eventWatcher.initEventProcessingOnCompleteHandler(); + + for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) { + log(`Fill block ${blockNumber}`); + + // TODO: Add pause between requests so as to not overwhelm the upsteam server. + const result = await ethClient.getBlockWithTransactions({ blockNumber }); + const { allEthHeaderCids: { nodes: blockNodes } } = result; + for (let bi = 0; bi < blockNodes.length; bi++) { + const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi]; + const blockProgress = await indexer.getBlockProgress(blockHash); + + if (blockProgress) { + log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`); + } else { + await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { blockHash, blockNumber, parentHash, timestamp }); + } + } + } + + // Creating an AsyncIterable from AsyncIterator to iterate over the values. + // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of + const blockProgressEventIterable = { + // getBlockProgressEventIterator returns an AsyncIterator which can be used to listen to BlockProgress events. + [Symbol.asyncIterator]: eventWatcher.getBlockProgressEventIterator.bind(eventWatcher) + }; + + // Iterate over async iterable. + // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of + for await (const data of blockProgressEventIterable) { + const { onBlockProgressEvent: { blockNumber, isComplete } } = data; + + if (blockNumber >= endBlock && isComplete) { + // Break the async loop if blockProgress event is for the endBlock and processing is complete. + break; + } + } +}; diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts new file mode 100644 index 00000000..37007c71 --- /dev/null +++ b/packages/util/src/types.ts @@ -0,0 +1,48 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +export interface BlockProgressInterface { + id: number; + blockHash: string; + parentHash: string; + blockNumber: number; + blockTimestamp: number; + numEvents: number; + numProcessedEvents: number; + lastProcessedEventIndex: number; + isComplete: boolean; + isPruned: boolean; +} + +export interface SyncStatusInterface { + id: number; + chainHeadBlockHash: string; + chainHeadBlockNumber: number; + latestCanonicalBlockHash: string; + latestCanonicalBlockNumber: number; +} + +export interface EventInterface { + id: number; + block: BlockProgressInterface; + txHash: string; + index: number; + contract: string; + eventName: string; + eventInfo: string; + extraInfo: string; + proof: string; +} + +export interface IndexerInterface { + getBlockProgress (blockHash: string): Promise + getEvent (id: string): Promise + updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise +} + +export interface EventWatcherInterface { + getBlockProgressEventIterator (): AsyncIterator + initBlockProcessingOnCompleteHandler (): Promise + initEventProcessingOnCompleteHandler (): Promise +}