From a654b79df329552a2961b69b8af141842f953155 Mon Sep 17 00:00:00 2001 From: Ashwin Phatak Date: Mon, 28 Jun 2021 17:12:53 +0530 Subject: [PATCH] Address watcher block filler (#105) * Address watcher block filler. * Update docs. --- README.md | 2 + packages/address-watcher/README.md | 43 ++++++++++++++++++- packages/address-watcher/src/database.ts | 36 ++++++++++++++++ .../src/entity/BlockProgress.ts | 20 +++++++++ packages/address-watcher/src/fill.ts | 21 ++++++--- packages/address-watcher/src/indexer.ts | 13 +++++- packages/address-watcher/src/resolvers.ts | 4 ++ packages/address-watcher/src/schema.ts | 11 +++++ packages/address-watcher/src/tx-watcher.ts | 30 ++++++++++++- 9 files changed, 169 insertions(+), 11 deletions(-) create mode 100644 packages/address-watcher/src/entity/BlockProgress.ts diff --git a/README.md b/README.md index a689ade0..d5bbbdb3 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,8 @@ sudo su - postgres createdb erc20-watcher ``` +Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints. + Run the watcher: ```bash diff --git a/packages/address-watcher/README.md b/packages/address-watcher/README.md index 5f0ddf4d..fff2b4a6 100644 --- a/packages/address-watcher/README.md +++ b/packages/address-watcher/README.md @@ -2,7 +2,14 @@ ## Setup -Enable the `pgcrypto` extension on the database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro). +Create a postgres12 database for the job queue: + +``` +sudo su - postgres +createdb job-queue +``` + +Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro). Example: @@ -17,3 +24,37 @@ job-queue=# CREATE EXTENSION pgcrypto; CREATE EXTENSION job-queue=# exit ``` + +Create a postgres12 database for the address watcher: + +``` +sudo su - postgres +createdb address-watcher +``` + +Update `environments/local.toml` with database connection settings for both the databases. + +Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API, the `indexer-db` postgraphile and the tracing API (`debug_traceTransaction` RPC provider) endpoints. + +## Run + +Run the following scripts in different terminals. + + +GQL server: + +``` +yarn server +``` + +Job runner for processing the tracing requests queue: + +``` +yarn job-runner +``` + +To fill a block range: + +``` +yarn fill --startBlock 1 --endBlock 1000 +``` diff --git a/packages/address-watcher/src/database.ts b/packages/address-watcher/src/database.ts index e12fac72..7e453dfe 100644 --- a/packages/address-watcher/src/database.ts +++ b/packages/address-watcher/src/database.ts @@ -3,6 +3,7 @@ import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'ty import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { Account } from './entity/Account'; +import { BlockProgress } from './entity/BlockProgress'; import { Trace } from './entity/Trace'; export class Database { @@ -93,4 +94,39 @@ export class Database { .orderBy({ block_number: 'ASC' }) .getMany(); } + + async getBlockProgress (blockHash: string): Promise { + const repo = this._conn.getRepository(BlockProgress); + return repo.findOne({ where: { blockHash } }); + } + + async initBlockProgress (blockHash: string, blockNumber: number, numTx: number): Promise { + await this._conn.transaction(async (tx) => { + const repo = tx.getRepository(BlockProgress); + + const numRows = await repo + .createQueryBuilder() + .where('block_hash = :blockHash', { blockHash }) + .getCount(); + + if (numRows === 0) { + const entity = repo.create({ blockHash, blockNumber, numTx, numTracedTx: 0, isComplete: (numTx === 0) }); + await repo.save(entity); + } + }); + } + + async updateBlockProgress (blockHash: string): Promise { + await this._conn.transaction(async (tx) => { + const repo = tx.getRepository(BlockProgress); + const entity = await repo.findOne({ where: { blockHash } }); + if (entity && !entity.isComplete) { + entity.numTracedTx++; + if (entity.numTracedTx >= entity.numTx) { + entity.isComplete = true; + } + await repo.save(entity); + } + }); + } } diff --git a/packages/address-watcher/src/entity/BlockProgress.ts b/packages/address-watcher/src/entity/BlockProgress.ts new file mode 100644 index 00000000..45bf040f --- /dev/null +++ b/packages/address-watcher/src/entity/BlockProgress.ts @@ -0,0 +1,20 @@ +import { Entity, PrimaryColumn, Column, Index } from 'typeorm'; + +@Entity() +@Index(['blockNumber']) +export class BlockProgress { + @PrimaryColumn('varchar', { length: 66 }) + blockHash!: string; + + @Column('numeric') + blockNumber!: number; + + @Column('numeric') + numTx!: number; + + @Column('numeric') + numTracedTx!: number; + + @Column('boolean') + isComplete!: boolean +} diff --git a/packages/address-watcher/src/fill.ts b/packages/address-watcher/src/fill.ts index d1e9889b..f51aca48 100644 --- a/packages/address-watcher/src/fill.ts +++ b/packages/address-watcher/src/fill.ts @@ -76,14 +76,21 @@ export const main = async (): Promise => { const { allEthHeaderCids: { nodes: blockNodes } } = result; for (let bi = 0; bi < blockNodes.length; bi++) { const { blockHash, ethTransactionCidsByHeaderId: { nodes: txNodes } } = blockNodes[bi]; - for (let ti = 0; ti < txNodes.length; ti++) { - const { txHash } = txNodes[ti]; - log(`Filling block number ${blockNumber}, block hash ${blockHash}, tx hash ${txHash}`); + const blockProgress = await db.getBlockProgress(blockHash); + if (blockProgress) { + log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`); + } else { + await db.initBlockProgress(blockHash, blockNumber, txNodes.length); - // Never push appearances from fill jobs to GQL subscribers, as this command can be run multiple times - // for the same block range, and/or process the same block in multiple different runs spread over a - // period of time. Also, the tx's are probably too old anyway for publishing. - await jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: false }); + for (let ti = 0; ti < txNodes.length; ti++) { + const { txHash } = txNodes[ti]; + log(`Filling block number ${blockNumber}, block hash ${blockHash}, tx hash ${txHash}`); + + // Never push appearances from fill jobs to GQL subscribers, as this command can be run multiple times + // for the same block range, and/or process the same block in multiple different runs spread over a + // period of time. Also, the tx's are probably too old anyway for publishing. + await jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: false, publishBlockProgress: true }); + } } } } diff --git a/packages/address-watcher/src/indexer.ts b/packages/address-watcher/src/indexer.ts index 61347420..92b62099 100644 --- a/packages/address-watcher/src/indexer.ts +++ b/packages/address-watcher/src/indexer.ts @@ -10,6 +10,7 @@ import { addressesInTrace } from './util'; import { Database } from './database'; import { Trace } from './entity/Trace'; import { Account } from './entity/Account'; +import { BlockProgress } from './entity/BlockProgress'; const log = debug('vulcanize:indexer'); @@ -50,9 +51,9 @@ export class Indexer { async traceTxAndIndexAppearances (txHash: string): Promise { let entity = await this._db.getTrace(txHash); if (entity) { - log('traceTx: db hit'); + log(`traceTx: db hit ${txHash}`); } else { - log('traceTx: db miss, fetching from tracing API server'); + log(`traceTx: db miss, fetching from tracing API server ${txHash}`); const tx = await this._tracingClient.getTx(txHash); const trace = await this._tracingClient.getTxTrace(txHash, 'callTraceWithAddresses', '15s'); @@ -77,6 +78,14 @@ export class Indexer { return this._db.getAppearances(address, fromBlockNumber, toBlockNumber); } + async getBlockProgress (blockHash: string): Promise { + return this._db.getBlockProgress(blockHash); + } + + async updateBlockProgress (blockHash: string): Promise { + return this._db.updateBlockProgress(blockHash); + } + async _indexAppearances (trace: Trace): Promise { const traceObj = JSON.parse(trace.trace); diff --git a/packages/address-watcher/src/resolvers.ts b/packages/address-watcher/src/resolvers.ts index 2f2ff59f..db13cbfb 100644 --- a/packages/address-watcher/src/resolvers.ts +++ b/packages/address-watcher/src/resolvers.ts @@ -28,6 +28,10 @@ export const createResolvers = async (indexer: Indexer, txWatcher: TxWatcher): P return payload.onAddressEvent.address === ethers.utils.getAddress(variables.address); } ) + }, + + onBlockProgressEvent: { + subscribe: () => txWatcher.getBlockProgressEventIterator() } }, diff --git a/packages/address-watcher/src/schema.ts b/packages/address-watcher/src/schema.ts index 9c573615..00568f05 100644 --- a/packages/address-watcher/src/schema.ts +++ b/packages/address-watcher/src/schema.ts @@ -16,6 +16,14 @@ type WatchedAddressEvent { txTrace: TxTrace! } +type BlockProgressEvent { + blockNumber: Int! + blockHash: String! + numTx: Int! + numTracedTx: Int! + isComplete: Boolean! +} + # # Queries # @@ -48,6 +56,9 @@ type Subscription { # Watch for address events (at head of chain). onAddressEvent(address: String!): WatchedAddressEvent! + + # Watch for block progress events from filler process. + onBlockProgressEvent: BlockProgressEvent! } # diff --git a/packages/address-watcher/src/tx-watcher.ts b/packages/address-watcher/src/tx-watcher.ts index 28686044..5691d6df 100644 --- a/packages/address-watcher/src/tx-watcher.ts +++ b/packages/address-watcher/src/tx-watcher.ts @@ -7,10 +7,12 @@ import { EthClient } from '@vulcanize/ipld-eth-client'; import { Indexer } from './indexer'; import { JobQueue } from './job-queue'; +import { BlockProgress } from './entity/BlockProgress'; const log = debug('vulcanize:tx-watcher'); export const AddressEvent = 'address-event'; +export const BlockProgressEvent = 'block-progress-event'; export const QUEUE_TX_TRACING = 'tx-tracing'; export class TxWatcher { @@ -31,6 +33,10 @@ export class TxWatcher { return this._pubsub.asyncIterator([AddressEvent]); } + getBlockProgressEventIterator (): AsyncIterator { + return this._pubsub.asyncIterator([BlockProgressEvent]); + } + async start (): Promise { assert(!this._watchTxSubscription, 'subscription already started'); @@ -38,6 +44,13 @@ export class TxWatcher { this._jobQueue.onComplete(QUEUE_TX_TRACING, async (job) => { const { data: { request, failed, state, createdOn } } = job; + + await this._indexer.updateBlockProgress(request.data.blockHash); + const blockProgress = await this._indexer.getBlockProgress(request.data.blockHash); + if (blockProgress && request.data.publishBlockProgress) { + await this.publishBlockProgressToSubscribers(blockProgress); + } + const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; log(`Job onComplete tx ${request.data.txHash} publish ${!!request.data.publish}`); if (!failed && state === 'completed' && request.data.publish) { @@ -53,7 +66,7 @@ export class TxWatcher { this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => { const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode'); log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2)); - await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: true }); + await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: true }); }); } @@ -85,6 +98,21 @@ export class TxWatcher { } } + async publishBlockProgressToSubscribers (blockProgress: BlockProgress): Promise { + const { blockHash, blockNumber, numTx, numTracedTx, isComplete } = blockProgress; + + // Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`. + await this._pubsub.publish(BlockProgressEvent, { + onBlockProgressEvent: { + blockHash, + blockNumber, + numTx, + numTracedTx, + isComplete + } + }); + } + async stop (): Promise { if (this._watchTxSubscription) { log('Stopped watching upstream tx');