From 8a720ef175ee5a818c7b12d8431122bfbd3518be Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Mon, 6 Nov 2023 11:34:48 +0530 Subject: [PATCH] Wait for events processing to complete before continuing historical blocks processing (#450) * Wait for events queue to be empty before continuing historical processing * Make historicalLogsBlockRange and historicalMaxFetchAhead configurable * Perform single RPC request for multiple addresses --- packages/cli/src/base.ts | 8 ++- packages/cli/src/job-runner.ts | 2 +- packages/cli/src/server.ts | 4 +- .../src/templates/config-template.handlebars | 7 +++ packages/rpc-eth-client/src/eth-client.ts | 43 ++++++------- packages/util/src/config.ts | 5 ++ packages/util/src/events.ts | 60 +++++++++++++------ packages/util/src/job-queue.ts | 18 +++++- packages/util/src/job-runner.ts | 27 +++++---- packages/util/src/misc.ts | 2 +- 10 files changed, 115 insertions(+), 61 deletions(-) diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 8a60b6dc..0ac7832a 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -135,6 +135,12 @@ export class BaseCmd { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - this._eventWatcher = new EventWatcher(this._config.server, this._clients.ethClient, this._indexer, pubsub, this._jobQueue); + + const config = { + server: this._config.server, + jobQueue: this._config.jobQueue + }; + + this._eventWatcher = new EventWatcher(config, this._clients.ethClient, this._indexer, pubsub, this._jobQueue); } } diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 25ea53de..edac3072 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -108,7 +108,7 @@ export class JobRunnerCmd { const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue); - await jobRunner.jobQueue.deleteAllJobs(); + await jobRunner.jobQueue.deleteAllJobs('completed'); await jobRunner.resetToPrevIndexedBlock(); await startJobRunner(jobRunner); diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 9a2b4a00..866945ed 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -281,8 +281,8 @@ export class ServerCmd { assert(eventWatcher); if (config.server.kind === KIND_ACTIVE) { - // Delete jobs to prevent creating jobs after completion of processing previous block. - await jobQueue.deleteAllJobs(); + // Delete jobs before completed state to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs('completed'); await eventWatcher.start(); } diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index b198842c..8576d7a6 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -92,3 +92,10 @@ blockDelayInMilliSecs = 2000 prefetchBlocksInMem = true prefetchBlockCount = 10 + + # Block range in which logs are fetched during historical blocks processing + historicalLogsBlockRange = 2000 + + # Max block range of historical processing after which it waits for completion of events processing + # If set to -1 historical processing does not wait for events processing and completes till latest canonical block + historicalMaxFetchAhead = 10000 diff --git a/packages/rpc-eth-client/src/eth-client.ts b/packages/rpc-eth-client/src/eth-client.ts index ae796e38..9b181fc3 100644 --- a/packages/rpc-eth-client/src/eth-client.ts +++ b/packages/rpc-eth-client/src/eth-client.ts @@ -4,7 +4,6 @@ import assert from 'assert'; import { errors, providers, utils } from 'ethers'; -import { TransactionReceipt } from '@ethersproject/abstract-provider'; import { Cache } from '@cerc-io/cache'; import { encodeHeader, escapeHexString, getRawTransaction, EthClient as EthClientInterface } from '@cerc-io/util'; @@ -80,7 +79,7 @@ export class EthClient implements EthClientInterface { nodes: result.transactions.map((transaction) => ({ txHash: transaction.hash, // Transactions with block should be of type TransactionReceipt - index: (transaction as unknown as TransactionReceipt).transactionIndex, + index: (transaction as unknown as providers.TransactionReceipt).transactionIndex, src: transaction.from, dst: transaction.to })) @@ -239,12 +238,9 @@ export class EthClient implements EthClientInterface { addresses?: string[], topics?: string[][] }): Promise { - const blockNumber = Number(vars.blockNumber); - console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); const result = await this._getLogs({ - fromBlock: blockNumber, - toBlock: blockNumber, + blockHash: vars.blockHash, addresses: vars.addresses, topics: vars.topics }); @@ -273,36 +269,35 @@ export class EthClient implements EthClientInterface { // TODO: Implement return type async _getLogs (vars: { + blockHash?: string, fromBlock?: number, toBlock?: number, addresses?: string[], topics?: string[][] }): Promise { - const { fromBlock, toBlock, addresses = [], topics } = vars; + const { blockHash, fromBlock, toBlock, addresses = [], topics } = vars; const result = await this._getCachedOrFetch( 'getLogs', vars, async () => { - const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({ - fromBlock, - toBlock, - address, - topics - })); - const logsByAddress = await Promise.all(logsByAddressPromises); - let logs = logsByAddress.flat(); - - // If no addresses provided to filter - if (!addresses.length) { - logs = await this._provider.getLogs({ - fromBlock, - toBlock, + const ethLogs = await this._provider.send( + 'eth_getLogs', + [{ + address: addresses.map(address => address.toLowerCase()), + fromBlock: fromBlock && utils.hexlify(fromBlock), + toBlock: toBlock && utils.hexlify(toBlock), + blockHash, topics - }); - } + }] + ); - return logs.map(log => { + // Format raw eth_getLogs response + const logs: providers.Log[] = providers.Formatter.arrayOf( + this._provider.formatter.filterLog.bind(this._provider.formatter) + )(ethLogs); + + return logs.map((log) => { log.address = log.address.toLowerCase(); return log; }); diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index fe13d2e5..c30e43da 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -24,6 +24,11 @@ export interface JobQueueConfig { blockDelayInMilliSecs: number; prefetchBlocksInMem: boolean; prefetchBlockCount: number; + // Block range in which logs are fetched during historical blocks processing + historicalLogsBlockRange?: number; + // Max block range of historical processing after which it waits for completion of events processing + // If set to -1 historical processing does not wait for events processing and completes till latest canonical block + historicalMaxFetchAhead?: number; } export interface GQLCacheConfig { diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index c8be242a..8cd55561 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -13,20 +13,27 @@ import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } f import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants'; import { createPruningJob, processBlockByNumber } from './common'; import { OrderDirection } from './database'; -import { HISTORICAL_BLOCKS_BATCH_SIZE, HistoricalJobData } from './job-runner'; -import { ServerConfig } from './config'; +import { HistoricalJobData, HistoricalJobResponseData } from './job-runner'; +import { JobQueueConfig, ServerConfig } from './config'; +import { wait } from './misc'; const EVENT = 'event'; -// TODO: Make configurable -const HISTORICAL_MAX_FETCH_AHEAD = 20_000; +// Time to wait for events queue to be empty +const EMPTY_EVENTS_QUEUE_WAIT_TIME = 5000; + +const DEFAULT_HISTORICAL_MAX_FETCH_AHEAD = 20_000; const log = debug('vulcanize:events'); export const BlockProgressEvent = 'block-progress-event'; +interface Config { + server: ServerConfig; + jobQueue: JobQueueConfig; +} export class EventWatcher { - _serverConfig: ServerConfig; + _config: Config; _ethClient: EthClient; _indexer: IndexerInterface; _pubsub: PubSub; @@ -36,8 +43,8 @@ export class EventWatcher { _signalCount = 0; _historicalProcessingEndBlockNumber = 0; - constructor (serverConfig: ServerConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - this._serverConfig = serverConfig; + constructor (config: Config, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { + this._config = config; this._ethClient = ethClient; this._indexer = indexer; this._pubsub = pubsub; @@ -96,11 +103,12 @@ export class EventWatcher { // Check if filter for logs is enabled // Check if starting block for watcher is before latest canonical block - if (this._serverConfig.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) { + if (this._config.server.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) { let endBlockNumber = latestCanonicalBlockNumber; + const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD; - if (HISTORICAL_MAX_FETCH_AHEAD > 0) { - endBlockNumber = Math.min(startBlockNumber + HISTORICAL_MAX_FETCH_AHEAD, endBlockNumber); + if (historicalMaxFetchAhead > 0) { + endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber); } await this.startHistoricalBlockProcessing(startBlockNumber, endBlockNumber); @@ -112,10 +120,11 @@ export class EventWatcher { } async startHistoricalBlockProcessing (startBlockNumber: number, endBlockNumber: number): Promise { - // TODO: Wait for events job queue to be empty so that historical processing does not move far ahead + // Wait for events job queue to be empty so that historical processing does not move far ahead + await this._waitForEmptyEventsQueue(); this._historicalProcessingEndBlockNumber = endBlockNumber; - log(`Starting historical block processing up to block ${this._historicalProcessingEndBlockNumber}`); + log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); // Push job for historical block processing await this._jobQueue.pushJob( @@ -127,6 +136,19 @@ export class EventWatcher { ); } + async _waitForEmptyEventsQueue (): Promise { + while (true) { + // Get queue size for active and pending jobs + const queueSize = await this._jobQueue.getQueueSize(QUEUE_EVENT_PROCESSING, 'completed'); + + if (queueSize === 0) { + break; + } + + await wait(EMPTY_EVENTS_QUEUE_WAIT_TIME); + } + } + async startRealtimeBlockProcessing (startBlockNumber: number): Promise { log(`Starting realtime block processing from block ${startBlockNumber}`); await processBlockByNumber(this._jobQueue, startBlockNumber); @@ -194,20 +216,22 @@ export class EventWatcher { } async historicalProcessingCompleteHandler (job: PgBoss.Job): Promise { - const { id, data: { failed, request: { data } } } = job; - const { blockNumber, isComplete }: HistoricalJobData = data; + const { id, data: { failed, request: { data }, response } } = job; + const { blockNumber }: HistoricalJobData = data; + const { isComplete, endBlock: batchEndBlockNumber }: HistoricalJobResponseData = response; - if (failed || isComplete) { + if (failed || !isComplete) { log(`Job ${id} for queue ${QUEUE_HISTORICAL_PROCESSING} failed`); return; } - // TODO: Get batch size from config - const batchEndBlockNumber = Math.min(blockNumber + HISTORICAL_BLOCKS_BATCH_SIZE, this._historicalProcessingEndBlockNumber); + // endBlock exists if isComplete is true + assert(batchEndBlockNumber); + const nextBatchStartBlockNumber = batchEndBlockNumber + 1; log(`Historical block processing completed for block range: ${blockNumber} to ${batchEndBlockNumber}`); - // Check if historical processing endBlock / latest canonical block is reached + // Check if historical processing end block is reached if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) { const [block] = await this._indexer.getBlocks({ blockNumber: this._historicalProcessingEndBlockNumber }); const historicalProcessingEndBlockHash = block ? block.blockHash : constants.AddressZero; diff --git a/packages/util/src/job-queue.ts b/packages/util/src/job-queue.ts index 53920dea..b6b05839 100644 --- a/packages/util/src/job-queue.ts +++ b/packages/util/src/job-queue.ts @@ -129,7 +129,7 @@ export class JobQueue { } async markComplete (job: PgBoss.Job, data: object = {}): Promise { - this._boss.complete(job.id, { ...job.data, ...data }); + await this._boss.complete(job.id, data); } async pushJob (queue: string, job: any, options: PgBoss.PublishOptions = {}): Promise { @@ -139,7 +139,19 @@ export class JobQueue { log(`Created job in queue ${queue}: ${jobId}`); } - async deleteAllJobs (): Promise { - await this._boss.deleteAllQueues(); + async deleteAllJobs (before: PgBoss.Subscription['state'] = 'active'): Promise { + // Workaround for incorrect type of pg-boss deleteAllQueues method + const deleteAllQueues = this._boss.deleteAllQueues.bind(this._boss) as (options: { before: PgBoss.Subscription['state'] }) => Promise; + await deleteAllQueues({ before }); + } + + async deleteJobs (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise { + // Workaround for incorrect type of pg-boss deleteAllQueues method + const deleteQueue = this._boss.deleteQueue.bind(this._boss) as (name: string, options: { before: PgBoss.Subscription['state'] }) => Promise; + await deleteQueue(name, { before }); + } + + async getQueueSize (name: string, before: PgBoss.Subscription['state'] = 'active'): Promise { + return this._boss.getQueueSize(name, { before }); } } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index e387de96..595a9858 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -40,13 +40,16 @@ const log = debug('vulcanize:job-runner'); // Wait time for retrying events processing on error (in ms) const EVENTS_PROCESSING_RETRY_WAIT = 2000; -// TODO: Get batch size from config -export const HISTORICAL_BLOCKS_BATCH_SIZE = 2000; +const DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE = 2000; export interface HistoricalJobData { blockNumber: number; processingEndBlockNumber: number; - isComplete?: boolean; +} + +export interface HistoricalJobResponseData { + isComplete: boolean; + endBlock?: number; } export class JobRunner { @@ -154,12 +157,12 @@ export class JobRunner { await this.jobQueue.markComplete(job); } - async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback): Promise { + async processHistoricalBlocks (job: PgBoss.JobWithDoneCallback): Promise { const { data: { blockNumber: startBlock, processingEndBlockNumber } } = job; if (this._historicalProcessingCompletedUpto) { if (startBlock < this._historicalProcessingCompletedUpto) { - await this.jobQueue.deleteAllJobs(); + await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING); // Remove all watcher blocks and events data if startBlock is less than this._historicalProcessingCompletedUpto // This occurs when new contract is added (with filterLogsByAddresses set to true) and historical processing is restarted from a previous block @@ -168,10 +171,10 @@ export class JobRunner { } else { // Check that startBlock is one greater than previous batch end block if (startBlock - 1 !== this._historicalProcessingCompletedUpto) { - // TODO: Debug jobQueue deleteAllJobs not working + // TODO: Debug jobQueue deleteJobs for historical processing not working await this.jobQueue.markComplete( job, - { isComplete: true } + { isComplete: false } ); return; @@ -180,7 +183,8 @@ export class JobRunner { } this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber; - const endBlock = Math.min(startBlock + HISTORICAL_BLOCKS_BATCH_SIZE, processingEndBlockNumber); + const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE; + const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber); log(`Processing historical blocks from ${startBlock} to ${endBlock}`); const blocks = await fetchAndSaveFilteredLogsAndBlocks( @@ -207,7 +211,7 @@ export class JobRunner { await this.jobQueue.markComplete( job, - { isComplete: true } + { isComplete: true, endBlock } ); } @@ -570,8 +574,8 @@ export class JobRunner { // Check if new contract was added and filterLogsByAddresses is set to true if (isNewContractWatched && this._indexer.serverConfig.filterLogsByAddresses) { - // Delete jobs for any pending events and blocks processing - await this.jobQueue.deleteAllJobs(); + // Delete jobs for any pending events processing + await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING); // Check if historical processing is running and that current block is being processed was trigerred by historical processing if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) { @@ -610,6 +614,7 @@ export class JobRunner { // Catch event processing error and push to job queue after some time with higher priority log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`); await wait(EVENTS_PROCESSING_RETRY_WAIT); + // TODO: Stop next job in queue from processing next await this.jobQueue.pushJob( QUEUE_EVENT_PROCESSING, job.data, diff --git a/packages/util/src/misc.ts b/packages/util/src/misc.ts index 8892527b..11627327 100644 --- a/packages/util/src/misc.ts +++ b/packages/util/src/misc.ts @@ -132,7 +132,7 @@ export const resetJobs = async (config: Config): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - await jobQueue.deleteAllJobs(); + await jobQueue.deleteAllJobs('completed'); }; export const getResetYargs = (): yargs.Argv => {