diff --git a/packages/cli/src/export-state.ts b/packages/cli/src/export-state.ts index 7bee68d5..496e08fd 100644 --- a/packages/cli/src/export-state.ts +++ b/packages/cli/src/export-state.ts @@ -49,7 +49,8 @@ export class ExportStateCmd { } async init ( - Database: new (config: ConnectionOptions, + Database: new ( + config: ConnectionOptions, serverConfig?: ServerConfig ) => DatabaseInterface, Indexer: new ( diff --git a/packages/cli/src/fill.ts b/packages/cli/src/fill.ts index 791854ca..63b0bd9e 100644 --- a/packages/cli/src/fill.ts +++ b/packages/cli/src/fill.ts @@ -57,7 +57,8 @@ export class FillCmd { } async init ( - Database: new (config: ConnectionOptions, + Database: new ( + config: ConnectionOptions, serverConfig?: ServerConfig ) => DatabaseInterface, Indexer: new ( diff --git a/packages/cli/src/import-state.ts b/packages/cli/src/import-state.ts index 489278d8..dcda9ea3 100644 --- a/packages/cli/src/import-state.ts +++ b/packages/cli/src/import-state.ts @@ -52,7 +52,8 @@ export class ImportStateCmd { } async init ( - Database: new (config: ConnectionOptions, + Database: new ( + config: ConnectionOptions, serverConfig?: ServerConfig ) => DatabaseInterface, Indexer: new ( diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 9e69598b..2cc02163 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -17,7 +17,7 @@ import { IndexerInterface, ServerConfig, Clients, - WatcherJobRunner as JobRunner, + JobRunner, startMetricsServer } from '@cerc-io/util'; @@ -51,7 +51,8 @@ export class JobRunnerCmd { } async init ( - Database: new (config: ConnectionOptions, + Database: new ( + config: ConnectionOptions, serverConfig?: ServerConfig ) => DatabaseInterface, Indexer: new ( @@ -87,10 +88,10 @@ export class JobRunnerCmd { const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue); await jobRunner.jobQueue.deleteAllJobs(); - await jobRunner.baseJobRunner.resetToPrevIndexedBlock(); + await jobRunner.resetToPrevIndexedBlock(); await startJobRunner(jobRunner); - jobRunner.baseJobRunner.handleShutdown(); + jobRunner.handleShutdown(); await startMetricsServer(config, indexer); } diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index be9b4741..b54d7f2f 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -50,7 +50,8 @@ export class ServerCmd { } async init ( - Database: new (config: ConnectionOptions, + Database: new ( + config: ConnectionOptions, serverConfig?: ServerConfig ) => DatabaseInterface, Indexer: new ( diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index a3c3c585..77bb6b8a 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import { JobRunnerCmd } from '@cerc-io/cli'; -import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; +import { JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 2a336d94..239da068 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import { JobRunnerCmd } from '@cerc-io/cli'; -import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; +import { JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/erc721-watcher/src/job-runner.ts b/packages/erc721-watcher/src/job-runner.ts index 84ff73cd..703c4397 100644 --- a/packages/erc721-watcher/src/job-runner.ts +++ b/packages/erc721-watcher/src/job-runner.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import { JobRunnerCmd } from '@cerc-io/cli'; -import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; +import { JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index 84ff73cd..703c4397 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import { JobRunnerCmd } from '@cerc-io/cli'; -import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; +import { JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/mobymask-watcher/src/job-runner.ts b/packages/mobymask-watcher/src/job-runner.ts index 84ff73cd..703c4397 100644 --- a/packages/mobymask-watcher/src/job-runner.ts +++ b/packages/mobymask-watcher/src/job-runner.ts @@ -5,7 +5,7 @@ import debug from 'debug'; import { JobRunnerCmd } from '@cerc-io/cli'; -import { WatcherJobRunner as JobRunner } from '@cerc-io/util'; +import { JobRunner } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Database } from './database'; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f6a0af3b..be196765 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -34,8 +34,8 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber const log = debug('vulcanize:job-runner'); export class JobRunner { + jobQueue: JobQueue _indexer: IndexerInterface - _jobQueue: JobQueue _jobQueueConfig: JobQueueConfig _blockProcessStartTime?: Date _endBlockProcessTimer?: () => void @@ -45,10 +45,34 @@ export class JobRunner { constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { this._indexer = indexer; - this._jobQueue = jobQueue; + this.jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; } + async subscribeBlockProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + await this.processBlock(job); + }); + } + + async subscribeEventProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + await this.processEvent(job); + }); + } + + async subscribeHooksQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => { + await this.processHooks(job); + }); + } + + async subscribeBlockCheckpointQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { + await this.processCheckpoint(job); + }); + } + async processBlock (job: any): Promise { const { data: { kind } } = job; @@ -70,7 +94,7 @@ export class JobRunner { // Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block. const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock(); - await createHooksJob(this._jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1); + await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1); break; } @@ -79,7 +103,7 @@ export class JobRunner { break; } - await this._jobQueue.markComplete(job); + await this.jobQueue.markComplete(job); } async processEvent (job: any): Promise { @@ -99,7 +123,7 @@ export class JobRunner { break; } - await this._jobQueue.markComplete(job); + await this.jobQueue.markComplete(job); } async processHooks (job: any): Promise { @@ -112,7 +136,7 @@ export class JobRunner { if (stateSyncStatus.latestIndexedBlockNumber < (blockNumber - 1)) { // Create hooks job for parent block. const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false); - await createHooksJob(this._jobQueue, parentBlock.blockHash, parentBlock.blockNumber); + await createHooksJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber); const message = `State for blockNumber ${blockNumber - 1} not indexed yet, aborting`; log(message); @@ -134,9 +158,9 @@ export class JobRunner { await this._indexer.updateStateSyncStatusIndexedBlock(blockNumber); // Create a checkpoint job after completion of a hooks job. - await createCheckpointJob(this._jobQueue, blockHash, blockNumber); + await createCheckpointJob(this.jobQueue, blockHash, blockNumber); - await this._jobQueue.markComplete(job); + await this.jobQueue.markComplete(job); } async processCheckpoint (job: any): Promise { @@ -150,7 +174,7 @@ export class JobRunner { if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) { // Create a checkpoint job for parent block. const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false); - await createCheckpointJob(this._jobQueue, parentBlock.blockHash, parentBlock.blockNumber); + await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber); const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`; log(message); @@ -171,7 +195,7 @@ export class JobRunner { // Update the stateSyncStatus. await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber); - await this._jobQueue.markComplete(job); + await this.jobQueue.markComplete(job); } async resetToPrevIndexedBlock (): Promise { @@ -196,7 +220,7 @@ export class JobRunner { if (this._signalCount >= 3 || process.env.YARN_CHILD_PROCESS === 'true') { // Forceful exit on receiving signal for the 3rd time or if job-runner is a child process of yarn. - this._jobQueue.stop(); + this.jobQueue.stop(); process.exit(1); } } @@ -272,7 +296,7 @@ export class JobRunner { // Check if chain pruning is caught up. if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) { - await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority); + await createPruningJob(this.jobQueue, syncStatus.latestCanonicalBlockNumber, priority); const message = `Chain pruning not caught up yet, latest canonical block number ${syncStatus.latestCanonicalBlockNumber} and latest indexed block number ${syncStatus.latestIndexedBlockNumber}`; log(message); @@ -311,7 +335,7 @@ export class JobRunner { const [{ cid: parentCid, blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks; - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { + await this.jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, cid: parentCid, blockHash: parentHash, @@ -332,7 +356,7 @@ export class JobRunner { const message = `Indexing incomplete for parent block number ${parentBlock.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`; log(message); - await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { + await this.jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, cid: parentBlock.cid, blockHash: parentHash, @@ -377,7 +401,7 @@ export class JobRunner { // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. - await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); + await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); @@ -418,7 +442,7 @@ export class JobRunner { if (this._shutDown) { log(`Graceful shutdown after processing block ${block.blockNumber}`); - this._jobQueue.stop(); + this.jobQueue.stop(); process.exit(0); } } @@ -429,41 +453,3 @@ export class JobRunner { this._indexer.updateStateStatusMap(contract.address, {}); } } - -export class WatcherJobRunner { - jobQueue: JobQueue - baseJobRunner: JobRunner - _indexer: IndexerInterface - _jobQueueConfig: JobQueueConfig - - constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { - this._jobQueueConfig = jobQueueConfig; - this._indexer = indexer; - this.jobQueue = jobQueue; - this.baseJobRunner = new JobRunner(this._jobQueueConfig, this._indexer, this.jobQueue); - } - - async subscribeBlockProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this.baseJobRunner.processBlock(job); - }); - } - - async subscribeEventProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this.baseJobRunner.processEvent(job); - }); - } - - async subscribeHooksQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this.baseJobRunner.processHooks(job); - }); - } - - async subscribeBlockCheckpointQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this.baseJobRunner.processCheckpoint(job); - }); - } -}