Fix block processing during chain reorg (#498)

* Fix block processing during chain reorg

* Add new method in test dummy indexer

* Add missing semicolon
This commit is contained in:
Nabarun Gogoi 2023-12-28 15:06:47 +05:30 committed by GitHub
parent 78e43bc088
commit a6deed9c27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 40 additions and 16 deletions

View File

@ -869,4 +869,8 @@ export class Indexer implements IndexerInterface {
await dbTx.release(); await dbTx.release();
} }
} }
async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> {
return this._baseIndexer.getFullTransactions(txHashList);
}
} }

View File

@ -334,4 +334,8 @@ export class Indexer implements IndexerInterface {
async processStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> { async processStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
return false; return false;
} }
async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> {
return [];
}
} }

View File

@ -23,7 +23,7 @@ const log = debug('vulcanize:common');
const JSONbigNative = JSONbig({ useNativeBigInt: true }); const JSONbigNative = JSONbig({ useNativeBigInt: true });
export interface PrefetchedBlock { export interface PrefetchedBlock {
block: BlockProgressInterface; block?: BlockProgressInterface;
events: DeepPartial<EventInterface>[]; events: DeepPartial<EventInterface>[];
ethFullBlock: EthFullBlock; ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[]; ethFullTransactions: EthFullTransaction[];
@ -64,15 +64,7 @@ export const fetchBlocksAtHeight = async (
jobQueueConfig: JobQueueConfig, jobQueueConfig: JobQueueConfig,
blockAndEventsMap: Map<string, PrefetchedBlock> blockAndEventsMap: Map<string, PrefetchedBlock>
): Promise<DeepPartial<BlockProgressInterface>[]> => { ): Promise<DeepPartial<BlockProgressInterface>[]> => {
let blocks = []; let blocks: EthFullBlock[] = [];
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
return block;
});
// Try fetching blocks from eth-server until found. // Try fetching blocks from eth-server until found.
while (!blocks.length) { while (!blocks.length) {
@ -82,8 +74,8 @@ export const fetchBlocksAtHeight = async (
// Check if all blocks are null and increment blockNumber to index next block number // Check if all blocks are null and increment blockNumber to index next block number
if (ethFullBlocks.length > 0 && ethFullBlocks.every(block => block === null)) { if (ethFullBlocks.length > 0 && ethFullBlocks.every(block => block === null)) {
blockNumber++;
log(`Block ${blockNumber} requested was null (FEVM); Fetching next block`); log(`Block ${blockNumber} requested was null (FEVM); Fetching next block`);
blockNumber++;
continue; continue;
} }
@ -125,7 +117,7 @@ export const fetchBlocksAtHeight = async (
}); });
} }
await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber); await indexer.updateSyncStatusChainHead(blocks[0].blockHash, Number(blocks[0].blockNumber));
return blocksToBeIndexed; return blocksToBeIndexed;
}; };

View File

@ -530,11 +530,15 @@ export class Indexer {
} }
async _fetchTxsFromLogs (logs: any[]): Promise<EthFullTransaction[]> { async _fetchTxsFromLogs (logs: any[]): Promise<EthFullTransaction[]> {
const txHashes = Array.from([ const txHashList = Array.from([
...new Set<string>(logs.map((log) => log.transaction.hash)) ...new Set<string>(logs.map((log) => log.transaction.hash))
]); ]);
const ethFullTxPromises = txHashes.map(async txHash => { return this.getFullTransactions(txHashList);
}
async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> {
const ethFullTxPromises = txHashList.map(async txHash => {
return this._ethClient.getFullTransaction(txHash); return this._ethClient.getFullTransaction(txHash);
}); });

View File

@ -587,6 +587,9 @@ export class JobRunner {
} }
} }
const data = this._blockAndEventsMap.get(blockHash);
assert(data);
if (!blockProgress) { if (!blockProgress) {
// Delay required to process block. // Delay required to process block.
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
@ -598,9 +601,24 @@ export class JobRunner {
[blockProgress, , ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); [blockProgress, , ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
const data = this._blockAndEventsMap.get(blockHash);
assert(data);
this._blockAndEventsMap.set(
blockHash,
{
...data,
block: blockProgress,
ethFullTransactions
});
} else {
const events = await this._indexer.getBlockEvents(blockHash, {}, {});
const txHashList = Array.from([
...new Set<string>(events.map((event) => event.txHash))
]);
const ethFullTransactions = await this._indexer.getFullTransactions(txHashList);
// const ethFullTransactions =
this._blockAndEventsMap.set( this._blockAndEventsMap.set(
blockHash, blockHash,
{ {
@ -627,6 +645,7 @@ export class JobRunner {
const prefetchedBlock = this._blockAndEventsMap.get(blockHash); const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
assert(prefetchedBlock); assert(prefetchedBlock);
const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock; const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock;
assert(block, 'BlockProgress not set in blockAndEvents map');
try { try {
log(`Processing events for block ${block.blockNumber}`); log(`Processing events for block ${block.blockNumber}`);

View File

@ -233,6 +233,7 @@ export interface IndexerInterface {
resetWatcherToBlock (blockNumber: number): Promise<void> resetWatcherToBlock (blockNumber: number): Promise<void>
clearProcessedBlockData (block: BlockProgressInterface): Promise<void> clearProcessedBlockData (block: BlockProgressInterface): Promise<void>
getResultEvent (event: EventInterface): any getResultEvent (event: EventInterface): any
getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]>
} }
export interface DatabaseInterface { export interface DatabaseInterface {