From e9880693adf0ece680ec939b76ae78ed22ec148f Mon Sep 17 00:00:00 2001 From: Ashwin Phatak Date: Fri, 25 Jun 2021 17:56:58 +0530 Subject: [PATCH] Fill task queue for block range. (#90) --- packages/address-watcher/package.json | 1 + packages/address-watcher/src/fill.ts | 96 +++++++++++++++++++++ packages/address-watcher/src/job-queue.ts | 3 +- packages/address-watcher/src/tx-watcher.ts | 5 +- packages/ipld-eth-client/src/eth-queries.ts | 1 + 5 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 packages/address-watcher/src/fill.ts diff --git a/packages/address-watcher/package.json b/packages/address-watcher/package.json index 8310f61c..8aea95eb 100644 --- a/packages/address-watcher/package.json +++ b/packages/address-watcher/package.json @@ -6,6 +6,7 @@ "scripts": { "server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml", "job-runner": "DEBUG=vulcanize:* nodemon src/job-runner.ts -f environments/local.toml", + "fill": "DEBUG=vulcanize:* ts-node src/fill.ts -f environments/local.toml", "test": "mocha -r ts-node/register src/**/*.spec.ts", "lint": "eslint .", "build": "tsc" diff --git a/packages/address-watcher/src/fill.ts b/packages/address-watcher/src/fill.ts new file mode 100644 index 00000000..d1e9889b --- /dev/null +++ b/packages/address-watcher/src/fill.ts @@ -0,0 +1,96 @@ +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; + +import { getCache } from '@vulcanize/cache'; +import { EthClient } from '@vulcanize/ipld-eth-client'; + +import { Database } from './database'; +import { getConfig } from './config'; +import { JobQueue } from './job-queue'; +import { QUEUE_TX_TRACING } from './tx-watcher'; + +const log = debug('vulcanize:server'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'configuration file path (toml)' + }, + startBlock: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to start processing at' + }, + endBlock: { + type: 'number', + require: true, + demandOption: true, + describe: 'Block number to stop processing at' + } + }).argv; + + const config = await getConfig(argv.configFile); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { gqlEndpoint, gqlSubscriptionEndpoint, traceProviderEndpoint, cache: cacheConfig } = upstream; + assert(gqlEndpoint, 'Missing upstream gqlEndpoint'); + assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint'); + assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint'); + + const cache = await getCache(cacheConfig); + + const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache }); + + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + await jobQueue.start(); + + for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) { + log(`Fill block ${blockNumber}`); + + // TODO: Add pause between requests so as to not overwhelm the upsteam server. + const result = await ethClient.getBlockWithTransactions(blockNumber.toString()); + const { allEthHeaderCids: { nodes: blockNodes } } = result; + for (let bi = 0; bi < blockNodes.length; bi++) { + const { blockHash, ethTransactionCidsByHeaderId: { nodes: txNodes } } = blockNodes[bi]; + for (let ti = 0; ti < txNodes.length; ti++) { + const { txHash } = txNodes[ti]; + log(`Filling block number ${blockNumber}, block hash ${blockHash}, tx hash ${txHash}`); + + // Never push appearances from fill jobs to GQL subscribers, as this command can be run multiple times + // for the same block range, and/or process the same block in multiple different runs spread over a + // period of time. Also, the tx's are probably too old anyway for publishing. + await jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: false }); + } + } + } +}; + +main().then(() => { + process.exit(); +}).catch(err => { + log(err); +}); diff --git a/packages/address-watcher/src/job-queue.ts b/packages/address-watcher/src/job-queue.ts index 9f2a6f15..bcaa436f 100644 --- a/packages/address-watcher/src/job-queue.ts +++ b/packages/address-watcher/src/job-queue.ts @@ -38,7 +38,8 @@ export class JobQueue { async onComplete (queue: string, callback: JobCallback): Promise { return await this._boss.onComplete(queue, async (job: any) => { - log(`Job complete for queue ${queue} job ${job.id}...`); + const { id, data: { failed, createdOn } } = job; + log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`); await callback(job); }); } diff --git a/packages/address-watcher/src/tx-watcher.ts b/packages/address-watcher/src/tx-watcher.ts index 7f34fcf2..28686044 100644 --- a/packages/address-watcher/src/tx-watcher.ts +++ b/packages/address-watcher/src/tx-watcher.ts @@ -39,7 +39,8 @@ export class TxWatcher { this._jobQueue.onComplete(QUEUE_TX_TRACING, async (job) => { const { data: { request, failed, state, createdOn } } = job; const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - if (!failed && state === 'completed') { + log(`Job onComplete tx ${request.data.txHash} publish ${!!request.data.publish}`); + if (!failed && state === 'completed' && request.data.publish) { // Check for max acceptable lag time between tracing request and sending results to live subscribers. if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { return await this.publishAddressEventToSubscribers(request.data.txHash, timeElapsedInSeconds); @@ -52,7 +53,7 @@ export class TxWatcher { this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => { const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode'); log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2)); - this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash }); + await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: true }); }); } diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index 826bd86c..2ef471c3 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -31,6 +31,7 @@ query allEthHeaderCids($blockNumber: BigInt) { cid blockNumber blockHash + timestamp ethTransactionCidsByHeaderId { nodes { cid