diff --git a/README.md b/README.md index 9222ddcc..cca34fb5 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ The default config files used by the watchers assume the following services are #### Note * In `vulcanize/ipld-eth-server`, add the following statement to `[ethereum]` section in `environments/config.toml`: - + `chainConfig = "./chain.json" # ETH_CHAIN_CONFIG` ### Databases @@ -46,11 +46,24 @@ createdb uni-info-watcher Create the databases for the job queues and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro): ``` +createdb erc20-watcher-job-queue createdb address-watcher-job-queue createdb uni-watcher-job-queue createdb uni-info-watcher-job-queue ``` +``` +postgres@tesla:~$ psql -U postgres -h localhost erc20-watcher-job-queue +Password for user postgres: +psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) +SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) +Type "help" for help. + +erc20-watcher-job-queue=# CREATE EXTENSION pgcrypto; +CREATE EXTENSION +erc20-watcher-job-queue=# exit +``` + ``` postgres@tesla:~$ psql -U postgres -h localhost address-watcher-job-queue Password for user postgres: diff --git a/packages/erc20-watcher/README.md b/packages/erc20-watcher/README.md index 847819e2..fa0ca998 100644 --- a/packages/erc20-watcher/README.md +++ b/packages/erc20-watcher/README.md @@ -2,15 +2,38 @@ ## Setup -Create a postgres12 database and provide connection settings in `environments/local.toml`. +Create a postgres12 database for the job queue: -For example: +``` +sudo su - postgres +createdb erc20-watcher-job-queue +``` + +Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro). + +Example: + +``` +postgres@tesla:~$ psql -U postgres -h localhost erc20-watcher-job-queue +Password for user postgres: +psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) +SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) +Type "help" for help. + +erc20-watcher-job-queue=# CREATE EXTENSION pgcrypto; +CREATE EXTENSION +erc20-watcher-job-queue=# exit +``` + +Create a postgres12 database for the erc20 watcher: ``` sudo su - postgres createdb erc20-watcher ``` +Update `environments/local.toml` with database connection settings for both the databases. + Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints. ## Run @@ -24,17 +47,59 @@ yarn build Run the watcher: ```bash -yarn run server +$ yarn server # For development. -yarn run server:dev +$ yarn server:dev # For specifying config file. -yarn run server -f environments/local.toml +$ yarn server -f environments/local.toml +``` + +Start the job runner: + +```bash +$ yarn job-runner + +# For development. +$ yarn job-runner:dev + +# For specifying config file. +$ yarn job-runner -f environments/local.toml ``` GQL console: http://localhost:3001/graphql +Start watching a token: + +```bash +$ yarn watch:contract --address 0xTokenAddress --startingBlock + +# For specifying config file. +$ yarn watch:contract -f environments/local.toml --address 0xTokenAddress --startingBlock +``` + +Example: + +```bash +$ yarn watch:contract --address 0xfE0034a874c2707c23F91D7409E9036F5e08ac34 --startingBlock 100 +``` + +To fill a block range: + +```bash +yarn fill --startBlock --endBlock + +# For specifying config file. +$ yarn fill -f environments/local.toml --startBlock --endBlock +``` + +Example: + +```bash +$ yarn fill --startBlock 1000 --endBlock 2000 +``` + ### Example GQL Queries ```text diff --git a/packages/erc20-watcher/environments/local.toml b/packages/erc20-watcher/environments/local.toml index 853febd5..e718d9b8 100644 --- a/packages/erc20-watcher/environments/local.toml +++ b/packages/erc20-watcher/environments/local.toml @@ -23,3 +23,8 @@ name = "requests" enabled = false deleteOnStart = false + +[jobQueue] + dbConnectionString = "postgres://postgres:postgres@localhost/erc20-watcher-job-queue" + maxCompletionLagInSecs = 300 + jobDelayInMilliSecs = 100 diff --git a/packages/erc20-watcher/hardhat.config.ts b/packages/erc20-watcher/hardhat.config.ts index b3419e70..a0a14b2c 100644 --- a/packages/erc20-watcher/hardhat.config.ts +++ b/packages/erc20-watcher/hardhat.config.ts @@ -6,6 +6,8 @@ import '@nomiclabs/hardhat-waffle'; import './test/tasks/token-deploy'; import './test/tasks/token-transfer'; +import './test/tasks/token-approve'; +import './test/tasks/token-transfer-from'; import './test/tasks/block-latest'; // You need to export an object to set up your config diff --git a/packages/erc20-watcher/package.json b/packages/erc20-watcher/package.json index 453e1011..aecbf9c4 100644 --- a/packages/erc20-watcher/package.json +++ b/packages/erc20-watcher/package.json @@ -11,12 +11,20 @@ "server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js", "server:dev": "DEBUG=vulcanize:* nodemon --watch src src/server.ts", "server:mock": "MOCK=1 nodemon src/server.ts", + "job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js", + "job-runner:dev": "DEBUG=vulcanize:* nodemon --watch src src/job-runner.ts", "watch:contract": "node --enable-source-maps dist/cli/watch-contract.js", "watch:contract:dev": "ts-node src/cli/watch-contract.ts", + "fill": "DEBUG=vulcanize:* node dist/fill.js", + "fill:dev": "DEBUG=vulcanize:* ts-node src/fill.ts", "token:deploy": "hardhat --network localhost token-deploy", "token:deploy:docker": "hardhat --network docker token-deploy", "token:transfer": "hardhat --network localhost token-transfer", "token:transfer:docker": "hardhat --network docker token-transfer", + "token:approve": "hardhat --network localhost token-approve", + "token:approve:docker": "hardhat --network docker token-approve", + "token:transfer-from": "hardhat --network localhost token-transfer-from", + "token:transfer-from:docker": "hardhat --network docker token-transfer-from", "block:latest": "hardhat --network localhost block-latest", "block:latest:docker": "hardhat --network docker block-latest" }, diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index 73f8f8b6..1afbbbeb 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, DeepPartial } from 'typeorm'; +import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner } from 'typeorm'; import path from 'path'; import { Database as BaseDatabase } from '@vulcanize/util'; @@ -12,7 +12,10 @@ import { Allowance } from './entity/Allowance'; import { Balance } from './entity/Balance'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; -import { EventSyncProgress } from './entity/EventProgress'; +import { SyncStatus } from './entity/SyncStatus'; +import { BlockProgress } from './entity/BlockProgress'; + +const CONTRACT_KIND = 'token'; export class Database { _config: ConnectionOptions @@ -73,87 +76,112 @@ export class Database { return repo.save(entity); } - // Returns true if events have already been synced for the (block, token) combination. - async didSyncEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { - const numRows = await this._conn.getRepository(EventSyncProgress) - .createQueryBuilder() - .where('block_hash = :blockHash AND token = :token', { - blockHash, - token - }) - .getCount(); + async getContract (address: string): Promise { + const repo = this._conn.getRepository(Contract); - return numRows > 0; + return this._baseDatabase.getContract(repo, address); } - async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { - return this._conn.getRepository(Event) - .createQueryBuilder('event') - .where('block_hash = :blockHash AND token = :token', { - blockHash, - token - }) - .addOrderBy('id', 'ASC') - .getMany(); + async createTransactionRunner (): Promise { + return this._baseDatabase.createTransactionRunner(); } - async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise { - return this._conn.getRepository(Event) - .createQueryBuilder('event') - .where('block_hash = :blockHash AND token = :token AND event_name = :eventName', { - blockHash, - token, - eventName - }) - .getMany(); + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + const repo = this._conn.getRepository(BlockProgress); + + return this._baseDatabase.getProcessedBlockCountForRange(repo, fromBlockNumber, toBlockNumber); } - async saveEvents ({ blockHash, token, events }: { blockHash: string, token: string, events: DeepPartial[] }): Promise { - // In a transaction: - // (1) Save all the events in the database. - // (2) Add an entry to the event progress table. + async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { + const repo = this._conn.getRepository(Event); - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(EventSyncProgress); - - // Check sync progress inside the transaction. - const numRows = await repo - .createQueryBuilder() - .where('block_hash = :blockHash AND token = :token', { - blockHash, - token - }) - .getCount(); - - if (numRows === 0) { - // Bulk insert events. - await tx.createQueryBuilder() - .insert() - .into(Event) - .values(events) - .execute(); - - // Update event sync progress. - const progress = repo.create({ blockHash, token }); - await repo.save(progress); - } - }); + return this._baseDatabase.getEventsInRange(repo, fromBlockNumber, toBlockNumber); } - async isWatchedContract (address: string): Promise { - const numRows = await this._conn.getRepository(Contract) - .createQueryBuilder() - .where('address = :address', { address }) - .getCount(); + async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise { + const repo = queryRunner.manager.getRepository(Event); + return this._baseDatabase.saveEventEntity(repo, entity); + } - return numRows > 0; + async getBlockEvents (blockHash: string, where: FindConditions): Promise { + const repo = this._conn.getRepository(Event); + + return this._baseDatabase.getBlockEvents(repo, blockHash, where); + } + + async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + const blockRepo = queryRunner.manager.getRepository(BlockProgress); + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); } async saveContract (address: string, startingBlock: number): Promise { await this._conn.transaction(async (tx) => { const repo = tx.getRepository(Contract); - return this._baseDatabase.saveContract(repo, address, startingBlock); + return this._baseDatabase.saveContract(repo, address, startingBlock, CONTRACT_KIND); }); } + + async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); + + return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber); + } + + async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); + + return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber); + } + + async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); + + return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber); + } + + async getSyncStatus (queryRunner: QueryRunner): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); + + return this._baseDatabase.getSyncStatus(repo); + } + + async getEvent (id: string): Promise { + const repo = this._conn.getRepository(Event); + + return this._baseDatabase.getEvent(repo, id); + } + + async getBlocksAtHeight (height: number, isPruned: boolean): Promise { + const repo = this._conn.getRepository(BlockProgress); + + return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned); + } + + async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.markBlocksAsPruned(repo, blocks); + } + + async getBlockProgress (blockHash: string): Promise { + const repo = this._conn.getRepository(BlockProgress); + return this._baseDatabase.getBlockProgress(repo, blockHash); + } + + async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + } + + async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { + return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); + } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseDatabase.getAncestorAtDepth(blockHash, depth); + } } diff --git a/packages/erc20-watcher/src/entity/BlockProgress.ts b/packages/erc20-watcher/src/entity/BlockProgress.ts new file mode 100644 index 00000000..e67cf2dd --- /dev/null +++ b/packages/erc20-watcher/src/entity/BlockProgress.ts @@ -0,0 +1,43 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; + +import { BlockProgressInterface } from '@vulcanize/util'; + +@Entity() +@Index(['blockHash'], { unique: true }) +@Index(['blockNumber']) +@Index(['parentHash']) +export class BlockProgress implements BlockProgressInterface { + @PrimaryGeneratedColumn() + id!: number; + + @Column('varchar', { length: 66 }) + blockHash!: string; + + @Column('varchar', { length: 66 }) + parentHash!: string; + + @Column('integer') + blockNumber!: number; + + @Column('integer') + blockTimestamp!: number; + + @Column('integer') + numEvents!: number; + + @Column('integer') + numProcessedEvents!: number; + + @Column('integer') + lastProcessedEventIndex!: number; + + @Column('boolean') + isComplete!: boolean + + @Column('boolean', { default: false }) + isPruned!: boolean +} diff --git a/packages/erc20-watcher/src/entity/Contract.ts b/packages/erc20-watcher/src/entity/Contract.ts index 272c33c7..83c99dcb 100644 --- a/packages/erc20-watcher/src/entity/Contract.ts +++ b/packages/erc20-watcher/src/entity/Contract.ts @@ -13,6 +13,9 @@ export class Contract { @Column('varchar', { length: 42 }) address!: string; + @Column('varchar', { length: 8 }) + kind!: string; + @Column('integer') startingBlock!: number; } diff --git a/packages/erc20-watcher/src/entity/Event.ts b/packages/erc20-watcher/src/entity/Event.ts index 5fe951db..7971bf56 100644 --- a/packages/erc20-watcher/src/entity/Event.ts +++ b/packages/erc20-watcher/src/entity/Event.ts @@ -2,48 +2,42 @@ // Copyright 2021 Vulcanize, Inc. // -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm'; +import { BlockProgress } from './BlockProgress'; + +export const UNKNOWN_EVENT_NAME = '__unknown__'; @Entity() // Index to query all events for a contract efficiently. -@Index(['blockHash', 'token']) -// Index to query 'Transfer' events efficiently. -@Index(['blockHash', 'token', 'eventName', 'transferFrom', 'transferTo']) -// Index to query 'Approval' events efficiently. -@Index(['blockHash', 'token', 'eventName', 'approvalOwner', 'approvalSpender']) +@Index(['block', 'contract']) +// Index to query events by name efficiently. +@Index(['block', 'contract', 'eventName']) export class Event { @PrimaryGeneratedColumn() id!: number; + @ManyToOne(() => BlockProgress) + block!: BlockProgress; + @Column('varchar', { length: 66 }) - blockHash!: string; + txHash!: string; + + // Index of the log in the block. + @Column('integer') + index!: number; @Column('varchar', { length: 42 }) - token!: string; + contract!: string; @Column('varchar', { length: 256 }) eventName!: string; + @Column('text') + eventInfo!: string; + + @Column('text') + extraInfo!: string; + @Column('text') proof!: string; - - // Transfer event columns. - @Column('varchar', { length: 42, nullable: true }) - transferFrom!: string; - - @Column('varchar', { length: 42, nullable: true }) - transferTo!: string; - - @Column('numeric', { nullable: true }) - transferValue!: bigint; - - // Approval event columns. - @Column('varchar', { length: 42, nullable: true }) - approvalOwner!: string; - - @Column('varchar', { length: 42, nullable: true }) - approvalSpender!: string; - - @Column('numeric', { nullable: true }) - approvalValue!: bigint; } diff --git a/packages/erc20-watcher/src/entity/EventProgress.ts b/packages/erc20-watcher/src/entity/EventProgress.ts deleted file mode 100644 index ffb48946..00000000 --- a/packages/erc20-watcher/src/entity/EventProgress.ts +++ /dev/null @@ -1,24 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; - -// Stores a row if events for a (block, token) combination have already been fetched. -// -// Required as a particular block may not have events from a particular contract, -// and we need to differentiate between that case and the case where data hasn't -// yet been synced from upstream. -// -@Entity() -@Index(['blockHash', 'token'], { unique: true }) -export class EventSyncProgress { - @PrimaryGeneratedColumn() - id!: number; - - @Column('varchar', { length: 66 }) - blockHash!: string; - - @Column('varchar', { length: 42 }) - token!: string; -} diff --git a/packages/erc20-watcher/src/entity/SyncStatus.ts b/packages/erc20-watcher/src/entity/SyncStatus.ts new file mode 100644 index 00000000..e5a52802 --- /dev/null +++ b/packages/erc20-watcher/src/entity/SyncStatus.ts @@ -0,0 +1,37 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm'; + +import { SyncStatusInterface } from '@vulcanize/util'; + +@Entity() +export class SyncStatus implements SyncStatusInterface { + @PrimaryGeneratedColumn() + id!: number; + + // Latest block hash and number from the chain itself. + @Column('varchar', { length: 66 }) + chainHeadBlockHash!: string; + + @Column('integer') + chainHeadBlockNumber!: number; + + // Most recent block hash that's been indexed. + @Column('varchar', { length: 66 }) + latestIndexedBlockHash!: string; + + // Most recent block number that's been indexed. + @Column('integer') + latestIndexedBlockNumber!: number; + + // Most recent block hash and number that we can consider as part + // of the canonical/finalized chain. Reorgs older than this block + // cannot be processed and processing will halt. + @Column('varchar', { length: 66 }) + latestCanonicalBlockHash!: string; + + @Column('integer') + latestCanonicalBlockNumber!: number; +} diff --git a/packages/erc20-watcher/src/events.ts b/packages/erc20-watcher/src/events.ts index 7381ad1b..35e91dd2 100644 --- a/packages/erc20-watcher/src/events.ts +++ b/packages/erc20-watcher/src/events.ts @@ -4,11 +4,20 @@ import assert from 'assert'; import debug from 'debug'; -import _ from 'lodash'; +import { PubSub } from 'apollo-server-express'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { + JobQueue, + EventWatcher as BaseEventWatcher, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING +} from '@vulcanize/util'; import { Indexer } from './indexer'; +import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; + +const EVENT = 'event'; const log = debug('vulcanize:events'); @@ -16,48 +25,88 @@ export class EventWatcher { _ethClient: EthClient _indexer: Indexer _subscription: ZenObservable.Subscription | undefined + _baseEventWatcher: BaseEventWatcher + _pubsub: PubSub + _jobQueue: JobQueue - constructor (ethClient: EthClient, indexer: Indexer) { + constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); this._ethClient = ethClient; this._indexer = indexer; + this._pubsub = pubsub; + this._jobQueue = jobQueue; + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); + } + + getEventIterator (): AsyncIterator { + return this._pubsub.asyncIterator([EVENT]); + } + + getBlockProgressEventIterator (): AsyncIterator { + return this._baseEventWatcher.getBlockProgressEventIterator(); } async start (): Promise { assert(!this._subscription, 'subscription already started'); - log('Started watching upstream logs...'); + await this.watchBlocksAtChainHead(); + await this.initBlockProcessingOnCompleteHandler(); + await this.initEventProcessingOnCompleteHandler(); + } - this._subscription = await this._ethClient.watchLogs(async (value) => { - const receipt = _.get(value, 'data.listen.relatedNode'); - log('watchLogs', JSON.stringify(receipt, null, 2)); + async stop (): Promise { + this._baseEventWatcher.stop(); + } - // Check if this log is for a contract we care about. - const { logContracts } = receipt; - if (logContracts && logContracts.length) { - for (let logIndex = 0; logIndex < logContracts.length; logIndex++) { - const contractAddress = logContracts[logIndex]; - const isWatchedContract = await this._indexer.isWatchedContract(contractAddress); - if (isWatchedContract) { - // TODO: Move processing to background task runner. + async watchBlocksAtChainHead (): Promise { + log('Started watching upstream blocks...'); + this._subscription = await this._ethClient.watchBlocks(async (value) => { + await this._baseEventWatcher.blocksHandler(value); + }); + } - const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt; - await this._indexer.getEvents(blockHash, contractAddress, null); + async initBlockProcessingOnCompleteHandler (): Promise { + this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { + await this._baseEventWatcher.blockProcessingCompleteHandler(job); + }); + } - // Trigger other indexer methods based on event topic. - await this._indexer.processEvent(blockHash, blockNumber, contractAddress, receipt, logIndex); - } + async initEventProcessingOnCompleteHandler (): Promise { + await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); + + const { data: { request, failed, state, createdOn } } = job; + + const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; + log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); } } }); } - async stop (): Promise { - if (this._subscription) { - log('Stopped watching upstream logs'); - this._subscription.unsubscribe(); + async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise { + if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) { + const { block: { blockHash }, contract: token } = dbEvent; + const resultEvent = this._indexer.getResultEvent(dbEvent); + + log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`); + + // Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`. + await this._pubsub.publish(EVENT, { + onTokenEvent: { + blockHash, + token, + event: resultEvent + } + }); } } } diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts new file mode 100644 index 00000000..e8ed9bd6 --- /dev/null +++ b/packages/erc20-watcher/src/fill.ts @@ -0,0 +1,95 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; +import { PubSub } from 'apollo-server-express'; +import { getDefaultProvider } from 'ethers'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; + +import { Database } from './database'; +import { Indexer } from './indexer'; +import { EventWatcher } from './events'; + +const log = debug('vulcanize:server'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + startBlock: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to start processing at' + }, + endBlock: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to stop processing at' + } + }).argv; + + const config = await getConfig(argv.configFile); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ + gqlEndpoint: gqlPostgraphileEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + const indexer = new Indexer(db, ethClient, ethProvider, mode); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); + + assert(jobQueueConfig, 'Missing job queue config'); + + await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv); +}; + +main().then(() => { + process.exit(); +}).catch(err => { + log(err); +}); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index e64aa781..026cb4ba 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -4,38 +4,32 @@ import assert from 'assert'; import debug from 'debug'; -import { invert } from 'lodash'; import { JsonFragment } from '@ethersproject/abi'; import { DeepPartial } from 'typeorm'; import JSONbig from 'json-bigint'; import { BigNumber, ethers } from 'ethers'; import { BaseProvider } from '@ethersproject/providers'; -import { PubSub } from 'apollo-server-express'; -import { EthClient, topictoAddress } from '@vulcanize/ipld-eth-client'; -import { getEventNameTopics, getStorageValue, GetStorageAt, StorageLayout } from '@vulcanize/solidity-mapper'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { StorageLayout } from '@vulcanize/solidity-mapper'; +import { EventInterface, Indexer as BaseIndexer, ValueResult } from '@vulcanize/util'; import { Database } from './database'; -import { Event } from './entity/Event'; +import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; import { fetchTokenDecimals, fetchTokenName, fetchTokenSymbol, fetchTokenTotalSupply } from './utils'; +import { SyncStatus } from './entity/SyncStatus'; +import artifacts from './artifacts/ERC20.json'; +import { BlockProgress } from './entity/BlockProgress'; +import { Contract } from './entity/Contract'; const log = debug('vulcanize:indexer'); const ETH_CALL_MODE = 'eth_call'; -interface Artifacts { - abi: JsonFragment[]; - storageLayout: StorageLayout; -} +const TRANSFER_EVENT = 'Transfer'; +const APPROVAL_EVENT = 'Approval'; -export interface ValueResult { - value: string | bigint; - proof?: { - data: string; - } -} - -type EventsResult = Array<{ +interface EventResult { event: { from?: string; to?: string; @@ -45,46 +39,51 @@ type EventsResult = Array<{ __typename: string; } proof?: string; -}> +} export class Indexer { _db: Database _ethClient: EthClient - _pubsub: PubSub - _getStorageAt: GetStorageAt _ethProvider: BaseProvider + _baseIndexer: BaseIndexer _abi: JsonFragment[] _storageLayout: StorageLayout _contract: ethers.utils.Interface _serverMode: string - constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, pubsub: PubSub, artifacts: Artifacts, serverMode: string) { + constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, serverMode: string) { assert(db); assert(ethClient); - assert(pubsub); - assert(artifacts); + + this._db = db; + this._ethClient = ethClient; + this._ethProvider = ethProvider; + this._serverMode = serverMode; + this._baseIndexer = new BaseIndexer(this._db, this._ethClient); const { abi, storageLayout } = artifacts; assert(abi); assert(storageLayout); - this._db = db; - this._ethClient = ethClient; - this._ethProvider = ethProvider; - this._pubsub = pubsub; - this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); - this._serverMode = serverMode; - this._abi = abi; this._storageLayout = storageLayout; this._contract = new ethers.utils.Interface(this._abi); } - getEventIterator (): AsyncIterator { - return this._pubsub.asyncIterator(['event']); + getResultEvent (event: Event): EventResult { + const eventFields = JSON.parse(event.eventInfo); + + return { + event: { + __typename: `${event.eventName}Event`, + ...eventFields + }, + // TODO: Return proof only if requested. + proof: JSON.parse(event.proof) + }; } async totalSupply (blockHash: string, token: string): Promise { @@ -95,7 +94,7 @@ export class Indexer { result = { value }; } else { - result = await this._getStorageValue(blockHash, token, '_totalSupply'); + result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_totalSupply'); } // https://github.com/GoogleChromeLabs/jsbi/issues/30#issuecomment-521460510 @@ -130,7 +129,7 @@ export class Indexer { value: BigInt(value.toString()) }; } else { - result = await this._getStorageValue(blockHash, token, '_balances', owner); + result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_balances', owner); } log(JSONbig.stringify(result, null, 2)); @@ -165,7 +164,7 @@ export class Indexer { value: BigInt(value.toString()) }; } else { - result = await this._getStorageValue(blockHash, token, '_allowances', owner, spender); + result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_allowances', owner, spender); } // log(JSONbig.stringify(result, null, 2)); @@ -184,7 +183,7 @@ export class Indexer { result = { value }; } else { - result = await this._getStorageValue(blockHash, token, '_name'); + result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_name'); } // log(JSONbig.stringify(result, null, 2)); @@ -200,7 +199,7 @@ export class Indexer { result = { value }; } else { - result = await this._getStorageValue(blockHash, token, '_symbol'); + result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_symbol'); } // log(JSONbig.stringify(result, null, 2)); @@ -224,88 +223,24 @@ export class Indexer { return result; } - async getEvents (blockHash: string, token: string, name: string | null): Promise { - const didSyncEvents = await this._db.didSyncEvents({ blockHash, token }); - if (!didSyncEvents) { - // Fetch and save events first and make a note in the event sync progress table. - await this._fetchAndSaveEvents({ blockHash, token }); - log('getEvents: db miss, fetching from upstream server'); - } - - assert(await this._db.didSyncEvents({ blockHash, token })); - - const events = await this._db.getEvents({ blockHash, token }); - log('getEvents: db hit'); - - const result = events - // TODO: Filter using db WHERE condition when name is not empty. - .filter(event => !name || name === event.eventName) - .map(e => { - const eventFields: { - from?: string, - to?: string, - value?: BigInt, - owner?: string, - spender?: string, - } = {}; - - switch (e.eventName) { - case 'Transfer': { - eventFields.from = e.transferFrom; - eventFields.to = e.transferTo; - eventFields.value = e.transferValue; - break; - } - case 'Approval': { - eventFields.owner = e.approvalOwner; - eventFields.spender = e.approvalSpender; - eventFields.value = e.approvalValue; - break; - } - } - - return { - event: { - __typename: `${e.eventName}Event`, - ...eventFields - }, - // TODO: Return proof only if requested. - proof: JSON.parse(e.proof) - }; - }); - - // log(JSONbig.stringify(result, null, 2)); - - return result; - } - - async triggerIndexingOnEvent (blockHash: string, blockNumber: number, token: string, receipt: any, logIndex: number): Promise { - const topics = []; - - // We only care about the event type for now. - const data = '0x0000000000000000000000000000000000000000000000000000000000000000'; - - topics.push(receipt.topic0S[logIndex]); - topics.push(receipt.topic1S[logIndex]); - topics.push(receipt.topic2S[logIndex]); - - const { name: eventName, args } = this._contract.parseLog({ topics, data }); - log(`trigger indexing on event: ${eventName} ${args}`); + async triggerIndexingOnEvent (event: Event): Promise { + const { eventName, eventInfo, contract: token, block: { blockHash } } = event; + const eventFields = JSON.parse(eventInfo); // What data we index depends on the kind of event. switch (eventName) { - case 'Transfer': { + case TRANSFER_EVENT: { // On a transfer, balances for both parties change. // Therefore, trigger indexing for both sender and receiver. - const [from, to] = args; + const { from, to } = eventFields; await this.balanceOf(blockHash, token, from); await this.balanceOf(blockHash, token, to); break; } - case 'Approval': { + case APPROVAL_EVENT: { // Update allowance for (owner, spender) combination. - const [owner, spender] = args; + const { owner, spender } = eventFields; await this.allowance(blockHash, token, owner, spender); break; @@ -313,35 +248,44 @@ export class Indexer { } } - async publishEventToSubscribers (blockHash: string, token: string, logIndex: number): Promise { - // TODO: Optimize this fetching of events. - const events = await this.getEvents(blockHash, token, null); - const event = events[logIndex]; - - log(`pushing event to GQL subscribers: ${event.event.__typename}`); - - // Publishing the event here will result in pushing the payload to GQL subscribers for `onTokenEvent`. - await this._pubsub.publish('event', { - onTokenEvent: { - blockHash, - token, - event - } - }); - } - - async processEvent (blockHash: string, blockNumber: number, token: string, receipt: any, logIndex: number): Promise { + async processEvent (event: Event): Promise { // Trigger indexing of data based on the event. - await this.triggerIndexingOnEvent(blockHash, blockNumber, token, receipt, logIndex); - - // Also trigger downstream event watcher subscriptions. - await this.publishEventToSubscribers(blockHash, token, logIndex); + await this.triggerIndexingOnEvent(event); } - async isWatchedContract (address : string): Promise { - assert(address); + parseEventNameAndArgs (kind: string, logObj: any): any { + let eventName = UNKNOWN_EVENT_NAME; + let eventInfo = {}; - return this._db.isWatchedContract(ethers.utils.getAddress(address)); + const { topics, data } = logObj; + const logDescription = this._contract.parseLog({ data, topics }); + + switch (logDescription.name) { + case TRANSFER_EVENT: { + eventName = logDescription.name; + const [from, to, value] = logDescription.args; + eventInfo = { + from, + to, + value: value.toString() + }; + + break; + } + case APPROVAL_EVENT: { + eventName = logDescription.name; + const [owner, spender, value] = logDescription.args; + eventInfo = { + owner, + spender, + value: value.toString() + }; + + break; + } + } + + return { eventName, eventInfo }; } async watchContract (address: string, startingBlock: number): Promise { @@ -351,67 +295,156 @@ export class Indexer { return true; } - // TODO: Move into base/class or framework package. - async _getStorageValue (blockHash: string, token: string, variable: string, ...mappingKeys: string[]): Promise { - return getStorageValue( - this._storageLayout, - this._getStorageAt, - blockHash, - token, - variable, - ...mappingKeys - ); + async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { - const { logs } = await this._ethClient.getLogs({ blockHash, contract: token }); + async isWatchedContract (address : string): Promise { + return this._baseIndexer.isWatchedContract(address); + } - const eventNameToTopic = getEventNameTopics(this._abi); - const logTopicToEventName = invert(eventNameToTopic); + async saveEventEntity (dbEvent: Event): Promise { + return this._baseIndexer.saveEventEntity(dbEvent); + } - const dbEvents = logs.map((log: any) => { - const { topics, data: value, cid, ipldBlock } = log; + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + return this._baseIndexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); + } - const [topic0, topic1, topic2] = topics; + async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { + return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber); + } - const eventName = logTopicToEventName[topic0]; - const address1 = topictoAddress(topic1); - const address2 = topictoAddress(topic2); + async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber); + } - const event: DeepPartial = { - blockHash, - token, - eventName, + async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); + } - proof: JSONbig.stringify({ - data: JSONbig.stringify({ - blockHash, - receipt: { - cid, - ipldBlock - } + async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber); + } + + async getSyncStatus (): Promise { + return this._baseIndexer.getSyncStatus(); + } + + async getBlock (blockHash: string): Promise { + return this._baseIndexer.getBlock(blockHash); + } + + async getEvent (id: string): Promise { + return this._baseIndexer.getEvent(id); + } + + async getBlockProgress (blockHash: string): Promise { + return this._baseIndexer.getBlockProgress(blockHash); + } + + async getBlocksAtHeight (height: number, isPruned: boolean): Promise { + return this._baseIndexer.getBlocksAtHeight(height, isPruned); + } + + async getOrFetchBlockEvents (block: DeepPartial): Promise> { + return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + } + + async getBlockEvents (blockHash: string): Promise> { + return this._baseIndexer.getBlockEvents(blockHash); + } + + async markBlocksAsPruned (blocks: BlockProgress[]): Promise { + return this._baseIndexer.markBlocksAsPruned(blocks); + } + + async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseIndexer.getAncestorAtDepth(blockHash, depth); + } + + async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { + assert(blockHash); + let { block, logs } = await this._ethClient.getLogs({ blockHash }); + + const dbEvents: Array> = []; + + for (let li = 0; li < logs.length; li++) { + const logObj = logs[li]; + const { + topics, + data, + index: logIndex, + cid, + ipldBlock, + account: { + address + }, + transaction: { + hash: txHash + }, + receiptCID, + status + } = logObj; + + if (status) { + let eventName = UNKNOWN_EVENT_NAME; + let eventInfo = {}; + const extraInfo = { topics, data }; + + const contract = ethers.utils.getAddress(address); + const watchedContract = await this.isWatchedContract(contract); + + if (watchedContract) { + const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); + eventName = eventDetails.eventName; + eventInfo = eventDetails.eventInfo; + } + + dbEvents.push({ + index: logIndex, + txHash, + contract, + eventName, + eventInfo: JSONbig.stringify(eventInfo), + extraInfo: JSONbig.stringify(extraInfo), + proof: JSONbig.stringify({ + data: JSONbig.stringify({ + blockHash, + receiptCID, + log: { + cid, + ipldBlock + } + }) }) - }) + }); + } else { + log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); + } + } + + const dbTx = await this._db.createTransactionRunner(); + + try { + block = { + blockHash, + blockNumber: block.number, + blockTimestamp: block.timestamp, + parentHash: block.parent.hash }; - switch (eventName) { - case 'Transfer': { - event.transferFrom = address1; - event.transferTo = address2; - event.transferValue = BigInt(value); - break; - } - case 'Approval': { - event.approvalOwner = address1; - event.approvalSpender = address2; - event.approvalValue = BigInt(value); - break; - } - } - - return event; - }); - - await this._db.saveEvents({ blockHash, token, events: dbEvents }); + await this._db.saveEvents(dbTx, block, dbEvents); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } } diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts new file mode 100644 index 00000000..aa68e8fd --- /dev/null +++ b/packages/erc20-watcher/src/job-runner.ts @@ -0,0 +1,126 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; +import { getDefaultProvider } from 'ethers'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; +import { + getConfig, + JobQueue, + JobRunner as BaseJobRunner, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + JobQueueConfig, + DEFAULT_CONFIG_PATH +} from '@vulcanize/util'; + +import { Indexer } from './indexer'; +import { Database } from './database'; + +const log = debug('vulcanize:job-runner'); + +export class JobRunner { + _indexer: Indexer + _jobQueue: JobQueue + _baseJobRunner: BaseJobRunner + _jobQueueConfig: JobQueueConfig + + constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { + this._indexer = indexer; + this._jobQueue = jobQueue; + this._jobQueueConfig = jobQueueConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); + } + + async start (): Promise { + await this.subscribeBlockProcessingQueue(); + await this.subscribeEventProcessingQueue(); + } + + async subscribeBlockProcessingQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + await this._baseJobRunner.processBlock(job); + + await this._jobQueue.markComplete(job); + }); + } + + async subscribeEventProcessingQueue (): Promise { + await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + const event = await this._baseJobRunner.processEvent(job); + + const watchedContract = await this._indexer.isWatchedContract(event.contract); + if (watchedContract) { + await this._indexer.processEvent(event); + } + + await this._jobQueue.markComplete(job); + }); + } +} + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)) + .option('f', { + alias: 'config-file', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string', + default: DEFAULT_CONFIG_PATH + }) + .argv; + + const config = await getConfig(argv.f); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); + assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + + const cache = await getCache(cacheConfig); + const ethClient = new EthClient({ + gqlEndpoint: gqlApiEndpoint, + gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + cache + }); + + const ethProvider = getDefaultProvider(rpcProviderEndpoint); + const indexer = new Indexer(db, ethClient, ethProvider, mode); + + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); + await jobRunner.start(); +}; + +main().then(() => { + log('Starting job runner...'); +}).catch(err => { + log(err); +}); + +process.on('uncaughtException', err => { + log('uncaughtException', err); +}); diff --git a/packages/erc20-watcher/src/resolvers.ts b/packages/erc20-watcher/src/resolvers.ts index a4ef9c65..bc5aad96 100644 --- a/packages/erc20-watcher/src/resolvers.ts +++ b/packages/erc20-watcher/src/resolvers.ts @@ -6,11 +6,14 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; -import { Indexer, ValueResult } from './indexer'; +import { ValueResult } from '@vulcanize/util'; + +import { Indexer } from './indexer'; +import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer): Promise => { +export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { assert(indexer); return { @@ -26,7 +29,7 @@ export const createResolvers = async (indexer: Indexer): Promise => { Subscription: { onTokenEvent: { - subscribe: () => indexer.getEventIterator() + subscribe: () => eventWatcher.getEventIterator() } }, @@ -71,7 +74,26 @@ export const createResolvers = async (indexer: Indexer): Promise => { events: async (_: any, { blockHash, token, name }: { blockHash: string, token: string, name: string }) => { log('events', blockHash, token, name || ''); - return indexer.getEvents(blockHash, token, name); + + const block = await indexer.getBlockProgress(blockHash); + if (!block || !block.isComplete) { + throw new Error(`Block hash ${blockHash} number ${block?.blockNumber} not processed yet`); + } + + const events = await indexer.getEventsByFilter(blockHash, token, name); + return events.map(event => indexer.getResultEvent(event)); + }, + + eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => { + log('eventsInRange', fromBlockNumber, toBlockNumber); + + const { expected, actual } = await indexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); + if (expected !== actual) { + throw new Error(`Range not available, expected ${expected}, got ${actual} blocks in range`); + } + + const events = await indexer.getEventsInRange(fromBlockNumber, toBlockNumber); + return events.map(event => indexer.getResultEvent(event)); } } }; diff --git a/packages/erc20-watcher/src/schema.ts b/packages/erc20-watcher/src/schema.ts index 3f52d95b..c3f02326 100644 --- a/packages/erc20-watcher/src/schema.ts +++ b/packages/erc20-watcher/src/schema.ts @@ -133,6 +133,12 @@ type Query { token: String! name: String ): [ResultEvent!] + + # Get token events in a given block range. + eventsInRange( + fromBlockNumber: Int! + toBlockNumber: Int! + ): [ResultEvent!] } # diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 8784d208..45d81673 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -15,9 +15,8 @@ import { getDefaultProvider } from 'ethers'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; +import { DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@vulcanize/util'; -import artifacts from './artifacts/ERC20.json'; import typeDefs from './schema'; import { createResolvers as createMockResolvers } from './mock/resolvers'; @@ -45,7 +44,7 @@ export const main = async (): Promise => { const { host, port, mode } = config.server; - const { upstream, database: dbConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; assert(dbConfig, 'Missing database config'); @@ -69,12 +68,20 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, ethProvider, pubsub, artifacts, mode); + const indexer = new Indexer(db, ethClient, ethProvider, mode); - const eventWatcher = new EventWatcher(ethClient, indexer); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await eventWatcher.start(); - const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer); + const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); const app: Application = express(); const server = new ApolloServer({ diff --git a/packages/erc20-watcher/test/tasks/token-approve.ts b/packages/erc20-watcher/test/tasks/token-approve.ts new file mode 100644 index 00000000..2091b50f --- /dev/null +++ b/packages/erc20-watcher/test/tasks/token-approve.ts @@ -0,0 +1,34 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import { task, types } from 'hardhat/config'; +import '@nomiclabs/hardhat-ethers'; +import { ContractTransaction, BigNumber } from 'ethers'; + +const DEFAULT_APPROVE_AMOUNT = '1000000000000000000000000'; + +task('token-approve', 'Move tokens to recipient') + .addParam('token', 'Token contract address', undefined, types.string) + .addParam('spender', 'Spender address', undefined, types.string) + .addParam('amount', 'Token amount to transfer', DEFAULT_APPROVE_AMOUNT, types.string) + .setAction(async (args, hre) => { + const { token: tokenAddress, amount, spender } = args; + await hre.run('compile'); + const Token = await hre.ethers.getContractFactory('GLDToken'); + const token = Token.attach(tokenAddress); + + const transaction: ContractTransaction = await token.approve(spender, BigNumber.from(amount)); + const receipt = await transaction.wait(); + + if (receipt.events) { + const TransferEvent = receipt.events.find(el => el.event === 'Approval'); + + if (TransferEvent && TransferEvent.args) { + console.log('Approval Event'); + console.log('owner:', TransferEvent.args.owner.toString()); + console.log('spender:', TransferEvent.args.spender.toString()); + console.log('value:', TransferEvent.args.value.toString()); + } + } + }); diff --git a/packages/erc20-watcher/test/tasks/token-transfer-from.ts b/packages/erc20-watcher/test/tasks/token-transfer-from.ts new file mode 100644 index 00000000..09712e24 --- /dev/null +++ b/packages/erc20-watcher/test/tasks/token-transfer-from.ts @@ -0,0 +1,37 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import { task, types } from 'hardhat/config'; +import '@nomiclabs/hardhat-ethers'; +import { ContractTransaction } from 'ethers'; + +task('token-transfer-from', 'Send tokens as spender') + .addParam('token', 'Token contract address', undefined, types.string) + .addParam('spenderKey', 'Spender private key', undefined, types.string) + .addParam('to', 'Transfer recipient address', undefined, types.string) + .addParam('amount', 'Token amount to transfer', undefined, types.int) + .setAction(async (args, hre) => { + const { token: tokenAddress, to, amount, spenderKey } = args; + await hre.run('compile'); + const [owner] = await hre.ethers.getSigners(); + const wallet = new hre.ethers.Wallet(spenderKey, hre.ethers.provider); + const Token = await hre.ethers.getContractFactory('GLDToken'); + let token = Token.attach(tokenAddress); + + token = token.connect(wallet); + const transaction: ContractTransaction = await token.transferFrom(owner.address, to, amount); + + const receipt = await transaction.wait(); + + if (receipt.events) { + const TransferEvent = receipt.events.find(el => el.event === 'Transfer'); + + if (TransferEvent && TransferEvent.args) { + console.log('Transfer Event'); + console.log('from:', TransferEvent.args.from.toString()); + console.log('to:', TransferEvent.args.to.toString()); + console.log('value:', TransferEvent.args.value.toString()); + } + } + }); diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index f7bb710d..a1ba84fc 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -103,10 +103,6 @@ export class EthClient { return this._graphqlClient.subscribe(ethQueries.subscribeBlocks, onNext); } - async watchLogs (onNext: (value: any) => void): Promise { - return this._graphqlClient.subscribe(ethQueries.subscribeLogs, onNext); - } - async watchTransactions (onNext: (value: any) => void): Promise { return this._graphqlClient.subscribe(ethQueries.subscribeTransactions, onNext); } diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index 110328dc..dcf45b2c 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -28,6 +28,8 @@ query getLogs($blockHash: Bytes32!, $contract: Address) { index cid ipldBlock + receiptCID + status } block(hash: $blockHash) { number @@ -75,31 +77,6 @@ query block($blockHash: Bytes32) { } `; -export const subscribeLogs = gql` -subscription SubscriptionReceipt { - listen(topic: "receipt_cids") { - relatedNode { - ... on ReceiptCid { - logContracts - topic0S - topic1S - topic2S - topic3S - contract - ethTransactionCidByTxId { - txHash - ethHeaderCidByHeaderId { - blockHash - blockNumber - parentHash - } - } - } - } - } -} -`; - export const subscribeBlocks = gql` subscription { listen(topic: "header_cids") { @@ -137,7 +114,6 @@ export default { getLogs, getBlockWithTransactions, getBlockByHash, - subscribeLogs, subscribeBlocks, subscribeTransactions }; diff --git a/packages/lighthouse-watcher/src/indexer.ts b/packages/lighthouse-watcher/src/indexer.ts index 7be83235..f90ffe27 100644 --- a/packages/lighthouse-watcher/src/indexer.ts +++ b/packages/lighthouse-watcher/src/indexer.ts @@ -116,54 +116,61 @@ export class Indexer { }, transaction: { hash: txHash - } + }, + receiptCID, + status } = logObj; - const tx = transactionMap[txHash]; - assert(ethers.utils.getAddress(address) === contract); + if (status) { + const tx = transactionMap[txHash]; + assert(ethers.utils.getAddress(address) === contract); - const eventDetails = this.parseEventNameAndArgs(logObj); - const eventName = eventDetails.eventName; - const eventInfo = eventDetails.eventInfo; + const eventDetails = this.parseEventNameAndArgs(logObj); + const eventName = eventDetails.eventName; + const eventInfo = eventDetails.eventInfo; - const { - hash, - number, - timestamp, - parent: { - hash: parentHash - } - } = block; - - events.push({ - block: { + const { hash, number, timestamp, - parentHash - }, - eventIndex: logIndex, - tx: { - hash: txHash, - index: tx.index, - from: tx.src, - to: tx.dst - }, - contract, - event: { - __typename: `${eventName}Event`, - ...eventInfo - }, - proof: { - data: JSONbig.stringify({ - blockHash: hash, - receipt: { - cid, - ipldBlock - } - }) - } - }); + parent: { + hash: parentHash + } + } = block; + + events.push({ + block: { + hash, + number, + timestamp, + parentHash + }, + eventIndex: logIndex, + tx: { + hash: txHash, + index: tx.index, + from: tx.src, + to: tx.dst + }, + contract, + event: { + __typename: `${eventName}Event`, + ...eventInfo + }, + proof: { + data: JSONbig.stringify({ + blockHash, + receiptCID, + log: { + cid, + ipldBlock + } + }) + } + }); + } else { + log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); + } } return events; diff --git a/packages/uni-info-watcher/package.json b/packages/uni-info-watcher/package.json index d773ba16..24a30ea1 100644 --- a/packages/uni-info-watcher/package.json +++ b/packages/uni-info-watcher/package.json @@ -5,23 +5,23 @@ "license": "AGPL-3.0", "private": true, "dependencies": { + "@apollo/client": "^3.3.19", "@vulcanize/cache": "^0.1.0", "@vulcanize/erc20-watcher": "^0.1.0", "@vulcanize/ipld-eth-client": "^0.1.0", - "@vulcanize/util": "^0.1.0", "@vulcanize/uni-watcher": "^0.1.0", - "@apollo/client": "^3.3.19", + "@vulcanize/util": "^0.1.0", "apollo-server-express": "^2.25.0", "apollo-type-bigint": "^0.1.3", - "decimal.js": "^10.3.1", - "typeorm": "^0.2.32", "debug": "^4.3.1", - "reflect-metadata": "^0.1.13", - "graphql-request": "^3.4.0", - "yargs": "^17.0.1", - "json-bigint": "^1.0.0", + "decimal.js": "^10.3.1", "express": "^4.17.1", - "graphql-import-node": "^0.0.4" + "graphql-import-node": "^0.0.4", + "graphql-request": "^3.4.0", + "json-bigint": "^1.0.0", + "reflect-metadata": "^0.1.13", + "typeorm": "^0.2.32", + "yargs": "^17.0.1" }, "scripts": { "lint": "eslint .", @@ -56,13 +56,13 @@ "eslint-plugin-node": "^11.1.0", "eslint-plugin-promise": "^5.1.0", "eslint-plugin-standard": "^5.0.0", + "ethers": "^5.2.0", "get-graphql-schema": "^2.1.2", "graphql-schema-linter": "^2.0.1", + "lodash": "^4.17.21", "mocha": "^8.4.0", "nodemon": "^2.0.7", "ts-node": "^10.0.0", - "typescript": "^4.3.2", - "ethers": "^5.2.0", - "lodash": "^4.17.21" + "typescript": "^4.3.2" } } diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 473891a8..b4c127f8 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -556,10 +556,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string): Promise { + async getBlockEvents (blockHash: string, where: FindConditions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash); + return this._baseDatabase.getBlockEvents(repo, blockHash, where); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 2ad02512..bff41eb3 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -37,13 +37,6 @@ const SYNC_DELTA = 5; const log = debug('vulcanize:indexer'); -export interface ValueResult { - value: string | bigint; - proof: { - data: string; - } -} - export { OrderDirection, BlockHeight }; export class Indexer implements IndexerInterface { diff --git a/packages/uni-watcher/package.json b/packages/uni-watcher/package.json index e21d4b19..6b09e51b 100644 --- a/packages/uni-watcher/package.json +++ b/packages/uni-watcher/package.json @@ -32,12 +32,12 @@ }, "homepage": "https://github.com/vulcanize/watcher-ts#readme", "dependencies": { + "@apollo/client": "^3.3.19", "@types/lodash": "^4.14.168", "@vulcanize/cache": "^0.1.0", "@vulcanize/ipld-eth-client": "^0.1.0", "@vulcanize/solidity-mapper": "^0.1.0", "@vulcanize/util": "^0.1.0", - "@apollo/client": "^3.3.19", "apollo-server-express": "^2.25.0", "apollo-type-bigint": "^0.1.3", "debug": "^4.3.1", diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index 70f97779..9698f39b 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -37,13 +37,6 @@ export class Database implements DatabaseInterface { return this._baseDatabase.close(); } - async getContract (address: string): Promise { - return this._conn.getRepository(Contract) - .createQueryBuilder('contract') - .where('address = :address', { address }) - .getOne(); - } - async getLatestContract (kind: string): Promise { return this._conn.getRepository(Contract) .createQueryBuilder('contract') @@ -52,6 +45,12 @@ export class Database implements DatabaseInterface { .getOne(); } + async getContract (address: string): Promise { + const repo = this._conn.getRepository(Contract); + + return this._baseDatabase.getContract(repo, address); + } + async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { const repo = queryRunner.manager.getRepository(Contract); @@ -79,10 +78,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string): Promise { + async getBlockEvents (blockHash: string, where: FindConditions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash); + return this._baseDatabase.getBlockEvents(repo, blockHash, where); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 73b16c28..624c0afe 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -9,7 +9,6 @@ import { ethers } from 'ethers'; import assert from 'assert'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper'; import { IndexerInterface, Indexer as BaseIndexer } from '@vulcanize/util'; import { Database } from './database'; @@ -36,18 +35,10 @@ type ResultEvent = { proof: string; }; -interface ValueResult { - value: any; - proof: { - data: string; - } -} - export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _postgraphileClient: EthClient - _getStorageAt: GetStorageAt _baseIndexer: BaseIndexer _factoryContract: ethers.utils.Interface @@ -58,7 +49,6 @@ export class Indexer implements IndexerInterface { this._db = db; this._ethClient = ethClient; this._postgraphileClient = postgraphileClient; - this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._baseIndexer = new BaseIndexer(this._db, this._ethClient); this._factoryContract = new ethers.utils.Interface(factoryABI); @@ -99,27 +89,6 @@ export class Indexer implements IndexerInterface { }; } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { - if (contract) { - const uniContract = await this.isUniswapContract(contract); - if (!uniContract) { - throw new Error('Not a uniswap contract'); - } - } - - const events = await this._db.getBlockEvents(blockHash); - log(`getEvents: db hit, num events: ${events.length}`); - - // Filtering. - const result = events - // TODO: Filter using db WHERE condition on contract. - .filter(event => !contract || contract === event.contract) - // TODO: Filter using db WHERE condition when name is not empty. - .filter(event => !name || name === event.eventName); - - return result; - } - async triggerIndexingOnEvent (dbTx: QueryRunner, dbEvent: Event): Promise { const re = this.getResultEvent(dbEvent); @@ -131,10 +100,6 @@ export class Indexer implements IndexerInterface { } } - async isUniswapContract (address: string): Promise { - return this._db.getContract(ethers.utils.getAddress(address)); - } - async processEvent (event: Event): Promise { const dbTx = await this._db.createTransactionRunner(); @@ -295,7 +260,7 @@ export class Indexer implements IndexerInterface { async position (blockHash: string, tokenId: string): Promise { const nfpmContract = await this._db.getLatestContract('nfpm'); assert(nfpmContract, 'No NFPM contract watched.'); - const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId)); + const { value, proof } = await this._baseIndexer.getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId)); return { ...value, @@ -306,7 +271,7 @@ export class Indexer implements IndexerInterface { async poolIdToPoolKey (blockHash: string, poolId: string): Promise { const nfpmContract = await this._db.getLatestContract('nfpm'); assert(nfpmContract, 'No NFPM contract watched.'); - const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId)); + const { value, proof } = await this._baseIndexer.getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId)); return { ...value, @@ -317,7 +282,7 @@ export class Indexer implements IndexerInterface { async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise { const factoryContract = await this._db.getLatestContract('factory'); assert(factoryContract, 'No Factory contract watched.'); - const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee)); + const { value, proof } = await this._baseIndexer.getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee)); return { pool: value, @@ -330,6 +295,14 @@ export class Indexer implements IndexerInterface { return contract; } + async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + return this._baseIndexer.getEventsByFilter(blockHash, contract, name); + } + + async isWatchedContract (address: string): Promise { + return this._baseIndexer.isWatchedContract(address); + } + async saveEventEntity (dbEvent: Event): Promise { return this._baseIndexer.saveEventEntity(dbEvent); } @@ -431,40 +404,47 @@ export class Indexer implements IndexerInterface { }, transaction: { hash: txHash - } + }, + receiptCID, + status } = logObj; - let eventName = UNKNOWN_EVENT_NAME; - let eventInfo = {}; - const tx = transactionMap[txHash]; - const extraInfo = { topics, data, tx }; + if (status) { + let eventName = UNKNOWN_EVENT_NAME; + let eventInfo = {}; + const tx = transactionMap[txHash]; + const extraInfo = { topics, data, tx }; - const contract = ethers.utils.getAddress(address); - const uniContract = await this.isUniswapContract(contract); + const contract = ethers.utils.getAddress(address); + const uniContract = await this.isWatchedContract(contract); - if (uniContract) { - const eventDetails = this.parseEventNameAndArgs(uniContract.kind, logObj); - eventName = eventDetails.eventName; - eventInfo = eventDetails.eventInfo; - } + if (uniContract) { + const eventDetails = this.parseEventNameAndArgs(uniContract.kind, logObj); + eventName = eventDetails.eventName; + eventInfo = eventDetails.eventInfo; + } - dbEvents.push({ - index: logIndex, - txHash, - contract, - eventName, - eventInfo: JSONbig.stringify(eventInfo), - extraInfo: JSONbig.stringify(extraInfo), - proof: JSONbig.stringify({ - data: JSONbig.stringify({ - blockHash, - receipt: { - cid, - ipldBlock - } + dbEvents.push({ + index: logIndex, + txHash, + contract, + eventName, + eventInfo: JSONbig.stringify(eventInfo), + extraInfo: JSONbig.stringify(extraInfo), + proof: JSONbig.stringify({ + data: JSONbig.stringify({ + blockHash, + receiptCID, + log: { + cid, + ipldBlock + } + }) }) - }) - }); + }); + } else { + log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); + } } const dbTx = await this._db.createTransactionRunner(); @@ -486,16 +466,4 @@ export class Indexer implements IndexerInterface { await dbTx.release(); } } - - // TODO: Move into base/class or framework package. - async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise { - return getStorageValue( - storageLayout, - this._getStorageAt, - blockHash, - token, - variable, - ...mappingKeys - ); - } } diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 32f59b60..d4f4a945 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -59,13 +59,13 @@ export class JobRunner { let dbEvent; const { data: { id } } = job; - const uniContract = await this._indexer.isUniswapContract(event.contract); - if (uniContract) { + const watchedContract = await this._indexer.isWatchedContract(event.contract); + if (watchedContract) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { const logObj = JSON.parse(event.extraInfo); - const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(uniContract.kind, logObj); + const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj); event.eventName = eventName; event.eventInfo = JSON.stringify(eventInfo); dbEvent = await this._indexer.saveEventEntity(event); diff --git a/packages/uni-watcher/src/resolvers.ts b/packages/uni-watcher/src/resolvers.ts index 4985754a..cf55b5ff 100644 --- a/packages/uni-watcher/src/resolvers.ts +++ b/packages/uni-watcher/src/resolvers.ts @@ -8,7 +8,6 @@ import debug from 'debug'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; -import { UNKNOWN_EVENT_NAME } from './entity/Event'; const log = debug('vulcanize:resolver'); @@ -64,8 +63,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch } const events = await indexer.getEventsByFilter(blockHash, contract, name); - return events.filter(event => event.eventName !== UNKNOWN_EVENT_NAME) - .map(event => indexer.getResultEvent(event)); + return events.map(event => indexer.getResultEvent(event)); }, eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => { diff --git a/packages/uni-watcher/src/smoke.test.ts b/packages/uni-watcher/src/smoke.test.ts index 3b6d272e..fef5eed8 100644 --- a/packages/uni-watcher/src/smoke.test.ts +++ b/packages/uni-watcher/src/smoke.test.ts @@ -124,7 +124,7 @@ describe('uni-watcher', () => { // Verifying with the db. const indexer = new Indexer(db, ethClient, postgraphileClient); - assert(await indexer.isUniswapContract(factory.address), 'Factory contract not added to the database.'); + assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.'); }); it('should deploy 2 tokens', async () => { @@ -259,7 +259,7 @@ describe('uni-watcher', () => { // Verifying with the db. const indexer = new Indexer(db, ethClient, postgraphileClient); - assert(await indexer.isUniswapContract(nfpm.address), 'NFPM contract not added to the database.'); + assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.'); }); it('should mint specified amount: nfpm', done => { diff --git a/packages/util/package.json b/packages/util/package.json index 7c104689..48f60b4e 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -4,6 +4,7 @@ "main": "dist/index.js", "license": "AGPL-3.0", "dependencies": { + "@vulcanize/solidity-mapper": "^0.1.0", "debug": "^4.3.1", "ethers": "^5.2.0", "fs-extra": "^10.0.0", diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index 953b3f19..215e4309 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -12,3 +12,5 @@ export const JOB_KIND_INDEX = 'index'; export const JOB_KIND_PRUNE = 'prune'; export const DEFAULT_CONFIG_PATH = 'environments/local.toml'; + +export const UNKNOWN_EVENT_NAME = '__unknown__'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index ba1221b1..6623ad40 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -18,9 +18,8 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import _ from 'lodash'; import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types'; -import { MAX_REORG_DEPTH } from './constants'; +import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants'; -const UNKNOWN_EVENT_NAME = '__unknown__'; const DEFAULT_LIMIT = 100; const DEFAULT_SKIP = 0; @@ -180,12 +179,19 @@ export class Database { return repo.findOne(id, { relations: ['block'] }); } - async getBlockEvents (repo: Repository, blockHash: string): Promise { - return repo.createQueryBuilder('event') - .innerJoinAndSelect('event.block', 'block') - .where('block_hash = :blockHash', { blockHash }) - .addOrderBy('event.id', 'ASC') - .getMany(); + async getBlockEvents (repo: Repository, blockHash: string, where: FindConditions = {}): Promise { + where.block = { + ...where.block, + blockHash + }; + + return repo.find({ + where, + relations: ['block'], + order: { + id: 'ASC' + } + }); } async saveEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { @@ -540,6 +546,12 @@ export class Database { return { canonicalBlockNumber, blockHashes }; } + async getContract (repo: Repository, address: string): Promise { + return repo.createQueryBuilder('contract') + .where('address = :address', { address }) + .getOne(); + } + async saveContract (repo: Repository, address: string, startingBlock: number, kind?: string): Promise { const numRows = await repo .createQueryBuilder() diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 2e1486fa..6268ca5a 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -3,24 +3,36 @@ // import assert from 'assert'; -import { DeepPartial } from 'typeorm'; +import { DeepPartial, FindConditions, Not } from 'typeorm'; import debug from 'debug'; +import { ethers } from 'ethers'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper'; -import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface } from './types'; +import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types'; +import { UNKNOWN_EVENT_NAME } from './constants'; const MAX_EVENTS_BLOCK_RANGE = 1000; const log = debug('vulcanize:indexer'); +export interface ValueResult { + value: any; + proof?: { + data: string; + } +} + export class Indexer { _db: DatabaseInterface; _ethClient: EthClient; + _getStorageAt: GetStorageAt constructor (db: DatabaseInterface, ethClient: EthClient) { this._db = db; this._ethClient = ethClient; + this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); } async getSyncStatus (): Promise { @@ -158,6 +170,32 @@ export class Indexer { return this._db.getBlockEvents(blockHash); } + async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { + if (contract) { + const watchedContract = await this.isWatchedContract(contract); + if (!watchedContract) { + throw new Error('Not a watched contract'); + } + } + + const where: FindConditions = { + eventName: Not(UNKNOWN_EVENT_NAME) + }; + + if (contract) { + where.contract = contract; + } + + if (name) { + where.eventName = name; + } + + const events = await this._db.getBlockEvents(blockHash, where); + log(`getEvents: db hit, num events: ${events.length}`); + + return events; + } + async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._db.getAncestorAtDepth(blockHash, depth); } @@ -194,4 +232,21 @@ export class Indexer { return this._db.getEventsInRange(fromBlockNumber, toBlockNumber); } + + async isWatchedContract (address : string): Promise { + assert(this._db.getContract); + + return this._db.getContract(ethers.utils.getAddress(address)); + } + + async getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise { + return getStorageValue( + storageLayout, + this._getStorageAt, + blockHash, + token, + variable, + ...mappingKeys + ); + } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index a87eb13a..0c19a263 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -43,7 +43,7 @@ export interface ContractInterface { id: number; address: string; startingBlock: number; - kind?: string; + kind: string; } export interface IndexerInterface { @@ -72,7 +72,7 @@ export interface DatabaseInterface { createTransactionRunner(): Promise; getBlocksAtHeight (height: number, isPruned: boolean): Promise; getBlockProgress (blockHash: string): Promise; - getBlockEvents (blockHash: string): Promise; + getBlockEvents (blockHash: string, where?: FindConditions): Promise; getEvent (id: string): Promise getSyncStatus (queryRunner: QueryRunner): Promise getAncestorAtDepth (blockHash: string, depth: number): Promise @@ -86,4 +86,5 @@ export interface DatabaseInterface { saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise; removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise; + getContract?: (address: string) => Promise } diff --git a/scripts/reset-dbs.sh b/scripts/reset-dbs.sh index 518cf175..57c5d4ef 100755 --- a/scripts/reset-dbs.sh +++ b/scripts/reset-dbs.sh @@ -17,6 +17,7 @@ then createdb uni-watcher createdb uni-info-watcher + psql -d erc20-watcher-job-queue -c "delete from pgboss.job;" psql -d address-watcher-job-queue -c "delete from pgboss.job;" psql -d uni-watcher-job-queue -c "delete from pgboss.job;" psql -d uni-info-watcher-job-queue -c "delete from pgboss.job;" diff --git a/yarn.lock b/yarn.lock index 2684d10a..b2b2b051 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2644,7 +2644,12 @@ dependencies: "@types/yargs-parser" "*" -"@types/zen-observable@^0.8.0", "@types/zen-observable@^0.8.2": +"@types/zen-observable@0.8.3": + version "0.8.3" + resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.3.tgz#781d360c282436494b32fe7d9f7f8e64b3118aa3" + integrity sha512-fbF6oTd4sGGy0xjHPKAt+eS2CrxJ3+6gQ3FGcBoIJR2TLAyCkCyI8JqZNy+FeON0AhVgNJoUumVoZQjBFUqHkw== + +"@types/zen-observable@^0.8.0": version "0.8.2" resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.2.tgz#808c9fa7e4517274ed555fa158f2de4b4f468e71" integrity sha512-HrCIVMLjE1MOozVoD86622S7aunluLb2PJdPfb3nYiEtohm8mIB/vyv0Fd37AdeMFrTUQXEunw78YloMA3Qilg== @@ -4567,9 +4572,9 @@ chalk@^3.0.0: supports-color "^7.1.0" chalk@^4.0.0, chalk@^4.1.0: - version "4.1.1" - resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.1.tgz#c80b3fab28bf6371e6863325eee67e618b77e6ad" - integrity sha512-diHzdDKxcU+bAsUboHLPEDQiw0qEe0qd7SYUn3HgcFlWgbDcfLGswOHYeGrHKzG9z6UYf01d9VFMfZxPM1xZSg== + version "4.1.2" + resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.2.tgz#aac4e2b7734a740867aeb16bf02aad556a1e7a01" + integrity sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA== dependencies: ansi-styles "^4.1.0" supports-color "^7.1.0" @@ -4707,7 +4712,7 @@ cli-cursor@^3.1.0: dependencies: restore-cursor "^3.1.0" -cli-highlight@^2.1.10: +cli-highlight@^2.1.11: version "2.1.11" resolved "https://registry.yarnpkg.com/cli-highlight/-/cli-highlight-2.1.11.tgz#49736fa452f0aaf4fae580e30acb26828d2dc1bf" integrity sha512-9KDcoEVwyUXrjcJNvHD0NFc/hiwe/WPVYIleQh2O1N2Zro5gWJZ/K+3DGn8w8P/F6FxOgzyC5bxDyHIgCSPhGg== @@ -5284,7 +5289,7 @@ debug@3.2.6: dependencies: ms "^2.1.1" -debug@4, debug@4.3.1, debug@^4.0.1, debug@^4.1.0, debug@^4.1.1, debug@^4.3.1: +debug@4, debug@4.3.1, debug@^4.0.1, debug@^4.1.0, debug@^4.1.1: version "4.3.1" resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee" integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ== @@ -5298,7 +5303,7 @@ debug@^3.1.0, debug@^3.2.6, debug@^3.2.7: dependencies: ms "^2.1.1" -debug@^4.2.0: +debug@^4.2.0, debug@^4.3.1: version "4.3.2" resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.2.tgz#f0a49c18ac8779e31d4a0c6029dfb76873c7428b" integrity sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw== @@ -6822,9 +6827,9 @@ fetch-ponyfill@^4.0.0: node-fetch "~1.7.1" figlet@^1.1.1: - version "1.5.0" - resolved "https://registry.yarnpkg.com/figlet/-/figlet-1.5.0.tgz#2db4d00a584e5155a96080632db919213c3e003c" - integrity sha512-ZQJM4aifMpz6H19AW1VqvZ7l4pOE9p7i/3LyxgO2kp+PO/VcDYNqIHEMtkccqIhTXMKci4kjueJr/iCQEaT/Ww== + version "1.5.2" + resolved "https://registry.yarnpkg.com/figlet/-/figlet-1.5.2.tgz#dda34ff233c9a48e36fcff6741aeb5bafe49b634" + integrity sha512-WOn21V8AhyE1QqVfPIVxe3tupJacq1xGkPTB4iagT6o+P2cAgEOOwIxMftr4+ZCTI6d551ij9j61DFr0nsP2uQ== figures@^3.0.0: version "3.2.0" @@ -7782,9 +7787,9 @@ heap@0.2.6: integrity sha1-CH4fELBGky/IWU3Z5tN4r8nR5aw= highlight.js@^10.7.1: - version "10.7.2" - resolved "https://registry.yarnpkg.com/highlight.js/-/highlight.js-10.7.2.tgz#89319b861edc66c48854ed1e6da21ea89f847360" - integrity sha512-oFLl873u4usRM9K63j4ME9u3etNF0PLiJhSQ8rdfuL51Wn3zkD6drf9ZW0dOzjnZI22YYG24z30JcmfCZjMgYg== + version "10.7.3" + resolved "https://registry.yarnpkg.com/highlight.js/-/highlight.js-10.7.3.tgz#697272e3991356e40c3cac566a74eef681756531" + integrity sha512-tzcUFauisWKNHaRkN4Wjl/ZA07gENAjFl3J/c480dprkGTg5EQstgaNFqBfUqCq54kZRIEcreTsAgF/m2quD7A== hmac-drbg@^1.0.1: version "1.0.1" @@ -13204,9 +13209,9 @@ tslib@^1.10.0, tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3: integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== tslib@^2.1.0: - version "2.2.0" - resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.2.0.tgz#fb2c475977e35e241311ede2693cee1ec6698f5c" - integrity sha512-gS9GVHRU+RGn5KQM2rllAlR3dU6m7AcpJKdtH8gFvQiC4Otgk98XnmMU+nZenHt/+VhnBPWwgrJsyrdcw6i23w== + version "2.3.1" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01" + integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw== tsort@0.0.1: version "0.0.1" @@ -13338,15 +13343,15 @@ typeorm-naming-strategies@^2.0.0: integrity sha512-nsJ5jDjhBBEG6olFmxojkO4yrW7hEv38sH7ZXWWx9wnDoo9uaoH/mo2mBYAh/VKgwoFHBLu+CYxGmzXz2GUMcA== typeorm@^0.2.32: - version "0.2.32" - resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.2.32.tgz#544dbfdfe0cd0887548d9bcbd28527ea4f4b3c9b" - integrity sha512-LOBZKZ9As3f8KRMPCUT2H0JZbZfWfkcUnO3w/1BFAbL/X9+cADTF6bczDGGaKVENJ3P8SaKheKmBgpt5h1x+EQ== + version "0.2.37" + resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.2.37.tgz#1a5e59216077640694d27c04c99ed3f968d15dc8" + integrity sha512-7rkW0yCgFC24I5T0f3S/twmLSuccPh1SQmxET/oDWn2sSDVzbyWdnItSdKy27CdJGTlKHYtUVeOcMYw5LRsXVw== dependencies: "@sqltools/formatter" "^1.2.2" app-root-path "^3.0.0" buffer "^6.0.3" chalk "^4.1.0" - cli-highlight "^2.1.10" + cli-highlight "^2.1.11" debug "^4.3.1" dotenv "^8.2.0" glob "^7.1.6" @@ -13357,7 +13362,7 @@ typeorm@^0.2.32: tslib "^2.1.0" xml2js "^0.4.23" yargonaut "^1.1.4" - yargs "^16.2.0" + yargs "^17.0.1" zen-observable-ts "^1.0.0" typescript@^4.3.2: @@ -14319,7 +14324,12 @@ yargs-parser@^2.4.1: camelcase "^3.0.0" lodash.assign "^4.0.6" -yargs-parser@^20.2.2, yargs-parser@^20.2.3: +yargs-parser@^20.2.2: + version "20.2.9" + resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-20.2.9.tgz#2eb7dc3b0289718fc295f362753845c41a0c94ee" + integrity sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w== + +yargs-parser@^20.2.3: version "20.2.7" resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-20.2.7.tgz#61df85c113edfb5a7a4e36eb8aa60ef423cbc90a" integrity sha512-FiNkvbeHzB/syOjIUxFDCnhSfzAL8R5vs40MgLFBorXACCOAEaWu0gRZl14vG8MR9AOJIZbmkjhusqBYZ3HTHw== @@ -14437,14 +14447,14 @@ zen-observable-ts@^0.8.21: zen-observable "^0.8.0" zen-observable-ts@^1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/zen-observable-ts/-/zen-observable-ts-1.0.0.tgz#30d1202b81d8ba4c489e3781e8ca09abf0075e70" - integrity sha512-KmWcbz+9kKUeAQ8btY8m1SsEFgBcp7h/Uf3V5quhan7ZWdjGsf0JcGLULQiwOZibbFWnHkYq8Nn2AZbJabovQg== + version "1.1.0" + resolved "https://registry.yarnpkg.com/zen-observable-ts/-/zen-observable-ts-1.1.0.tgz#2d1aa9d79b87058e9b75698b92791c1838551f83" + integrity sha512-1h4zlLSqI2cRLPJUHJFL8bCWHhkpuXkF+dbGkRaWjgDIG26DmzyshUMrdV/rL3UnR+mhaX4fRq8LPouq0MYYIA== dependencies: - "@types/zen-observable" "^0.8.2" - zen-observable "^0.8.15" + "@types/zen-observable" "0.8.3" + zen-observable "0.8.15" -zen-observable@^0.8.0, zen-observable@^0.8.14, zen-observable@^0.8.15: +zen-observable@0.8.15, zen-observable@^0.8.0, zen-observable@^0.8.14: version "0.8.15" resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.8.15.tgz#96415c512d8e3ffd920afd3889604e30b9eaac15" integrity sha512-PQ2PC7R9rslx84ndNBZB/Dkv8V8fZEpk83RLgXtYd0fwUgEjseMn1Dgajh2x6S8QbZAFa9p2qVCEuYZNgve0dQ==