From 4bfb007a7e4eeedfc6f14e6662fccff9d81bf9c4 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Wed, 23 Nov 2022 06:50:16 -0600 Subject: [PATCH] Refactor job-runner CLI to cli package (#255) * Fix eden-watcher server initialization * Add an indexer method to watch subgraph contracts * Refactor job-runner CLI to cli package * Move watcher reset commands to refactored code --- packages/cli/src/base.ts | 4 +- packages/cli/src/index.ts | 1 + packages/cli/src/job-runner.ts | 152 ++++++++++++++++++ packages/cli/src/server.ts | 3 - .../src/templates/indexer-template.handlebars | 7 + .../templates/job-runner-template.handlebars | 3 +- packages/eden-watcher/environments/local.toml | 2 +- packages/eden-watcher/src/indexer.ts | 5 + packages/eden-watcher/src/job-runner.ts | 120 ++------------ packages/eden-watcher/src/server.ts | 2 +- packages/erc20-watcher/src/job-runner.ts | 88 +--------- packages/erc721-watcher/src/job-runner.ts | 106 ++---------- packages/graph-test-watcher/src/indexer.ts | 5 + packages/graph-test-watcher/src/job-runner.ts | 120 ++------------ packages/graph-test-watcher/src/server.ts | 2 +- packages/mobymask-watcher/src/job-runner.ts | 106 ++---------- packages/util/src/types.ts | 1 + 17 files changed, 222 insertions(+), 505 deletions(-) create mode 100644 packages/cli/src/job-runner.ts diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 2e18bb57..78349dd4 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -30,8 +30,8 @@ export class BaseCmd { _jobQueue?: JobQueue _database?: DatabaseInterface; _indexer?: IndexerInterface; - _graphDb?: GraphDatabase - _eventWatcher?: EventWatcherInterface + _graphDb?: GraphDatabase; + _eventWatcher?: EventWatcherInterface; get config (): Config | undefined { return this._config; diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 0ac33f18..5255efd4 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -11,3 +11,4 @@ export * from './inspect-cid'; export * from './import-state'; export * from './export-state'; export * from './server'; +export * from './job-runner'; diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts new file mode 100644 index 00000000..b03a94a3 --- /dev/null +++ b/packages/cli/src/job-runner.ts @@ -0,0 +1,152 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import 'reflect-metadata'; +import assert from 'assert'; +import { ConnectionOptions } from 'typeorm'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { GraphWatcher } from '@cerc-io/graph-node'; +import { + DEFAULT_CONFIG_PATH, + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Clients, + JobRunner as BaseJobRunner, + JobQueueConfig, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + QUEUE_BLOCK_CHECKPOINT, + QUEUE_HOOKS, + startMetricsServer +} from '@cerc-io/util'; + +import { BaseCmd } from './base'; + +interface Arguments { + configFile: string; +} + +export class JobRunnerCmd { + _argv?: Arguments + _baseCmd: BaseCmd; + + constructor () { + this._baseCmd = new BaseCmd(); + } + + get jobQueue (): JobQueue | undefined { + return this._baseCmd.jobQueue; + } + + get indexer (): IndexerInterface | undefined { + return this._baseCmd.indexer; + } + + async initConfig (): Promise { + this._argv = this._getArgv(); + assert(this._argv); + + return this._baseCmd.initConfig(this._argv.configFile); + } + + async init ( + Database: new (config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcher + ) => IndexerInterface, + clients: { [key: string]: any } = {}, + entityQueryTypeMap?: Map, + entityToLatestEntityMap?: Map + ): Promise { + await this.initConfig(); + + await this._baseCmd.init(Database, Indexer, clients, entityQueryTypeMap, entityToLatestEntityMap); + } + + async exec (startJobRunner: (jobRunner: JobRunner) => Promise): Promise { + const config = this._baseCmd.config; + const jobQueue = this._baseCmd.jobQueue; + const indexer = this._baseCmd.indexer; + + assert(config); + assert(jobQueue); + assert(indexer); + + if (indexer.addContracts) { + await indexer.addContracts(); + } + + const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue); + + await jobRunner.jobQueue.deleteAllJobs(); + await jobRunner.baseJobRunner.resetToPrevIndexedBlock(); + + await startJobRunner(jobRunner); + jobRunner.baseJobRunner.handleShutdown(); + + await startMetricsServer(config, indexer); + } + + _getArgv (): any { + return yargs(hideBin(process.argv)) + .option('f', { + alias: 'config-file', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string', + default: DEFAULT_CONFIG_PATH + }) + .argv; + } +} + +export class JobRunner { + jobQueue: JobQueue + baseJobRunner: BaseJobRunner + _indexer: IndexerInterface + _jobQueueConfig: JobQueueConfig + + constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { + this._jobQueueConfig = jobQueueConfig; + this._indexer = indexer; + this.jobQueue = jobQueue; + this.baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this.jobQueue); + } + + async subscribeBlockProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + await this.baseJobRunner.processBlock(job); + }); + } + + async subscribeEventProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + await this.baseJobRunner.processEvent(job); + }); + } + + async subscribeHooksQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this.baseJobRunner.processHooks(job); + }); + } + + async subscribeBlockCheckpointQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { + await this.baseJobRunner.processCheckpoint(job); + }); + } +} diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 337641a2..be9b4741 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -5,8 +5,6 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import 'reflect-metadata'; -import fs from 'fs'; -import path from 'path'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; import { PubSub } from 'graphql-subscriptions'; @@ -34,7 +32,6 @@ import { BaseCmd } from './base'; interface Arguments { configFile: string; - importFile: string; } export class ServerCmd { diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index bc73984d..0f084222 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -491,6 +491,13 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getLatestStateIndexedBlock(); } + {{#if (subgraphPath)}} + async addContracts (): Promise { + // Watching all the contracts in the subgraph. + await this._graphWatcher.addContracts(); + } + + {{/if}} async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 2c118390..4c82dfc0 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -119,8 +119,7 @@ export const main = async (): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); - // Watching all the contracts in the subgraph. - await graphWatcher.addContracts(); + await indexer.addContracts(); {{/if}} const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/eden-watcher/environments/local.toml b/packages/eden-watcher/environments/local.toml index 79f3899b..e16fc026 100644 --- a/packages/eden-watcher/environments/local.toml +++ b/packages/eden-watcher/environments/local.toml @@ -72,4 +72,4 @@ eventsInBatch = 50 blockDelayInMilliSecs = 2000 prefetchBlocksInMem = true - prefetchBlockCount = 10 \ No newline at end of file + prefetchBlockCount = 10 diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index ab981791..c90a3b2b 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -400,6 +400,11 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getLatestStateIndexedBlock(); } + async addContracts (): Promise { + // Watching all the contracts in the subgraph. + await this._graphWatcher.addContracts(); + } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index 52fccec6..d8d8794f 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -2,125 +2,25 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; -import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { - getConfig, - Config, - JobQueue, - JobRunner as BaseJobRunner, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - QUEUE_BLOCK_CHECKPOINT, - QUEUE_HOOKS, - JobQueueConfig, - DEFAULT_CONFIG_PATH, - initClients, - startMetricsServer -} from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; import { Indexer } from './indexer'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; +import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; const log = debug('vulcanize:job-runner'); -export class JobRunner { - _indexer: Indexer - _jobQueue: JobQueue - _baseJobRunner: BaseJobRunner - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this._jobQueue = jobQueue; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); - } - - async start (): Promise { - await this._jobQueue.deleteAllJobs(); - await this._baseJobRunner.resetToPrevIndexedBlock(); - await this.subscribeBlockProcessingQueue(); - await this.subscribeEventProcessingQueue(); - await this.subscribeBlockCheckpointQueue(); - await this.subscribeHooksQueue(); - this._baseJobRunner.handleShutdown(); - } - - async subscribeBlockProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseJobRunner.processEvent(job); - }); - } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._baseJobRunner.processHooks(job); - }); - } - - async subscribeBlockCheckpointQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this._baseJobRunner.processCheckpoint(job); - }); - } -} - export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const jobRunnerCmd = new JobRunnerCmd(); + await jobRunnerCmd.init(Database, Indexer, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - // Watching all the contracts in the subgraph. - await graphWatcher.addContracts(); - - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); - await jobRunner.start(); - - startMetricsServer(config, indexer); + await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise => { + await jobRunner.subscribeBlockProcessingQueue(); + await jobRunner.subscribeEventProcessingQueue(); + await jobRunner.subscribeBlockCheckpointQueue(); + await jobRunner.subscribeHooksQueue(); + }); }; main().then(() => { diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index e2f8cca2..45810884 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -19,7 +19,7 @@ const log = debug('vulcanize:server'); export const main = async (): Promise => { const serverCmd = new ServerCmd(); - await serverCmd.init(Database, Indexer, EventWatcher, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); + await serverCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 563ffd2e..4eebcb42 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -2,97 +2,23 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; -import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { - getConfig, - Config, - JobQueue, - JobRunner as BaseJobRunner, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - JobQueueConfig, - DEFAULT_CONFIG_PATH, - initClients, - startMetricsServer -} from '@cerc-io/util'; +import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; import { Indexer } from './indexer'; import { Database } from './database'; const log = debug('vulcanize:job-runner'); -export class JobRunner { - _indexer: Indexer - _jobQueue: JobQueue - _baseJobRunner: BaseJobRunner - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this._jobQueue = jobQueue; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); - } - - async start (): Promise { - await this._jobQueue.deleteAllJobs(); - await this._baseJobRunner.resetToPrevIndexedBlock(); - await this.subscribeBlockProcessingQueue(); - await this.subscribeEventProcessingQueue(); - this._baseJobRunner.handleShutdown(); - } - - async subscribeBlockProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseJobRunner.processEvent(job); - }); - } -} - export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const jobRunnerCmd = new JobRunnerCmd(); + await jobRunnerCmd.init(Database, Indexer); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); - await jobRunner.start(); - - startMetricsServer(config, indexer); + await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise => { + await jobRunner.subscribeBlockProcessingQueue(); + await jobRunner.subscribeEventProcessingQueue(); + }); }; main().then(() => { diff --git a/packages/erc721-watcher/src/job-runner.ts b/packages/erc721-watcher/src/job-runner.ts index eb29b4a2..b2446ef0 100644 --- a/packages/erc721-watcher/src/job-runner.ts +++ b/packages/erc721-watcher/src/job-runner.ts @@ -2,113 +2,25 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; -import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { - getConfig, - Config, - JobQueue, - JobRunner as BaseJobRunner, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - QUEUE_BLOCK_CHECKPOINT, - QUEUE_HOOKS, - JobQueueConfig, - DEFAULT_CONFIG_PATH, - initClients, - startMetricsServer -} from '@cerc-io/util'; +import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; import { Indexer } from './indexer'; import { Database } from './database'; const log = debug('vulcanize:job-runner'); -export class JobRunner { - _indexer: Indexer - _jobQueue: JobQueue - _baseJobRunner: BaseJobRunner - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this._jobQueue = jobQueue; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); - } - - async start (): Promise { - await this._jobQueue.deleteAllJobs(); - await this._baseJobRunner.resetToPrevIndexedBlock(); - await this.subscribeBlockProcessingQueue(); - await this.subscribeEventProcessingQueue(); - await this.subscribeBlockCheckpointQueue(); - await this.subscribeHooksQueue(); - this._baseJobRunner.handleShutdown(); - } - - async subscribeBlockProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseJobRunner.processEvent(job); - }); - } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._baseJobRunner.processHooks(job); - }); - } - - async subscribeBlockCheckpointQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this._baseJobRunner.processCheckpoint(job); - }); - } -} - export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const jobRunnerCmd = new JobRunnerCmd(); + await jobRunnerCmd.init(Database, Indexer); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); - await jobRunner.start(); - - startMetricsServer(config, indexer); + await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise => { + await jobRunner.subscribeBlockProcessingQueue(); + await jobRunner.subscribeEventProcessingQueue(); + await jobRunner.subscribeBlockCheckpointQueue(); + await jobRunner.subscribeHooksQueue(); + }); }; main().then(() => { diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index fb576727..194d8088 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -431,6 +431,11 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getLatestStateIndexedBlock(); } + async addContracts (): Promise { + // Watching all the contracts in the subgraph. + await this._graphWatcher.addContracts(); + } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index 52fccec6..b2446ef0 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -2,125 +2,25 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; -import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { - getConfig, - Config, - JobQueue, - JobRunner as BaseJobRunner, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - QUEUE_BLOCK_CHECKPOINT, - QUEUE_HOOKS, - JobQueueConfig, - DEFAULT_CONFIG_PATH, - initClients, - startMetricsServer -} from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; import { Indexer } from './indexer'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; +import { Database } from './database'; const log = debug('vulcanize:job-runner'); -export class JobRunner { - _indexer: Indexer - _jobQueue: JobQueue - _baseJobRunner: BaseJobRunner - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this._jobQueue = jobQueue; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); - } - - async start (): Promise { - await this._jobQueue.deleteAllJobs(); - await this._baseJobRunner.resetToPrevIndexedBlock(); - await this.subscribeBlockProcessingQueue(); - await this.subscribeEventProcessingQueue(); - await this.subscribeBlockCheckpointQueue(); - await this.subscribeHooksQueue(); - this._baseJobRunner.handleShutdown(); - } - - async subscribeBlockProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseJobRunner.processEvent(job); - }); - } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._baseJobRunner.processHooks(job); - }); - } - - async subscribeBlockCheckpointQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this._baseJobRunner.processCheckpoint(job); - }); - } -} - export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const jobRunnerCmd = new JobRunnerCmd(); + await jobRunnerCmd.init(Database, Indexer); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - // Watching all the contracts in the subgraph. - await graphWatcher.addContracts(); - - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); - await jobRunner.start(); - - startMetricsServer(config, indexer); + await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise => { + await jobRunner.subscribeBlockProcessingQueue(); + await jobRunner.subscribeEventProcessingQueue(); + await jobRunner.subscribeBlockCheckpointQueue(); + await jobRunner.subscribeHooksQueue(); + }); }; main().then(() => { diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index e2f8cca2..45810884 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -19,7 +19,7 @@ const log = debug('vulcanize:server'); export const main = async (): Promise => { const serverCmd = new ServerCmd(); - await serverCmd.init(Database, Indexer, EventWatcher, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); + await serverCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); diff --git a/packages/mobymask-watcher/src/job-runner.ts b/packages/mobymask-watcher/src/job-runner.ts index eb29b4a2..b2446ef0 100644 --- a/packages/mobymask-watcher/src/job-runner.ts +++ b/packages/mobymask-watcher/src/job-runner.ts @@ -2,113 +2,25 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; -import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { - getConfig, - Config, - JobQueue, - JobRunner as BaseJobRunner, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - QUEUE_BLOCK_CHECKPOINT, - QUEUE_HOOKS, - JobQueueConfig, - DEFAULT_CONFIG_PATH, - initClients, - startMetricsServer -} from '@cerc-io/util'; +import { JobRunner, JobRunnerCmd } from '@cerc-io/cli'; import { Indexer } from './indexer'; import { Database } from './database'; const log = debug('vulcanize:job-runner'); -export class JobRunner { - _indexer: Indexer - _jobQueue: JobQueue - _baseJobRunner: BaseJobRunner - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this._jobQueue = jobQueue; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); - } - - async start (): Promise { - await this._jobQueue.deleteAllJobs(); - await this._baseJobRunner.resetToPrevIndexedBlock(); - await this.subscribeBlockProcessingQueue(); - await this.subscribeEventProcessingQueue(); - await this.subscribeBlockCheckpointQueue(); - await this.subscribeHooksQueue(); - this._baseJobRunner.handleShutdown(); - } - - async subscribeBlockProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseJobRunner.processEvent(job); - }); - } - - async subscribeHooksQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this._baseJobRunner.processHooks(job); - }); - } - - async subscribeBlockCheckpointQueue (): Promise { - await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this._baseJobRunner.processCheckpoint(job); - }); - } -} - export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const jobRunnerCmd = new JobRunnerCmd(); + await jobRunnerCmd.init(Database, Indexer); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); - await jobRunner.start(); - - startMetricsServer(config, indexer); + await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise => { + await jobRunner.subscribeBlockProcessingQueue(); + await jobRunner.subscribeEventProcessingQueue(); + await jobRunner.subscribeBlockCheckpointQueue(); + await jobRunner.subscribeHooksQueue(); + }); }; main().then(() => { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 6acc521a..bc6d7466 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -109,6 +109,7 @@ export interface IndexerInterface { parseEventNameAndArgs?: (kind: string, logObj: any) => any isWatchedContract: (address: string) => ContractInterface | undefined; getContractsByKind?: (kind: string) => ContractInterface[] + addContracts?: () => Promise cacheContract: (contract: ContractInterface) => void; watchContract: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise getEntityTypesMap?: () => Map