Remove prefetchBlocksInMem flag along with functionality (#474)

* Remove prefetchBlocksInMem flag

* Remove Indexer method fetchEventsAndSaveBlocks

* Throw error if pruning not safe

---------

Co-authored-by: neeraj <neeraj.rtly@gmail.com>
This commit is contained in:
Nabarun Gogoi 2023-11-16 19:03:13 +05:30 committed by GitHub
parent 3e898914ba
commit 7c8f84b1dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 29 additions and 294 deletions

View File

@ -87,8 +87,6 @@
eventsInBatch = 50
subgraphEventsOrder = true
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.

View File

@ -688,10 +688,6 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> {
return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this.eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
}
async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgress,
events: DeepPartial<Event>[],

View File

@ -114,12 +114,6 @@ export class Indexer implements IndexerInterface {
return '';
}
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
assert(blocks);
return [];
}
async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgressInterface;
events: DeepPartial<EventInterface>[];

View File

@ -68,34 +68,13 @@ export const fetchBlocksAtHeight = async (
): Promise<DeepPartial<BlockProgressInterface>[]> => {
let blocks = [];
// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
// Get blocks prefetched in memory.
blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber);
log('size:common#fetchBlocksAtHeight-prefetch-_blockAndEventsMap-size:', blockAndEventsMap.size);
}
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
if (!blocks.length) {
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
return block;
});
}
if (jobQueueConfig.prefetchBlocksInMem && !blocks.length) {
// If blocks not found in the db and cache, fetch next batch.
log(`common#cache-miss-${blockNumber}`);
// Wait for blocks to be prefetched.
console.time('time:common#fetchBlocks-_prefetchBlocks');
await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, blockAndEventsMap);
console.timeEnd('time:common#fetchBlocks-_prefetchBlocks');
blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber);
}
return block;
});
// Try fetching blocks from eth-server until found.
while (!blocks.length) {
@ -180,125 +159,6 @@ export const fetchAndSaveFilteredLogsAndBlocks = async (
return blocksData.map(({ blockProgress }) => blockProgress);
};
export const _prefetchBlocks = async (
blockNumber: number,
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
blockAndEventsMap: Map<string, PrefetchedBlock>
): Promise<void> => {
// Clear cache of any remaining blocks.
blockAndEventsMap.clear();
const blocksWithEvents = await _fetchBatchBlocks(
indexer,
jobQueueConfig,
blockNumber,
blockNumber + jobQueueConfig.prefetchBlockCount
);
blocksWithEvents.forEach(({ blockProgress, events }) => {
blockAndEventsMap.set(
blockProgress.blockHash,
{
block: blockProgress,
events,
// TODO: Set ethFullBlock and ethFullTransactions
ethFullBlock: {} as EthFullBlock,
ethFullTransactions: []
});
});
};
/**
* Method to fetch blocks (with events) in the given range.
* @param indexer
* @param jobQueueConfig
* @param startBlock
* @param endBlock
*/
export const _fetchBatchBlocks = async (
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
startBlock: number,
endBlock: number
): Promise<{
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[]
}[]> => {
let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
let blocks = [];
// Fetch blocks again if there are missing blocks.
while (true) {
console.time('time:common#fetchBatchBlocks-getBlocks');
const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
const settledResults = await Promise.allSettled(blockPromises);
const res: any[] = [];
for (let index = 0; index < settledResults.length; index++) {
const result = settledResults[index];
// If fulfilled, return value
if (result.status === 'fulfilled') {
res.push(result.value);
continue;
}
// If rejected, check error
// Handle null block error in case of Lotus EVM
// Otherwise, rethrow error
const err = result.reason;
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) {
throw err;
}
log(`Block ${blockNumbers[index]} requested was null (FEVM), skipping`);
// Remove the corresponding block number from the blockNumbers to avoid retrying for the same
blockNumbers = blockNumbers.splice(index, 1);
// Stop the iteration at the first null block found
// To avoid saving blocks after the null block
// so that they don't conflict with blocks fetched when processBlockByNumber gets called for the null block
// TODO: Optimize
break;
}
console.timeEnd('time:common#fetchBatchBlocks-getBlocks');
const firstMissingBlockIndex = res.findIndex(blocks => blocks.length === 0);
if (firstMissingBlockIndex === -1) {
blocks = res;
break;
} else if (firstMissingBlockIndex > 0) {
blocks = res.slice(0, firstMissingBlockIndex);
break;
}
log(`No blocks fetched for block number ${blockNumbers[0]}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
await wait(jobQueueConfig.blockDelayInMilliSecs);
}
// Flatten array as there can be multiple blocks at the same height
blocks = blocks.flat();
if (jobQueueConfig.jobDelayInMilliSecs) {
await wait(jobQueueConfig.jobDelayInMilliSecs);
}
blocks.forEach(block => {
block.blockTimestamp = Number(block.timestamp);
block.blockNumber = Number(block.blockNumber);
});
console.time('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');
const blockAndEventsList = await indexer.fetchEventsAndSaveBlocks(blocks);
console.timeEnd('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');
return blockAndEventsList;
};
/**
* Process events in batches for a block.
* @param indexer
@ -602,9 +462,3 @@ export const createCheckpointJob = async (jobQueue: JobQueue, blockHash: string,
}
);
};
const getPrefetchedBlocksAtHeight = (blockAndEventsMap: Map<string, PrefetchedBlock>, blockNumber: number):any[] => {
return Array.from(blockAndEventsMap.values())
.filter(({ block }) => Number(block.blockNumber) === blockNumber)
.map(prefetchedBlock => prefetchedBlock.block);
};

View File

@ -22,8 +22,6 @@ export interface JobQueueConfig {
lazyUpdateBlockProgress?: boolean;
subgraphEventsOrder: boolean;
blockDelayInMilliSecs: number;
prefetchBlocksInMem: boolean;
prefetchBlockCount: number;
// Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange?: number;
// Max block range of historical processing after which it waits for completion of events processing

View File

@ -359,106 +359,6 @@ export class Indexer {
return this._db.getEvent(id);
}
// For each of the given blocks, fetches events and saves them along with the block to db
// Returns an array with [block, events] for all the given blocks
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
if (!blocks.length) {
return [];
}
const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber;
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`);
const dbEventsMap = await this.fetchEventsForBlocks(blocks, eventSignaturesMap, parseEventNameAndArgs);
const blocksWithEventsPromises = blocks.map(async block => {
const blockHash = block.blockHash;
assert(blockHash);
const dbEvents = dbEventsMap.get(blockHash) || [];
const [blockProgress] = await this.saveBlockWithEvents(block, dbEvents);
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetched for block: ${blockHash} num events: ${blockProgress.numEvents}`);
return { blockProgress, events: [] };
});
return Promise.all(blocksWithEventsPromises);
}
// Fetch events (to be saved to db) for a block range
async fetchEventsForBlocks (blocks: DeepPartial<BlockProgressInterface>[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<Map<string, DeepPartial<EventInterface>[]>> {
if (!blocks.length) {
return new Map();
}
// Fetch logs for block range of given blocks
const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber;
assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
const { logs } = await this._ethClient.getLogsForBlockRange({
fromBlock,
toBlock,
addresses,
topics
});
// Skip further processing if no relevant logs found in the entire block range
if (!logs.length) {
return new Map();
}
// Sort logs according to blockhash
const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
// Fetch transactions for given blocks
const transactionsMap: Map<string, any> = new Map();
const transactionPromises = blocks.map(async (block) => {
assert(block.blockHash);
// Skip fetching txs if no relevant logs found in this block
if (!blockLogsMap.has(block.blockHash)) {
return;
}
const blockWithTransactions = await this._ethClient.getBlockWithTransactions({ blockHash: block.blockHash, blockNumber: block.blockNumber });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
} = blockWithTransactions;
transactionsMap.set(block.blockHash, transactions);
});
await Promise.all(transactionPromises);
// Map db ready events according to blockhash
const dbEventsMap: Map<string, DeepPartial<EventInterface>[]> = new Map();
blocks.forEach(block => {
const blockHash = block.blockHash;
assert(blockHash);
const logs = blockLogsMap.get(blockHash) || [];
const transactions = transactionsMap.get(blockHash) || [];
const dbEvents = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
dbEventsMap.set(blockHash, dbEvents);
});
return dbEventsMap;
}
async fetchAndSaveFilteredEventsAndBlocks (
fromBlock: number,
toBlock: number,

View File

@ -423,7 +423,11 @@ export class JobRunner {
log(`Processing chain pruning at ${pruneBlockHeight}`);
// Assert we're at a depth where pruning is safe.
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
if (!(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH))) {
const message = `Pruning is not safe at height ${pruneBlockHeight}, latest indexed block height ${syncStatus.latestIndexedBlockNumber}`;
log(message);
throw new Error(message);
}
// Check that we haven't already pruned at this depth.
if (syncStatus.latestCanonicalBlockNumber >= pruneBlockHeight) {
@ -585,34 +589,26 @@ export class JobRunner {
}
if (!blockProgress) {
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
// Delay required to process block.
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
await wait(jobDelayInMilliSecs);
// Check if prefetched block is set properly
// prefetchedBlock.block is an empty object when running in realtime processing
if (prefetchedBlock && prefetchedBlock.block.blockHash) {
({ block: blockProgress } = prefetchedBlock);
} else {
// Delay required to process block.
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
await wait(jobDelayInMilliSecs);
console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`);
let ethFullTransactions;
[blockProgress, , ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
const data = this._blockAndEventsMap.get(blockHash);
assert(data);
console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`);
let ethFullTransactions;
[blockProgress,, ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
const data = this._blockAndEventsMap.get(blockHash);
assert(data);
this._blockAndEventsMap.set(
blockHash,
{
...data,
block: blockProgress,
ethFullTransactions
});
}
this._blockAndEventsMap.set(
blockHash,
{
...data,
block: blockProgress,
ethFullTransactions
});
}
if (!blockProgress.isComplete) {

View File

@ -173,7 +173,6 @@ export interface IndexerInterface {
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[
BlockProgressInterface,
DeepPartial<EventInterface>[],