Move update sync status outside job-runner (#324)

This commit is contained in:
nikugogoi 2021-12-21 15:46:05 +05:30 committed by GitHub
parent 2bcf579859
commit 7b6f6e468f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 12 deletions

View File

@ -87,6 +87,8 @@ export const processBlockByNumber = async (
}
}
await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber);
return;
}

View File

@ -77,7 +77,7 @@ export class EventWatcher {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
if (isComplete) {
processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1);
await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1);
}
}
}

View File

@ -9,7 +9,7 @@ import { In } from 'typeorm';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface } from './types';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
import { wait } from './misc';
import { createPruningJob } from './common';
import { OrderDirection } from './database';
@ -33,13 +33,16 @@ export class JobRunner {
async processBlock (job: any): Promise<void> {
const { data: { kind } } = job;
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
switch (kind) {
case JOB_KIND_INDEX:
await this._indexBlock(job);
await this._indexBlock(job, syncStatus);
break;
case JOB_KIND_PRUNE:
await this._pruneChain(job);
await this._pruneChain(job, syncStatus);
break;
default:
@ -70,12 +73,9 @@ export class JobRunner {
await this._jobQueue.markComplete(job);
}
async _pruneChain (job: any): Promise<void> {
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> {
const { pruneBlockHeight } = job.data;
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
log(`Processing chain pruning at ${pruneBlockHeight}`);
// Assert we're at a depth where pruning is safe.
@ -116,7 +116,7 @@ export class JobRunner {
}
}
async _indexBlock (job: any): Promise<void> {
async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise<void> {
const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
const indexBlockStartTime = new Date();
@ -131,9 +131,6 @@ export class JobRunner {
this._blockProcessStartTime = indexBlockStartTime;
log(`Processing block number ${blockNumber} hash ${blockHash} `);
const syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
assert(syncStatus);
// Check if chain pruning is caught up.
if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);