diff --git a/packages/uni-watcher/environments/local.toml b/packages/uni-watcher/environments/local.toml index 3649dc84..58f447eb 100644 --- a/packages/uni-watcher/environments/local.toml +++ b/packages/uni-watcher/environments/local.toml @@ -29,6 +29,3 @@ name = "requests" enabled = false deleteOnStart = false - -[contracts] - factory = "0xECfCae19742EE4644b209d75cF15ee6D02A726Ff" diff --git a/packages/uni-watcher/src/cli/watch-contract.ts b/packages/uni-watcher/src/cli/watch-contract.ts new file mode 100644 index 00000000..196e57bb --- /dev/null +++ b/packages/uni-watcher/src/cli/watch-contract.ts @@ -0,0 +1,51 @@ +import assert from 'assert'; +import yargs from 'yargs'; +import 'reflect-metadata'; +import { ethers } from 'ethers'; + +import { Config, getConfig } from '../config'; +import { Database } from '../database'; + +(async () => { + const argv = await yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + type: 'string', + require: true, + demandOption: true, + describe: 'configuration file path (toml)' + }, + address: { + type: 'string', + require: true, + demandOption: true, + describe: 'Address of the deployed contract' + }, + kind: { + type: 'string', + require: true, + demandOption: true, + describe: 'Kind of contract (factory|pool)' + }, + startingBlock: { + type: 'number', + default: 1, + describe: 'Starting block' + } + }).argv; + + const config: Config = await getConfig(argv.configFile); + const { database: dbConfig } = config; + + assert(dbConfig); + + const db = new Database(dbConfig); + await db.init(); + + // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). + const address = ethers.utils.getAddress(argv.address); + + await db.saveContract(address, argv.kind, argv.startingBlock); + await db.close(); +})(); diff --git a/packages/uni-watcher/src/config.ts b/packages/uni-watcher/src/config.ts index 927f64b1..b93cd8f0 100644 --- a/packages/uni-watcher/src/config.ts +++ b/packages/uni-watcher/src/config.ts @@ -19,9 +19,6 @@ export interface Config { gqlSubscriptionEndpoint: string; cache: CacheConfig } - contracts: { - factory: string; - } } export const getConfig = async (configFile: string): Promise => { diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index fc232917..324b3cb4 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -3,6 +3,7 @@ import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'ty import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { Event } from './entity/Event'; +import { Contract } from './entity/Contract'; import { EventSyncProgress } from './entity/EventProgress'; export class Database { @@ -93,4 +94,27 @@ export class Database { } }); } + + async getContract (address: string): Promise { + return this._conn.getRepository(Contract) + .createQueryBuilder('contract') + .where('address = :address', { address }) + .getOne(); + } + + async saveContract (address: string, kind: string, startingBlock: number): Promise { + await this._conn.transaction(async (tx) => { + const repo = tx.getRepository(Contract); + + const numRows = await repo + .createQueryBuilder() + .where('address = :address', { address }) + .getCount(); + + if (numRows === 0) { + const entity = repo.create({ address, kind, startingBlock }); + await repo.save(entity); + } + }); + } } diff --git a/packages/uni-watcher/src/entity/Contract.ts b/packages/uni-watcher/src/entity/Contract.ts index ba0ddbe2..bf23668e 100644 --- a/packages/uni-watcher/src/entity/Contract.ts +++ b/packages/uni-watcher/src/entity/Contract.ts @@ -1,12 +1,12 @@ -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryColumn, Column } from 'typeorm'; + +export const KIND_FACTORY = 'factory'; + +export const KIND_POOL = 'pool'; @Entity() -@Index(['address'], { unique: true }) export class Contract { - @PrimaryGeneratedColumn() - id!: number; - - @Column('varchar', { length: 42 }) + @PrimaryColumn('varchar', { length: 42 }) address!: string; @Column('varchar', { length: 8 }) diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index 05e840c4..9b88bc96 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -35,15 +35,14 @@ export class EventWatcher { if (logContracts && logContracts.length) { for (let logIndex = 0; logIndex < logContracts.length; logIndex++) { const contractAddress = logContracts[logIndex]; - const isWatchedContract = await this._indexer.isUniswapContract(contractAddress); - if (isWatchedContract) { - // TODO: Move processing to background task runner. - + const uniContract = await this._indexer.isUniswapContract(contractAddress); + if (uniContract) { const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt; - await this._indexer.getEvents(blockHash, contractAddress, null); + const events = await this._indexer.getEvents(blockHash, contractAddress, null); + const event = events[logIndex]; // Trigger other indexer methods based on event topic. - await this._indexer.processEvent(blockHash, blockNumber, contractAddress, receipt, logIndex); + await this._indexer.processEvent(blockHash, blockNumber, uniContract, receipt, event); } } } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index b833e9df..8b99022e 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -11,16 +11,19 @@ import { GetStorageAt } from '@vulcanize/solidity-mapper'; import { Database } from './database'; import { Event } from './entity/Event'; +import { Contract, KIND_FACTORY, KIND_POOL } from './entity/Contract'; import { Config } from './config'; import factoryABI from './artifacts/factory.json'; const log = debug('vulcanize:indexer'); -type EventsResult = Array<{ +type EventResult = { event: any; proof: string; -}> +}; + +type EventsResult = Array; export class Indexer { _config: Config; @@ -51,10 +54,15 @@ export class Indexer { } async getEvents (blockHash: string, contract: string, name: string | null): Promise { + const uniContract = await this.isUniswapContract(contract); + if (!uniContract) { + throw new Error('Not a uniswap contract'); + } + const didSyncEvents = await this._db.didSyncEvents({ blockHash, contract }); if (!didSyncEvents) { // Fetch and save events first and make a note in the event sync progress table. - await this._fetchAndSaveEvents({ blockHash, contract }); + await this._fetchAndSaveEvents({ blockHash, contract, uniContract }); log('getEvents: db miss, fetching from upstream server'); } @@ -84,21 +92,16 @@ export class Indexer { return result; } - /* eslint-disable */ - async triggerIndexingOnEvent (blockHash: string, contract: string, receipt: any, logIndex: number): Promise { - + async triggerIndexingOnEvent (blockNumber: number, event: EventResult): Promise { + switch (event.event.__typename) { + case 'PoolCreatedEvent': { + const poolContract = ethers.utils.getAddress(event.event.pool); + await this._db.saveContract(poolContract, KIND_POOL, blockNumber); + } + } } - /* eslint-enable */ - - async publishEventToSubscribers (blockHash: string, blockNumber: number, contract: string, logIndex: number): Promise { - // TODO: Optimize this fetching of events. - const events = await this.getEvents(blockHash, contract, null); - - log(JSON.stringify(events, null, 2)); - log(logIndex); - - const event = events[logIndex]; + async publishEventToSubscribers (blockHash: string, blockNumber: number, contract: string, event: EventResult): Promise { log(`pushing event to GQL subscribers: ${event.event.__typename}`); // Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`. @@ -112,20 +115,21 @@ export class Indexer { }); } - async isUniswapContract (address: string): Promise { - // TODO: Return true for uniswap contracts of interest to the indexer (from config?). - return address != null; + async isUniswapContract (address: string): Promise { + return this._db.getContract(ethers.utils.getAddress(address)); } - async processEvent (blockHash: string, blockNumber: number, contract: string, receipt: any, logIndex: number): Promise { + async processEvent (blockHash: string, blockNumber: number, contract: Contract, receipt: any, event: EventResult): Promise { // Trigger indexing of data based on the event. - await this.triggerIndexingOnEvent(blockHash, contract, receipt, logIndex); + await this.triggerIndexingOnEvent(blockNumber, event); // Also trigger downstream event watcher subscriptions. - await this.publishEventToSubscribers(blockHash, blockNumber, contract, logIndex); + await this.publishEventToSubscribers(blockHash, blockNumber, contract.address, event); } - async _fetchAndSaveEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise { + async _fetchAndSaveEvents ({ blockHash, contract, uniContract }: { blockHash: string, contract: string, uniContract: Contract }): Promise { + assert(uniContract); + const logs = await this._ethClient.getLogs({ blockHash, contract }); const dbEvents = logs.map((logObj: any) => { @@ -134,8 +138,7 @@ export class Indexer { let eventName; let eventProps = {}; - // TODO: Get contract kind from contracts table. - if (contract === this._config.contracts.factory) { + if (uniContract.kind === KIND_FACTORY) { const logDescription = this._factoryContract.parseLog({ data, topics }); const { token0, token1, fee, tickSpacing, pool } = logDescription.args;