Merge watcher job-runner into base job-runner (#260)

This commit is contained in:
prathamesh0 2022-11-24 04:56:40 -06:00 committed by GitHub
parent aba0c665f3
commit 590482ee53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 58 additions and 67 deletions

View File

@ -49,7 +49,8 @@ export class ExportStateCmd {
}
async init (
Database: new (config: ConnectionOptions,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (

View File

@ -57,7 +57,8 @@ export class FillCmd {
}
async init (
Database: new (config: ConnectionOptions,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (

View File

@ -52,7 +52,8 @@ export class ImportStateCmd {
}
async init (
Database: new (config: ConnectionOptions,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (

View File

@ -17,7 +17,7 @@ import {
IndexerInterface,
ServerConfig,
Clients,
WatcherJobRunner as JobRunner,
JobRunner,
startMetricsServer
} from '@cerc-io/util';
@ -51,7 +51,8 @@ export class JobRunnerCmd {
}
async init (
Database: new (config: ConnectionOptions,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (
@ -87,10 +88,10 @@ export class JobRunnerCmd {
const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue);
await jobRunner.jobQueue.deleteAllJobs();
await jobRunner.baseJobRunner.resetToPrevIndexedBlock();
await jobRunner.resetToPrevIndexedBlock();
await startJobRunner(jobRunner);
jobRunner.baseJobRunner.handleShutdown();
jobRunner.handleShutdown();
await startMetricsServer(config, indexer);
}

View File

@ -50,7 +50,8 @@ export class ServerCmd {
}
async init (
Database: new (config: ConnectionOptions,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -5,7 +5,7 @@
import debug from 'debug';
import { JobRunnerCmd } from '@cerc-io/cli';
import { WatcherJobRunner as JobRunner } from '@cerc-io/util';
import { JobRunner } from '@cerc-io/util';
import { Indexer } from './indexer';
import { Database } from './database';

View File

@ -34,8 +34,8 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber
const log = debug('vulcanize:job-runner');
export class JobRunner {
jobQueue: JobQueue
_indexer: IndexerInterface
_jobQueue: JobQueue
_jobQueueConfig: JobQueueConfig
_blockProcessStartTime?: Date
_endBlockProcessTimer?: () => void
@ -45,10 +45,34 @@ export class JobRunner {
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer;
this._jobQueue = jobQueue;
this.jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.processBlock(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
await this.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this.processHooks(job);
});
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this.processCheckpoint(job);
});
}
async processBlock (job: any): Promise<void> {
const { data: { kind } } = job;
@ -70,7 +94,7 @@ export class JobRunner {
// Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block.
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
await createHooksJob(this._jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1);
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1);
break;
}
@ -79,7 +103,7 @@ export class JobRunner {
break;
}
await this._jobQueue.markComplete(job);
await this.jobQueue.markComplete(job);
}
async processEvent (job: any): Promise<EventInterface | void> {
@ -99,7 +123,7 @@ export class JobRunner {
break;
}
await this._jobQueue.markComplete(job);
await this.jobQueue.markComplete(job);
}
async processHooks (job: any): Promise<void> {
@ -112,7 +136,7 @@ export class JobRunner {
if (stateSyncStatus.latestIndexedBlockNumber < (blockNumber - 1)) {
// Create hooks job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createHooksJob(this._jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
await createHooksJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
const message = `State for blockNumber ${blockNumber - 1} not indexed yet, aborting`;
log(message);
@ -134,9 +158,9 @@ export class JobRunner {
await this._indexer.updateStateSyncStatusIndexedBlock(blockNumber);
// Create a checkpoint job after completion of a hooks job.
await createCheckpointJob(this._jobQueue, blockHash, blockNumber);
await createCheckpointJob(this.jobQueue, blockHash, blockNumber);
await this._jobQueue.markComplete(job);
await this.jobQueue.markComplete(job);
}
async processCheckpoint (job: any): Promise<void> {
@ -150,7 +174,7 @@ export class JobRunner {
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createCheckpointJob(this._jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);
@ -171,7 +195,7 @@ export class JobRunner {
// Update the stateSyncStatus.
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
await this._jobQueue.markComplete(job);
await this.jobQueue.markComplete(job);
}
async resetToPrevIndexedBlock (): Promise<void> {
@ -196,7 +220,7 @@ export class JobRunner {
if (this._signalCount >= 3 || process.env.YARN_CHILD_PROCESS === 'true') {
// Forceful exit on receiving signal for the 3rd time or if job-runner is a child process of yarn.
this._jobQueue.stop();
this.jobQueue.stop();
process.exit(1);
}
}
@ -272,7 +296,7 @@ export class JobRunner {
// Check if chain pruning is caught up.
if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
await createPruningJob(this.jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
const message = `Chain pruning not caught up yet, latest canonical block number ${syncStatus.latestCanonicalBlockNumber} and latest indexed block number ${syncStatus.latestIndexedBlockNumber}`;
log(message);
@ -311,7 +335,7 @@ export class JobRunner {
const [{ cid: parentCid, blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks;
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
await this.jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
kind: JOB_KIND_INDEX,
cid: parentCid,
blockHash: parentHash,
@ -332,7 +356,7 @@ export class JobRunner {
const message = `Indexing incomplete for parent block number ${parentBlock.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
await this.jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
kind: JOB_KIND_INDEX,
cid: parentBlock.cid,
blockHash: parentHash,
@ -377,7 +401,7 @@ export class JobRunner {
// Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents.
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
@ -418,7 +442,7 @@ export class JobRunner {
if (this._shutDown) {
log(`Graceful shutdown after processing block ${block.blockNumber}`);
this._jobQueue.stop();
this.jobQueue.stop();
process.exit(0);
}
}
@ -429,41 +453,3 @@ export class JobRunner {
this._indexer.updateStateStatusMap(contract.address, {});
}
}
export class WatcherJobRunner {
jobQueue: JobQueue
baseJobRunner: JobRunner
_indexer: IndexerInterface
_jobQueueConfig: JobQueueConfig
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this.jobQueue = jobQueue;
this.baseJobRunner = new JobRunner(this._jobQueueConfig, this._indexer, this.jobQueue);
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.baseJobRunner.processBlock(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
await this.baseJobRunner.processEvent(job);
});
}
async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this.baseJobRunner.processHooks(job);
});
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this.baseJobRunner.processCheckpoint(job);
});
}
}