Fix use of prefetchBlocksInMem flag in block processing (#240)

* Fix use of prefetchBlocksInMem flag in block processing

* Rename prefetchedBlocksMap to blockAndEventsMap
This commit is contained in:
nikugogoi 2022-11-18 11:01:09 +05:30 committed by GitHub
parent 92fd3cac03
commit cc8fcffaa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 37 deletions

View File

@ -52,45 +52,55 @@ export const processBlockByNumberWithCache = async (
* @param job
* @param indexer
* @param jobQueueConfig
* @param prefetchedBlocksMap
* @param blockAndEventsMap
*/
export const fetchBlocksAtHeight = async (
job: any,
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
prefetchedBlocksMap: Map<string, PrefetchedBlock>
blockAndEventsMap: Map<string, PrefetchedBlock>
): Promise<DeepPartial<BlockProgressInterface>[]> => {
const { blockNumber } = job.data;
let blocks = [];
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
// Get blocks prefetched in memory.
blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber);
log('size:common#fetchBlocksAtHeight-prefetch-_blockAndEventsMap-size:', blockAndEventsMap.size);
}
return block;
});
// If blocks not found in the db:
if (!blocks.length) {
// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
// Get blocks prefetched in memory.
blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber);
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
// If not found in cache, fetch the next batch.
if (!blocks.length) {
log(`common#cache-miss-${blockNumber}`);
return block;
});
}
// Wait for blocks to be prefetched.
console.time('time:common#fetchBlocks-_prefetchBlocks');
await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, prefetchedBlocksMap);
console.timeEnd('time:common#fetchBlocks-_prefetchBlocks');
if (jobQueueConfig.prefetchBlocksInMem && !blocks.length) {
// If blocks not found in the db and cache, fetch next batch.
log(`common#cache-miss-${blockNumber}`);
blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber);
}
// Wait for blocks to be prefetched.
console.time('time:common#fetchBlocks-_prefetchBlocks');
await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, blockAndEventsMap);
console.timeEnd('time:common#fetchBlocks-_prefetchBlocks');
log('size:common#_fetchBlocks-_prefetchedBlocksMap-size:', prefetchedBlocksMap.size);
blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber);
}
// Try fetching blocks from eth-server until found.
while (!blocks.length) {
console.time('time:common#_fetchBlocks-eth-server');
blocks = await indexer.getBlocks({ blockNumber });
console.timeEnd('time:common#_fetchBlocks-eth-server');
if (!blocks.length) {
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
await wait(jobQueueConfig.blockDelayInMilliSecs);
}
}
@ -118,10 +128,10 @@ export const _prefetchBlocks = async (
blockNumber: number,
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
prefetchedBlocksMap: Map<string, PrefetchedBlock>
blockAndEventsMap: Map<string, PrefetchedBlock>
): Promise<void> => {
// Clear cache of any remaining blocks.
prefetchedBlocksMap.clear();
blockAndEventsMap.clear();
const blocksWithEvents = await _fetchBatchBlocks(
indexer,
@ -131,7 +141,7 @@ export const _prefetchBlocks = async (
);
blocksWithEvents.forEach(({ blockProgress, events }) => {
prefetchedBlocksMap.set(blockProgress.blockHash, { block: blockProgress, events });
blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events });
});
};
@ -360,8 +370,8 @@ export const createCheckpointJob = async (jobQueue: JobQueue, blockHash: string,
);
};
const getPrefetchedBlocksAtHeight = (prefetchedBlocksMap: Map<string, PrefetchedBlock>, blockNumber: number):any[] => {
return Array.from(prefetchedBlocksMap.values())
const getPrefetchedBlocksAtHeight = (blockAndEventsMap: Map<string, PrefetchedBlock>, blockNumber: number):any[] => {
return Array.from(blockAndEventsMap.values())
.filter(({ block }) => Number(block.blockNumber) === blockNumber)
.map(prefetchedBlock => prefetchedBlock.block);
};

View File

@ -39,7 +39,7 @@ export class JobRunner {
_endBlockProcessTimer?: () => void
_shutDown = false
_signalCount = 0
_prefetchedBlocksMap: Map<string, PrefetchedBlock> = new Map()
_blockAndEventsMap: Map<string, PrefetchedBlock> = new Map()
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer;
@ -56,7 +56,7 @@ export class JobRunner {
job,
this._indexer,
this._jobQueueConfig,
this._prefetchedBlocksMap
this._blockAndEventsMap
);
const indexBlockPromises = blocksToBeIndexed.map(blockToBeIndexed => this._indexBlock(job, blockToBeIndexed));
await Promise.all(indexBlockPromises);
@ -352,7 +352,7 @@ export class JobRunner {
}
if (!blockProgress) {
const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash);
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
if (prefetchedBlock) {
({ block: blockProgress } = prefetchedBlock);
@ -365,7 +365,7 @@ export class JobRunner {
[blockProgress] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
this._prefetchedBlocksMap.set(blockHash, { block: blockProgress, events: [] });
this._blockAndEventsMap.set(blockHash, { block: blockProgress, events: [] });
}
}
@ -382,16 +382,16 @@ export class JobRunner {
async _processEvents (job: any): Promise<void> {
const { blockHash } = job.data;
if (!this._prefetchedBlocksMap.has(blockHash)) {
if (!this._blockAndEventsMap.has(blockHash)) {
console.time('time:job-runner#_processEvents-get-block-progress');
const block = await this._indexer.getBlockProgress(blockHash);
console.timeEnd('time:job-runner#_processEvents-get-block-progress');
assert(block);
this._prefetchedBlocksMap.set(blockHash, { block, events: [] });
this._blockAndEventsMap.set(blockHash, { block, events: [] });
}
const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash);
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
assert(prefetchedBlock);
const { block } = prefetchedBlock;
@ -404,7 +404,7 @@ export class JobRunner {
lastProcessedBlockNumber.set(block.blockNumber);
lastBlockNumEvents.set(block.numEvents);
this._prefetchedBlocksMap.delete(block.blockHash);
this._blockAndEventsMap.delete(block.blockHash);
if (this._endBlockProcessTimer) {
this._endBlockProcessTimer();