diff --git a/packages/erc20-watcher/src/cli/reset-cmds/state.ts b/packages/erc20-watcher/src/cli/reset-cmds/state.ts index edf78ecc..5cff688e 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/state.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/state.ts @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util'; +import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -29,13 +29,21 @@ export const builder = { export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); + const { jobQueue: jobQueueConfig } = config; const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); // Initialize database. const db = new Database(dbConfig); await db.init(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, serverConfig.mode); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/erc20-watcher/src/cli/watch-contract.ts b/packages/erc20-watcher/src/cli/watch-contract.ts index 8f4f0eb8..60e0d662 100644 --- a/packages/erc20-watcher/src/cli/watch-contract.ts +++ b/packages/erc20-watcher/src/cli/watch-contract.ts @@ -5,11 +5,11 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; -import { ethers } from 'ethers'; -import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util'; import { Database } from '../database'; +import { Indexer } from '../indexer'; (async () => { const argv = await yargs.parserConfiguration({ @@ -37,16 +37,27 @@ import { Database } from '../database'; }).argv; const config: Config = await getConfig(argv.configFile); - const { database: dbConfig } = config; + const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config; + const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); assert(dbConfig); const db = new Database(dbConfig); await db.init(); - // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). - const address = ethers.utils.getAddress(argv.address); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + + await indexer.watchContract(argv.address, argv.startingBlock); - await db.saveContract(address, argv.startingBlock); await db.close(); + await jobQueue.stop(); + process.exit(); })(); diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index 2fa9cf3f..0d7a5319 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -15,8 +15,6 @@ import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; -const CONTRACT_KIND = 'token'; - export class Database { _config: ConnectionOptions _conn!: Connection @@ -76,10 +74,10 @@ export class Database { return repo.save(entity); } - async getContract (address: string): Promise { + async getContracts (): Promise { const repo = this._conn.getRepository(Contract); - return this._baseDatabase.getContract(repo, address); + return this._baseDatabase.getContracts(repo); } async createTransactionRunner (): Promise { @@ -116,12 +114,10 @@ export class Database { return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); } - async saveContract (address: string, startingBlock: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Contract); + async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { + const repo = queryRunner.manager.getRepository(Contract); - return this._baseDatabase.saveContract(repo, address, startingBlock, CONTRACT_KIND); - }); + return this._baseDatabase.saveContract(repo, address, startingBlock, kind); } async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise { @@ -171,10 +167,10 @@ export class Database { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); } async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise { diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index a515c294..6643d2c6 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -77,7 +77,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -85,6 +84,8 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 48915ba9..67ef4a0c 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME } from '@vulcanize/util'; +import { EventInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue } from '@vulcanize/util'; import { Database } from './database'; import { Event } from './entity/Event'; @@ -29,6 +29,8 @@ const ETH_CALL_MODE = 'eth_call'; const TRANSFER_EVENT = 'Transfer'; const APPROVAL_EVENT = 'Approval'; +const CONTRACT_KIND = 'token'; + interface EventResult { event: { from?: string; @@ -53,7 +55,7 @@ export class Indexer { _contract: ethers.utils.Interface _serverMode: string - constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, serverMode: string) { + constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) { assert(db); assert(ethClient); @@ -62,7 +64,7 @@ export class Indexer { this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverMode = serverMode; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); const { abi, storageLayout } = artifacts; @@ -290,13 +292,6 @@ export class Indexer { return { eventName, eventInfo }; } - async watchContract (address: string, startingBlock: number): Promise { - // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). - await this._db.saveContract(ethers.utils.getAddress(address), startingBlock); - - return true; - } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -305,6 +300,10 @@ export class Indexer { return this._baseIndexer.isWatchedContract(address); } + async watchContract (address: string, startingBlock: number): Promise { + return this._baseIndexer.watchContract(address, CONTRACT_KIND, startingBlock); + } + async saveEventEntity (dbEvent: Event): Promise { return this._baseIndexer.saveEventEntity(dbEvent); } @@ -365,8 +364,8 @@ export class Indexer { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index f13033ca..5d894c80 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -56,12 +56,16 @@ export class JobRunner { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { const event = await this._baseJobRunner.processEvent(job); + if (!event) { + return; + } + const watchedContract = await this._indexer.isWatchedContract(event.contract); if (watchedContract) { await this._indexer.processEvent(event); } - await this._indexer.updateBlockProgress(event.block.blockHash, event.index); + await this._indexer.updateBlockProgress(event.block, event.index); await this._jobQueue.markComplete(job); }); } @@ -107,7 +111,6 @@ export const main = async (): Promise => { }); const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); @@ -117,6 +120,8 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/erc20-watcher/src/resolvers.ts b/packages/erc20-watcher/src/resolvers.ts index bc5aad96..f5c9548e 100644 --- a/packages/erc20-watcher/src/resolvers.ts +++ b/packages/erc20-watcher/src/resolvers.ts @@ -34,9 +34,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Mutation: { - watchToken: (_: any, { token, startingBlock = 1 }: { token: string, startingBlock: number }): Promise => { + watchToken: async (_: any, { token, startingBlock = 1 }: { token: string, startingBlock: number }): Promise => { log('watchToken', token, startingBlock); - return indexer.watchContract(token, startingBlock); + await indexer.watchContract(token, startingBlock); + + return true; } }, diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index fadb90ca..b2502eaf 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -72,7 +72,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); @@ -80,6 +79,9 @@ export const main = async (): Promise => { assert(dbConnectionString, 'Missing job queue db connection string'); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts index 57dcf036..5762c6da 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util'; +import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; @@ -38,6 +38,7 @@ export const builder = { export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); + const { jobQueue: jobQueueConfig } = config; const { dbConfig, serverConfig, upstreamConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); // Initialize database. @@ -52,7 +53,15 @@ export const handler = async (argv: any): Promise => { const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, serverConfig.mode); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 1e9a6ee9..6bfa3598 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -663,10 +663,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); } async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 79ce104b..45689e24 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -82,7 +82,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -91,6 +90,8 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv); diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 1901af5e..c64771e3 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -11,7 +11,7 @@ import { providers, utils, BigNumber } from 'ethers'; import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal } from '@vulcanize/util'; +import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue } from '@vulcanize/util'; import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; @@ -48,7 +48,7 @@ export class Indexer implements IndexerInterface { _baseIndexer: BaseIndexer _isDemo: boolean - constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, mode: string) { + constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) { assert(db); assert(uniClient); assert(erc20Client); @@ -59,14 +59,14 @@ export class Indexer implements IndexerInterface { this._erc20Client = erc20Client; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, ethProvider, jobQueue); this._isDemo = mode === 'demo'; } getResultEvent (event: Event): ResultEvent { const block = event.block; const eventFields = JSON.parse(event.eventInfo); - const { tx } = JSON.parse(event.extraInfo); + const { tx, eventIndex } = JSON.parse(event.extraInfo); return { block: { @@ -78,7 +78,7 @@ export class Indexer implements IndexerInterface { tx, contract: event.contract, - eventIndex: event.index, + eventIndex, event: { __typename: event.eventName, @@ -346,8 +346,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async _fetchAndSaveEvents (block: DeepPartial): Promise { @@ -365,10 +365,10 @@ export class Indexer implements IndexerInterface { } = events[i]; const { __typename: eventName, ...eventInfo } = event; - const extraInfo = { tx }; + const extraInfo = { tx, eventIndex }; dbEvents.push({ - index: eventIndex, + index: i, txHash: tx.hash, contract, eventName, diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index ac4fa153..f3f71b87 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -58,12 +58,16 @@ export class JobRunner { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { const event = await this._baseJobRunner.processEvent(job); + if (!event) { + return; + } + // Check if event is processed. if (!event.block.isComplete && event.index !== event.block.lastProcessedEventIndex) { await this._indexer.processEvent(event); } - await this._indexer.updateBlockProgress(event.block.blockHash, event.index); + await this._indexer.updateBlockProgress(event.block, event.index); await this._jobQueue.markComplete(job); }); } @@ -132,8 +136,6 @@ export const main = async (): Promise => { const erc20Client = new ERC20Client(tokenWatcher); const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, mode); - assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -142,6 +144,8 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 555c5b83..8cb85f8f 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -82,7 +82,6 @@ export const main = async (): Promise => { const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, mode); assert(jobQueueConfig, 'Missing job queue config'); @@ -92,6 +91,8 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const pubSub = new PubSub(); const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue); await eventWatcher.start(); diff --git a/packages/uni-watcher/src/chain-pruning.test.ts b/packages/uni-watcher/src/chain-pruning.test.ts index 95d1e8a2..26869864 100644 --- a/packages/uni-watcher/src/chain-pruning.test.ts +++ b/packages/uni-watcher/src/chain-pruning.test.ts @@ -63,14 +63,14 @@ describe('chain pruning', () => { const ethProvider = getCustomProvider(rpcProviderEndpoint); - indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); - assert(indexer, 'Could not create indexer object.'); - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + assert(indexer, 'Could not create indexer object.'); + jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); }); diff --git a/packages/uni-watcher/src/cli/reset-cmds/state.ts b/packages/uni-watcher/src/cli/reset-cmds/state.ts index d7f1d749..2bdaa9b7 100644 --- a/packages/uni-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-watcher/src/cli/reset-cmds/state.ts @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util'; +import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -27,13 +27,21 @@ export const builder = { export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); + const { jobQueue: jobQueueConfig } = config; const { dbConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); // Initialize database. const db = new Database(dbConfig); await db.init(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-watcher/src/cli/watch-contract.ts b/packages/uni-watcher/src/cli/watch-contract.ts index 6caf226d..c289080a 100644 --- a/packages/uni-watcher/src/cli/watch-contract.ts +++ b/packages/uni-watcher/src/cli/watch-contract.ts @@ -6,10 +6,10 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; -import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util'; import { Database } from '../database'; -import { watchContract } from '../utils/index'; +import { Indexer } from '../indexer'; (async () => { const argv = await yargs.parserConfiguration({ @@ -43,14 +43,28 @@ import { watchContract } from '../utils/index'; }).argv; const config: Config = await getConfig(argv.configFile); - const { database: dbConfig } = config; + const { database: dbConfig, jobQueue: jobQueueConfig } = config; + const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); assert(dbConfig); const db = new Database(dbConfig); await db.init(); - await watchContract(db, argv.address, argv.kind, argv.startingBlock); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + await indexer.init(); + + await indexer.watchContract(argv.address, argv.kind, argv.startingBlock); await db.close(); + await jobQueue.stop(); + process.exit(); })(); diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index 9da38ae9..27d8b57b 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -45,13 +45,13 @@ export class Database implements DatabaseInterface { .getOne(); } - async getContract (address: string): Promise { + async getContracts (): Promise { const repo = this._conn.getRepository(Contract); - return this._baseDatabase.getContract(repo, address); + return this._baseDatabase.getContracts(repo); } - async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { + async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { const repo = queryRunner.manager.getRepository(Contract); return this._baseDatabase.saveContract(repo, address, startingBlock, kind); @@ -138,10 +138,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); } async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index 2c2db607..8652f0b0 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -77,7 +77,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -85,6 +84,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + await indexer.init(); + const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index e4344ddc..81d08e61 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -9,7 +9,7 @@ import { ethers } from 'ethers'; import assert from 'assert'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { IndexerInterface, Indexer as BaseIndexer, ValueResult } from '@vulcanize/util'; +import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue } from '@vulcanize/util'; import { Database } from './database'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; @@ -46,18 +46,22 @@ export class Indexer implements IndexerInterface { _poolContract: ethers.utils.Interface _nfpmContract: ethers.utils.Interface - constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider) { + constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { this._db = db; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); this._factoryContract = new ethers.utils.Interface(factoryABI); this._poolContract = new ethers.utils.Interface(poolABI); this._nfpmContract = new ethers.utils.Interface(nfpmABI); } + async init (): Promise { + await this._baseIndexer.fetchContracts(); + } + getResultEvent (event: Event): ResultEvent { const block = event.block; const eventFields = JSON.parse(event.eventInfo); @@ -97,7 +101,7 @@ export class Indexer implements IndexerInterface { switch (re.event.__typename) { case 'PoolCreatedEvent': { const poolContract = ethers.utils.getAddress(re.event.pool); - await this._db.saveContract(dbTx, poolContract, KIND_POOL, dbEvent.block.blockNumber); + await this.watchContract(poolContract, KIND_POOL, dbEvent.block.blockNumber); } } } @@ -343,6 +347,14 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.isWatchedContract(address); } + async watchContract (address: string, kind: string, startingBlock: number): Promise { + return this._baseIndexer.watchContract(address, kind, startingBlock); + } + + cacheContract (contract: Contract): void { + return this._baseIndexer.cacheContract(contract); + } + async saveEventEntity (dbEvent: Event): Promise { return this._baseIndexer.saveEventEntity(dbEvent); } @@ -404,8 +416,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index f0a3ec04..1b9f5043 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -55,12 +55,17 @@ export class JobRunner { async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - const event = await this._baseJobRunner.processEvent(job); + // TODO: Support two kind of jobs on the event processing queue. + // 1) processEvent => Current single event + // 2) processEvents => Event range (multiple events) + let event = await this._baseJobRunner.processEvent(job); - let dbEvent; - const { data: { id } } = job; + if (!event) { + return; + } const watchedContract = await this._indexer.isWatchedContract(event.contract); + if (watchedContract) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. @@ -69,16 +74,13 @@ export class JobRunner { const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj); event.eventName = eventName; event.eventInfo = JSON.stringify(eventInfo); - dbEvent = await this._indexer.saveEventEntity(event); + event = await this._indexer.saveEventEntity(event); } - dbEvent = await this._indexer.getEvent(id); - assert(dbEvent); - - await this._indexer.processEvent(dbEvent); + await this._indexer.processEvent(event); } - await this._indexer.updateBlockProgress(event.block.blockHash, event.index); + await this._indexer.updateBlockProgress(event.block, event.index); await this._jobQueue.markComplete(job); }); } @@ -125,8 +127,6 @@ export const main = async (): Promise => { const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); - assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -135,6 +135,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + await indexer.init(); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index b995bdad..db1c84fd 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -72,7 +72,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); assert(jobQueueConfig, 'Missing job queue config'); @@ -82,6 +81,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + await indexer.init(); + const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); await eventWatcher.start(); diff --git a/packages/uni-watcher/src/smoke.test.ts b/packages/uni-watcher/src/smoke.test.ts index af40f6e1..3690616e 100644 --- a/packages/uni-watcher/src/smoke.test.ts +++ b/packages/uni-watcher/src/smoke.test.ts @@ -8,7 +8,8 @@ import 'mocha'; import { Config, - getConfig + getConfig, + JobQueue } from '@vulcanize/util'; import { deployTokens, @@ -65,6 +66,7 @@ describe('uni-watcher', () => { let ethClient: EthClient; let postgraphileClient: EthClient; let ethProvider: ethers.providers.JsonRpcProvider; + let jobQueue: JobQueue; let signer: Signer; let recipient: string; let deadline: number; @@ -72,7 +74,7 @@ describe('uni-watcher', () => { before(async () => { config = await getConfig(CONFIG_FILE); - const { database: dbConfig, upstream, server: { host, port } } = config; + const { database: dbConfig, upstream, server: { host, port }, jobQueue: jobQueueConfig } = config; assert(dbConfig, 'Missing dbConfig.'); assert(upstream, 'Missing upstream.'); assert(host, 'Missing host.'); @@ -115,6 +117,9 @@ describe('uni-watcher', () => { const deadlineDate = new Date(); deadlineDate.setDate(deadlineDate.getDate() + 2); deadline = Math.floor(deadlineDate.getTime() / 1000); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); }); after(async () => { @@ -130,7 +135,8 @@ describe('uni-watcher', () => { factory = new Contract(factoryContract.address, FACTORY_ABI, signer); // Verifying with the db. - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + await indexer.init(); assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.'); }); @@ -265,7 +271,8 @@ describe('uni-watcher', () => { nfpm = new Contract(nfpmContract.address, NFPM_ABI, signer); // Verifying with the db. - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + await indexer.init(); assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.'); }); diff --git a/packages/uni-watcher/src/utils/index.ts b/packages/uni-watcher/src/utils/index.ts index 3a3caa3c..3b70ff2e 100644 --- a/packages/uni-watcher/src/utils/index.ts +++ b/packages/uni-watcher/src/utils/index.ts @@ -2,27 +2,8 @@ // Copyright 2021 Vulcanize, Inc. // -import { ethers } from 'ethers'; - -import { Database } from '../database'; import { Client as UniClient } from '../client'; -export async function watchContract (db: Database, address: string, kind: string, startingBlock: number): Promise { - // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). - const contractAddress = ethers.utils.getAddress(address); - const dbTx = await db.createTransactionRunner(); - - try { - await db.saveContract(dbTx, contractAddress, kind, startingBlock); - await dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } -} - export const watchEvent = async (uniClient: UniClient, eventType: string): Promise => { return new Promise((resolve, reject) => { (async () => { diff --git a/packages/uni-watcher/test/init.ts b/packages/uni-watcher/test/init.ts index bea76618..b3f9a352 100644 --- a/packages/uni-watcher/test/init.ts +++ b/packages/uni-watcher/test/init.ts @@ -6,7 +6,7 @@ import { Contract, ethers, Signer } from 'ethers'; import assert from 'assert'; import { - getConfig + getConfig, getResetConfig, JobQueue } from '@vulcanize/util'; import { deployWETH9Token, @@ -19,23 +19,23 @@ import { import { Client as UniClient } from '../src/client'; import { Database } from '../src/database'; -import { watchContract } from '../src/utils/index'; +import { Indexer } from '../src/indexer'; const CONFIG_FILE = './environments/test.toml'; -const deployFactoryContract = async (db: Database, signer: Signer): Promise => { +const deployFactoryContract = async (indexer: Indexer, signer: Signer): Promise => { // Deploy factory from uniswap package. const Factory = new ethers.ContractFactory(FACTORY_ABI, FACTORY_BYTECODE, signer); const factory = await Factory.deploy(); assert(factory.address, 'Factory contract not deployed.'); // Watch factory contract. - await watchContract(db, factory.address, 'factory', 100); + await indexer.watchContract(factory.address, 'factory', 100); return factory; }; -const deployNFPMContract = async (db: Database, signer: Signer, factory: Contract): Promise => { +const deployNFPMContract = async (indexer: Indexer, signer: Signer, factory: Contract): Promise => { // Deploy weth9 token. const weth9Address = await deployWETH9Token(signer); assert(weth9Address, 'WETH9 token not deployed.'); @@ -45,18 +45,19 @@ const deployNFPMContract = async (db: Database, signer: Signer, factory: Contrac assert(nfpm.address, 'NFPM contract not deployed.'); // Watch NFPM contract. - await watchContract(db, nfpm.address, 'nfpm', 100); + await indexer.watchContract(nfpm.address, 'nfpm', 100); }; const main = async () => { // Get config. const config = await getConfig(CONFIG_FILE); - const { database: dbConfig, server: { host, port }, upstream: { ethServer: { rpcProviderEndpoint } } } = config; + const { database: dbConfig, server: { host, port }, jobQueue: jobQueueConfig } = config; assert(dbConfig, 'Missing dbConfig.'); assert(host, 'Missing host.'); assert(port, 'Missing port.'); - assert(rpcProviderEndpoint, 'Missing rpcProviderEndpoint.'); + + const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); // Initialize uniClient. const endpoint = `http://${host}:${port}/graphql`; @@ -71,14 +72,22 @@ const main = async () => { const db = new Database(dbConfig); await db.init(); - const provider = new ethers.providers.JsonRpcProvider(rpcProviderEndpoint); + const provider = ethProvider as ethers.providers.JsonRpcProvider; const signer = provider.getSigner(); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + let factory: Contract; // Checking whether factory is deployed. const factoryContract = await uniClient.getContract('factory'); if (factoryContract == null) { - factory = await deployFactoryContract(db, signer); + factory = await deployFactoryContract(indexer, signer); } else { factory = new Contract(factoryContract.address, FACTORY_ABI, signer); } @@ -86,7 +95,7 @@ const main = async () => { // Checking whether NFPM is deployed. const nfpmContract = await uniClient.getContract('nfpm'); if (nfpmContract == null) { - await deployNFPMContract(db, signer, factory); + await deployNFPMContract(indexer, signer, factory); } // Closing the database. diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index 8f54bf4d..7595da90 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -11,6 +11,9 @@ export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; export const JOB_KIND_INDEX = 'index'; export const JOB_KIND_PRUNE = 'prune'; +export const JOB_KIND_CONTRACT = 'contract'; +export const JOB_KIND_EVENT = 'event'; + export const DEFAULT_CONFIG_PATH = 'environments/local.toml'; export const UNKNOWN_EVENT_NAME = '__unknown__'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 203ac165..fece54f9 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -153,20 +153,20 @@ export class Database { .getMany(); } - async updateBlockProgress (repo: Repository, blockHash: string, lastProcessedEventIndex: number): Promise { - const entity = await repo.findOne({ where: { blockHash } }); - if (entity && !entity.isComplete) { - if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) { - throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); + async updateBlockProgress (repo: Repository, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { + if (!block.isComplete) { + if (lastProcessedEventIndex <= block.lastProcessedEventIndex) { + throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); } - entity.lastProcessedEventIndex = lastProcessedEventIndex; - entity.numProcessedEvents++; - if (entity.numProcessedEvents >= entity.numEvents) { - entity.isComplete = true; + block.lastProcessedEventIndex = lastProcessedEventIndex; + block.numProcessedEvents++; + if (block.numProcessedEvents >= block.numEvents) { + block.isComplete = true; } - await repo.save(entity); + const { id, ...blockData } = block; + await repo.update(id, blockData); } } @@ -550,21 +550,24 @@ export class Database { return { canonicalBlockNumber, blockHashes }; } - async getContract (repo: Repository, address: string): Promise { + async getContracts (repo: Repository): Promise { return repo.createQueryBuilder('contract') - .where('address = :address', { address }) - .getOne(); + .getMany(); } - async saveContract (repo: Repository, address: string, startingBlock: number, kind?: string): Promise { - const numRows = await repo + async saveContract (repo: Repository, address: string, startingBlock: number, kind?: string): Promise { + const contract = await repo .createQueryBuilder() .where('address = :address', { address }) - .getCount(); + .getOne(); - if (numRows === 0) { - const entity = repo.create({ address, kind, startingBlock }); - await repo.save(entity); + const entity = repo.create({ address, kind, startingBlock }); + + // If contract already present, overwrite fields. + if (contract) { + entity.id = contract.id; } + + return repo.save(entity); } } diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 47906acf..c0e875dd 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -11,7 +11,8 @@ import { EthClient } from '@vulcanize/ipld-eth-client'; import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper'; import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types'; -import { UNKNOWN_EVENT_NAME } from './constants'; +import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING } from './constants'; +import { JobQueue } from './job-queue'; const MAX_EVENTS_BLOCK_RANGE = 1000; @@ -30,15 +31,31 @@ export class Indexer { _postgraphileClient: EthClient; _getStorageAt: GetStorageAt; _ethProvider: ethers.providers.BaseProvider; + _jobQueue: JobQueue; - constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider) { + _watchedContracts: { [key: string]: ContractInterface } = {}; + + constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { this._db = db; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; + this._jobQueue = jobQueue; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); } + async fetchContracts (): Promise { + assert(this._db.getContracts); + + const contracts = await this._db.getContracts(); + + this._watchedContracts = contracts.reduce((acc: { [key: string]: ContractInterface }, contract) => { + acc[contract.address] = contract; + + return acc; + }, {}); + } + async getSyncStatus (): Promise { const dbTx = await this._db.createTransactionRunner(); let res; @@ -152,12 +169,12 @@ export class Indexer { } } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.updateBlockProgress(dbTx, blockHash, lastProcessedEventIndex); + res = await this._db.updateBlockProgress(dbTx, block, lastProcessedEventIndex); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -281,9 +298,39 @@ export class Indexer { } async isWatchedContract (address : string): Promise { - assert(this._db.getContract); + return this._watchedContracts[address]; + } - return this._db.getContract(ethers.utils.getAddress(address)); + async watchContract (address: string, kind: string, startingBlock: number): Promise { + assert(this._db.saveContract); + const dbTx = await this._db.createTransactionRunner(); + + // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). + const contractAddress = ethers.utils.getAddress(address); + + try { + const contract = await this._db.saveContract(dbTx, contractAddress, kind, startingBlock); + this.cacheContract(contract); + await dbTx.commitTransaction(); + + await this._jobQueue.pushJob( + QUEUE_EVENT_PROCESSING, + { + kind: JOB_KIND_CONTRACT, + contract + }, + { priority: 1 } + ); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + + cacheContract (contract: ContractInterface): void { + this._watchedContracts[contract.address] = contract; } async getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise { diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 1711d095..a6cda25d 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -52,6 +52,10 @@ export class JobQueue { await this._boss.start(); } + async stop (): Promise { + await this._boss.stop(); + } + async subscribe (queue: string, callback: JobCallback): Promise { return await this._boss.subscribe( queue, diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 9dc1d0a3..3bb97efb 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -8,7 +8,7 @@ import { wait } from './misc'; import { createPruningJob } from './common'; import { JobQueueConfig } from './config'; -import { JOB_KIND_INDEX, JOB_KIND_PRUNE, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; +import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENT, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; @@ -46,34 +46,20 @@ export class JobRunner { } } - async processEvent (job: any): Promise { - const { data: { id } } = job; + async processEvent (job: any): Promise { + const { data: { kind } } = job; - log(`Processing event ${id}`); + switch (kind) { + case JOB_KIND_EVENT: + return this._processEvent(job); - const dbEvent = await this._indexer.getEvent(id); - assert(dbEvent); + case JOB_KIND_CONTRACT: + return this._updateWatchedContracts(job); - const event = dbEvent; - - const blockProgress = await this._indexer.getBlockProgress(event.block.blockHash); - assert(blockProgress); - - const events = await this._indexer.getBlockEvents(event.block.blockHash); - const eventIndex = events.findIndex((e: any) => e.id === event.id); - assert(eventIndex !== -1); - - // Check if previous event in block has been processed exactly before this and abort if not. - if (eventIndex > 0) { // Skip the first event in the block. - const prevIndex = eventIndex - 1; - const prevEvent = events[prevIndex]; - if (prevEvent.index !== blockProgress.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` + - ` prev event index ${prevEvent.index}, got event index ${event.index} and lastProcessedEventIndex ${blockProgress.lastProcessedEventIndex}, aborting`); - } + default: + log(`Invalid Job kind ${kind} in QUEUE_EVENT_PROCESSING.`); + break; } - - return event; } async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise { @@ -180,8 +166,37 @@ export class JobRunner { const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); for (let ei = 0; ei < events.length; ei++) { - await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); + await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENT, id: events[ei].id, publish: true }); } } } + + async _processEvent (job: any): Promise { + const { data: { id } } = job; + + log(`Processing event ${id}`); + + const event = await this._indexer.getEvent(id); + assert(event); + const eventIndex = event.index; + + // Check if previous event in block has been processed exactly before this and abort if not. + if (eventIndex > 0) { // Skip the first event in the block. + const prevIndex = eventIndex - 1; + + if (prevIndex !== event.block.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${event.block.blockNumber} hash ${event.block.blockHash},` + + ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${event.block.lastProcessedEventIndex}, aborting`); + } + } + + return event; + } + + async _updateWatchedContracts (job: any): Promise { + const { data: { contract } } = job; + + assert(this._indexer.cacheContract); + this._indexer.cacheContract(contract); + } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 785c51e0..285489f3 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -56,11 +56,12 @@ export interface IndexerInterface { getAncestorAtDepth (blockHash: string, depth: number): Promise getOrFetchBlockEvents (block: DeepPartial): Promise> removeUnknownEvents (block: BlockProgressInterface): Promise - updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise + updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise; + cacheContract?: (contract: ContractInterface) => void; } export interface EventWatcherInterface { @@ -80,12 +81,13 @@ export interface DatabaseInterface { getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>; getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise>; markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise; - updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise + updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise; removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise; - getContract?: (address: string) => Promise + getContracts?: () => Promise + saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, startingBlock: number) => Promise }