From 7b6f6e468f48ae5c6ef90733e6fc837433359143 Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Tue, 21 Dec 2021 15:46:05 +0530 Subject: [PATCH] Move update sync status outside job-runner (#324) --- packages/util/src/common.ts | 2 ++ packages/util/src/events.ts | 2 +- packages/util/src/job-runner.ts | 19 ++++++++----------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 8c9678b0..58275c49 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -87,6 +87,8 @@ export const processBlockByNumber = async ( } } + await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber); + return; } diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 0e5ac447..9c14af6e 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -77,7 +77,7 @@ export class EventWatcher { const { onBlockProgressEvent: { blockNumber, isComplete } } = data; if (isComplete) { - processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1); + await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1); } } } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index b8b44b5e..96e279a2 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -9,7 +9,7 @@ import { In } from 'typeorm'; import { JobQueueConfig } from './config'; import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants'; import { JobQueue } from './job-queue'; -import { EventInterface, IndexerInterface } from './types'; +import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; import { wait } from './misc'; import { createPruningJob } from './common'; import { OrderDirection } from './database'; @@ -33,13 +33,16 @@ export class JobRunner { async processBlock (job: any): Promise { const { data: { kind } } = job; + const syncStatus = await this._indexer.getSyncStatus(); + assert(syncStatus); + switch (kind) { case JOB_KIND_INDEX: - await this._indexBlock(job); + await this._indexBlock(job, syncStatus); break; case JOB_KIND_PRUNE: - await this._pruneChain(job); + await this._pruneChain(job, syncStatus); break; default: @@ -70,12 +73,9 @@ export class JobRunner { await this._jobQueue.markComplete(job); } - async _pruneChain (job: any): Promise { + async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise { const { pruneBlockHeight } = job.data; - const syncStatus = await this._indexer.getSyncStatus(); - assert(syncStatus); - log(`Processing chain pruning at ${pruneBlockHeight}`); // Assert we're at a depth where pruning is safe. @@ -116,7 +116,7 @@ export class JobRunner { } } - async _indexBlock (job: any): Promise { + async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise { const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job; const indexBlockStartTime = new Date(); @@ -131,9 +131,6 @@ export class JobRunner { this._blockProcessStartTime = indexBlockStartTime; log(`Processing block number ${blockNumber} hash ${blockHash} `); - const syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber); - assert(syncStatus); - // Check if chain pruning is caught up. if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) { await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);