From bad5ec754a56baa6838c5920ddb672db4ecabcf6 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 13 Oct 2022 06:37:46 -0500 Subject: [PATCH] Prefetch blocks with events in memory (#199) * Prefetch blocks along with events in memory * Fix prefetched block destructuring * Wait while blocks are being prefetched * Retry only if block not returned by upstream eth-server * Log time taken to prefetch blocks * Avoid creating a separate job for prefetching blocks * Skip syncStatus check while fetching blocks * Convert block number string to number for comparison * Prefetch blocks in a batch only when required * Keep existing pattern for watchers in this repo * Fix logging while fetching a batch of blocks --- packages/util/index.ts | 1 + packages/util/src/common.ts | 192 ++++++++++++++++++++++++++++++++++- packages/util/src/config.ts | 3 + packages/util/src/indexer.ts | 4 +- packages/util/src/types.ts | 1 + 5 files changed, 196 insertions(+), 5 deletions(-) diff --git a/packages/util/index.ts b/packages/util/index.ts index ac18ea9f..4869babe 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -18,3 +18,4 @@ export * from './src/ipfs'; export * from './src/index-block'; export * from './src/metrics'; export * from './src/gql-metrics'; +export * from './src/common'; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 7bae952d..50c3e05f 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -1,16 +1,23 @@ import debug from 'debug'; import assert from 'assert'; +import { DeepPartial } from 'typeorm'; -import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; +import { QUEUE_BLOCK_PROCESSING, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; import { JobQueue } from './job-queue'; -import { BlockProgressInterface, IndexerInterface } from './types'; +import { BlockProgressInterface, IndexerInterface, EventInterface } from './types'; import { wait } from './misc'; import { OrderDirection } from './database'; +import { JobQueueConfig } from './config'; const DEFAULT_EVENTS_IN_BATCH = 50; const log = debug('vulcanize:common'); +export interface PrefetchedBlock { + block: any; + events: DeepPartial[]; +} + /** * Create pruning job in QUEUE_BLOCK_PROCESSING. * @param jobQueue @@ -39,7 +46,7 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN * Method to fetch block by number and push to job queue. * @param jobQueue * @param indexer - * @param ethClient + * @param blockDelayInMilliSecs * @param blockNumber */ export const processBlockByNumber = async ( @@ -101,6 +108,179 @@ export const processBlockByNumber = async ( } }; +/** + * Create a processing job in QUEUE_BLOCK_PROCESSING. + * @param jobQueue + * @param blockNumber + */ +export const processBlockByNumberWithCache = async ( + jobQueue: JobQueue, + blockNumber: number +): Promise => { + log(`Process block ${blockNumber}`); + + // TODO Check syncStatus if blockNumber already processed (might cause problems on restart). + + await jobQueue.pushJob( + QUEUE_BLOCK_PROCESSING, + { + kind: JOB_KIND_INDEX, + blockNumber + } + ); +}; + +/** + * Method to fetch all blocks at a height. + * @param job + * @param indexer + * @param jobQueueConfig + * @param prefetchedBlocksMap + */ +export const fetchBlocksAtHeight = async ( + job: any, + indexer: IndexerInterface, + jobQueueConfig: JobQueueConfig, + prefetchedBlocksMap: Map +): Promise[]> => { + const { blockNumber } = job.data; + let blocks = []; + + // Check for blocks in cache if prefetchBlocksInMem flag set. + if (jobQueueConfig.prefetchBlocksInMem) { + // Get blocks prefetched in memory. + blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber); + + // If not found in cache, fetch the next batch. + if (!blocks.length) { + // Wait for blocks to be prefetched. + console.time('time:common#fetchBlocks-_prefetchBlocks'); + await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, prefetchedBlocksMap); + console.timeEnd('time:common#fetchBlocks-_prefetchBlocks'); + + blocks = getPrefetchedBlocksAtHeight(prefetchedBlocksMap, blockNumber); + } + + log('size:common#_fetchBlocks-_prefetchedBlocksMap-size:', prefetchedBlocksMap.size); + } + + if (!blocks.length) { + log(`common#cache-miss-${blockNumber}`); + const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); + + blocks = blockProgressEntities.map((block: any) => { + block.timestamp = block.blockTimestamp; + + return block; + }); + } + + // Try fetching blocks from eth-server until found. + while (!blocks.length) { + console.time('time:common#_fetchBlocks-eth-server'); + blocks = await indexer.getBlocks({ blockNumber }); + console.timeEnd('time:common#_fetchBlocks-eth-server'); + + if (!blocks.length) { + log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); + assert(jobQueueConfig.blockDelayInMilliSecs); + await wait(jobQueueConfig.blockDelayInMilliSecs); + } + } + + const blocksToBeIndexed: DeepPartial[] = []; + for (const block of blocks) { + const { cid, blockHash, blockNumber, parentHash, timestamp } = block; + + blocksToBeIndexed.push({ + blockNumber: Number(blockNumber), + cid, + blockHash, + parentHash, + blockTimestamp: timestamp + }); + } + + await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber); + + return blocksToBeIndexed; +}; + +export const _prefetchBlocks = async ( + blockNumber: number, + indexer: IndexerInterface, + jobQueueConfig: JobQueueConfig, + prefetchedBlocksMap: Map +): Promise => { + // Clear cache of any remaining blocks. + prefetchedBlocksMap.clear(); + + const blocksWithEvents = await _fetchBatchBlocks( + indexer, + jobQueueConfig, + blockNumber, + blockNumber + jobQueueConfig.prefetchBlockCount + ); + + blocksWithEvents.forEach(({ block, events }) => { + prefetchedBlocksMap.set(block.blockHash, { block, events }); + }); +}; + +/** + * Method to fetch blocks (with events) in the given range. + * @param indexer + * @param jobQueueConfig + * @param startBlock + * @param endBlock + */ +export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise => { + let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock); + let blocks = []; + + // Fetch blocks again if there are missing blocks. + while (true) { + console.time('time:common#fetchBatchBlocks-getBlocks'); + const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber })); + const res = await Promise.all(blockPromises); + console.timeEnd('time:common#fetchBatchBlocks-getBlocks'); + + const missingIndex = res.findIndex(blocks => blocks.length === 0); + + // TODO Continue to process available blocks instead of retrying for whole range. + if (missingIndex < 0) { + blocks = blocks.concat(res); + break; + } + + log('missing block number:', blockNumbers[missingIndex]); + + blocks.push(res.slice(0, missingIndex)); + blockNumbers = blockNumbers.slice(missingIndex); + + assert(jobQueueConfig.blockDelayInMilliSecs); + await wait(jobQueueConfig.blockDelayInMilliSecs); + } + + blocks = blocks.flat(); + + if (jobQueueConfig.jobDelayInMilliSecs) { + await wait(jobQueueConfig.jobDelayInMilliSecs); + } + + // TODO Catch errors and continue to process available events instead of retrying for whole range because of an error. + const blockAndEventPromises = blocks.map(async block => { + block.blockTimestamp = block.timestamp; + + assert(indexer.fetchBlockEvents); + const events = await indexer.fetchBlockEvents(block); + + return { block, events }; + }); + + return Promise.all(blockAndEventPromises); +}; + /** * Process events in batches for a block. * @param indexer @@ -201,3 +381,9 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); console.timeEnd('time:common#processBatchEvents-updateBlockProgress'); }; + +const getPrefetchedBlocksAtHeight = (prefetchedBlocksMap: Map, blockNumber: number):any[] => { + return Array.from(prefetchedBlocksMap.values()) + .filter(({ block }) => Number(block.blockNumber) === blockNumber) + .map(prefetchedBlock => prefetchedBlock.block); +}; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index d8a42112..b0298bfd 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -24,6 +24,9 @@ export interface JobQueueConfig { eventsInBatch: number; lazyUpdateBlockProgress?: boolean; subgraphEventsOrder: boolean; + blockDelayInMilliSecs?: number; + prefetchBlocksInMem: boolean; + prefetchBlockCount: number; } export interface ServerConfig { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 4579a2d6..1ec532ae 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -246,9 +246,9 @@ export class Indexer { assert(block.blockHash); log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); - console.time('time:indexer#fetchBlockEvents-fetchAndSaveEvents'); + console.time(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`); const events = await fetchEvents(block); - console.timeEnd('time:indexer#fetchBlockEvents-fetchAndSaveEvents'); + console.timeEnd(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`); log(`getBlockEvents: fetched for block: ${block.blockHash} num events: ${events.length}`); return events; diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index a7279444..74d85c50 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -91,6 +91,7 @@ export interface IndexerInterface { getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise fetchBlockWithEvents (block: DeepPartial): Promise + fetchBlockEvents?: (block: DeepPartial) => Promise[]> removeUnknownEvents (block: BlockProgressInterface): Promise updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise