From f2c5f6777770f0d635bf0abd61ba5a25bf56eded Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Wed, 8 Nov 2023 14:28:53 +0530 Subject: [PATCH] Handle restarts during historical processing in watcher (#455) * Reset to latest processed block on restarting job-runner * Update sync status during historical processing in job-runner * Codegen changes * Use sync status latest processed block for subgraph _meta GQL query * Set job per interval for subscribing events queue to 1 * Fix events processing skipped for blocks after template create --- packages/cli/src/job-runner.ts | 3 +- .../codegen/src/data/entities/SyncStatus.yaml | 11 ++ packages/codegen/src/schema.ts | 18 ++- .../templates/database-template.handlebars | 6 +- .../src/templates/indexer-template.handlebars | 6 +- packages/graph-node/test/utils/indexer.ts | 7 +- packages/rpc-eth-client/src/eth-client.ts | 4 +- packages/util/src/common.ts | 8 +- packages/util/src/database.ts | 29 ++-- packages/util/src/events.ts | 30 +---- packages/util/src/indexer.ts | 59 +++++---- packages/util/src/job-queue.ts | 29 +++- packages/util/src/job-runner.ts | 124 ++++++++++++------ packages/util/src/types.ts | 10 +- 14 files changed, 207 insertions(+), 137 deletions(-) diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 1cd9814f..10cb39dc 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -114,7 +114,8 @@ export class JobRunnerCmd { // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs await jobRunner.jobQueue.deleteAllJobs('completed'); - await jobRunner.resetToPrevIndexedBlock(); + + await jobRunner.resetToLatestProcessedBlock(); await indexer.updateSyncStatusIndexingError(false); await startJobRunner(jobRunner); diff --git a/packages/codegen/src/data/entities/SyncStatus.yaml b/packages/codegen/src/data/entities/SyncStatus.yaml index 1fb487f7..ce2b7a4d 100644 --- a/packages/codegen/src/data/entities/SyncStatus.yaml +++ b/packages/codegen/src/data/entities/SyncStatus.yaml @@ -27,6 +27,17 @@ columns: pgType: integer tsType: number columnType: Column + - name: latestProcessedBlockHash + pgType: varchar + tsType: string + columnType: Column + columnOptions: + - option: length + value: 66 + - name: latestProcessedBlockNumber + pgType: integer + tsType: number + columnType: Column - name: latestCanonicalBlockHash pgType: varchar tsType: string diff --git a/packages/codegen/src/schema.ts b/packages/codegen/src/schema.ts index 4d372256..88a6f146 100644 --- a/packages/codegen/src/schema.ts +++ b/packages/codegen/src/schema.ts @@ -460,16 +460,28 @@ export class Schema { } _addMeta (): void { - const typeComposer = this._composer.createObjectTC({ + // Create the Block type. + const metaBlocktypeComposer = this._composer.createObjectTC({ + name: '_MetaBlock_', + fields: { + hash: 'Bytes', + number: 'Int!', + timestamp: 'Int' + } + }); + + this._composer.addSchemaMustHaveType(metaBlocktypeComposer); + + const metaTypeComposer = this._composer.createObjectTC({ name: '_Meta_', fields: { - block: this._composer.getOTC('_Block_').NonNull, + block: metaBlocktypeComposer.NonNull, deployment: { type: new GraphQLNonNull(GraphQLString) }, hasIndexingErrors: { type: new GraphQLNonNull(GraphQLBoolean) } } }); - this._composer.addSchemaMustHaveType(typeComposer); + this._composer.addSchemaMustHaveType(metaTypeComposer); this._composer.Query.addFields({ _meta: { diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 760e8827..d2435f03 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -253,13 +253,13 @@ export class Database implements DatabaseInterface { return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force); } - async forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + async updateSyncStatusProcessedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise { const repo = queryRunner.manager.getRepository(SyncStatus); - return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber); + return this._baseDatabase.updateSyncStatusProcessedBlock(repo, blockHash, blockNumber, force); } - async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise { + async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise { const repo = queryRunner.manager.getRepository(SyncStatus); return this._baseDatabase.updateSyncStatusIndexingError(repo, hasIndexingError); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 163774fd..f1f13e8b 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -661,11 +661,11 @@ export class Indexer implements IndexerInterface { return syncStatus; } - async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber); + async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise { + return this._baseIndexer.updateSyncStatusProcessedBlock(blockHash, blockNumber, force); } - async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { + async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { return this._baseIndexer.updateSyncStatusIndexingError(hasIndexingError); } diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 7f09464e..aa2f8b1d 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -170,17 +170,18 @@ export class Indexer implements IndexerInterface { return {} as SyncStatusInterface; } - async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise { + async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise { assert(blockNumber); assert(blockHash); + assert(force); return {} as SyncStatusInterface; } - async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { + async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { assert(hasIndexingError); - return {} as SyncStatusInterface; + return undefined; } async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise { diff --git a/packages/rpc-eth-client/src/eth-client.ts b/packages/rpc-eth-client/src/eth-client.ts index 9b181fc3..e636b92c 100644 --- a/packages/rpc-eth-client/src/eth-client.ts +++ b/packages/rpc-eth-client/src/eth-client.ts @@ -285,8 +285,8 @@ export class EthClient implements EthClientInterface { 'eth_getLogs', [{ address: addresses.map(address => address.toLowerCase()), - fromBlock: fromBlock && utils.hexlify(fromBlock), - toBlock: toBlock && utils.hexlify(toBlock), + fromBlock: fromBlock && utils.hexValue(fromBlock), + toBlock: toBlock && utils.hexValue(toBlock), blockHash, topics }] diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index d1f8cba5..97f9183f 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -384,20 +384,20 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B // Check if we are out of events. while (numFetchedEvents < block.numEvents) { - console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch'); + console.time(`time:common#processEventsInSubgraphOrder-fetching_events_batch-${block.blockNumber}`); // Fetch events in batches const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page); page++; numFetchedEvents += events.length; - console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch'); + console.timeEnd(`time:common#processEventsInSubgraphOrder-fetching_events_batch-${block.blockNumber}`); if (events.length) { log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); } - console.time('time:common#processEventsInSubgraphOrder-processing_events_batch'); + console.time(`time:common#processEventsInSubgraphOrder-processing_events_batch-${block.blockNumber}`); // First process events for initially watched contracts const watchedContractEvents: EventInterface[] = []; @@ -417,7 +417,7 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B block.numProcessedEvents++; } - console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch'); + console.timeEnd(`time:common#processEventsInSubgraphOrder-processing_events_batch-${block.blockNumber}`); } const watchedContracts = indexer.getWatchedContracts().map(contract => contract.address); diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index a4b0e402..01b01d30 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -169,6 +169,8 @@ export class Database { latestCanonicalBlockNumber: blockNumber, latestIndexedBlockHash: '', latestIndexedBlockNumber: -1, + latestProcessedBlockHash: '', + latestProcessedBlockNumber: -1, initialIndexedBlockHash: blockHash, initialIndexedBlockNumber: blockNumber }); @@ -182,29 +184,24 @@ export class Database { return await repo.save(entity); } - async forceUpdateSyncStatus (repo: Repository, blockHash: string, blockNumber: number): Promise { - let entity = await repo.findOne(); + async updateSyncStatusProcessedBlock (repo: Repository, blockHash: string, blockNumber: number, force = false): Promise { + const entity = await repo.findOne(); + assert(entity); - if (!entity) { - entity = repo.create({ - initialIndexedBlockHash: blockHash, - initialIndexedBlockNumber: blockNumber - }); + if (force || blockNumber >= entity.latestProcessedBlockNumber) { + entity.latestProcessedBlockHash = blockHash; + entity.latestProcessedBlockNumber = blockNumber; } - entity.chainHeadBlockHash = blockHash; - entity.chainHeadBlockNumber = blockNumber; - entity.latestCanonicalBlockHash = blockHash; - entity.latestCanonicalBlockNumber = blockNumber; - entity.latestIndexedBlockHash = blockHash; - entity.latestIndexedBlockNumber = blockNumber; - return await repo.save(entity); } - async updateSyncStatusIndexingError (repo: Repository, hasIndexingError: boolean): Promise { + async updateSyncStatusIndexingError (repo: Repository, hasIndexingError: boolean): Promise { const entity = await repo.findOne(); - assert(entity); + + if (!entity) { + return; + } entity.hasIndexingError = hasIndexingError; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index a29e8d8d..654732a9 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -6,7 +6,6 @@ import assert from 'assert'; import debug from 'debug'; import { PubSub } from 'graphql-subscriptions'; import PgBoss from 'pg-boss'; -import { constants } from 'ethers'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types'; @@ -15,13 +14,9 @@ import { createPruningJob, processBlockByNumber } from './common'; import { OrderDirection } from './database'; import { HistoricalJobData, HistoricalJobResponseData } from './job-runner'; import { JobQueueConfig, ServerConfig } from './config'; -import { wait } from './misc'; const EVENT = 'event'; -// Time to wait for events queue to be empty -const EMPTY_EVENTS_QUEUE_WAIT_TIME = 5000; - const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000; const log = debug('vulcanize:events'); @@ -121,7 +116,7 @@ export class EventWatcher { async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise { // Wait for events job queue to be empty so that historical processing does not move far ahead - await this._waitForEmptyEventsQueue(); + await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); this._historicalProcessingEndBlockNumber = endBlockNumber; log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); @@ -136,19 +131,6 @@ export class EventWatcher { ); } - async _waitForEmptyEventsQueue (): Promise { - while (true) { - // Get queue size for active and pending jobs - const queueSize = await this._jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'completed'); - - if (queueSize === 0) { - break; - } - - await wait(EMPTY_EVENTS_QUEUE_WAIT_TIME); - } - } - async startRealtimeBlockProcessing (startBlockNumber: number): Promise { log(`Starting realtime block processing from block ${startBlockNumber}`); await processBlockByNumber(this._jobQueue, startBlockNumber); @@ -233,16 +215,6 @@ export class EventWatcher { // Check if historical processing end block is reached if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) { - const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber }); - const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero; - - // Update sync status chain head and canonical block to end block of historical processing - const [syncStatus] = await Promise.all([ - this._indexer.updateSyncStatusCanonicalBlock(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true), - this._indexer.updateSyncStatusChainHead(historicalProcessingEndBlockHash, this._historicalProcessingEndBlockNumber, true) - ]); - log(`Sync status canonical block updated to ${syncStatus.latestCanonicalBlockNumber}`); - // Start realtime processing this.startBlockProcessing(); return; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7aa40115..8a4ff473 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -92,11 +92,9 @@ export type ResultEvent = { export type ResultMeta = { block: { - cid: string | null; - hash: string; + hash: string | null; number: number; - timestamp: number; - parentHash: string; + timestamp: number | null; }; deployment: string; hasIndexingErrors: boolean; @@ -146,37 +144,42 @@ export class Indexer { } async getMetaData (block: BlockHeight): Promise { - let resultBlock: BlockProgressInterface | undefined; + const resultBlock: ResultMeta['block'] = { + hash: block.hash ?? null, + number: block.number ?? 0, + timestamp: null + }; const syncStatus = await this.getSyncStatus(); assert(syncStatus); if (block.hash) { - resultBlock = await this.getBlockProgress(block.hash); + const blockProgress = await this.getBlockProgress(block.hash); + assert(blockProgress, 'No block with hash found'); + resultBlock.number = blockProgress.blockNumber; + resultBlock.timestamp = blockProgress.blockTimestamp; } else { - const blockHeight = block.number ? block.number : syncStatus.latestIndexedBlockNumber - 1; + let blockHeight = block.number; + + if (!blockHeight) { + blockHeight = syncStatus.latestProcessedBlockNumber; + } // Get all the blocks at a height - const blocksAtHeight = await this.getBlocksAtHeight(blockHeight, false); + const [blockProgress] = await this.getBlocksAtHeight(blockHeight, false); - if (blocksAtHeight.length) { - resultBlock = blocksAtHeight[0]; + if (blockProgress) { + resultBlock.hash = blockProgress.blockHash; + resultBlock.number = blockProgress.blockNumber; + resultBlock.timestamp = blockProgress.blockTimestamp; } } - return resultBlock - ? { - block: { - cid: resultBlock.cid, - number: resultBlock.blockNumber, - hash: resultBlock.blockHash, - timestamp: resultBlock.blockTimestamp, - parentHash: resultBlock.parentHash - }, - deployment: '', - hasIndexingErrors: syncStatus.hasIndexingError - } - : null; + return { + block: resultBlock, + hasIndexingErrors: syncStatus.hasIndexingError, + deployment: '' + }; } async getSyncStatus (): Promise { @@ -247,12 +250,12 @@ export class Indexer { return res; } - async forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise { + async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.forceUpdateSyncStatus(dbTx, blockHash, blockNumber); + res = await this._db.updateSyncStatusProcessedBlock(dbTx, blockHash, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -264,7 +267,7 @@ export class Indexer { return res; } - async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { + async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; @@ -1320,6 +1323,10 @@ export class Indexer { await this.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); } + if (syncStatus.latestProcessedBlockNumber > blockProgress.blockNumber) { + await this.updateSyncStatusProcessedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); + } + if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { await this.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); } diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index b6b05839..b93f8e7d 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -7,6 +7,7 @@ import debug from 'debug'; import PgBoss from 'pg-boss'; import { jobCount, lastJobCompletedOn } from './metrics'; +import { wait } from './misc'; interface Config { dbConnectionString: string @@ -15,7 +16,11 @@ interface Config { type JobCallback = (job: PgBoss.JobWithDoneCallback) => Promise; -const JOBS_PER_INTERVAL = 5; +// Default number of jobs fetched from DB per polling interval (newJobCheckInterval) +const DEFAULT_JOBS_PER_INTERVAL = 5; + +// Interval time to check for events queue to be empty +const EMPTY_QUEUE_CHECK_INTERVAL = 5000; const log = debug('vulcanize:job-queue'); @@ -86,12 +91,13 @@ export class JobQueue { await this._boss.stop(); } - async subscribe (queue: string, callback: JobCallback): Promise { + async subscribe (queue: string, callback: JobCallback, subscribeOptions: PgBoss.SubscribeOptions = {}): Promise { return await this._boss.subscribe( queue, { - teamSize: JOBS_PER_INTERVAL, - teamConcurrency: 1 + teamSize: DEFAULT_JOBS_PER_INTERVAL, + teamConcurrency: 1, + ...subscribeOptions }, async (job) => { try { @@ -111,7 +117,7 @@ export class JobQueue { return await this._boss.onComplete( queue, { - teamSize: JOBS_PER_INTERVAL, + teamSize: DEFAULT_JOBS_PER_INTERVAL, teamConcurrency: 1 }, async (job: PgBoss.JobWithDoneCallback) => { @@ -154,4 +160,17 @@ export class JobQueue { async getQueueSize (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise { return this._boss.getQueueSize(name, { before }); } + + async waitForEmptyQueue (queue: string): Promise { + while (true) { + // Get queue size for active and pending jobs + const queueSize = await this.getQueueSize(queue, 'completed'); + + if (queueSize === 0) { + break; + } + + await wait(EMPTY_QUEUE_CHECK_INTERVAL); + } + } } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f284460a..44dd91ae 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { ethers } from 'ethers'; +import { constants, ethers } from 'ethers'; import { DeepPartial, In } from 'typeorm'; import PgBoss from 'pg-boss'; @@ -78,15 +78,27 @@ export class JobRunner { } async subscribeHistoricalProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_HISTORICAL_PROCESSING, async (job) => { - await this.processHistoricalBlocks(job); - }); + await this.jobQueue.subscribe( + QUEUE_HISTORICAL_PROCESSING, + async (job) => { + await this.processHistoricalBlocks(job); + }, + { + teamSize: 1 + } + ); } async subscribeEventProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - await this.processEvent(job); - }); + await this.jobQueue.subscribe( + QUEUE_EVENT_PROCESSING, + async (job) => { + await this.processEvent(job); + }, + { + teamSize: 1 + } + ); } async subscribeHooksQueue (): Promise { @@ -162,6 +174,9 @@ export class JobRunner { if (startBlock < this._historicalProcessingCompletedUpto) { await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING); + // Wait for events queue to be empty + await this.jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); + // Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto // This occurs when new contract is added (with filterLogsByAddresses set to true) and historical processing is restarted from a previous block log(`Restarting historical processing from block ${startBlock}`); @@ -191,8 +206,48 @@ export class JobRunner { endBlock ); + let batchEndBlockHash = constants.AddressZero; + const blocksLength = blocks.length; + + if (blocksLength) { + // Push event processing job for each block + const pushEventProcessingJobsForBlocksPromise = this._pushEventProcessingJobsForBlocks(blocks); + + if (blocks[blocksLength - 1].blockNumber === endBlock) { + // If in blocks returned end block is same as the batch end block, set batchEndBlockHash + batchEndBlockHash = blocks[blocksLength - 1].blockHash; + } else { + // Else fetch block hash from upstream for batch end block + const [block] = await this._indexer.getBlocks({ blockNumber: endBlock }); + + if (block) { + batchEndBlockHash = block.blockHash; + } + } + + await pushEventProcessingJobsForBlocksPromise; + } + + // Update sync status canonical, indexed and chain head block to end block + await Promise.all([ + this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, endBlock, true), + this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, endBlock, true), + this._indexer.updateSyncStatusChainHead(batchEndBlockHash, endBlock, true) + ]); + log(`Sync status canonical, indexed and chain head block updated to ${endBlock}`); + + this._historicalProcessingCompletedUpto = endBlock; + + await this.jobQueue.markComplete( + job, + { isComplete: true, endBlock } + ); + } + + async _pushEventProcessingJobsForBlocks (blocks: BlockProgressInterface[]): Promise { // Push event processing job for each block - const pushJobForBlockPromises = blocks.map(async block => { + // const pushJobForBlockPromises = blocks.map(async block => { + for (const block of blocks) { const eventsProcessingJob: EventsJobData = { kind: EventsQueueJobKind.EVENTS, blockHash: block.blockHash, @@ -201,16 +256,9 @@ export class JobRunner { // Publishing when realtime processing is listening to events will cause problems publish: false }; - this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); - }); - await Promise.all(pushJobForBlockPromises); - this._historicalProcessingCompletedUpto = endBlock; - - await this.jobQueue.markComplete( - job, - { isComplete: true, endBlock } - ); + await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); + } } async processEvent (job: PgBoss.JobWithDoneCallback): Promise { @@ -305,7 +353,7 @@ export class JobRunner { await this.jobQueue.markComplete(job); } - async resetToPrevIndexedBlock (): Promise { + async resetToLatestProcessedBlock (): Promise { const syncStatus = await this._indexer.getSyncStatus(); // Watcher running for first time if syncStatus does not exist @@ -313,17 +361,13 @@ export class JobRunner { return; } - const blockProgress = await this._indexer.getBlockProgress(syncStatus.latestIndexedBlockHash); + const blockProgress = await this._indexer.getBlockProgress(syncStatus.latestProcessedBlockHash); assert(blockProgress); + assert(blockProgress.isComplete); - // Don't reset to previous block if block is complete (all events processed) - if (blockProgress.isComplete) { - return; - } - - // Resetting to block before latest indexed block as all events should be processed in the previous block. - // Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented. - await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1); + // Resetting to block with events that have been processed completely + // Reprocessing of block events in subgraph watchers is not possible as DB transaction is not implemented. + await this._indexer.resetWatcherToBlock(blockProgress.blockNumber); } handleShutdown (): void { @@ -554,17 +598,17 @@ export class JobRunner { const prefetchedBlock = this._blockAndEventsMap.get(blockHash); assert(prefetchedBlock); - const { block } = prefetchedBlock; + log(`Processing events for block ${block.blockNumber}`); - console.time('time:job-runner#_processEvents-events'); + console.time(`time:job-runner#_processEvents-events-${block.blockNumber}`); const isNewContractWatched = await processBatchEvents( this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder ); - console.timeEnd('time:job-runner#_processEvents-events'); + console.timeEnd(`time:job-runner#_processEvents-events-${block.blockNumber}`); // Update metrics lastProcessedBlockNumber.set(block.blockNumber); @@ -574,13 +618,13 @@ export class JobRunner { // Check if new contract was added and filterLogsByAddresses is set to true if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) { - // Delete jobs for any pending events processing - await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING); - // Check if historical processing is running and that current block is being processed was trigerred by historical processing if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) { const nextBlockNumberToProcess = block.blockNumber + 1; + // Delete jobs for any pending historical processing + await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING); + // Push a new job to restart historical blocks processing after current block log('New contract added in historical processing with filterLogsByAddresses set to true'); await this.jobQueue.pushJob( @@ -592,6 +636,9 @@ export class JobRunner { { priority: 1 } ); } + + // Delete jobs for any pending events processing + await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING); } if (this._endBlockProcessTimer) { @@ -599,6 +646,7 @@ export class JobRunner { } this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); + await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber); // If this was a retry attempt, unset the indexing error flag in sync status if (isRetryAttempt) { @@ -620,18 +668,18 @@ export class JobRunner { // TODO: Remove processed entities for current block to avoid reprocessing of events - // Catch event processing error and push to job queue after some time with higher priority + // Catch event processing error and push job again to job queue with higher priority log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`); - await wait(EVENTS_PROCESSING_RETRY_WAIT); - - // TODO: Stop job for next block in queue (in historical processing) - const eventsProcessingRetryJob: EventsJobData = { ...jobData, isRetryAttempt: true }; + await this.jobQueue.pushJob( QUEUE_EVENT_PROCESSING, eventsProcessingRetryJob, { priority: 1 } ); + + // Wait for some time before retrying job + await wait(EVENTS_PROCESSING_RETRY_WAIT); } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 83770c65..ca5a6d9b 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -39,6 +39,8 @@ export interface SyncStatusInterface { chainHeadBlockNumber: number; latestIndexedBlockHash: string; latestIndexedBlockNumber: number; + latestProcessedBlockHash: string; + latestProcessedBlockNumber: number; latestCanonicalBlockHash: string; latestCanonicalBlockNumber: number; initialIndexedBlockHash: string; @@ -107,8 +109,8 @@ export interface IndexerInterface { updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise - forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise - updateSyncStatusIndexingError (hasIndexingError: boolean): Promise + updateSyncStatusIndexingError (hasIndexingError: boolean): Promise + updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise @@ -171,8 +173,8 @@ export interface DatabaseInterface { updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; - forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; - updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise; + updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise; + updateSyncStatusProcessedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; saveEvents (queryRunner: QueryRunner, events: DeepPartial[]): Promise; saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise;