diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 159e52ed..5cf60b1f 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -253,6 +253,12 @@ export class Database implements DatabaseInterface { return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force); } + async forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); + + return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber); + } + async getSyncStatus (queryRunner: QueryRunner): Promise { const repo = queryRunner.manager.getRepository(SyncStatus); @@ -271,6 +277,12 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned); } + async getLatestProcessedBlockProgress (isPruned: boolean): Promise { + const repo = this._conn.getRepository(BlockProgress); + + return this._baseDatabase.getLatestProcessedBlockProgress(repo, isPruned); + } + async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 134cd9ac..e884f237 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -509,7 +509,7 @@ export class Indexer implements IndexerInterface { if (!this._serverConfig.enableState) { return; } - + const dbTx = await this._db.createTransactionRunner(); let res; @@ -637,6 +637,10 @@ export class Indexer implements IndexerInterface { return syncStatus; } + async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber); + } + async getEvent (id: string): Promise { return this._baseIndexer.getEvent(id); } @@ -653,6 +657,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } + async getLatestProcessedBlockProgress (isPruned: boolean): Promise { + return this._db.getLatestProcessedBlockProgress(isPruned); + } + async fetchEventsAndSaveBlocks (blocks: DeepPartial[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial[] }[]> { return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); } diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 41756141..46f5685c 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -34,6 +34,7 @@ export const main = async (): Promise => { await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise => { await jobRunner.subscribeBlockProcessingQueue(); + await jobRunner.subscribeHistoricalProcessingQueue(); await jobRunner.subscribeEventProcessingQueue(); await jobRunner.subscribeBlockCheckpointQueue(); await jobRunner.subscribeHooksQueue(); diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 41d469f5..6af710c3 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -93,6 +93,12 @@ export class Indexer implements IndexerInterface { return []; } + async getLatestProcessedBlockProgress (isPruned: boolean): Promise { + assert(isPruned); + + return undefined; + } + async getBlockEvents (blockHash: string): Promise> { assert(blockHash); @@ -150,6 +156,13 @@ export class Indexer implements IndexerInterface { return {} as SyncStatusInterface; } + async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise { + assert(blockNumber); + assert(blockHash); + + return {} as SyncStatusInterface; + } + async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise { assert(blocks); diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 8860bec4..52803e98 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -129,7 +129,7 @@ export const fetchBlocksAtHeight = async ( cid, blockHash, parentHash, - blockTimestamp: timestamp + blockTimestamp: Number(timestamp) }); } @@ -182,6 +182,9 @@ export const _fetchBatchBlocks = async ( while (true) { console.time('time:common#fetchBatchBlocks-getBlocks'); + // TODO: Fetch logs by filter before fetching blocks + // TODO: Fetch only blocks needed for returned logs + // TODO: Save blocks and logs to DB const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber })); const settledResults = await Promise.allSettled(blockPromises); @@ -238,7 +241,7 @@ export const _fetchBatchBlocks = async ( } blocks.forEach(block => { - block.blockTimestamp = block.timestamp; + block.blockTimestamp = Number(block.timestamp); block.blockNumber = Number(block.blockNumber); }); @@ -265,7 +268,7 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block if (indexer.processBlockAfterEvents) { if (!dbBlock.isComplete) { - await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber); + await indexer.processBlockAfterEvents(dbBlock.blockHash, dbBlock.blockNumber); } } diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index a5b974cf..8861a660 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -6,6 +6,7 @@ export const MAX_REORG_DEPTH = 16; export const DIFF_MERGE_BATCH_SIZE = 10000; export const QUEUE_BLOCK_PROCESSING = 'block-processing'; +export const QUEUE_HISTORICAL_PROCESSING = 'historical-processing'; export const QUEUE_EVENT_PROCESSING = 'event-processing'; export const QUEUE_CHAIN_PRUNING = 'chain-pruning'; export const QUEUE_BLOCK_CHECKPOINT = 'block-checkpoint'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index bfc6ac8a..cae40647 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -166,6 +166,26 @@ export class Database { return await repo.save(entity); } + async forceUpdateSyncStatus (repo: Repository, blockHash: string, blockNumber: number): Promise { + let entity = await repo.findOne(); + + if (!entity) { + entity = repo.create({ + initialIndexedBlockHash: blockHash, + initialIndexedBlockNumber: blockNumber + }); + } + + entity.chainHeadBlockHash = blockHash; + entity.chainHeadBlockNumber = blockNumber; + entity.latestCanonicalBlockHash = blockHash; + entity.latestCanonicalBlockNumber = blockNumber; + entity.latestIndexedBlockHash = blockHash; + entity.latestIndexedBlockNumber = blockNumber; + + return await repo.save(entity); + } + async getBlockProgress (repo: Repository, blockHash: string): Promise { return repo.findOne({ where: { blockHash } }); } @@ -182,6 +202,14 @@ export class Database { .getMany(); } + async getLatestProcessedBlockProgress (repo: Repository, isPruned: boolean): Promise { + return repo.createQueryBuilder('block_progress') + .where('is_pruned = :isPruned AND is_complete = :isComplete', { isPruned, isComplete: true }) + .orderBy('block_number', 'DESC') + .limit(1) + .getOne(); + } + async saveBlockProgress (repo: Repository, block: DeepPartial): Promise { blockProgressCount.inc(1); diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 6fbe092f..e0560595 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -5,12 +5,15 @@ import assert from 'assert'; import debug from 'debug'; import { PubSub } from 'graphql-subscriptions'; +import PgBoss from 'pg-boss'; +import { constants } from 'ethers'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } from './types'; -import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; +import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants'; import { createPruningJob, processBlockByNumber } from './common'; import { OrderDirection } from './database'; +import { HISTORICAL_BLOCKS_BATCH_SIZE, HistoricalJobData } from './job-runner'; const EVENT = 'event'; @@ -26,6 +29,7 @@ export class EventWatcher { _shutDown = false; _signalCount = 0; + _historicalProcessingEndBlockNumber = 0; constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; @@ -44,6 +48,7 @@ export class EventWatcher { async start (): Promise { await this.initBlockProcessingOnCompleteHandler(); + await this.initHistoricalProcessingOnCompleteHandler(); await this.initEventProcessingOnCompleteHandler(); this.startBlockProcessing(); @@ -57,6 +62,12 @@ export class EventWatcher { }); } + async initHistoricalProcessingOnCompleteHandler (): Promise { + this._jobQueue.onComplete(QUEUE_HISTORICAL_PROCESSING, async (job) => { + await this.historicalProcessingCompleteHandler(job); + }); + } + async initEventProcessingOnCompleteHandler (): Promise { await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { await this.eventProcessingCompleteHandler(job); @@ -64,17 +75,44 @@ export class EventWatcher { } async startBlockProcessing (): Promise { - const syncStatus = await this._indexer.getSyncStatus(); - let startBlockNumber: number; + // Get latest block in chain and sync status from DB. + const [{ block: latestBlock }, syncStatus] = await Promise.all([ + this._ethClient.getBlockByHash(), + this._indexer.getSyncStatus() + ]); - if (!syncStatus) { - // Get latest block in chain. - const { block: currentBlock } = await this._ethClient.getBlockByHash(); - startBlockNumber = currentBlock.number; - } else { + const latestCanonicalBlockNumber = latestBlock.number - MAX_REORG_DEPTH; + let startBlockNumber = latestBlock.number; + + if (syncStatus) { startBlockNumber = syncStatus.chainHeadBlockNumber + 1; } + // Check if starting block for watcher is before latest canonical block + if (startBlockNumber < latestCanonicalBlockNumber) { + await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber); + + return; + } + + await this.startRealtimeBlockProcessing(startBlockNumber); + } + + async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise { + this._historicalProcessingEndBlockNumber = endBlockNumber; + log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`); + + // Push job for historical block processing + await this._jobQueue.pushJob( + QUEUE_HISTORICAL_PROCESSING, + { + blockNumber: startBlockNumber + } + ); + } + + async startRealtimeBlockProcessing (startBlockNumber: number): Promise { + log(`Starting realtime block processing from block ${startBlockNumber}`); await processBlockByNumber(this._jobQueue, startBlockNumber); // Creating an AsyncIterable from AsyncIterator to iterate over the values. @@ -139,6 +177,67 @@ export class EventWatcher { } } + async historicalProcessingCompleteHandler (job: PgBoss.Job): Promise { + const { id, data: { failed, request: { data } } } = job; + const { blockNumber }: HistoricalJobData = data; + + if (failed) { + log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`); + return; + } + + // TODO: Get batch size from config + const nextBatchStartBlockNumber = blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE + 1; + log(`Historical block processing completed for block range: ${blockNumber} to ${nextBatchStartBlockNumber}`); + + // Check if historical processing endBlock / latest canonical block is reached + if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) { + let newSyncStatusBlock: { + blockNumber: number; + blockHash: string; + } | undefined; + + // Fetch latest processed block from DB + const latestProcessedBlock = await this._indexer.getLatestProcessedBlockProgress(false); + + if (latestProcessedBlock) { + if (latestProcessedBlock.blockNumber > this._historicalProcessingEndBlockNumber) { + // Set new sync status to latest processed block + newSyncStatusBlock = { + blockHash: latestProcessedBlock.blockHash, + blockNumber: latestProcessedBlock.blockNumber + }; + } + } + + if (!newSyncStatusBlock) { + const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber }); + + newSyncStatusBlock = { + // At latestCanonicalBlockNumber height null block might be returned in case of FEVM + blockHash: block ? block.blockHash : constants.AddressZero, + blockNumber: this._historicalProcessingEndBlockNumber + }; + } + + // Update sync status to max of latest processed block or latest canonical block + const syncStatus = await this._indexer.forceUpdateSyncStatus(newSyncStatusBlock.blockHash, newSyncStatusBlock.blockNumber); + log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`); + // Start realtime processing + this.startBlockProcessing(); + + return; + } + + // Push job for next batch of blocks + await this._jobQueue.pushJob( + QUEUE_HISTORICAL_PROCESSING, + { + blockNumber: nextBatchStartBlockNumber + } + ); + } + async eventProcessingCompleteHandler (job: any): Promise { const { id, data: { request, failed, state, createdOn } } = job; diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index b4cd082e..8e5ed001 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -130,7 +130,13 @@ const prefetchBlocks = async ( const blockProgress = await indexer.getBlockProgress(blockHash); if (!blockProgress) { - await indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + await indexer.saveBlockAndFetchEvents({ + cid, + blockHash, + blockNumber: Number(blockNumber), + parentHash, + blockTimestamp: Number(timestamp) + }); } }); diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index 8969c504..b8ffdfd8 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -22,7 +22,8 @@ export const indexBlock = async ( const blocks = await indexer.getBlocks({ blockNumber: argv.block }); blockProgressEntities = blocks.map((block: any): Partial => { - block.blockTimestamp = block.timestamp; + block.blockTimestamp = Number(block.timestamp); + block.blockNumber = Number(block.blockNumber); return block; }); diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index bced3783..663b0e7f 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -194,6 +194,23 @@ export class Indexer { return res; } + async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.forceUpdateSyncStatus(dbTx, blockHash, blockNumber); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; + } + async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise { assert(blockFilter.blockHash || blockFilter.blockNumber); const result = await this._ethClient.getBlocks(blockFilter); diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 88a7d56c..febc5a13 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -13,7 +13,7 @@ interface Config { maxCompletionLag: number } -type JobCallback = (job: any) => Promise; +type JobCallback = (job: PgBoss.JobWithDoneCallback) => Promise; const JOBS_PER_INTERVAL = 5; @@ -93,7 +93,7 @@ export class JobQueue { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, - async (job: any) => { + async (job) => { try { log(`Processing queue ${queue} job ${job.id}...`); await callback(job); @@ -114,7 +114,7 @@ export class JobQueue { teamSize: JOBS_PER_INTERVAL, teamConcurrency: 1 }, - async (job: any) => { + async (job: PgBoss.JobWithDoneCallback) => { try { const { id, data: { failed, createdOn } } = job; log(`Job onComplete for queue ${queue} job ${id} created ${createdOn} success ${!failed}`); @@ -128,7 +128,7 @@ export class JobQueue { ); } - async markComplete (job: any): Promise { + async markComplete (job: PgBoss.Job): Promise { this._boss.complete(job.id); } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 9cc90e1b..aa1afadb 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -6,6 +6,7 @@ import assert from 'assert'; import debug from 'debug'; import { ethers } from 'ethers'; import { DeepPartial, In } from 'typeorm'; +import PgBoss from 'pg-boss'; import { JobQueueConfig } from './config'; import { @@ -17,7 +18,8 @@ import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_BLOCK_CHECKPOINT, - QUEUE_HOOKS + QUEUE_HOOKS, + QUEUE_HISTORICAL_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; @@ -34,6 +36,16 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber const log = debug('vulcanize:job-runner'); +// Wait time for retrying events processing on error (in ms) +const EVENTS_PROCESSING_RETRY_WAIT = 2000; + +// TODO: Get batch size from config +export const HISTORICAL_BLOCKS_BATCH_SIZE = 100; + +export interface HistoricalJobData { + blockNumber: number; +} + export class JobRunner { jobQueue: JobQueue; _indexer: IndexerInterface; @@ -57,6 +69,12 @@ export class JobRunner { }); } + async subscribeHistoricalProcessingQueue (): Promise { + await this.jobQueue.subscribe(QUEUE_HISTORICAL_PROCESSING, async (job) => { + await this.processHistoricalBlocks(job); + }); + } + async subscribeEventProcessingQueue (): Promise { await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { await this.processEvent(job); @@ -129,6 +147,28 @@ export class JobRunner { await this.jobQueue.markComplete(job); } + async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback): Promise { + const { data: { blockNumber: startBlock } } = job; + const endBlock = startBlock + HISTORICAL_BLOCKS_BATCH_SIZE; + + log(`Processing historical blocks from ${startBlock} to ${endBlock}`); + // TODO: Use method from common.ts to fetch and save filtered logs and blocks + const blocks: BlockProgressInterface[] = []; + + // Push event processing job for each block + const pushJobForBlockPromises = blocks.map(async block => this.jobQueue.pushJob( + QUEUE_EVENT_PROCESSING, + { + kind: JOB_KIND_EVENTS, + blockHash: block.blockHash, + publish: true + } + )); + + await Promise.all(pushJobForBlockPromises); + await this.jobQueue.markComplete(job); + } + async processEvent (job: any): Promise { const { data: { kind } } = job; @@ -360,8 +400,9 @@ export class JobRunner { console.timeEnd('time:job-runner#_indexBlock-get-block-progress-entities'); // Check if parent block has been processed yet, if not, push a high priority job to process that first and abort. - // However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep. - if (blockHash !== syncStatus.latestCanonicalBlockHash) { + // However, don't go beyond the `latestCanonicalBlockNumber` from SyncStatus as we have to assume the reorg can't be that deep. + // latestCanonicalBlockNumber is used to handle null blocks in case of FEVM. + if (blockNumber > syncStatus.latestCanonicalBlockNumber) { // Create a higher priority job to index parent block and then abort. // We don't have to worry about aborting as this job will get retried later. const newPriority = (priority || 0) + 1; @@ -382,9 +423,9 @@ export class JobRunner { kind: JOB_KIND_INDEX, cid: parentCid, blockHash: parentHash, - blockNumber: parentBlockNumber, + blockNumber: Number(parentBlockNumber), parentHash: grandparentHash, - timestamp: parentTimestamp, + timestamp: Number(parentTimestamp), priority: newPriority }, { priority: newPriority }); @@ -418,8 +459,6 @@ export class JobRunner { await this._indexer.removeUnknownEvents(parentBlock); console.timeEnd('time:job-runner#_indexBlock-remove-unknown-events'); } - } else { - blockProgress = parentBlock; } if (!blockProgress) { @@ -442,7 +481,9 @@ export class JobRunner { } } - await this._indexer.processBlock(blockProgress); + if (!blockProgress.isComplete) { + await this._indexer.processBlock(blockProgress); + } // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. @@ -455,40 +496,62 @@ export class JobRunner { async _processEvents (job: any): Promise { const { blockHash } = job.data; - if (!this._blockAndEventsMap.has(blockHash)) { - console.time('time:job-runner#_processEvents-get-block-progress'); - const block = await this._indexer.getBlockProgress(blockHash); - console.timeEnd('time:job-runner#_processEvents-get-block-progress'); + try { + if (!this._blockAndEventsMap.has(blockHash)) { + console.time('time:job-runner#_processEvents-get-block-progress'); + const block = await this._indexer.getBlockProgress(blockHash); + console.timeEnd('time:job-runner#_processEvents-get-block-progress'); - assert(block); - this._blockAndEventsMap.set(blockHash, { block, events: [] }); - } + assert(block); + this._blockAndEventsMap.set(blockHash, { block, events: [] }); + } - const prefetchedBlock = this._blockAndEventsMap.get(blockHash); - assert(prefetchedBlock); + const prefetchedBlock = this._blockAndEventsMap.get(blockHash); + assert(prefetchedBlock); - const { block } = prefetchedBlock; + const { block } = prefetchedBlock; - console.time('time:job-runner#_processEvents-events'); - await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder); - console.timeEnd('time:job-runner#_processEvents-events'); + console.time('time:job-runner#_processEvents-events'); + await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder); + console.timeEnd('time:job-runner#_processEvents-events'); - // Update metrics - lastProcessedBlockNumber.set(block.blockNumber); - lastBlockNumEvents.set(block.numEvents); + // Update metrics + lastProcessedBlockNumber.set(block.blockNumber); + lastBlockNumEvents.set(block.numEvents); - this._blockAndEventsMap.delete(block.blockHash); + this._blockAndEventsMap.delete(block.blockHash); - if (this._endBlockProcessTimer) { - this._endBlockProcessTimer(); - } + if (this._endBlockProcessTimer) { + this._endBlockProcessTimer(); + } - this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); + this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); - if (this._shutDown) { - log(`Graceful shutdown after processing block ${block.blockNumber}`); - this.jobQueue.stop(); - process.exit(0); + if (this._shutDown) { + log(`Graceful shutdown after processing block ${block.blockNumber}`); + this.jobQueue.stop(); + process.exit(0); + } + } catch (error) { + log(`Error in processing events for block ${blockHash}`); + log(error); + + // TODO: Remove processed entities for current block to avoid reprocessing of events + + // Catch event processing error and push to job queue after some time with higher priority + log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`); + await wait(EVENTS_PROCESSING_RETRY_WAIT); + await this.jobQueue.pushJob( + QUEUE_EVENT_PROCESSING, + { + kind: JOB_KIND_EVENTS, + blockHash: blockHash, + publish: true + }, + { + priority: 1 + } + ); } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 89c73b5b..c41b462b 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -91,6 +91,7 @@ export interface IndexerInterface { getStateSyncStatus (): Promise getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise getBlocksAtHeight (height: number, isPruned: boolean): Promise + getLatestProcessedBlockProgress (isPruned: boolean): Promise getLatestCanonicalBlock (): Promise getLatestStateIndexedBlock (): Promise getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> @@ -102,6 +103,7 @@ export interface IndexerInterface { updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise + forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise @@ -164,6 +166,7 @@ export interface DatabaseInterface { updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; + forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; saveEvents (queryRunner: QueryRunner, events: DeepPartial[]): Promise; saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise;