Implement graceful shutdown in job-runner (#170)

* Implement graceful shutdown in job-runner

* Improve reset CLI delete query

* Exit job-runner immediately in development

* Implement changes in other watchers
This commit is contained in:
nikugogoi 2022-09-06 19:21:24 +05:30 committed by GitHub
parent 1e284bd07e
commit 288e153287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 72 additions and 17 deletions

View File

@ -11,8 +11,8 @@
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/",
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",

View File

@ -84,7 +84,7 @@ export const handler = async (argv: any): Promise<void> => {
const entities = [BlockProgress, Producer, ProducerSet, ProducerSetChange, ProducerRewardCollectorChange, RewardScheduleEntry, RewardSchedule, ProducerEpoch, Block, Epoch, SlotClaim, Slot, Staker, Network, Distributor, Distribution, Claim, Slash, Account];
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();

View File

@ -213,6 +213,10 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}

View File

@ -51,6 +51,7 @@ export class JobRunner {
await this.subscribeBlockCheckpointQueue();
await this.subscribeHooksQueue();
await this.subscribeIPFSQueue();
this._baseJobRunner.handleShutdown();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -11,8 +11,8 @@
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* nodemon --watch src src/server.ts",
"server:mock": "MOCK=1 nodemon src/server.ts",
"job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* nodemon --watch src src/job-runner.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "node --enable-source-maps dist/cli/watch-contract.js",
"watch:contract:dev": "ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* node dist/fill.js",

View File

@ -57,7 +57,7 @@ export const handler = async (argv: any): Promise<void> => {
try {
for (const entity of [BlockProgress, Allowance, Balance]) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {

View File

@ -183,6 +183,10 @@ export class Database {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}

View File

@ -42,6 +42,7 @@ export class JobRunner {
async start (): Promise<void> {
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
this._baseJobRunner.handleShutdown();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -11,8 +11,8 @@
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/",
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",

View File

@ -71,7 +71,7 @@ export const handler = async (argv: any): Promise<void> => {
const entities = [BlockProgress, SupportsInterface, BalanceOf, OwnerOf, GetApproved, IsApprovedForAll, Name, Symbol, TokenURI, _Name, _Symbol, _Owners, _Balances, _TokenApprovals, _OperatorApprovals];
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();

View File

@ -474,6 +474,10 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}

View File

@ -49,6 +49,7 @@ export class JobRunner {
await this.subscribeBlockCheckpointQueue();
await this.subscribeHooksQueue();
await this.subscribeIPFSQueue();
this._baseJobRunner.handleShutdown();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -11,8 +11,8 @@
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/",
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",

View File

@ -72,7 +72,7 @@ export const handler = async (argv: any): Promise<void> => {
const entities = [BlockProgress, GetMethod, _Test, Author, Category, Blog];
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();

View File

@ -247,6 +247,10 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}

View File

@ -51,6 +51,7 @@ export class JobRunner {
await this.subscribeBlockCheckpointQueue();
await this.subscribeHooksQueue();
await this.subscribeIPFSQueue();
this._baseJobRunner.handleShutdown();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -11,8 +11,8 @@
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/",
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",

View File

@ -62,7 +62,7 @@ export const handler = async (argv: any): Promise<void> => {
const entities = [BlockProgress, MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember];
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
await db.deleteEntitiesByConditions<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();

View File

@ -306,6 +306,10 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}

View File

@ -49,6 +49,7 @@ export class JobRunner {
await this.subscribeBlockCheckpointQueue();
await this.subscribeHooksQueue();
await this.subscribeIPFSQueue();
this._baseJobRunner.handleShutdown();
}
async subscribeBlockProcessingQueue (): Promise<void> {

View File

@ -191,7 +191,7 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
}
if (indexer.processBlockAfterEvents) {
if (!block.isComplete || block.numEvents === 0) {
if (!block.isComplete) {
await indexer.processBlockAfterEvents(block.blockHash);
}
}

View File

@ -251,7 +251,7 @@ export class Database {
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
isComplete: false
});
const blockProgress = await blockRepo.save(entity);
@ -317,6 +317,12 @@ export class Database {
await repo.remove(entities);
}
async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
const repo = queryRunner.manager.getRepository(entity);
await repo.delete(findConditions);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS

View File

@ -30,6 +30,8 @@ export class JobRunner {
_jobQueueConfig: JobQueueConfig
_blockProcessStartTime?: Date
_endBlockProcessTimer?: () => void
_shutDown = false
_signalCount = 0
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer;
@ -78,6 +80,22 @@ export class JobRunner {
await this._jobQueue.markComplete(job);
}
handleShutdown () {
process.on('SIGINT', this._processShutdown.bind(this));
process.on('SIGTERM', this._processShutdown.bind(this));
}
async _processShutdown () {
this._shutDown = true;
this._signalCount++;
if (this._signalCount >= 3 || process.env.YARN_CHILD_PROCESS === 'true') {
// Forceful exit on receiving signal for the 3rd time or if job-runner is a child process of yarn.
this._jobQueue.stop();
process.exit(1);
}
}
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> {
console.time('time:job-runner#_pruneChain');
const { pruneBlockHeight } = job.data;
@ -267,6 +285,12 @@ export class JobRunner {
}
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
if (this._shutDown) {
log(`Graceful shutdown after processing block ${block.blockNumber}`);
this._jobQueue.stop();
process.exit(0);
}
}
async _updateWatchedContracts (job: any): Promise<void> {