Fix job-runner stop and restart in uni-watcher (#319)

* Fix job-runner stop and restart in uni-watcher

* Fix blocks processed twice after processing missing parent block
This commit is contained in:
nikugogoi 2021-12-17 11:57:09 +05:30 committed by GitHub
parent d087667177
commit cfd293f11b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 85 additions and 65 deletions

View File

@ -70,6 +70,8 @@ export const handler = async (argv: any): Promise<void> => {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();

View File

@ -132,10 +132,10 @@ export class Database {
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {

View File

@ -320,8 +320,8 @@ export class Indexer {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber, force);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {

View File

@ -88,6 +88,8 @@ export const handler = async (argv: any): Promise<void> => {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();

View File

@ -628,10 +628,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {

View File

@ -324,8 +324,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber, force);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {

View File

@ -64,6 +64,8 @@ export const handler = async (argv: any): Promise<void> => {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}
await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();

View File

@ -103,10 +103,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber);
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {

View File

@ -384,8 +384,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber, force);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {

View File

@ -9,9 +9,9 @@
"decimal.js": "^10.3.1",
"ethers": "^5.2.0",
"fs-extra": "^10.0.0",
"lodash": "^4.17.21",
"pg-boss": "^6.1.0",
"toml": "^3.0.0",
"lodash": "^4.17.21"
"toml": "^3.0.0"
},
"devDependencies": {
"@types/fs-extra": "^9.0.11",

View File

@ -46,9 +46,16 @@ export const processBlockByNumber = async (
): Promise<void> => {
log(`Process block ${blockNumber}`);
while (true) {
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
console.time('time:common#processBlockByNumber-get-blockProgress-syncStatus');
const [blockProgressEntities, syncStatus] = await Promise.all([
indexer.getBlocksAtHeight(blockNumber, false),
indexer.getSyncStatus()
]);
console.timeEnd('time:common#processBlockByNumber-get-blockProgress-syncStatus');
while (true) {
let blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
@ -65,12 +72,8 @@ export const processBlockByNumber = async (
for (let bi = 0; bi < blocks.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blocks[bi];
console.time('time:common#processBlockByNumber-updateSyncStatusChainHead');
const syncStatus = await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
console.timeEnd('time:common#processBlockByNumber-updateSyncStatusChainHead');
// Stop old blocks from getting pushed to job queue. They are already retried after fail.
if (syncStatus.latestIndexedBlockNumber < blockNumber) {
// Stop blocks already pushed to job queue. They are already retried after fail.
if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) {
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
}
}

View File

@ -125,7 +125,7 @@ export class Database {
return await repo.save(entity);
}
async updateSyncStatusChainHead (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
async updateSyncStatusChainHead (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
@ -138,7 +138,7 @@ export class Database {
});
}
if (blockNumber >= entity.chainHeadBlockNumber) {
if (force || blockNumber >= entity.chainHeadBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}

View File

@ -50,19 +50,19 @@ export class EventWatcher {
async startBlockProcessing (): Promise<void> {
const syncStatus = await this._indexer.getSyncStatus();
let blockNumber;
let startBlockNumber;
if (!syncStatus) {
// Get latest block in chain.
const { block: currentBlock } = await this._ethClient.getBlockByHash();
blockNumber = currentBlock.number + 1;
startBlockNumber = currentBlock.number;
} else {
blockNumber = syncStatus.latestIndexedBlockNumber + 1;
startBlockNumber = syncStatus.chainHeadBlockNumber + 1;
}
const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig;
processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1);
processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, startBlockNumber);
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -148,20 +148,19 @@ export class EventWatcher {
const [blockProgress, syncStatus] = await Promise.all([
this._indexer.getBlockProgress(blockHash),
// Update sync progress.
this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber)
]);
// Create pruning job if required.
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
}
// Publish block progress event if no events exist.
// Event for blocks with events will be pusblished from eventProcessingCompleteHandler.
if (blockProgress && blockProgress.numEvents === 0) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
// Create pruning job if required.
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
}
}
async _handlePruningComplete (jobData: any): Promise<void> {

View File

@ -29,23 +29,19 @@ export const fillBlocks = async (
const syncStatus = await indexer.getSyncStatus();
if (prefetch) {
if (syncStatus) {
if (startBlock > syncStatus.chainHeadBlockNumber + 1) {
throw new Error(`Missing blocks between startBlock ${startBlock} and chainHeadBlockNumber ${syncStatus.chainHeadBlockNumber}`);
}
startBlock = syncStatus.chainHeadBlockNumber + 1;
}
if (prefetch) {
await prefetchBlocks(indexer, blockDelayInMilliSecs, { startBlock, endBlock, batchBlocks });
return;
}
if (syncStatus && syncStatus.latestIndexedBlockNumber > -1) {
if (startBlock > syncStatus.latestIndexedBlockNumber + 1) {
throw new Error(`Missing blocks between startBlock ${startBlock} and latestIndexedBlockNumber ${syncStatus.latestIndexedBlockNumber}`);
}
startBlock = syncStatus.latestIndexedBlockNumber + 1;
}
await eventWatcher.initBlockProcessingOnCompleteHandler();
await eventWatcher.initEventProcessingOnCompleteHandler();
@ -135,7 +131,7 @@ const prefetchBlocks = async (
await Promise.all(fetchBlockPromises);
} catch (error: any) {
log(error.message);
log('Exiting gracefully');
log('Exiting as upstream block not available for prefetch');
process.exit(0);
}
}

View File

@ -91,12 +91,12 @@ export class Indexer {
return res;
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber);
res = await this._db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber, force);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();

View File

@ -34,6 +34,7 @@ export class JobQueue {
retryDelay: 1,
retryBackoff: true,
// Time before active job fails by expiration.
expireInHours: 24 * 7, // 7 days
retentionDays: 30, // 30 days

View File

@ -9,7 +9,7 @@ import { In } from 'typeorm';
import { JobQueueConfig } from './config';
import { JOB_KIND_INDEX, JOB_KIND_PRUNE, JOB_KIND_EVENTS, JOB_KIND_CONTRACT, MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
import { EventInterface, IndexerInterface } from './types';
import { wait } from './misc';
import { createPruningJob } from './common';
import { OrderDirection } from './database';
@ -32,16 +32,13 @@ export class JobRunner {
async processBlock (job: any): Promise<void> {
const { data: { kind } } = job;
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
switch (kind) {
case JOB_KIND_INDEX:
await this._indexBlock(job, syncStatus);
await this._indexBlock(job);
break;
case JOB_KIND_PRUNE:
await this._pruneChain(job, syncStatus);
await this._pruneChain(job);
break;
default:
@ -72,9 +69,12 @@ export class JobRunner {
await this._jobQueue.markComplete(job);
}
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> {
async _pruneChain (job: any): Promise<void> {
const { pruneBlockHeight } = job.data;
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
log(`Processing chain pruning at ${pruneBlockHeight}`);
// Assert we're at a depth where pruning is safe.
@ -115,11 +115,15 @@ export class JobRunner {
}
}
async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise<void> {
console.time('time:job-runner#_indexBlock');
async _indexBlock (job: any): Promise<void> {
const indexBlockStartTime = new Date();
const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
log(`Processing block number ${blockNumber} hash ${blockHash} `);
const syncStatus = await this._indexer.updateSyncStatusChainHead(blockHash, blockNumber);
assert(syncStatus);
// Check if chain pruning is caught up.
if ((syncStatus.latestIndexedBlockNumber - syncStatus.latestCanonicalBlockNumber) > MAX_REORG_DEPTH) {
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
@ -143,6 +147,10 @@ export class JobRunner {
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
if (blockHash !== syncStatus.latestCanonicalBlockHash) {
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1;
if (!parentBlock || parentBlock.blockHash !== parentHash) {
const blocks = await this._indexer.getBlocks({ blockHash: parentHash });
@ -155,9 +163,6 @@ export class JobRunner {
const [{ blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks;
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
const newPriority = (priority || 0) + 1;
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
kind: JOB_KIND_INDEX,
blockHash: parentHash,
@ -178,6 +183,15 @@ export class JobRunner {
const message = `Indexing incomplete for parent block number ${parentBlock.blockNumber} hash ${parentHash} of block number ${blockNumber} hash ${blockHash}, aborting`;
log(message);
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
kind: JOB_KIND_INDEX,
blockHash: parentHash,
blockNumber: parentBlock.blockNumber,
parentHash: parentBlock.parentHash,
timestamp: parentBlock.blockTimestamp,
priority: newPriority
}, { priority: newPriority });
throw new Error(message);
}
} else {
@ -192,12 +206,13 @@ export class JobRunner {
blockProgress = await this._indexer.fetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
}
// Check if block is being already processed.
if (blockProgress.numProcessedEvents === 0 && blockProgress.numEvents) {
// Check if block has unprocessed events.
if (blockProgress.numProcessedEvents < blockProgress.numEvents) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
}
console.timeEnd('time:job-runner#_indexBlock');
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
}
async _processEvents (job: any): Promise<void> {

View File

@ -61,7 +61,7 @@ export interface IndexerInterface {
fetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>;
@ -93,7 +93,7 @@ export interface DatabaseInterface {
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;