mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-29 19:32:06 +00:00
Skip checking of events order while processing due to incorrect logIndex
in FEVM (#438)
* Skip log index order in events processing * Use json-bigint stringify in processBatchEvents * Set eventSignature in event processing for contracts watched later * Check for null block in async caching of block sizes
This commit is contained in:
parent
8bba0aeb94
commit
b63a93d8a0
@ -67,6 +67,9 @@
|
||||
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
|
||||
rpcProviderEndpoint = "http://127.0.0.1:8081"
|
||||
|
||||
# Flag to specify if rpc-eth-client should be used from RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
|
||||
rpcClient = false
|
||||
|
||||
[upstream.cache]
|
||||
name = "requests"
|
||||
enabled = false
|
||||
|
@ -2,9 +2,11 @@
|
||||
// Copyright 2022 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { utils, providers } from 'ethers';
|
||||
import { utils, providers, errors } from 'ethers';
|
||||
import debug from 'debug';
|
||||
|
||||
import { NULL_BLOCK_ERROR } from './constants';
|
||||
|
||||
const log = debug('vulcanize:block-size-cache');
|
||||
|
||||
// Number of blocks to cache after current block being processed.
|
||||
@ -44,6 +46,7 @@ const cacheBlockSizesAsync = async (provider: providers.JsonRpcProvider, blockNu
|
||||
|
||||
// Start prefetching blocks after latest height in blockSizeMap.
|
||||
for (let i = startBlockHeight; i <= endBlockHeight; i++) {
|
||||
try {
|
||||
console.time(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`);
|
||||
const block = await provider.send('eth_getBlockByNumber', [utils.hexStripZeros(utils.hexlify(i)), false]);
|
||||
|
||||
@ -53,9 +56,18 @@ const cacheBlockSizesAsync = async (provider: providers.JsonRpcProvider, blockNu
|
||||
} else {
|
||||
log(`No block found at height ${i}`);
|
||||
}
|
||||
} catch (err: any) {
|
||||
// Handle null block error in case of Lotus EVM
|
||||
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
log(`Block ${i} requested was null (FEVM); Fetching next block`);
|
||||
} finally {
|
||||
console.timeEnd(`time:misc#cacheBlockSizesAsync-eth_getBlockByNumber-${i}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At interval clear previous blocks below height blockNumber from map.
|
||||
if (blockNumber % BLOCK_SIZE_MAP_CLEAR_HEIGHT_INTERVAL === 0) {
|
||||
|
@ -2,6 +2,7 @@ import debug from 'debug';
|
||||
import assert from 'assert';
|
||||
import { DeepPartial } from 'typeorm';
|
||||
import { errors } from 'ethers';
|
||||
import JSONbig from 'json-bigint';
|
||||
|
||||
import {
|
||||
QUEUE_BLOCK_PROCESSING,
|
||||
@ -9,7 +10,8 @@ import {
|
||||
QUEUE_BLOCK_CHECKPOINT,
|
||||
JOB_KIND_PRUNE,
|
||||
JOB_KIND_INDEX,
|
||||
UNKNOWN_EVENT_NAME
|
||||
UNKNOWN_EVENT_NAME,
|
||||
NULL_BLOCK_ERROR
|
||||
} from './constants';
|
||||
import { JobQueue } from './job-queue';
|
||||
import { BlockProgressInterface, IndexerInterface, EventInterface } from './types';
|
||||
@ -20,6 +22,7 @@ import { JobQueueConfig } from './config';
|
||||
const DEFAULT_EVENTS_IN_BATCH = 50;
|
||||
|
||||
const log = debug('vulcanize:common');
|
||||
const JSONbigNative = JSONbig({ useNativeBigInt: true });
|
||||
|
||||
export interface PrefetchedBlock {
|
||||
block: BlockProgressInterface;
|
||||
@ -104,7 +107,7 @@ export const fetchBlocksAtHeight = async (
|
||||
}
|
||||
} catch (err: any) {
|
||||
// Handle null block error in case of Lotus EVM
|
||||
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === 'requested epoch was a null round')) {
|
||||
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
@ -195,7 +198,7 @@ export const _fetchBatchBlocks = async (
|
||||
// Handle null block error in case of Lotus EVM
|
||||
// Otherwise, rethrow error
|
||||
const err = result.reason;
|
||||
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === 'requested epoch was a null round')) {
|
||||
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
@ -253,6 +256,8 @@ export const _fetchBatchBlocks = async (
|
||||
* @param eventsInBatch
|
||||
*/
|
||||
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<void> => {
|
||||
let page = 0;
|
||||
|
||||
// Check if block processing is complete.
|
||||
while (block.numProcessedEvents < block.numEvents) {
|
||||
console.time('time:common#processBacthEvents-fetching_events_batch');
|
||||
@ -260,12 +265,9 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
|
||||
// Fetch events in batches
|
||||
const events = await indexer.getBlockEvents(
|
||||
block.blockHash,
|
||||
{},
|
||||
{
|
||||
index: [
|
||||
{ value: block.lastProcessedEventIndex + 1, operator: 'gte', not: false }
|
||||
]
|
||||
},
|
||||
{
|
||||
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
|
||||
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
|
||||
orderBy: 'index',
|
||||
orderDirection: OrderDirection.asc
|
||||
@ -282,23 +284,11 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
|
||||
|
||||
// Process events in loop
|
||||
for (let event of events) {
|
||||
const eventIndex = event.index;
|
||||
|
||||
// Check that events are processed in order.
|
||||
if (eventIndex <= block.lastProcessedEventIndex) {
|
||||
throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
|
||||
}
|
||||
|
||||
// Check if previous event in block has been processed exactly before this and abort if not.
|
||||
// Skip check if logs fetched are filtered by contract address.
|
||||
if (!indexer.serverConfig.filterLogs) {
|
||||
const prevIndex = eventIndex - 1;
|
||||
|
||||
if (prevIndex !== block.lastProcessedEventIndex) {
|
||||
throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash},` +
|
||||
` prev event index ${prevIndex}, got event index ${event.index} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
|
||||
}
|
||||
}
|
||||
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
|
||||
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
|
||||
// if (event.index <= block.lastProcessedEventIndex) {
|
||||
// throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
|
||||
// }
|
||||
|
||||
const watchedContract = indexer.isWatchedContract(event.contract);
|
||||
|
||||
@ -310,10 +300,14 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
|
||||
|
||||
assert(indexer.parseEventNameAndArgs);
|
||||
assert(typeof watchedContract !== 'boolean');
|
||||
const { eventName, eventInfo } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
|
||||
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
|
||||
|
||||
event.eventName = eventName;
|
||||
event.eventInfo = JSON.stringify(eventInfo);
|
||||
event.eventInfo = JSONbigNative.stringify(eventInfo);
|
||||
event.extraInfo = JSONbigNative.stringify({
|
||||
...logObj,
|
||||
eventSignature
|
||||
});
|
||||
event = await indexer.saveEventEntity(event);
|
||||
}
|
||||
|
||||
|
@ -29,3 +29,5 @@ export const DEFAULT_PREFETCH_BATCH_SIZE = 10;
|
||||
export const DEFAULT_MAX_GQL_CACHE_SIZE = Math.pow(2, 20) * 8; // 8 MB
|
||||
|
||||
export const SUPPORTED_PAID_RPC_METHODS = ['eth_getBlockByHash', 'eth_getStorageAt', 'eth_getBlockByNumber'];
|
||||
|
||||
export const NULL_BLOCK_ERROR = 'requested epoch was a null round';
|
||||
|
@ -190,10 +190,6 @@ export class Database {
|
||||
|
||||
async updateBlockProgress (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> {
|
||||
if (!block.isComplete) {
|
||||
if (lastProcessedEventIndex <= block.lastProcessedEventIndex) {
|
||||
throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
|
||||
}
|
||||
|
||||
block.lastProcessedEventIndex = lastProcessedEventIndex;
|
||||
block.numProcessedEvents++;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user