diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 351c63ca..589e6178 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -759,6 +759,19 @@ export class Indexer implements IndexerInterface { await this.resetLatestEntities(blockNumber); {{/if}} } + + async clearProcessedBlockData (block: BlockProgress): Promise { + {{#if (subgraphPath)}} + const entities = [...ENTITIES, FrothyEntity]; + {{else}} + const entities = [...ENTITIES]; + {{/if}} + await this._baseIndexer.clearProcessedBlockData(block, entities); + {{#if (subgraphPath)}} + + await this.resetLatestEntities(block.blockNumber); + {{/if}} + } {{#if (subgraphPath)}} getEntityTypesMap (): Map { diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 76e2e787..110dc661 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -318,6 +318,10 @@ export class Indexer implements IndexerInterface { return undefined; } + async clearProcessedBlockData (block: BlockProgressInterface): Promise { + return undefined; + } + cacheContract (contract: ContractInterface): void { return undefined; } diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 6ad77193..fbba9245 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -66,32 +66,36 @@ export class EventWatcher { } async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this.blockProcessingCompleteHandler(job); - }); + this._jobQueue.onComplete( + QUEUE_BLOCK_PROCESSING, + async (job) => this.blockProcessingCompleteHandler(job) + ); } async initHistoricalProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_HISTORICAL_PROCESSING, async (job) => { - await this.historicalProcessingCompleteHandler(job); - }); + this._jobQueue.onComplete( + QUEUE_HISTORICAL_PROCESSING, + async (job) => this.historicalProcessingCompleteHandler(job) + ); } async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this.eventProcessingCompleteHandler(job); - }); + await this._jobQueue.onComplete( + QUEUE_EVENT_PROCESSING, + async (job) => this.eventProcessingCompleteHandler(job as PgBoss.JobWithMetadata), + { includeMetadata: true } + ); } async startBlockProcessing (): Promise { // Get latest block in chain and sync status from DB. const [{ block: latestBlock }, syncStatus] = await Promise.all([ this._ethClient.getBlockByHash(), - this._indexer.getSyncStatus() + this._indexer.getSyncStatus(), + // Wait for events job queue to be empty before starting historical or realtime processing + this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING) ]); - // Wait for events job queue to be empty before starting historical or realtime processing - await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING); const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed'); // Stop if there are active or pending historical processing jobs @@ -110,14 +114,7 @@ export class EventWatcher { // Check if filter for logs is enabled // Check if starting block for watcher is before latest canonical block if (this._config.jobQueue.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) { - let endBlockNumber = latestCanonicalBlockNumber; - const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD; - - if (historicalMaxFetchAhead > 0) { - endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber); - } - - await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber); + await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber); return; } @@ -125,7 +122,19 @@ export class EventWatcher { await this.startRealtimeBlockProcessing(startBlockNumber); } - async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise { + async startHistoricalBlockProcessing (startBlockNumber: number, latestCanonicalBlockNumber: number): Promise { + if (this._realtimeProcessingStarted) { + // Do not start historical processing if realtime processing has already started + return; + } + + let endBlockNumber = latestCanonicalBlockNumber; + const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD; + + if (historicalMaxFetchAhead > 0) { + endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber); + } + this._historicalProcessingEndBlockNumber = endBlockNumber; log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); @@ -140,7 +149,7 @@ export class EventWatcher { } async startRealtimeBlockProcessing (startBlockNumber: number): Promise { - // Check if realtime processing already started and avoid resubscribing to block progress event + // Check if realtime processing already started if (this._realtimeProcessingStarted) { return; } @@ -246,8 +255,8 @@ export class EventWatcher { ); } - async eventProcessingCompleteHandler (job: PgBoss.Job): Promise { - const { id, data: { request: { data }, failed, state, createdOn } } = job; + async eventProcessingCompleteHandler (job: PgBoss.JobWithMetadata): Promise { + const { id, retrycount, data: { request: { data }, failed, state, createdOn } } = job; if (failed) { log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`); @@ -261,14 +270,21 @@ export class EventWatcher { } const { blockHash, publish }: EventsJobData = data; + const blockProgress = await this._indexer.getBlockProgress(blockHash); + assert(blockProgress); + + // Check if job was retried + if (retrycount > 0) { + // Reset watcher to remove any data after this block + await this._indexer.resetWatcherToBlock(blockProgress.blockNumber); + // Start block processing (Try restarting historical processing or continue realtime processing) + this.startBlockProcessing(); + } // Check if publish is set to true // Events and blocks are not published in historical processing // GQL subscription events will not be triggered if publish is set to false if (publish) { - const blockProgress = await this._indexer.getBlockProgress(blockHash); - assert(blockProgress); - await this.publishBlockProgressToSubscribers(blockProgress); const dbEvents = await this._indexer.getBlockEvents( diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 6ec2be96..3dcd152f 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, MoreThan } from 'typeorm'; +import { DeepPartial, EntityTarget, Equal, FindConditions, FindManyOptions, MoreThan } from 'typeorm'; import debug from 'debug'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; @@ -1355,6 +1355,7 @@ export class Indexer { } await this._db.deleteEntitiesByConditions(dbTx, 'contract', { startingBlock: MoreThan(blockNumber) }); + this._clearWatchedContracts((watchedContracts) => watchedContracts.startingBlock > blockNumber); await this._db.deleteEntitiesByConditions(dbTx, 'block_progress', { blockNumber: MoreThan(blockNumber) }); @@ -1396,6 +1397,36 @@ export class Indexer { } } + async clearProcessedBlockData (block: BlockProgressInterface, entities: EntityTarget<{ blockNumber: number }>[]): Promise { + const dbTx = await this._db.createTransactionRunner(); + + try { + for (const entity of entities) { + await this._db.deleteEntitiesByConditions(dbTx, entity, { blockHash: Equal(block.blockHash) }); + } + + await this._db.deleteEntitiesByConditions(dbTx, 'contract', { startingBlock: Equal(block.blockNumber) }); + this._clearWatchedContracts((watchedContracts) => watchedContracts.startingBlock === block.blockNumber); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + + _clearWatchedContracts (removFilter: (watchedContract: ContractInterface) => boolean): void { + this._watchedContracts = Object.values(this._watchedContracts) + .filter(watchedContract => !removFilter(watchedContract)) + .reduce((acc: {[key: string]: ContractInterface}, watchedContract) => { + acc[watchedContract.address] = watchedContract; + + return acc; + }, {}); + } + updateStateStatusMap (address: string, stateStatus: StateStatus): void { // Get and update State status for the contract. const oldStateStatus = this._stateStatusMap[address]; diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index b93f8e7d..8374a996 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -14,7 +14,7 @@ interface Config { maxCompletionLag: number } -type JobCallback = (job: PgBoss.JobWithDoneCallback) => Promise; +type JobCompleteCallback = (job: PgBoss.Job | PgBoss.JobWithMetadata) => Promise; // Default number of jobs fetched from DB per polling interval (newJobCheckInterval) const DEFAULT_JOBS_PER_INTERVAL = 5; @@ -91,13 +91,17 @@ export class JobQueue { await this._boss.stop(); } - async subscribe (queue: string, callback: JobCallback, subscribeOptions: PgBoss.SubscribeOptions = {}): Promise { + async subscribe ( + queue: string, + callback: PgBoss.SubscribeHandler, + options: PgBoss.SubscribeOptions = {} + ): Promise { return await this._boss.subscribe( queue, { teamSize: DEFAULT_JOBS_PER_INTERVAL, teamConcurrency: 1, - ...subscribeOptions + ...options }, async (job) => { try { @@ -113,12 +117,13 @@ export class JobQueue { ); } - async onComplete (queue: string, callback: JobCallback): Promise { + async onComplete (queue: string, callback: JobCompleteCallback, options: PgBoss.SubscribeOptions = {}): Promise { return await this._boss.onComplete( queue, { teamSize: DEFAULT_JOBS_PER_INTERVAL, - teamConcurrency: 1 + teamConcurrency: 1, + ...options }, async (job: PgBoss.JobWithDoneCallback) => { try { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 6f2e72c9..bf192503 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -20,7 +20,7 @@ import { QUEUE_HISTORICAL_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; -import { BlockProgressInterface, ContractJobData, EventInterface, EventsJobData, EventsQueueJobKind, IndexerInterface } from './types'; +import { BlockProgressInterface, ContractJobData, EventsJobData, EventsQueueJobKind, IndexerInterface } from './types'; import { wait } from './misc'; import { createPruningJob, @@ -35,9 +35,6 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber const log = debug('vulcanize:job-runner'); -// Wait time for retrying events processing on error (in ms) -const EVENTS_PROCESSING_RETRY_WAIT = 2000; - const DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE = 2000; export interface HistoricalJobData { @@ -72,17 +69,16 @@ export class JobRunner { } async subscribeBlockProcessingQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { - await this.processBlock(job); - }); + await this.jobQueue.subscribe( + QUEUE_BLOCK_PROCESSING, + async (job) => this.processBlock(job) + ); } async subscribeHistoricalProcessingQueue (): Promise { await this.jobQueue.subscribe( QUEUE_HISTORICAL_PROCESSING, - async (job) => { - await this.processHistoricalBlocks(job); - }, + async (job) => this.processHistoricalBlocks(job), { teamSize: 1 } @@ -92,25 +88,26 @@ export class JobRunner { async subscribeEventProcessingQueue (): Promise { await this.jobQueue.subscribe( QUEUE_EVENT_PROCESSING, - async (job) => { - await this.processEvent(job); - }, + async (job) => this.processEvent(job as PgBoss.JobWithMetadataDoneCallback), { - teamSize: 1 + teamSize: 1, + includeMetadata: true } ); } async subscribeHooksQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => { - await this.processHooks(job); - }); + await this.jobQueue.subscribe( + QUEUE_HOOKS, + async (job) => this.processHooks(job) + ); } async subscribeBlockCheckpointQueue (): Promise { - await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => { - await this.processCheckpoint(job); - }); + await this.jobQueue.subscribe( + QUEUE_BLOCK_CHECKPOINT, + async (job) => this.processCheckpoint(job) + ); } async processBlock (job: any): Promise { @@ -209,33 +206,37 @@ export class JobRunner { endBlock ); - let batchEndBlockHash = constants.AddressZero; const blocksLength = blocks.length; if (blocksLength) { - // Push event processing job for each block - const pushEventProcessingJobsForBlocksPromise = this._pushEventProcessingJobsForBlocks(blocks); + // TODO: Add pg-boss option to get queue size of jobs in a single state + const [pendingEventQueueSize, createdEventQueuSize] = await Promise.all([ + this.jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING), + this.jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'retry') + ]); - if (blocks[blocksLength - 1].blockNumber === endBlock) { - // If in blocks returned end block is same as the batch end block, set batchEndBlockHash - batchEndBlockHash = blocks[blocksLength - 1].blockHash; - } else { - // Else fetch block hash from upstream for batch end block - const [block] = await this._indexer.getBlocks({ blockNumber: endBlock }); + const retryEventQueueSize = pendingEventQueueSize - createdEventQueuSize; - if (block) { - batchEndBlockHash = block.blockHash; - } + if (retryEventQueueSize > 0) { + log(`${QUEUE_EVENT_PROCESSING} queue consists ${retryEventQueueSize} job(s) in retry state. Aborting pushing blocks to queue from historical processing`); + await this.jobQueue.markComplete( + job, + { isComplete: false, endBlock } + ); + + return; } - await pushEventProcessingJobsForBlocksPromise; + // Push event processing job for each block + await this._pushEventProcessingJobsForBlocks(blocks); } // Update sync status canonical, indexed and chain head block to end block + // Update with zero hash as they won't be used during historical processing await Promise.all([ - this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, endBlock, true), - this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, endBlock, true), - this._indexer.updateSyncStatusChainHead(batchEndBlockHash, endBlock, true) + this._indexer.updateSyncStatusCanonicalBlock(constants.HashZero, endBlock, true), + this._indexer.updateSyncStatusIndexedBlock(constants.HashZero, endBlock, true), + this._indexer.updateSyncStatusChainHead(constants.HashZero, endBlock, true) ]); log(`Sync status canonical, indexed and chain head block updated to ${endBlock}`); @@ -254,7 +255,6 @@ export class JobRunner { const eventsProcessingJob: EventsJobData = { kind: EventsQueueJobKind.EVENTS, blockHash: block.blockHash, - isRetryAttempt: false, // Avoid publishing GQL subscription event in historical processing // Publishing when realtime processing is listening to events will cause problems publish: false @@ -264,12 +264,12 @@ export class JobRunner { } } - async processEvent (job: PgBoss.JobWithDoneCallback): Promise { - const { data: jobData } = job; + async processEvent (job: PgBoss.JobWithMetadataDoneCallback): Promise { + const { data: jobData, retrycount: retryCount } = job; switch (jobData.kind) { case EventsQueueJobKind.EVENTS: - await this._processEvents(jobData); + await this._processEvents(jobData, retryCount); break; case EventsQueueJobKind.CONTRACT: @@ -278,6 +278,13 @@ export class JobRunner { } await this.jobQueue.markComplete(job); + + // Shutdown after job gets marked as complete + if (this._shutDown) { + log(`Graceful shutdown after ${QUEUE_EVENT_PROCESSING} queue ${jobData.kind} kind job ${job.id}`); + this.jobQueue.stop(); + process.exit(0); + } } async processHooks (job: any): Promise { @@ -602,7 +609,6 @@ export class JobRunner { const eventsProcessingJob: EventsJobData = { kind: EventsQueueJobKind.EVENTS, blockHash: blockProgress.blockHash, - isRetryAttempt: false, publish: true }; await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob); @@ -611,23 +617,13 @@ export class JobRunner { log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); } - async _processEvents (jobData: EventsJobData): Promise { - const { blockHash, isRetryAttempt } = jobData; + async _processEvents (jobData: EventsJobData, retryCount: number): Promise { + const { blockHash } = jobData; + const prefetchedBlock = this._blockAndEventsMap.get(blockHash); + assert(prefetchedBlock); + const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock; try { - // NOTE: blockAndEventsMap should contain block as watcher is reset - // if (!this._blockAndEventsMap.has(blockHash)) { - // console.time('time:job-runner#_processEvents-get-block-progress'); - // const block = await this._indexer.getBlockProgress(blockHash); - // console.timeEnd('time:job-runner#_processEvents-get-block-progress'); - - // assert(block); - // this._blockAndEventsMap.set(blockHash, { block, events: [] }); - // } - - const prefetchedBlock = this._blockAndEventsMap.get(blockHash); - assert(prefetchedBlock); - const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock; log(`Processing events for block ${block.blockNumber}`); console.time(`time:job-runner#_processEvents-events-${block.blockNumber}`); @@ -684,37 +680,25 @@ export class JobRunner { await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber); // If this was a retry attempt, unset the indexing error flag in sync status - if (isRetryAttempt) { + if (retryCount > 0) { await this._indexer.updateSyncStatusIndexingError(false); } - - // TODO: Shutdown after job gets marked as complete - if (this._shutDown) { - log(`Graceful shutdown after processing block ${block.blockNumber}`); - this.jobQueue.stop(); - process.exit(0); - } } catch (error) { - log(`Error in processing events for block ${blockHash}`); - log(error); + log(`Error in processing events for block ${block.blockNumber} hash ${block.blockHash}`); - // Set the indexing error flag in sync status - await this._indexer.updateSyncStatusIndexingError(true); + await Promise.all([ + // Remove processed data for current block to avoid reprocessing of events + this._indexer.clearProcessedBlockData(block), + // Delete all pending event processing jobs + this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING), + // Delete all pending historical processing jobs + this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'active'), + // Set the indexing error flag in sync status + this._indexer.updateSyncStatusIndexingError(true) + ]); - // TODO: Remove processed entities for current block to avoid reprocessing of events - - // Catch event processing error and push job again to job queue with higher priority - log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`); - const eventsProcessingRetryJob: EventsJobData = { ...jobData, isRetryAttempt: true }; - - await this.jobQueue.pushJob( - QUEUE_EVENT_PROCESSING, - eventsProcessingRetryJob, - { priority: 1 } - ); - - // Wait for some time before retrying job - await wait(EVENTS_PROCESSING_RETRY_WAIT); + // Error logged in job-queue handler + throw error; } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index bbc567b2..13f76dfb 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -229,6 +229,7 @@ export interface IndexerInterface { saveOrUpdateState (state: StateInterface): Promise removeStates (blockNumber: number, kind: StateKind): Promise resetWatcherToBlock (blockNumber: number): Promise + clearProcessedBlockData (block: BlockProgressInterface): Promise getResultEvent (event: EventInterface): any } @@ -297,7 +298,6 @@ export enum EventsQueueJobKind { export interface EventsJobData { kind: EventsQueueJobKind.EVENTS; blockHash: string; - isRetryAttempt: boolean; publish: boolean; }