Fix retry on error for events processing job with exponential backoff (#468)

* Throw error in job-runner events processing queue for retry

* Restart block processing after completion of retried events queue job

* Check events queue size in retry state from active historical processing job

* Clear watched contracts from memory map on reset

* Update sync status with zero hash during historical processing

* Update codegen for changes

* Add clearProcessedBlockData in graph-node test dummy indexer
This commit is contained in:
Nabarun Gogoi 2023-11-14 14:53:23 +05:30 committed by GitHub
parent 7b5fbf3d13
commit 803f7c3b3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 116 deletions

View File

@ -759,6 +759,19 @@ export class Indexer implements IndexerInterface {
await this.resetLatestEntities(blockNumber);
{{/if}}
}
async clearProcessedBlockData (block: BlockProgress): Promise<void> {
{{#if (subgraphPath)}}
const entities = [...ENTITIES, FrothyEntity];
{{else}}
const entities = [...ENTITIES];
{{/if}}
await this._baseIndexer.clearProcessedBlockData(block, entities);
{{#if (subgraphPath)}}
await this.resetLatestEntities(block.blockNumber);
{{/if}}
}
{{#if (subgraphPath)}}
getEntityTypesMap (): Map<string, { [key: string]: string }> {

View File

@ -318,6 +318,10 @@ export class Indexer implements IndexerInterface {
return undefined;
}
async clearProcessedBlockData (block: BlockProgressInterface): Promise<void> {
return undefined;
}
cacheContract (contract: ContractInterface): void {
return undefined;
}

View File

@ -66,32 +66,36 @@ export class EventWatcher {
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.blockProcessingCompleteHandler(job);
});
this._jobQueue.onComplete(
QUEUE_BLOCK_PROCESSING,
async (job) => this.blockProcessingCompleteHandler(job)
);
}
async initHistoricalProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_HISTORICAL_PROCESSING, async (job) => {
await this.historicalProcessingCompleteHandler(job);
});
this._jobQueue.onComplete(
QUEUE_HISTORICAL_PROCESSING,
async (job) => this.historicalProcessingCompleteHandler(job)
);
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this.eventProcessingCompleteHandler(job);
});
await this._jobQueue.onComplete(
QUEUE_EVENT_PROCESSING,
async (job) => this.eventProcessingCompleteHandler(job as PgBoss.JobWithMetadata),
{ includeMetadata: true }
);
}
async startBlockProcessing (): Promise<void> {
// Get latest block in chain and sync status from DB.
const [{ block: latestBlock }, syncStatus] = await Promise.all([
this._ethClient.getBlockByHash(),
this._indexer.getSyncStatus()
this._indexer.getSyncStatus(),
// Wait for events job queue to be empty before starting historical or realtime processing
this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING)
]);
// Wait for events job queue to be empty before starting historical or realtime processing
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);
const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');
// Stop if there are active or pending historical processing jobs
@ -110,14 +114,7 @@ export class EventWatcher {
// Check if filter for logs is enabled
// Check if starting block for watcher is before latest canonical block
if (this._config.jobQueue.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
let endBlockNumber = latestCanonicalBlockNumber;
const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD;
if (historicalMaxFetchAhead > 0) {
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
}
await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber);
await this.startHistoricalBlockProcessing(startBlockNumber, latestCanonicalBlockNumber);
return;
}
@ -125,7 +122,19 @@ export class EventWatcher {
await this.startRealtimeBlockProcessing(startBlockNumber);
}
async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise<void> {
async startHistoricalBlockProcessing (startBlockNumber: number, latestCanonicalBlockNumber: number): Promise<void> {
if (this._realtimeProcessingStarted) {
// Do not start historical processing if realtime processing has already started
return;
}
let endBlockNumber = latestCanonicalBlockNumber;
const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD;
if (historicalMaxFetchAhead > 0) {
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
}
this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
@ -140,7 +149,7 @@ export class EventWatcher {
}
async startRealtimeBlockProcessing (startBlockNumber: number): Promise<void> {
// Check if realtime processing already started and avoid resubscribing to block progress event
// Check if realtime processing already started
if (this._realtimeProcessingStarted) {
return;
}
@ -246,8 +255,8 @@ export class EventWatcher {
);
}
async eventProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { request: { data }, failed, state, createdOn } } = job;
async eventProcessingCompleteHandler (job: PgBoss.JobWithMetadata<any>): Promise<void> {
const { id, retrycount, data: { request: { data }, failed, state, createdOn } } = job;
if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
@ -261,14 +270,21 @@ export class EventWatcher {
}
const { blockHash, publish }: EventsJobData = data;
const blockProgress = await this._indexer.getBlockProgress(blockHash);
assert(blockProgress);
// Check if job was retried
if (retrycount > 0) {
// Reset watcher to remove any data after this block
await this._indexer.resetWatcherToBlock(blockProgress.blockNumber);
// Start block processing (Try restarting historical processing or continue realtime processing)
this.startBlockProcessing();
}
// Check if publish is set to true
// Events and blocks are not published in historical processing
// GQL subscription events will not be triggered if publish is set to false
if (publish) {
const blockProgress = await this._indexer.getBlockProgress(blockHash);
assert(blockProgress);
await this.publishBlockProgressToSubscribers(blockProgress);
const dbEvents = await this._indexer.getBlockEvents(

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { DeepPartial, EntityTarget, FindConditions, FindManyOptions, MoreThan } from 'typeorm';
import { DeepPartial, EntityTarget, Equal, FindConditions, FindManyOptions, MoreThan } from 'typeorm';
import debug from 'debug';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
@ -1355,6 +1355,7 @@ export class Indexer {
}
await this._db.deleteEntitiesByConditions(dbTx, 'contract', { startingBlock: MoreThan(blockNumber) });
this._clearWatchedContracts((watchedContracts) => watchedContracts.startingBlock > blockNumber);
await this._db.deleteEntitiesByConditions(dbTx, 'block_progress', { blockNumber: MoreThan(blockNumber) });
@ -1396,6 +1397,36 @@ export class Indexer {
}
}
async clearProcessedBlockData (block: BlockProgressInterface, entities: EntityTarget<{ blockNumber: number }>[]): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
for (const entity of entities) {
await this._db.deleteEntitiesByConditions(dbTx, entity, { blockHash: Equal(block.blockHash) });
}
await this._db.deleteEntitiesByConditions(dbTx, 'contract', { startingBlock: Equal(block.blockNumber) });
this._clearWatchedContracts((watchedContracts) => watchedContracts.startingBlock === block.blockNumber);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
_clearWatchedContracts (removFilter: (watchedContract: ContractInterface) => boolean): void {
this._watchedContracts = Object.values(this._watchedContracts)
.filter(watchedContract => !removFilter(watchedContract))
.reduce((acc: {[key: string]: ContractInterface}, watchedContract) => {
acc[watchedContract.address] = watchedContract;
return acc;
}, {});
}
updateStateStatusMap (address: string, stateStatus: StateStatus): void {
// Get and update State status for the contract.
const oldStateStatus = this._stateStatusMap[address];

View File

@ -14,7 +14,7 @@ interface Config {
maxCompletionLag: number
}
type JobCallback = (job: PgBoss.JobWithDoneCallback<any, any>) => Promise<void>;
type JobCompleteCallback = (job: PgBoss.Job | PgBoss.JobWithMetadata) => Promise<void>;
// Default number of jobs fetched from DB per polling interval (newJobCheckInterval)
const DEFAULT_JOBS_PER_INTERVAL = 5;
@ -91,13 +91,17 @@ export class JobQueue {
await this._boss.stop();
}
async subscribe (queue: string, callback: JobCallback, subscribeOptions: PgBoss.SubscribeOptions = {}): Promise<string> {
async subscribe (
queue: string,
callback: PgBoss.SubscribeHandler<any, any>,
options: PgBoss.SubscribeOptions = {}
): Promise<string> {
return await this._boss.subscribe(
queue,
{
teamSize: DEFAULT_JOBS_PER_INTERVAL,
teamConcurrency: 1,
...subscribeOptions
...options
},
async (job) => {
try {
@ -113,12 +117,13 @@ export class JobQueue {
);
}
async onComplete (queue: string, callback: JobCallback): Promise<string> {
async onComplete (queue: string, callback: JobCompleteCallback, options: PgBoss.SubscribeOptions = {}): Promise<string> {
return await this._boss.onComplete(
queue,
{
teamSize: DEFAULT_JOBS_PER_INTERVAL,
teamConcurrency: 1
teamConcurrency: 1,
...options
},
async (job: PgBoss.JobWithDoneCallback<any, any>) => {
try {

View File

@ -20,7 +20,7 @@ import {
QUEUE_HISTORICAL_PROCESSING
} from './constants';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, ContractJobData, EventInterface, EventsJobData, EventsQueueJobKind, IndexerInterface } from './types';
import { BlockProgressInterface, ContractJobData, EventsJobData, EventsQueueJobKind, IndexerInterface } from './types';
import { wait } from './misc';
import {
createPruningJob,
@ -35,9 +35,6 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber
const log = debug('vulcanize:job-runner');
// Wait time for retrying events processing on error (in ms)
const EVENTS_PROCESSING_RETRY_WAIT = 2000;
const DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE = 2000;
export interface HistoricalJobData {
@ -72,17 +69,16 @@ export class JobRunner {
}
async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.processBlock(job);
});
await this.jobQueue.subscribe(
QUEUE_BLOCK_PROCESSING,
async (job) => this.processBlock(job)
);
}
async subscribeHistoricalProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(
QUEUE_HISTORICAL_PROCESSING,
async (job) => {
await this.processHistoricalBlocks(job);
},
async (job) => this.processHistoricalBlocks(job),
{
teamSize: 1
}
@ -92,25 +88,26 @@ export class JobRunner {
async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(
QUEUE_EVENT_PROCESSING,
async (job) => {
await this.processEvent(job);
},
async (job) => this.processEvent(job as PgBoss.JobWithMetadataDoneCallback<EventsJobData | ContractJobData, object>),
{
teamSize: 1
teamSize: 1,
includeMetadata: true
}
);
}
async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this.processHooks(job);
});
await this.jobQueue.subscribe(
QUEUE_HOOKS,
async (job) => this.processHooks(job)
);
}
async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this.processCheckpoint(job);
});
await this.jobQueue.subscribe(
QUEUE_BLOCK_CHECKPOINT,
async (job) => this.processCheckpoint(job)
);
}
async processBlock (job: any): Promise<void> {
@ -209,33 +206,37 @@ export class JobRunner {
endBlock
);
let batchEndBlockHash = constants.AddressZero;
const blocksLength = blocks.length;
if (blocksLength) {
// Push event processing job for each block
const pushEventProcessingJobsForBlocksPromise = this._pushEventProcessingJobsForBlocks(blocks);
// TODO: Add pg-boss option to get queue size of jobs in a single state
const [pendingEventQueueSize, createdEventQueuSize] = await Promise.all([
this.jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING),
this.jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'retry')
]);
if (blocks[blocksLength - 1].blockNumber === endBlock) {
// If in blocks returned end block is same as the batch end block, set batchEndBlockHash
batchEndBlockHash = blocks[blocksLength - 1].blockHash;
} else {
// Else fetch block hash from upstream for batch end block
const [block] = await this._indexer.getBlocks({ blockNumber: endBlock });
const retryEventQueueSize = pendingEventQueueSize - createdEventQueuSize;
if (block) {
batchEndBlockHash = block.blockHash;
}
if (retryEventQueueSize > 0) {
log(`${QUEUE_EVENT_PROCESSING} queue consists ${retryEventQueueSize} job(s) in retry state. Aborting pushing blocks to queue from historical processing`);
await this.jobQueue.markComplete(
job,
{ isComplete: false, endBlock }
);
return;
}
await pushEventProcessingJobsForBlocksPromise;
// Push event processing job for each block
await this._pushEventProcessingJobsForBlocks(blocks);
}
// Update sync status canonical, indexed and chain head block to end block
// Update with zero hash as they won't be used during historical processing
await Promise.all([
this._indexer.updateSyncStatusCanonicalBlock(batchEndBlockHash, endBlock, true),
this._indexer.updateSyncStatusIndexedBlock(batchEndBlockHash, endBlock, true),
this._indexer.updateSyncStatusChainHead(batchEndBlockHash, endBlock, true)
this._indexer.updateSyncStatusCanonicalBlock(constants.HashZero, endBlock, true),
this._indexer.updateSyncStatusIndexedBlock(constants.HashZero, endBlock, true),
this._indexer.updateSyncStatusChainHead(constants.HashZero, endBlock, true)
]);
log(`Sync status canonical, indexed and chain head block updated to ${endBlock}`);
@ -254,7 +255,6 @@ export class JobRunner {
const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS,
blockHash: block.blockHash,
isRetryAttempt: false,
// Avoid publishing GQL subscription event in historical processing
// Publishing when realtime processing is listening to events will cause problems
publish: false
@ -264,12 +264,12 @@ export class JobRunner {
}
}
async processEvent (job: PgBoss.JobWithDoneCallback<EventsJobData | ContractJobData, any>): Promise<EventInterface | void> {
const { data: jobData } = job;
async processEvent (job: PgBoss.JobWithMetadataDoneCallback<EventsJobData | ContractJobData, object>): Promise<void> {
const { data: jobData, retrycount: retryCount } = job;
switch (jobData.kind) {
case EventsQueueJobKind.EVENTS:
await this._processEvents(jobData);
await this._processEvents(jobData, retryCount);
break;
case EventsQueueJobKind.CONTRACT:
@ -278,6 +278,13 @@ export class JobRunner {
}
await this.jobQueue.markComplete(job);
// Shutdown after job gets marked as complete
if (this._shutDown) {
log(`Graceful shutdown after ${QUEUE_EVENT_PROCESSING} queue ${jobData.kind} kind job ${job.id}`);
this.jobQueue.stop();
process.exit(0);
}
}
async processHooks (job: any): Promise<void> {
@ -602,7 +609,6 @@ export class JobRunner {
const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS,
blockHash: blockProgress.blockHash,
isRetryAttempt: false,
publish: true
};
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
@ -611,23 +617,13 @@ export class JobRunner {
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
}
async _processEvents (jobData: EventsJobData): Promise<void> {
const { blockHash, isRetryAttempt } = jobData;
async _processEvents (jobData: EventsJobData, retryCount: number): Promise<void> {
const { blockHash } = jobData;
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
assert(prefetchedBlock);
const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock;
try {
// NOTE: blockAndEventsMap should contain block as watcher is reset
// if (!this._blockAndEventsMap.has(blockHash)) {
// console.time('time:job-runner#_processEvents-get-block-progress');
// const block = await this._indexer.getBlockProgress(blockHash);
// console.timeEnd('time:job-runner#_processEvents-get-block-progress');
// assert(block);
// this._blockAndEventsMap.set(blockHash, { block, events: [] });
// }
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
assert(prefetchedBlock);
const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock;
log(`Processing events for block ${block.blockNumber}`);
console.time(`time:job-runner#_processEvents-events-${block.blockNumber}`);
@ -684,37 +680,25 @@ export class JobRunner {
await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber);
// If this was a retry attempt, unset the indexing error flag in sync status
if (isRetryAttempt) {
if (retryCount > 0) {
await this._indexer.updateSyncStatusIndexingError(false);
}
// TODO: Shutdown after job gets marked as complete
if (this._shutDown) {
log(`Graceful shutdown after processing block ${block.blockNumber}`);
this.jobQueue.stop();
process.exit(0);
}
} catch (error) {
log(`Error in processing events for block ${blockHash}`);
log(error);
log(`Error in processing events for block ${block.blockNumber} hash ${block.blockHash}`);
// Set the indexing error flag in sync status
await this._indexer.updateSyncStatusIndexingError(true);
await Promise.all([
// Remove processed data for current block to avoid reprocessing of events
this._indexer.clearProcessedBlockData(block),
// Delete all pending event processing jobs
this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING),
// Delete all pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'active'),
// Set the indexing error flag in sync status
this._indexer.updateSyncStatusIndexingError(true)
]);
// TODO: Remove processed entities for current block to avoid reprocessing of events
// Catch event processing error and push job again to job queue with higher priority
log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`);
const eventsProcessingRetryJob: EventsJobData = { ...jobData, isRetryAttempt: true };
await this.jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
eventsProcessingRetryJob,
{ priority: 1 }
);
// Wait for some time before retrying job
await wait(EVENTS_PROCESSING_RETRY_WAIT);
// Error logged in job-queue handler
throw error;
}
}

View File

@ -229,6 +229,7 @@ export interface IndexerInterface {
saveOrUpdateState (state: StateInterface): Promise<StateInterface>
removeStates (blockNumber: number, kind: StateKind): Promise<void>
resetWatcherToBlock (blockNumber: number): Promise<void>
clearProcessedBlockData (block: BlockProgressInterface): Promise<void>
getResultEvent (event: EventInterface): any
}
@ -297,7 +298,6 @@ export enum EventsQueueJobKind {
export interface EventsJobData {
kind: EventsQueueJobKind.EVENTS;
blockHash: string;
isRetryAttempt: boolean;
publish: boolean;
}