mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-08-04 21:24:08 +00:00
Merge 6810130ef4
into b572eb9032
This commit is contained in:
commit
39de0f3d59
@ -29,6 +29,11 @@ export interface JobQueueConfig {
|
|||||||
// Block range in which logs are fetched during historical blocks processing
|
// Block range in which logs are fetched during historical blocks processing
|
||||||
historicalLogsBlockRange?: number;
|
historicalLogsBlockRange?: number;
|
||||||
|
|
||||||
|
// Factor to find the next multiple endBlock number when deciding eth_getLogs block range
|
||||||
|
// Used to fetch logs in aligned block ranges (paired with caching proxy server)
|
||||||
|
// If set to 0, historicalLogsBlockRange will be used to instead to decide block ranges
|
||||||
|
historicalLogsBlockRangeEndFactor?: number
|
||||||
|
|
||||||
// Max block range of historical processing after which it waits for completion of events processing
|
// Max block range of historical processing after which it waits for completion of events processing
|
||||||
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block
|
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block
|
||||||
historicalMaxFetchAhead?: number;
|
historicalMaxFetchAhead?: number;
|
||||||
|
@ -152,6 +152,18 @@ export class EventWatcher {
|
|||||||
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
|
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this._config.jobQueue.historicalLogsBlockRangeEndFactor) {
|
||||||
|
// Set endBlockNumber to a multiple lower than computed endBlockNumber (or canonical block)
|
||||||
|
// For using aligned block ranges
|
||||||
|
endBlockNumber = Math.floor(endBlockNumber / this._config.jobQueue.historicalLogsBlockRangeEndFactor) * this._config.jobQueue.historicalLogsBlockRangeEndFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (endBlockNumber < startBlockNumber) {
|
||||||
|
await this.startRealtimeBlockProcessing(startBlockNumber);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this._historicalProcessingEndBlockNumber = endBlockNumber;
|
this._historicalProcessingEndBlockNumber = endBlockNumber;
|
||||||
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
|
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);
|
||||||
|
|
||||||
|
@ -469,10 +469,27 @@ export class Indexer {
|
|||||||
topics
|
topics
|
||||||
});
|
});
|
||||||
|
|
||||||
const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
|
let blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
|
||||||
// Create unique list of tx required
|
|
||||||
|
// Filter blocks which have no events from watched contracts
|
||||||
|
blockLogsMap = Array.from(blockLogsMap.entries())
|
||||||
|
.filter(([, logs]) => {
|
||||||
|
return logs.some(log => {
|
||||||
|
const contractAddress = ethers.utils.getAddress(log.account.address);
|
||||||
|
return this.isContractAddressWatched(contractAddress)?.length;
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.reduce((acc, [blockHash, logs]) => {
|
||||||
|
acc.set(blockHash, logs);
|
||||||
|
return acc;
|
||||||
|
}, new Map());
|
||||||
|
|
||||||
|
// Create unique list of txs required
|
||||||
const txHashes = Array.from([
|
const txHashes = Array.from([
|
||||||
...new Set<string>(logs.map((log: any) => log.transaction.hash))
|
...new Set<string>(
|
||||||
|
Array.from(blockLogsMap.values())
|
||||||
|
.flat()
|
||||||
|
.map((log: any) => log.transaction.hash))
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// Fetch blocks with transactions for the logs returned
|
// Fetch blocks with transactions for the logs returned
|
||||||
@ -543,7 +560,7 @@ export class Indexer {
|
|||||||
return blocksWithDbEvents;
|
return blocksWithDbEvents;
|
||||||
}
|
}
|
||||||
|
|
||||||
_reduceLogsToBlockLogsMap (logs: any[]): Map<string, any> {
|
_reduceLogsToBlockLogsMap (logs: any[]): Map<string, any[]> {
|
||||||
return logs.reduce((acc: Map<string, any>, log: any) => {
|
return logs.reduce((acc: Map<string, any>, log: any) => {
|
||||||
const { blockHash: logBlockHash } = log;
|
const { blockHash: logBlockHash } = log;
|
||||||
assert(typeof logBlockHash === 'string');
|
assert(typeof logBlockHash === 'string');
|
||||||
|
@ -252,7 +252,15 @@ export class JobRunner {
|
|||||||
|
|
||||||
this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber;
|
this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber;
|
||||||
const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE;
|
const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE;
|
||||||
const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber);
|
let rangeEndBlock = startBlock + logsBlockRange;
|
||||||
|
|
||||||
|
if (this._jobQueueConfig.historicalLogsBlockRangeEndFactor) {
|
||||||
|
// Set rangeEndBlock to the next multiple of historicalLogsBlockRangeEndFactor after startBlock number
|
||||||
|
// For using aligned block ranges
|
||||||
|
rangeEndBlock = Math.ceil(startBlock / this._jobQueueConfig.historicalLogsBlockRangeEndFactor) * this._jobQueueConfig.historicalLogsBlockRangeEndFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
const endBlock = Math.min(rangeEndBlock, processingEndBlockNumber);
|
||||||
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);
|
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);
|
||||||
|
|
||||||
const blocks = await fetchAndSaveFilteredLogsAndBlocks(
|
const blocks = await fetchAndSaveFilteredLogsAndBlocks(
|
||||||
@ -720,9 +728,9 @@ export class JobRunner {
|
|||||||
|
|
||||||
this._blockAndEventsMap.delete(block.blockHash);
|
this._blockAndEventsMap.delete(block.blockHash);
|
||||||
|
|
||||||
// Check if new contract was added and filterLogsByAddresses is set to true
|
// Check if new contract was added
|
||||||
if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) {
|
if (isNewContractWatched) {
|
||||||
// Check if historical processing is running and that current block is being processed was trigerred by historical processing
|
// Check if historical processing is running and that current block being processed was trigerred by historical processing
|
||||||
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
|
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
|
||||||
const nextBlockNumberToProcess = block.blockNumber + 1;
|
const nextBlockNumberToProcess = block.blockNumber + 1;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user