From 695723955f7199c82d16c7d074482132e84513ea Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Thu, 9 Nov 2023 18:42:37 +0530 Subject: [PATCH] Prefetch block and txs in historical processing instead of fetching them in events processing (#460) * Set gzip true in ethersjs provider * Add timer logs and use StaticJsonRpcProvider * Fetch block data in historical processing and cache in map * Fetch txs required for event logs in historical processing * Process events with prefetched block and txs data in realtime processing * Clear old TODOs --- packages/cli/src/create-state-gql.ts | 6 +- packages/cli/src/utils/index.ts | 5 +- packages/graph-node/src/watcher.ts | 28 ++-- packages/graph-node/test/utils/indexer.ts | 19 ++- packages/ipld-eth-client/src/eth-client.ts | 8 +- packages/ipld-eth-client/src/eth-queries.ts | 1 + packages/rpc-eth-client/src/eth-client.ts | 49 ++++--- packages/util/src/common.ts | 84 +++++++++--- packages/util/src/events.ts | 8 ++ packages/util/src/graph/utils.ts | 2 +- packages/util/src/index-block.ts | 11 +- packages/util/src/indexer.ts | 138 ++++++++++++------- packages/util/src/job-runner.ts | 60 +++++++-- packages/util/src/misc.ts | 56 +++----- packages/util/src/types.ts | 142 ++++++++++++-------- 15 files changed, 398 insertions(+), 219 deletions(-) diff --git a/packages/cli/src/create-state-gql.ts b/packages/cli/src/create-state-gql.ts index bf9caff4..09a5ad41 100644 --- a/packages/cli/src/create-state-gql.ts +++ b/packages/cli/src/create-state-gql.ts @@ -129,8 +129,10 @@ export class CreateStateFromGQLCmd { } const blockProgress: Partial = { - ...block, - blockNumber: Number(block.blockNumber) + cid: block.cid, + blockTimestamp: Number(block.timestamp), + blockNumber: Number(block.blockNumber), + blockHash: block.blockHash }; // Get watched contracts using subgraph dataSources diff --git a/packages/cli/src/utils/index.ts b/packages/cli/src/utils/index.ts index df9133ee..ee985bc8 100644 --- a/packages/cli/src/utils/index.ts +++ b/packages/cli/src/utils/index.ts @@ -54,7 +54,10 @@ export const initClients = async (config: Config): Promise<{ }); } - const ethProvider = getCustomProvider(rpcProviderEndpoint); + const ethProvider = getCustomProvider({ + url: rpcProviderEndpoint, + allowGzip: true + }); return { ethClient, diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index bfc86945..672473c8 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -31,7 +31,9 @@ import { FILTER_CHANGE_BLOCK, Where, Filter, - OPERATOR_MAP + OPERATOR_MAP, + ExtraEventData, + EthFullTransaction } from '@cerc-io/util'; import { Context, GraphData, instantiate } from './loader'; @@ -149,12 +151,12 @@ export class GraphWatcher { } } - async handleEvent (eventData: any) { + async handleEvent (eventData: any, extraData: ExtraEventData) { const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData; // Check if block data is already fetched by a previous event in the same block. if (!this._context.block || this._context.block.blockHash !== block.hash) { - this._context.block = await getFullBlock(this._ethClient, this._ethProvider, block.hash, block.number); + this._context.block = getFullBlock(extraData.ethFullBlock); } const blockData = this._context.block; @@ -197,7 +199,7 @@ export class GraphWatcher { const eventFragment = contractInterface.getEvent(eventSignature); - const tx = await this._getTransactionData(txHash, Number(blockData.blockNumber)); + const tx = this._getTransactionData(txHash, extraData.ethFullTransactions); const data = { block: blockData, @@ -208,9 +210,13 @@ export class GraphWatcher { }; // Create ethereum event to be passed to the wasm event handler. + console.time(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`); const ethereumEvent = await createEvent(instanceExports, contract, data); + console.timeEnd(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`); try { + console.time(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`); await this._handleMemoryError(instanceExports[eventHandler.handler](ethereumEvent), dataSource.name); + console.timeEnd(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`); } catch (error) { this._clearCachedEntities(); throw error; @@ -237,10 +243,11 @@ export class GraphWatcher { continue; } - // Check if block data is already fetched in handleEvent method for the same block. - if (!this._context.block || this._context.block.blockHash !== blockHash) { - this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash, blockNumber); - } + // TODO: Use extraData full block + // // Check if block data is already fetched in handleEvent method for the same block. + // if (!this._context.block || this._context.block.blockHash !== blockHash) { + // this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash, blockNumber); + // } const blockData = this._context.block; assert(blockData); @@ -445,15 +452,14 @@ export class GraphWatcher { } } - async _getTransactionData (txHash: string, blockNumber: number): Promise { + _getTransactionData (txHash: string, ethFullTransactions: EthFullTransaction[]): Transaction { let transaction = this._transactionsMap.get(txHash); if (transaction) { return transaction; } - transaction = await getFullTransaction(this._ethClient, txHash, blockNumber); - assert(transaction); + transaction = getFullTransaction(txHash, ethFullTransactions); this._transactionsMap.set(txHash, transaction); return transaction; diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index aa2f8b1d..76e2e787 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -18,7 +18,9 @@ import { ResultEvent, StateKind, EthClient, - UpstreamConfig + UpstreamConfig, + EthFullTransaction, + EthFullBlock } from '@cerc-io/util'; import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -117,7 +119,12 @@ export class Indexer implements IndexerInterface { return []; } - async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> { + async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ + blockProgress: BlockProgressInterface; + events: DeepPartial[]; + ethFullBlock: EthFullBlock; + ethFullTransactions: EthFullTransaction[]; + }[]> { assert(startBlock); assert(endBlock); @@ -132,8 +139,12 @@ export class Indexer implements IndexerInterface { return []; } - async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial[]]> { - return [block, []]; + async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[ + BlockProgressInterface, + DeepPartial[], + EthFullTransaction[] + ]> { + return [block, [], []]; } async removeUnknownEvents (block: BlockProgressInterface): Promise { diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index 572a39ef..633c1bae 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -5,7 +5,7 @@ import assert from 'assert'; import { Cache } from '@cerc-io/cache'; -import { EthClient as EthClientInterface, FullTransaction } from '@cerc-io/util'; +import { EthClient as EthClientInterface, EthFullTransaction } from '@cerc-io/util'; import ethQueries from './eth-queries'; import { padKey } from './utils'; @@ -93,7 +93,7 @@ export class EthClient implements EthClientInterface { async getFullBlocks ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise { console.time(`time:eth-client#getFullBlocks-${JSON.stringify({ blockNumber, blockHash })}`); - const result = await this._graphqlClient.query( + const { allEthHeaderCids } = await this._graphqlClient.query( ethQueries.getFullBlocks, { blockNumber: blockNumber?.toString(), @@ -102,10 +102,10 @@ export class EthClient implements EthClientInterface { ); console.timeEnd(`time:eth-client#getFullBlocks-${JSON.stringify({ blockNumber, blockHash })}`); - return result; + return allEthHeaderCids.nodes; } - async getFullTransaction (txHash: string, blockNumber?: number): Promise { + async getFullTransaction (txHash: string, blockNumber?: number): Promise { console.time(`time:eth-client#getFullTransaction-${JSON.stringify({ txHash, blockNumber })}`); const result = await this._graphqlClient.query( ethQueries.getFullTransaction, diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index a54b67ee..17e5e7b3 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -75,6 +75,7 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) { } `; +// TODO: Get block size from ipld-eth-server export const getFullBlocks = gql` query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) { allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) { diff --git a/packages/rpc-eth-client/src/eth-client.ts b/packages/rpc-eth-client/src/eth-client.ts index ad447c4e..14ab70a5 100644 --- a/packages/rpc-eth-client/src/eth-client.ts +++ b/packages/rpc-eth-client/src/eth-client.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import { errors, providers, utils } from 'ethers'; import { Cache } from '@cerc-io/cache'; -import { encodeHeader, escapeHexString, EthClient as EthClientInterface, FullTransaction } from '@cerc-io/util'; +import { encodeHeader, escapeHexString, EthClient as EthClientInterface, EthFullTransaction } from '@cerc-io/util'; import { padKey } from '@cerc-io/ipld-eth-client'; export interface Config { @@ -32,7 +32,10 @@ export class EthClient implements EthClientInterface { constructor (config: Config) { const { rpcEndpoint, cache } = config; assert(rpcEndpoint, 'Missing RPC endpoint'); - this._provider = new providers.JsonRpcProvider(rpcEndpoint); + this._provider = new providers.StaticJsonRpcProvider({ + url: rpcEndpoint, + allowGzip: true + }); this._cache = cache; } @@ -171,39 +174,33 @@ export class EthClient implements EthClientInterface { const rlpData = encodeHeader(header); - const allEthHeaderCids = { - nodes: [ - { - blockNumber: this._provider.formatter.number(rawBlock.number).toString(), - blockHash: this._provider.formatter.hash(rawBlock.hash), - parentHash: this._provider.formatter.hash(rawBlock.parentHash), - timestamp: this._provider.formatter.number(rawBlock.timestamp).toString(), - stateRoot: this._provider.formatter.hash(rawBlock.stateRoot), - td: this._provider.formatter.bigNumber(rawBlock.totalDifficulty).toString(), - txRoot: this._provider.formatter.hash(rawBlock.transactionsRoot), - receiptRoot: this._provider.formatter.hash(rawBlock.receiptsRoot), - uncleRoot: this._provider.formatter.hash(rawBlock.sha3Uncles), - bloom: escapeHexString(this._provider.formatter.hex(rawBlock.logsBloom)), - blockByMhKey: { - data: escapeHexString(rlpData) - } - } - ] - }; - - return { allEthHeaderCids }; + return [{ + blockNumber: this._provider.formatter.number(rawBlock.number).toString(), + blockHash: this._provider.formatter.hash(rawBlock.hash), + parentHash: this._provider.formatter.hash(rawBlock.parentHash), + timestamp: this._provider.formatter.number(rawBlock.timestamp).toString(), + stateRoot: this._provider.formatter.hash(rawBlock.stateRoot), + td: this._provider.formatter.bigNumber(rawBlock.totalDifficulty).toString(), + txRoot: this._provider.formatter.hash(rawBlock.transactionsRoot), + receiptRoot: this._provider.formatter.hash(rawBlock.receiptsRoot), + uncleRoot: this._provider.formatter.hash(rawBlock.sha3Uncles), + bloom: escapeHexString(this._provider.formatter.hex(rawBlock.logsBloom)), + size: this._provider.formatter.number(rawBlock.size).toString(), + blockByMhKey: { + data: escapeHexString(rlpData) + } + }]; } - async getFullTransaction (txHash: string): Promise { + async getFullTransaction (txHash: string): Promise { console.time(`time:eth-client#getFullTransaction-${JSON.stringify({ txHash })}`); const tx = await this._provider.getTransaction(txHash); console.timeEnd(`time:eth-client#getFullTransaction-${JSON.stringify({ txHash })}`); - const txReceipt = await tx.wait(); return { ethTransactionCidByTxHash: { txHash: tx.hash, - index: txReceipt.transactionIndex, + index: (tx as any).transactionIndex, src: tx.from, dst: tx.to }, diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 97f9183f..67774f15 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -14,7 +14,7 @@ import { NULL_BLOCK_ERROR } from './constants'; import { JobQueue } from './job-queue'; -import { BlockProgressInterface, IndexerInterface, EventInterface } from './types'; +import { BlockProgressInterface, IndexerInterface, EventInterface, EthFullTransaction, EthFullBlock } from './types'; import { wait } from './misc'; import { OrderDirection } from './database'; import { JobQueueConfig } from './config'; @@ -27,6 +27,8 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true }); export interface PrefetchedBlock { block: BlockProgressInterface; events: DeepPartial[]; + ethFullBlock: EthFullBlock; + ethFullTransactions: EthFullTransaction[]; } /** @@ -104,6 +106,20 @@ export const fetchBlocksAtHeight = async ( if (!blocks.length) { log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); await wait(jobQueueConfig.blockDelayInMilliSecs); + } else { + blocks.forEach(block => { + blockAndEventsMap.set( + block.blockHash, + { + // Block is set later in job-runner when saving to database + block: {} as BlockProgressInterface, + events: [], + ethFullBlock: block, + // Transactions are set later in job-runner when fetching events + ethFullTransactions: [] + } + ); + }); } } catch (err: any) { // Handle null block error in case of Lotus EVM @@ -153,15 +169,15 @@ export const fetchAndSaveFilteredLogsAndBlocks = async ( ): Promise => { // Fetch filtered logs and required blocks console.time('time:common#fetchAndSaveFilteredLogsAndBlocks-fetchAndSaveFilteredEventsAndBlocks'); - const blocksWithEvents = await indexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock); + const blocksData = await indexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock); console.timeEnd('time:common#fetchAndSaveFilteredLogsAndBlocks-fetchAndSaveFilteredEventsAndBlocks'); // Set blocks with events in blockAndEventsMap cache - blocksWithEvents.forEach(({ blockProgress, events }) => { - blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events }); + blocksData.forEach(({ blockProgress, events, ethFullBlock, ethFullTransactions }) => { + blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events, ethFullBlock, ethFullTransactions }); }); - return blocksWithEvents.map(({ blockProgress }) => blockProgress); + return blocksData.map(({ blockProgress }) => blockProgress); }; export const _prefetchBlocks = async ( @@ -181,7 +197,15 @@ export const _prefetchBlocks = async ( ); blocksWithEvents.forEach(({ blockProgress, events }) => { - blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events }); + blockAndEventsMap.set( + blockProgress.blockHash, + { + block: blockProgress, + events, + // TODO: Set ethFullBlock and ethFullTransactions + ethFullBlock: {} as EthFullBlock, + ethFullTransactions: [] + }); }); }; @@ -283,17 +307,23 @@ export const _fetchBatchBlocks = async ( */ export const processBatchEvents = async ( indexer: IndexerInterface, - block: BlockProgressInterface, - eventsInBatch: number, - subgraphEventsOrder: boolean + data: { + block: BlockProgressInterface; + ethFullBlock: EthFullBlock; + ethFullTransactions: EthFullTransaction[]; + }, + { eventsInBatch, subgraphEventsOrder }: { + eventsInBatch: number; + subgraphEventsOrder: boolean; + } ): Promise => { let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[]; let isNewContractWatched = false; if (subgraphEventsOrder) { - ({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); + ({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } else { - ({ dbBlock, updatedDbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); + ({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } if (indexer.processBlockAfterEvents) { @@ -314,7 +344,15 @@ export const processBatchEvents = async ( return isNewContractWatched; }; -const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[] }> => { +const _processEvents = async ( + indexer: IndexerInterface, + { block, ethFullBlock, ethFullTransactions }: { + block: BlockProgressInterface; + ethFullBlock: EthFullBlock; + ethFullTransactions: EthFullTransaction[]; + }, + eventsInBatch: number +): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[] }> => { const updatedDbEvents: EventInterface[] = []; let page = 0; @@ -356,7 +394,7 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt updatedDbEvents.push(event); } - await indexer.processEvent(event); + await indexer.processEvent(event, { ethFullBlock, ethFullTransactions }); } block.lastProcessedEventIndex = event.index; @@ -371,7 +409,15 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt return { dbBlock: block, updatedDbEvents: updatedDbEvents }; }; -const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[], isNewContractWatched: boolean }> => { +const _processEventsInSubgraphOrder = async ( + indexer: IndexerInterface, + { block, ethFullBlock, ethFullTransactions }: { + block: BlockProgressInterface; + ethFullBlock: EthFullBlock; + ethFullTransactions: EthFullTransaction[]; + }, + eventsInBatch: number +): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[], isNewContractWatched: boolean }> => { // Create list of initially watched contracts const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address); const unwatchedContractEvents: EventInterface[] = []; @@ -411,7 +457,9 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B // Process known events in a loop for (const event of watchedContractEvents) { - await indexer.processEvent(event); + console.time(`time:common#_processEventsInSubgraphOrder-block-${block.blockNumber}-processEvent-${event.eventName}`); + await indexer.processEvent(event, { ethFullBlock, ethFullTransactions }); + console.timeEnd(`time:common#_processEventsInSubgraphOrder-block-${block.blockNumber}-processEvent-${event.eventName}`); block.lastProcessedEventIndex = event.index; block.numProcessedEvents++; @@ -430,7 +478,9 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B if (indexer.upstreamConfig.ethServer.filterLogsByAddresses) { // Fetch and parse events for newly watched contracts const newContracts = watchedContracts.filter(contract => !initiallyWatchedContracts.includes(contract)); + console.time(`time:common#_processEventsInSubgraphOrder-fetchEventsForContracts-block-${block.blockNumber}-unwatched-contract`); const events = await indexer.fetchEventsForContracts(block.blockHash, block.blockNumber, newContracts); + console.timeEnd(`time:common#_processEventsInSubgraphOrder-fetchEventsForContracts-block-${block.blockNumber}-unwatched-contract`); events.forEach(event => { event.block = block; @@ -457,7 +507,9 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events'); // In the end process events of newly watched contracts for (const updatedDbEvent of updatedDbEvents) { - await indexer.processEvent(updatedDbEvent); + console.time(`time:common#processEventsInSubgraphOrder-block-${block.blockNumber}-updated-processEvent-${updatedDbEvent.eventName}`); + await indexer.processEvent(updatedDbEvent, { ethFullBlock, ethFullTransactions }); + console.timeEnd(`time:common#processEventsInSubgraphOrder-block-${block.blockNumber}-updated-processEvent-${updatedDbEvent.eventName}`); block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, updatedDbEvent.index); block.numProcessedEvents++; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 654732a9..5970c87c 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -33,6 +33,7 @@ export class EventWatcher { _indexer: IndexerInterface; _pubsub: PubSub; _jobQueue: JobQueue; + _realtimeProcessingStarted = false; _shutDown = false; _signalCount = 0; @@ -135,6 +136,13 @@ export class EventWatcher { log(`Starting realtime block processing from block ${startBlockNumber}`); await processBlockByNumber(this._jobQueue, startBlockNumber); + // Check if realtime processing already started and avoid resubscribing to block progress event + if (this._realtimeProcessingStarted) { + return; + } + + this._realtimeProcessingStarted = true; + // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of const blockProgressEventIterable = { diff --git a/packages/util/src/graph/utils.ts b/packages/util/src/graph/utils.ts index 57c7163d..c327db0e 100644 --- a/packages/util/src/graph/utils.ts +++ b/packages/util/src/graph/utils.ts @@ -34,7 +34,7 @@ export interface Transaction { hash: string; index: number; from: string; - to: string; + to?: string; value: string; gasLimit: string; gasPrice?: string; diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index b8ffdfd8..61a6e467 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -6,6 +6,7 @@ import assert from 'assert'; import { BlockProgressInterface, IndexerInterface } from './types'; import { processBatchEvents } from './common'; +import { EthFullBlock } from '.'; export const indexBlock = async ( indexer: IndexerInterface, @@ -46,6 +47,14 @@ export const indexBlock = async ( assert(indexer.processBlock); await indexer.processBlock(blockProgress); - await processBatchEvents(indexer, blockProgress, eventsInBatch, subgraphEventsOrder); + await processBatchEvents( + indexer, + { + block: blockProgress, + // TODO: Set ethFullBlock and ethFullTransactions + ethFullBlock: {} as EthFullBlock, + ethFullTransactions: [] + }, + { eventsInBatch, subgraphEventsOrder }); } }; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 8a4ff473..6ec2be96 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -24,7 +24,9 @@ import { StateKind, EthClient, ContractJobData, - EventsQueueJobKind + EventsQueueJobKind, + EthFullBlock, + EthFullTransaction } from './types'; import { UNKNOWN_EVENT_NAME, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants'; import { JobQueue } from './job-queue'; @@ -100,6 +102,11 @@ export type ResultMeta = { hasIndexingErrors: boolean; }; +export type ExtraEventData = { + ethFullBlock: EthFullBlock; + ethFullTransactions: EthFullTransaction[]; +} + export class Indexer { _serverConfig: ServerConfig; _upstreamConfig: UpstreamConfig; @@ -284,10 +291,9 @@ export class Indexer { return res; } - async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise { + async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise { assert(blockFilter.blockHash || blockFilter.blockNumber); - const result = await this._ethClient.getBlocks(blockFilter); - const { allEthHeaderCids: { nodes: blocks } } = result; + const blocks = await this._ethClient.getFullBlocks(blockFilter); if (!blocks.length) { try { @@ -461,7 +467,12 @@ export class Indexer { kind: string, logObj: { topics: string[]; data: string } ) => { eventName: string; eventInfo: {[key: string]: any}; eventSignature: string } - ): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> { + ): Promise<{ + blockProgress: BlockProgressInterface, + events: DeepPartial[], + ethFullBlock: EthFullBlock, + ethFullTransactions: EthFullTransaction[] + }[]> { assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient'); const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); @@ -474,45 +485,67 @@ export class Indexer { }); const blockLogsMap = this._reduceLogsToBlockLogsMap(logs); + // Create unique list of tx required + const txHashes = Array.from([ + ...new Set(logs.map((log: any) => log.transaction.hash)) + ]); // Fetch blocks with transactions for the logs returned console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`); - const blocksWithTxPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => { - const result = await this._ethClient.getBlockWithTransactions({ blockHash }); + const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => { + const [fullBlock] = await this._ethClient.getFullBlocks({ blockHash }); - const { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - }, - ...block - } - ] - } - } = result; + const block = { + ...fullBlock, + blockTimestamp: Number(fullBlock.timestamp), + blockNumber: Number(fullBlock.blockNumber) + }; - block.blockTimestamp = Number(block.timestamp); - block.blockNumber = Number(block.blockNumber); - - return { block, transactions } as { block: DeepPartial; transactions: any[] }; + return { block, fullBlock } as { block: DeepPartial; fullBlock: EthFullBlock }; }); - const blockWithTxs = await Promise.all(blocksWithTxPromises); + const ethFullTxPromises = txHashes.map(async txHash => { + return this._ethClient.getFullTransaction(txHash); + }); + + const blocks = await Promise.all(blocksPromises); + const ethFullTxs = await Promise.all(ethFullTxPromises); + + const ethFullTxsMap = ethFullTxs.reduce((acc: Map, ethFullTx) => { + acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx); + return acc; + }, new Map()); + console.timeEnd(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`); // Map db ready events according to blockhash console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`); - const blockWithDbEventsPromises = blockWithTxs.map(async ({ block, transactions }) => { + const blockWithDbEventsPromises = blocks.map(async ({ block, fullBlock }) => { const blockHash = block.blockHash; assert(blockHash); const logs = blockLogsMap.get(blockHash) || []; - const events = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs); + const txHashes = Array.from([ + ...new Set(logs.map((log: any) => log.transaction.hash)) + ]); + + const blockEthFullTxs = txHashes.map(txHash => ethFullTxsMap.get(txHash)) as EthFullTransaction[]; + + const events = this.createDbEventsFromLogsAndTxs( + blockHash, + logs, + blockEthFullTxs.map(ethFullTx => ethFullTx?.ethTransactionCidByTxHash), + parseEventNameAndArgs + ); const [blockProgress] = await this.saveBlockWithEvents(block, events); - return { blockProgress, events: [] }; + return { + blockProgress, + ethFullBlock: fullBlock, + ethFullTransactions: blockEthFullTxs, + block, + events: [] + }; }); const blocksWithDbEvents = await Promise.all(blockWithDbEventsPromises); @@ -536,48 +569,57 @@ export class Indexer { } // Fetch events (to be saved to db) for a particular block - async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]> { + async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ events: DeepPartial[], transactions: EthFullTransaction[]}> { const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics); - return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs); + const events = this.createDbEventsFromLogsAndTxs( + blockHash, + logs, + transactions.map(tx => tx.ethTransactionCidByTxHash), + parseEventNameAndArgs + ); + + return { events, transactions }; } async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[], eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]> { const { topics } = this._createLogsFilters(eventSignaturesMap); const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics); - return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs); + return this.createDbEventsFromLogsAndTxs( + blockHash, + logs, + transactions.map(tx => tx.ethTransactionCidByTxHash), + parseEventNameAndArgs + ); } - async _fetchLogsAndTransactions (blockHash: string, blockNumber: number, addresses?: string[], topics?: string[][]): Promise<{ logs: any[]; transactions: any[] }> { - const logsPromise = await this._ethClient.getLogs({ + async _fetchLogsAndTransactions (blockHash: string, blockNumber: number, addresses?: string[], topics?: string[][]): Promise<{ logs: any[]; transactions: EthFullTransaction[] }> { + const { logs } = await this._ethClient.getLogs({ blockHash, blockNumber: blockNumber.toString(), addresses, topics }); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber }); - - const [ - { logs }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } - ] = await Promise.all([logsPromise, transactionsPromise]); + const transactions = await this._fetchTxsFromLogs(logs); return { logs, transactions }; } + async _fetchTxsFromLogs (logs: any[]): Promise { + const txHashes = Array.from([ + ...new Set(logs.map((log) => log.transaction.hash)) + ]); + + const ethFullTxPromises = txHashes.map(async txHash => { + return this._ethClient.getFullTransaction(txHash); + }); + + return Promise.all(ethFullTxPromises); + } + // Create events to be saved to db for a block given blockHash, logs, transactions and a parser function createDbEventsFromLogsAndTxs (blockHash: string, logs: any, transactions: any, parseEventNameAndArgs: (kind: string, logObj: any) => any): DeepPartial[] { const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 3d624cce..c0867aad 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -502,6 +502,20 @@ export class JobRunner { throw new Error(message); } + blocks.forEach(block => { + this._blockAndEventsMap.set( + block.blockHash, + { + // block is set later in job when saving to database + block: {} as BlockProgressInterface, + events: [], + ethFullBlock: block, + // Transactions are set later in job when fetching events + ethFullTransactions: [] + } + ); + }); + const [{ cid: parentCid, blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks; await this.jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { @@ -549,7 +563,9 @@ export class JobRunner { if (!blockProgress) { const prefetchedBlock = this._blockAndEventsMap.get(blockHash); - if (prefetchedBlock) { + // Check if prefetched block is set properly + // prefetchedBlock.block is an empty object when running in realtime processing + if (prefetchedBlock && prefetchedBlock.block.blockHash) { ({ block: blockProgress } = prefetchedBlock); } else { // Delay required to process block. @@ -558,11 +574,20 @@ export class JobRunner { console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`); - [blockProgress] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); + let ethFullTransactions; + [blockProgress,, ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); + const data = this._blockAndEventsMap.get(blockHash); + assert(data); - this._blockAndEventsMap.set(blockHash, { block: blockProgress, events: [] }); + this._blockAndEventsMap.set( + blockHash, + { + ...data, + block: blockProgress, + ethFullTransactions + }); } } @@ -588,26 +613,33 @@ export class JobRunner { const { blockHash, isRetryAttempt } = jobData; try { - if (!this._blockAndEventsMap.has(blockHash)) { - console.time('time:job-runner#_processEvents-get-block-progress'); - const block = await this._indexer.getBlockProgress(blockHash); - console.timeEnd('time:job-runner#_processEvents-get-block-progress'); + // NOTE: blockAndEventsMap should contain block as watcher is reset + // if (!this._blockAndEventsMap.has(blockHash)) { + // console.time('time:job-runner#_processEvents-get-block-progress'); + // const block = await this._indexer.getBlockProgress(blockHash); + // console.timeEnd('time:job-runner#_processEvents-get-block-progress'); - assert(block); - this._blockAndEventsMap.set(blockHash, { block, events: [] }); - } + // assert(block); + // this._blockAndEventsMap.set(blockHash, { block, events: [] }); + // } const prefetchedBlock = this._blockAndEventsMap.get(blockHash); assert(prefetchedBlock); - const { block } = prefetchedBlock; + const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock; log(`Processing events for block ${block.blockNumber}`); console.time(`time:job-runner#_processEvents-events-${block.blockNumber}`); const isNewContractWatched = await processBatchEvents( this._indexer, - block, - this._jobQueueConfig.eventsInBatch, - this._jobQueueConfig.subgraphEventsOrder + { + block, + ethFullBlock, + ethFullTransactions + }, + { + eventsInBatch: this._jobQueueConfig.eventsInBatch, + subgraphEventsOrder: this._jobQueueConfig.subgraphEventsOrder + } ); console.timeEnd(`time:job-runner#_processEvents-events-${block.blockNumber}`); diff --git a/packages/util/src/misc.ts b/packages/util/src/misc.ts index b9b79e4b..546fbbf4 100644 --- a/packages/util/src/misc.ts +++ b/packages/util/src/misc.ts @@ -18,10 +18,10 @@ import { GQLCacheConfig, Config } from './config'; import { JobQueue } from './job-queue'; import { GraphDecimal } from './graph/graph-decimal'; import * as EthDecoder from './eth'; -import { getCachedBlockSize } from './block-size-cache'; import { ResultEvent } from './indexer'; -import { EventInterface, EthClient } from './types'; +import { EventInterface, EthFullBlock, EthFullTransaction } from './types'; import { BlockHeight } from './database'; +import { Transaction } from './graph/utils'; const JSONbigNative = JSONbig({ useNativeBigInt: true }); @@ -154,7 +154,7 @@ export const getResetYargs = (): yargs.Argv => { }; export const getCustomProvider = (url?: utils.ConnectionInfo | string, network?: providers.Networkish): providers.JsonRpcProvider => { - const provider = new providers.JsonRpcProvider(url, network); + const provider = new providers.StaticJsonRpcProvider(url, network); provider.formatter = new CustomFormatter(); return provider; }; @@ -182,52 +182,40 @@ class CustomFormatter extends providers.Formatter { } } -export const getFullBlock = async (ethClient: EthClient, ethProvider: providers.BaseProvider, blockHash: string, blockNumber: number): Promise => { - const { - allEthHeaderCids: { - nodes: [ - fullBlock - ] - } - } = await ethClient.getFullBlocks({ blockHash, blockNumber }); - - assert(fullBlock.blockByMhKey); - +export const getFullBlock = (ethFullBlock: EthFullBlock): any => { // Decode the header data. - const header = EthDecoder.decodeHeader(EthDecoder.decodeData(fullBlock.blockByMhKey.data)); + const header = EthDecoder.decodeHeader(EthDecoder.decodeData(ethFullBlock.blockByMhKey.data)); assert(header); - // TODO: Calculate size from rlp encoded data. - // Get block info from JSON RPC API provided by ipld-eth-server. - const provider = ethProvider as providers.JsonRpcProvider; - const size = await getCachedBlockSize(provider, blockHash, Number(fullBlock.blockNumber)); - return { - headerId: fullBlock.id, - cid: fullBlock.cid, - blockNumber: fullBlock.blockNumber, - blockHash: fullBlock.blockHash, - parentHash: fullBlock.parentHash, - timestamp: fullBlock.timestamp, - stateRoot: fullBlock.stateRoot, - td: fullBlock.td, - txRoot: fullBlock.txRoot, - receiptRoot: fullBlock.receiptRoot, - uncleHash: fullBlock.uncleRoot, + headerId: ethFullBlock.id, + cid: ethFullBlock.cid, + blockNumber: ethFullBlock.blockNumber, + blockHash: ethFullBlock.blockHash, + parentHash: ethFullBlock.parentHash, + timestamp: ethFullBlock.timestamp, + stateRoot: ethFullBlock.stateRoot, + td: ethFullBlock.td, + txRoot: ethFullBlock.txRoot, + receiptRoot: ethFullBlock.receiptRoot, + uncleHash: ethFullBlock.uncleRoot, difficulty: header.Difficulty.toString(), gasLimit: header.GasLimit.toString(), gasUsed: header.GasUsed.toString(), author: header.Beneficiary, - size: BigInt(size).toString(), + size: ethFullBlock.size, baseFee: header.BaseFee?.toString() }; }; -export const getFullTransaction = async (ethClient: EthClient, txHash: string, blockNumber: number): Promise => { +export const getFullTransaction = (txHash: string, ethFullTransactions: EthFullTransaction[]): Transaction => { + const ethFullTransaction = ethFullTransactions.find(ethFullTransaction => ethFullTransaction.ethTransactionCidByTxHash.txHash === txHash); + assert(ethFullTransaction); + let { ethTransactionCidByTxHash: fullTx, data: txData - } = await ethClient.getFullTransaction(txHash, blockNumber); + } = ethFullTransaction; // Check if txData does not exist when using ipld-eth-client if (!txData) { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 67ad6400..bbc567b2 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -9,7 +9,7 @@ import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { ServerConfig, UpstreamConfig } from './config'; import { Where, QueryOptions, Database } from './database'; -import { ValueResult, StateStatus } from './indexer'; +import { ValueResult, StateStatus, ExtraEventData } from './indexer'; import { JOB_KIND_CONTRACT, JOB_KIND_EVENTS } from './constants'; export enum StateKind { @@ -84,6 +84,77 @@ export interface StateInterface { data: Buffer; } +export interface EthFullTransaction { + ethTransactionCidByTxHash: { + txHash: string; + index: number; + src: string; + dst?: string; + blockByMhKey?: { + data: string; + } + }, + data?: Transaction; +} + +export interface EthFullBlock { + id?: string, + cid?: string; + blockNumber: string; + blockHash: string; + parentHash: string; + timestamp: string; + stateRoot: string; + td: string; + txRoot: string; + receiptRoot: string; + uncleRoot: string; + bloom: string; + size: string; + blockByMhKey: { + data: string; + } +} + +export interface EthClient { + getStorageAt({ blockHash, contract, slot }: { + blockHash: string; + contract: string; + slot: string; + }): Promise<{ + value: string; + proof: { + data: string; + }; + }>; + getBlockWithTransactions({ blockNumber, blockHash }: { + blockNumber?: number; + blockHash?: string; + }): Promise; + getBlocks({ blockNumber, blockHash }: { + blockNumber?: number; + blockHash?: string; + }): Promise; + getFullBlocks({ blockNumber, blockHash }: { + blockNumber?: number; + blockHash?: string; + }): Promise; + getFullTransaction(txHash: string, blockNumber?: number): Promise; + getBlockByHash(blockHash?: string): Promise; + getLogs(vars: { + blockHash: string, + blockNumber: string, + addresses?: string[], + topics?: string[][] + }): Promise; + getLogsForBlockRange?: (vars: { + fromBlock?: number, + toBlock?: number, + addresses?: string[], + topics?: string[][] + }) => Promise; +} + export interface IndexerInterface { readonly serverConfig: ServerConfig readonly upstreamConfig: UpstreamConfig @@ -95,15 +166,24 @@ export interface IndexerInterface { getEvent (id: string): Promise getSyncStatus (): Promise getStateSyncStatus (): Promise - getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise + getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise getBlocksAtHeight (height: number, isPruned: boolean): Promise getLatestCanonicalBlock (): Promise getLatestStateIndexedBlock (): Promise getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise fetchEventsAndSaveBlocks (blocks: DeepPartial[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> - saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgressInterface, DeepPartial[]]> - fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> + saveBlockAndFetchEvents (block: DeepPartial): Promise<[ + BlockProgressInterface, + DeepPartial[], + EthFullTransaction[] + ]> + fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ + blockProgress: BlockProgressInterface, + events: DeepPartial[], + ethFullBlock: EthFullBlock, + ethFullTransactions: EthFullTransaction[] + }[]> fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise[]> removeUnknownEvents (block: BlockProgressInterface): Promise updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise @@ -117,7 +197,7 @@ export interface IndexerInterface { markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise saveEventEntity (dbEvent: EventInterface): Promise saveEvents (dbEvents: DeepPartial[]): Promise - processEvent (event: EventInterface): Promise + processEvent (event: EventInterface, extraData: ExtraEventData): Promise parseEventNameAndArgs?: (kind: string, logObj: any) => any isWatchedContract: (address: string) => ContractInterface | undefined; getWatchedContracts: () => ContractInterface[] @@ -204,58 +284,6 @@ export interface GraphWatcherInterface { setIndexer (indexer: IndexerInterface): void; } -export interface FullTransaction { - ethTransactionCidByTxHash: { - txHash: string; - index: number; - src: string; - dst?: string; - blockByMhKey?: { - data: string; - } - }, - data?: Transaction; -} - -export interface EthClient { - getStorageAt({ blockHash, contract, slot }: { - blockHash: string; - contract: string; - slot: string; - }): Promise<{ - value: string; - proof: { - data: string; - }; - }>; - getBlockWithTransactions({ blockNumber, blockHash }: { - blockNumber?: number; - blockHash?: string; - }): Promise; - getBlocks({ blockNumber, blockHash }: { - blockNumber?: number; - blockHash?: string; - }): Promise; - getFullBlocks({ blockNumber, blockHash }: { - blockNumber?: number; - blockHash?: string; - }): Promise; - getFullTransaction(txHash: string, blockNumber?: number): Promise; - getBlockByHash(blockHash?: string): Promise; - getLogs(vars: { - blockHash: string, - blockNumber: string, - addresses?: string[], - topics?: string[][] - }): Promise; - getLogsForBlockRange?: (vars: { - fromBlock?: number, - toBlock?: number, - addresses?: string[], - topics?: string[][] - }) => Promise; -} - export type Clients = { ethClient: EthClient; [key: string]: any;