diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index b36fbade..4c5feb3a 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -29,6 +29,11 @@ export interface JobQueueConfig { // Block range in which logs are fetched during historical blocks processing historicalLogsBlockRange?: number; + // Factor to find the next multiple endBlock number when deciding eth_getLogs block range + // Used to fetch logs in aligned block ranges (paired with caching proxy server) + // If set to 0, historicalLogsBlockRange will be used to instead to decide block ranges + historicalLogsBlockRangeEndFactor?: number + // Max block range of historical processing after which it waits for completion of events processing // If set to -1 historical processing does not wait for events processing and completes till latest canonical block historicalMaxFetchAhead?: number; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index ace4da5a..564b9f47 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -152,6 +152,12 @@ export class EventWatcher { endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber); } + if (this._config.jobQueue.historicalLogsBlockRangeEndFactor) { + // Set endBlockNumber to a multiple lower than computed endBlockNumber (or canonical block) + // For using aligned block ranges + endBlockNumber = Math.floor(endBlockNumber / this._config.jobQueue.historicalLogsBlockRangeEndFactor) * this._config.jobQueue.historicalLogsBlockRangeEndFactor; + } + this._historicalProcessingEndBlockNumber = endBlockNumber; log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f544c290..6d08c4bf 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -252,7 +252,15 @@ export class JobRunner { this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber; const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE; - const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber); + let rangeEndBlock = startBlock + logsBlockRange; + + if (this._jobQueueConfig.historicalLogsBlockRangeEndFactor) { + // Set rangeEndBlock to the next multiple of historicalLogsBlockRangeEndFactor after startBlock number + // For using aligned block ranges + rangeEndBlock = Math.ceil(startBlock / this._jobQueueConfig.historicalLogsBlockRangeEndFactor) * this._jobQueueConfig.historicalLogsBlockRangeEndFactor; + } + + const endBlock = Math.min(rangeEndBlock, processingEndBlockNumber); log(`Processing historical blocks from ${startBlock} to ${endBlock}`); const blocks = await fetchAndSaveFilteredLogsAndBlocks(