From b63a93d8a0c85c3646049386fb50cebac031b43b Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Thu, 26 Oct 2023 11:21:48 +0530 Subject: [PATCH] Skip checking of events order while processing due to incorrect `logIndex` in FEVM (#438) * Skip log index order in events processing * Use json-bigint stringify in processBatchEvents * Set eventSignature in event processing for contracts watched later * Check for null block in async caching of block sizes --- .../src/templates/config-template.handlebars | 3 ++ packages/util/src/block-size-cache.ts | 30 ++++++++---- packages/util/src/common.ts | 48 ++++++++----------- packages/util/src/constants.ts | 2 + packages/util/src/database.ts | 4 -- 5 files changed, 47 insertions(+), 40 deletions(-) diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 35e76ed4..8a3d7da2 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -67,6 +67,9 @@ gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" + # Flag to specify if rpc-eth-client should be used from RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) + rpcClient = false + [upstream.cache] name = "requests" enabled = false diff --git a/packages/util/src/block-size-cache.ts b/packages/util/src/block-size-cache.ts index d27b466c..d2c7a2c2 100644 --- a/packages/util/src/block-size-cache.ts +++ b/packages/util/src/block-size-cache.ts @@ -2,9 +2,11 @@ // Copyright 2022 Vulcanize, Inc. // -import { utils, providers } from 'ethers'; +import { utils, providers, errors } from 'ethers'; import debug from 'debug'; +import { NULL_BLOCK_ERROR } from './constants'; + const log = debug('vulcanize:block-size-cache'); // Number of blocks to cache after current block being processed. @@ -44,16 +46,26 @@ const cacheBlockSizesAsync = async (provider: providers.JsonRpcProvider, blockNu // Start prefetching blocks after latest height in blockSizeMap. for (let i = startBlockHeight; i <= endBlockHeight; i++) { - console.time(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`); - const block = await provider.send('eth_getBlockByNumber', [utils.hexStripZeros(utils.hexlify(i)), false]); + try { + console.time(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`); + const block = await provider.send('eth_getBlockByNumber', [utils.hexStripZeros(utils.hexlify(i)), false]); - if (block) { - const { size, hash } = block; - blockSizeMap.set(hash, { size, blockNumber: i }); - } else { - log(`No block found at height ${i}`); + if (block) { + const { size, hash } = block; + blockSizeMap.set(hash, { size, blockNumber: i }); + } else { + log(`No block found at height ${i}`); + } + } catch (err: any) { + // Handle null block error in case of Lotus EVM + if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) { + throw err; + } + + log(`Block ${i} requested was null (FEVM); Fetching next block`); + } finally { + console.timeEnd(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`); } - console.timeEnd(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`); } } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 412ffa6e..f780c33c 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -2,6 +2,7 @@ import debug from 'debug'; import assert from 'assert'; import { DeepPartial } from 'typeorm'; import { errors } from 'ethers'; +import JSONbig from 'json-bigint'; import { QUEUE_BLOCK_PROCESSING, @@ -9,7 +10,8 @@ import { QUEUE_BLOCK_CHECKPOINT, JOB_KIND_PRUNE, JOB_KIND_INDEX, - UNKNOWN_EVENT_NAME + UNKNOWN_EVENT_NAME, + NULL_BLOCK_ERROR } from './constants'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, IndexerInterface, EventInterface } from './types'; @@ -20,6 +22,7 @@ import { JobQueueConfig } from './config'; const DEFAULT_EVENTS_IN_BATCH = 50; const log = debug('vulcanize:common'); +const JSONbigNative = JSONbig({ useNativeBigInt: true }); export interface PrefetchedBlock { block: BlockProgressInterface; @@ -104,7 +107,7 @@ export const fetchBlocksAtHeight = async ( } } catch (err: any) { // Handle null block error in case of Lotus EVM - if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === 'requested epoch was a null round')) { + if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) { throw err; } @@ -195,7 +198,7 @@ export const _fetchBatchBlocks = async ( // Handle null block error in case of Lotus EVM // Otherwise, rethrow error const err = result.reason; - if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === 'requested epoch was a null round')) { + if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) { throw err; } @@ -253,6 +256,8 @@ export const _fetchBatchBlocks = async ( * @param eventsInBatch */ export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise => { + let page = 0; + // Check if block processing is complete. while (block.numProcessedEvents < block.numEvents) { console.time('time:common#processBacthEvents-fetching_events_batch'); @@ -260,12 +265,9 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block // Fetch events in batches const events = await indexer.getBlockEvents( block.blockHash, + {}, { - index: [ - { value: block.lastProcessedEventIndex + 1, operator: 'gte', not: false } - ] - }, - { + skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH), limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, orderBy: 'index', orderDirection: OrderDirection.asc @@ -282,23 +284,11 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block // Process events in loop for (let event of events) { - const eventIndex = event.index; - - // Check that events are processed in order. - if (eventIndex <= block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - } - - // Check if previous event in block has been processed exactly before this and abort if not. - // Skip check if logs fetched are filtered by contract address. - if (!indexer.serverConfig.filterLogs) { - const prevIndex = eventIndex - 1; - - if (prevIndex !== block.lastProcessedEventIndex) { - throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` + - ` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - } - } + // Skipping check for order of events processing since logIndex in FEVM is not index of log in block + // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log + // if (event.index <= block.lastProcessedEventIndex) { + // throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + // } const watchedContract = indexer.isWatchedContract(event.contract); @@ -310,10 +300,14 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block assert(indexer.parseEventNameAndArgs); assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); + const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); event.eventName = eventName; - event.eventInfo = JSON.stringify(eventInfo); + event.eventInfo = JSONbigNative.stringify(eventInfo); + event.extraInfo = JSONbigNative.stringify({ + ...logObj, + eventSignature + }); event = await indexer.saveEventEntity(event); } diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index ba1b464a..a5b974cf 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -29,3 +29,5 @@ export const DEFAULT_PREFETCH_BATCH_SIZE = 10; export const DEFAULT_MAX_GQL_CACHE_SIZE = Math.pow(2, 20) * 8; // 8 MB export const SUPPORTED_PAID_RPC_METHODS = ['eth_getBlockByHash', 'eth_getStorageAt', 'eth_getBlockByNumber']; + +export const NULL_BLOCK_ERROR = 'requested epoch was a null round'; diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 3f85d04f..25c31741 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -190,10 +190,6 @@ export class Database { async updateBlockProgress (repo: Repository, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { if (!block.isComplete) { - if (lastProcessedEventIndex <= block.lastProcessedEventIndex) { - throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); - } - block.lastProcessedEventIndex = lastProcessedEventIndex; block.numProcessedEvents++; }