mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-23 11:39:05 +00:00
Handle null blocks for FEVM (#437)
* Handle null blocks in chain pruning * Skip FEVM null blocks while prefetching * Fix BaseFee assignment while fetching block in rpc-eth-client * Convert blockNumber field to number type before saving block to db * Avoid saving blocks after null block twice
This commit is contained in:
parent
320bf02938
commit
8bba0aeb94
@ -166,7 +166,7 @@ export class EthClient implements EthClientInterface {
|
||||
Extra: rawBlock.extraData,
|
||||
MixDigest: rawBlock.mixHash,
|
||||
Nonce: BigInt(rawBlock.nonce),
|
||||
BaseFee: rawBlock.baseFeePerGas ?? BigInt(rawBlock.baseFeePerGas)
|
||||
BaseFee: rawBlock.baseFeePerGas && BigInt(rawBlock.baseFeePerGas)
|
||||
};
|
||||
|
||||
const rlpData = encodeHeader(header);
|
||||
|
@ -172,14 +172,45 @@ export const _fetchBatchBlocks = async (
|
||||
blockProgress: BlockProgressInterface,
|
||||
events: DeepPartial<EventInterface>[]
|
||||
}[]> => {
|
||||
const blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
|
||||
let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
|
||||
let blocks = [];
|
||||
|
||||
// Fetch blocks again if there are missing blocks.
|
||||
while (true) {
|
||||
console.time('time:common#fetchBatchBlocks-getBlocks');
|
||||
|
||||
const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
|
||||
const res = await Promise.all(blockPromises);
|
||||
const settledResults = await Promise.allSettled(blockPromises);
|
||||
|
||||
const res: any[] = [];
|
||||
for (let index = 0; index < settledResults.length; index++) {
|
||||
const result = settledResults[index];
|
||||
// If fulfilled, return value
|
||||
if (result.status === 'fulfilled') {
|
||||
res.push(result.value);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If rejected, check error
|
||||
// Handle null block error in case of Lotus EVM
|
||||
// Otherwise, rethrow error
|
||||
const err = result.reason;
|
||||
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === 'requested epoch was a null round')) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
log(`Block ${blockNumbers[index]} requested was null (FEVM), skipping`);
|
||||
|
||||
// Remove the corresponding block number from the blockNumbers to avoid retrying for the same
|
||||
blockNumbers = blockNumbers.splice(index, 1);
|
||||
|
||||
// Stop the iteration at the first null block found
|
||||
// To avoid saving blocks after the null block
|
||||
// so that they don't conflict with blocks fetched when processBlockByNumber gets called for the null block
|
||||
// TODO: Optimize
|
||||
break;
|
||||
}
|
||||
|
||||
console.timeEnd('time:common#fetchBatchBlocks-getBlocks');
|
||||
|
||||
const firstMissingBlockIndex = res.findIndex(blocks => blocks.length === 0);
|
||||
@ -205,6 +236,7 @@ export const _fetchBatchBlocks = async (
|
||||
|
||||
blocks.forEach(block => {
|
||||
block.blockTimestamp = block.timestamp;
|
||||
block.blockNumber = Number(block.blockNumber);
|
||||
});
|
||||
|
||||
console.time('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');
|
||||
|
@ -266,6 +266,10 @@ export class Indexer {
|
||||
// For each of the given blocks, fetches events and saves them along with the block to db
|
||||
// Returns an array with [block, events] for all the given blocks
|
||||
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
|
||||
if (!blocks.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const fromBlock = blocks[0].blockNumber;
|
||||
const toBlock = blocks[blocks.length - 1].blockNumber;
|
||||
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`);
|
||||
@ -276,16 +280,8 @@ export class Indexer {
|
||||
const blockHash = block.blockHash;
|
||||
assert(blockHash);
|
||||
|
||||
const blockToSave = {
|
||||
cid: block.cid,
|
||||
blockHash: block.blockHash,
|
||||
blockNumber: block.blockNumber,
|
||||
blockTimestamp: block.blockTimestamp,
|
||||
parentHash: block.parentHash
|
||||
};
|
||||
|
||||
const dbEvents = dbEventsMap.get(blockHash) || [];
|
||||
const [blockProgress] = await this.saveBlockWithEvents(blockToSave, dbEvents);
|
||||
const [blockProgress] = await this.saveBlockWithEvents(block, dbEvents);
|
||||
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetched for block: ${blockHash} num events: ${blockProgress.numEvents}`);
|
||||
|
||||
return { blockProgress, events: [] };
|
||||
@ -494,10 +490,10 @@ export class Indexer {
|
||||
async saveBlockWithEvents (block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
try {
|
||||
console.time(`time:indexer#_saveBlockWithEvents-db-save-${block.blockNumber}`);
|
||||
console.time(`time:indexer#saveBlockWithEvents-db-save-${block.blockNumber}`);
|
||||
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, events);
|
||||
await dbTx.commitTransaction();
|
||||
console.timeEnd(`time:indexer#_saveBlockWithEvents-db-save-${block.blockNumber}`);
|
||||
console.timeEnd(`time:indexer#saveBlockWithEvents-db-save-${block.blockNumber}`);
|
||||
|
||||
return [blockProgress, []];
|
||||
} catch (error) {
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import { ethers } from 'ethers';
|
||||
import { DeepPartial, In } from 'typeorm';
|
||||
|
||||
import { JobQueueConfig } from './config';
|
||||
@ -273,27 +274,29 @@ export class JobRunner {
|
||||
// Check how many branches there are at the given height/block number.
|
||||
const blocksAtHeight = await this._indexer.getBlocksAtHeight(pruneBlockHeight, false);
|
||||
|
||||
// Should be at least 1.
|
||||
assert(blocksAtHeight.length);
|
||||
let newCanonicalBlockHash = ethers.constants.HashZero;
|
||||
|
||||
let newCanonicalBlockHash;
|
||||
// We have more than one node at this height, so prune all nodes not reachable from indexed block at max reorg depth from prune height.
|
||||
// This will lead to orphaned nodes, which will get pruned at the next height.
|
||||
if (blocksAtHeight.length > 1) {
|
||||
const [indexedBlock] = await this._indexer.getBlocksAtHeight(pruneBlockHeight + MAX_REORG_DEPTH, false);
|
||||
// Prune only if blocks exist at pruneBlockHeight
|
||||
// There might be missing null block in FEVM; only update the sync status in such case
|
||||
if (blocksAtHeight.length !== 0) {
|
||||
// We have more than one node at this height, so prune all nodes not reachable from indexed block at max reorg depth from prune height.
|
||||
// This will lead to orphaned nodes, which will get pruned at the next height.
|
||||
if (blocksAtHeight.length > 1) {
|
||||
const [indexedBlock] = await this._indexer.getBlocksAtHeight(pruneBlockHeight + MAX_REORG_DEPTH, false);
|
||||
|
||||
// Get ancestor blockHash from indexed block at prune height.
|
||||
const ancestorBlockHash = await this._indexer.getAncestorAtDepth(indexedBlock.blockHash, MAX_REORG_DEPTH);
|
||||
newCanonicalBlockHash = ancestorBlockHash;
|
||||
// Get ancestor blockHash from indexed block at prune height.
|
||||
const ancestorBlockHash = await this._indexer.getAncestorAtDepth(indexedBlock.blockHash, MAX_REORG_DEPTH);
|
||||
newCanonicalBlockHash = ancestorBlockHash;
|
||||
|
||||
const blocksToBePruned = blocksAtHeight.filter(block => ancestorBlockHash !== block.blockHash);
|
||||
const blocksToBePruned = blocksAtHeight.filter(block => ancestorBlockHash !== block.blockHash);
|
||||
|
||||
if (blocksToBePruned.length) {
|
||||
// Mark blocks pruned which are not the ancestor block.
|
||||
await this._indexer.markBlocksAsPruned(blocksToBePruned);
|
||||
if (blocksToBePruned.length) {
|
||||
// Mark blocks pruned which are not the ancestor block.
|
||||
await this._indexer.markBlocksAsPruned(blocksToBePruned);
|
||||
}
|
||||
} else {
|
||||
newCanonicalBlockHash = blocksAtHeight[0].blockHash;
|
||||
}
|
||||
} else {
|
||||
newCanonicalBlockHash = blocksAtHeight[0].blockHash;
|
||||
}
|
||||
|
||||
// Update the canonical block in the SyncStatus.
|
||||
|
Loading…
Reference in New Issue
Block a user