Prefetch blocks with events in memory (#199)

* Prefetch blocks along with events in memory

* Fix prefetched block destructuring

* Wait while blocks are being prefetched

* Retry only if block not returned by upstream eth-server

* Log time taken to prefetch blocks

* Avoid creating a separate job for prefetching blocks

* Skip syncStatus check while fetching blocks

* Convert block number string to number for comparison

* Prefetch blocks in a batch only when required

* Keep existing pattern for watchers in this repo

* Fix logging while fetching a batch of blocks
This commit is contained in:
prathamesh0 2022-10-13 06:37:46 -05:00 committed by GitHub
parent 0ef458734e
commit bad5ec754a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 196 additions and 5 deletions

View File

@ -18,3 +18,4 @@ export * from './src/ipfs';
export * from './src/index-block';
export * from './src/metrics';
export * from './src/gql-metrics';
export * from './src/common';

View File

@ -1,16 +1,23 @@
import debug from 'debug';
import assert from 'assert';
import { DeepPartial } from 'typeorm';
import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { QUEUE_BLOCK_PROCESSING, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, IndexerInterface } from './types';
import { BlockProgressInterface, IndexerInterface, EventInterface } from './types';
import { wait } from './misc';
import { OrderDirection } from './database';
import { JobQueueConfig } from './config';
const DEFAULT_EVENTS_IN_BATCH = 50;
const log = debug('vulcanize:common');
export interface PrefetchedBlock {
block: any;
events: DeepPartial<EventInterface>[];
}
/**
* Create pruning job in QUEUE_BLOCK_PROCESSING.
* @param jobQueue
@ -39,7 +46,7 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN
* Method to fetch block by number and push to job queue.
* @param jobQueue
* @param indexer
* @param ethClient
* @param blockDelayInMilliSecs
* @param blockNumber
*/
export const processBlockByNumber = async (
@ -101,6 +108,179 @@ export const processBlockByNumber = async (
}
};
/**
* Create a processing job in QUEUE_BLOCK_PROCESSING.
* @param jobQueue
* @param blockNumber
*/
export const processBlockByNumberWithCache = async (
jobQueue: JobQueue,
blockNumber: number
): Promise<void> => {
log(`Process block ${blockNumber}`);
// TODO Check syncStatus if blockNumber already processed (might cause problems on restart).
await jobQueue.pushJob(
QUEUE_BLOCK_PROCESSING,
{
kind: JOB_KIND_INDEX,
blockNumber
}
);
};
/**
* Method to fetch all blocks at a height.
* @param job
* @param indexer
* @param jobQueueConfig
* @param prefetchedBlocksMap
*/
export const fetchBlocksAtHeight = async (
job: any,
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
prefetchedBlocksMap: Map<string, PrefetchedBlock>
): Promise<DeepPartial<BlockProgressInterface>[]> => {
const { blockNumber } = job.data;
let blocks = [];
// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
// Get blocks prefetched in memory.
blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber);
// If not found in cache, fetch the next batch.
if (!blocks.length) {
// Wait for blocks to be prefetched.
console.time('time:common#fetchBlocks-_prefetchBlocks');
await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, prefetchedBlocksMap);
console.timeEnd('time:common#fetchBlocks-_prefetchBlocks');
blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber);
}
log('size:common#_fetchBlocks-_prefetchedBlocksMap-size:', prefetchedBlocksMap.size);
}
if (!blocks.length) {
log(`common#cache-miss-${blockNumber}`);
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
return block;
});
}
// Try fetching blocks from eth-server until found.
while (!blocks.length) {
console.time('time:common#_fetchBlocks-eth-server');
blocks = await indexer.getBlocks({ blockNumber });
console.timeEnd('time:common#_fetchBlocks-eth-server');
if (!blocks.length) {
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
assert(jobQueueConfig.blockDelayInMilliSecs);
await wait(jobQueueConfig.blockDelayInMilliSecs);
}
}
const blocksToBeIndexed: DeepPartial<BlockProgressInterface>[] = [];
for (const block of blocks) {
const { cid, blockHash, blockNumber, parentHash, timestamp } = block;
blocksToBeIndexed.push({
blockNumber: Number(blockNumber),
cid,
blockHash,
parentHash,
blockTimestamp: timestamp
});
}
await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber);
return blocksToBeIndexed;
};
export const _prefetchBlocks = async (
blockNumber: number,
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
prefetchedBlocksMap: Map<string, PrefetchedBlock>
): Promise<void> => {
// Clear cache of any remaining blocks.
prefetchedBlocksMap.clear();
const blocksWithEvents = await _fetchBatchBlocks(
indexer,
jobQueueConfig,
blockNumber,
blockNumber + jobQueueConfig.prefetchBlockCount
);
blocksWithEvents.forEach(({ block, events }) => {
prefetchedBlocksMap.set(block.blockHash, { block, events });
});
};
/**
* Method to fetch blocks (with events) in the given range.
* @param indexer
* @param jobQueueConfig
* @param startBlock
* @param endBlock
*/
export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise<any[]> => {
let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
let blocks = [];
// Fetch blocks again if there are missing blocks.
while (true) {
console.time('time:common#fetchBatchBlocks-getBlocks');
const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
const res = await Promise.all(blockPromises);
console.timeEnd('time:common#fetchBatchBlocks-getBlocks');
const missingIndex = res.findIndex(blocks => blocks.length === 0);
// TODO Continue to process available blocks instead of retrying for whole range.
if (missingIndex < 0) {
blocks = blocks.concat(res);
break;
}
log('missing block number:', blockNumbers[missingIndex]);
blocks.push(res.slice(0, missingIndex));
blockNumbers = blockNumbers.slice(missingIndex);
assert(jobQueueConfig.blockDelayInMilliSecs);
await wait(jobQueueConfig.blockDelayInMilliSecs);
}
blocks = blocks.flat();
if (jobQueueConfig.jobDelayInMilliSecs) {
await wait(jobQueueConfig.jobDelayInMilliSecs);
}
// TODO Catch errors and continue to process available events instead of retrying for whole range because of an error.
const blockAndEventPromises = blocks.map(async block => {
block.blockTimestamp = block.timestamp;
assert(indexer.fetchBlockEvents);
const events = await indexer.fetchBlockEvents(block);
return { block, events };
});
return Promise.all(blockAndEventPromises);
};
/**
* Process events in batches for a block.
* @param indexer
@ -201,3 +381,9 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
console.timeEnd('time:common#processBatchEvents-updateBlockProgress');
};
const getPrefetchedBlocksAtHeight = (prefetchedBlocksMap: Map<string, PrefetchedBlock>, blockNumber: number):any[] => {
return Array.from(prefetchedBlocksMap.values())
.filter(({ block }) => Number(block.blockNumber) === blockNumber)
.map(prefetchedBlock => prefetchedBlock.block);
};

View File

@ -24,6 +24,9 @@ export interface JobQueueConfig {
eventsInBatch: number;
lazyUpdateBlockProgress?: boolean;
subgraphEventsOrder: boolean;
blockDelayInMilliSecs?: number;
prefetchBlocksInMem: boolean;
prefetchBlockCount: number;
}
export interface ServerConfig {

View File

@ -246,9 +246,9 @@ export class Indexer {
assert(block.blockHash);
log(`getBlockEvents: fetching from upstream server ${block.blockHash}`);
console.time('time:indexer#fetchBlockEvents-fetchAndSaveEvents');
console.time(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`);
const events = await fetchEvents(block);
console.timeEnd('time:indexer#fetchBlockEvents-fetchAndSaveEvents');
console.timeEnd(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`);
log(`getBlockEvents: fetched for block: ${block.blockHash} num events: ${events.length}`);
return events;

View File

@ -91,6 +91,7 @@ export interface IndexerInterface {
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
fetchBlockWithEvents (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface>
fetchBlockEvents?: (block: DeepPartial<BlockProgressInterface>) => Promise<DeepPartial<EventInterface>[]>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>