Prefetch block and txs in historical processing instead of fetching them in events processing (#460)

* Set gzip true in ethersjs provider

* Add timer logs and use StaticJsonRpcProvider

* Fetch block data in historical processing and cache in map

* Fetch txs required for event logs in historical processing

* Process events with prefetched block and txs data in realtime processing

* Clear old TODOs
This commit is contained in:
Nabarun Gogoi 2023-11-09 18:42:37 +05:30 committed by GitHub
parent c7bcd4c276
commit 695723955f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 398 additions and 219 deletions

View File

@ -129,8 +129,10 @@ export class CreateStateFromGQLCmd {
}
const blockProgress: Partial<BlockProgressInterface> = {
...block,
blockNumber: Number(block.blockNumber)
cid: block.cid,
blockTimestamp: Number(block.timestamp),
blockNumber: Number(block.blockNumber),
blockHash: block.blockHash
};
// Get watched contracts using subgraph dataSources

View File

@ -54,7 +54,10 @@ export const initClients = async (config: Config): Promise<{
});
}
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const ethProvider = getCustomProvider({
url: rpcProviderEndpoint,
allowGzip: true
});
return {
ethClient,

View File

@ -31,7 +31,9 @@ import {
FILTER_CHANGE_BLOCK,
Where,
Filter,
OPERATOR_MAP
OPERATOR_MAP,
ExtraEventData,
EthFullTransaction
} from '@cerc-io/util';
import { Context, GraphData, instantiate } from './loader';
@ -149,12 +151,12 @@ export class GraphWatcher {
}
}
async handleEvent (eventData: any) {
async handleEvent (eventData: any, extraData: ExtraEventData) {
const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData;
// Check if block data is already fetched by a previous event in the same block.
if (!this._context.block || this._context.block.blockHash !== block.hash) {
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, block.hash, block.number);
this._context.block = getFullBlock(extraData.ethFullBlock);
}
const blockData = this._context.block;
@ -197,7 +199,7 @@ export class GraphWatcher {
const eventFragment = contractInterface.getEvent(eventSignature);
const tx = await this._getTransactionData(txHash, Number(blockData.blockNumber));
const tx = this._getTransactionData(txHash, extraData.ethFullTransactions);
const data = {
block: blockData,
@ -208,9 +210,13 @@ export class GraphWatcher {
};
// Create ethereum event to be passed to the wasm event handler.
console.time(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`);
const ethereumEvent = await createEvent(instanceExports, contract, data);
console.timeEnd(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`);
try {
console.time(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`);
await this._handleMemoryError(instanceExports[eventHandler.handler](ethereumEvent), dataSource.name);
console.timeEnd(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`);
} catch (error) {
this._clearCachedEntities();
throw error;
@ -237,10 +243,11 @@ export class GraphWatcher {
continue;
}
// Check if block data is already fetched in handleEvent method for the same block.
if (!this._context.block || this._context.block.blockHash !== blockHash) {
this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash, blockNumber);
}
// TODO: Use extraData full block
// // Check if block data is already fetched in handleEvent method for the same block.
// if (!this._context.block || this._context.block.blockHash !== blockHash) {
// this._context.block = await getFullBlock(this._ethClient, this._ethProvider, blockHash, blockNumber);
// }
const blockData = this._context.block;
assert(blockData);
@ -445,15 +452,14 @@ export class GraphWatcher {
}
}
async _getTransactionData (txHash: string, blockNumber: number): Promise<Transaction> {
_getTransactionData (txHash: string, ethFullTransactions: EthFullTransaction[]): Transaction {
let transaction = this._transactionsMap.get(txHash);
if (transaction) {
return transaction;
}
transaction = await getFullTransaction(this._ethClient, txHash, blockNumber);
assert(transaction);
transaction = getFullTransaction(txHash, ethFullTransactions);
this._transactionsMap.set(txHash, transaction);
return transaction;

View File

@ -18,7 +18,9 @@ import {
ResultEvent,
StateKind,
EthClient,
UpstreamConfig
UpstreamConfig,
EthFullTransaction,
EthFullBlock
} from '@cerc-io/util';
import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
@ -117,7 +119,12 @@ export class Indexer implements IndexerInterface {
return [];
}
async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgressInterface;
events: DeepPartial<EventInterface>[];
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
}[]> {
assert(startBlock);
assert(endBlock);
@ -132,8 +139,12 @@ export class Indexer implements IndexerInterface {
return [];
}
async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
return [block, []];
async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[
BlockProgressInterface,
DeepPartial<EventInterface>[],
EthFullTransaction[]
]> {
return [block, [], []];
}
async removeUnknownEvents (block: BlockProgressInterface): Promise<void> {

View File

@ -5,7 +5,7 @@
import assert from 'assert';
import { Cache } from '@cerc-io/cache';
import { EthClient as EthClientInterface, FullTransaction } from '@cerc-io/util';
import { EthClient as EthClientInterface, EthFullTransaction } from '@cerc-io/util';
import ethQueries from './eth-queries';
import { padKey } from './utils';
@ -93,7 +93,7 @@ export class EthClient implements EthClientInterface {
async getFullBlocks ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise<any> {
console.time(`time:eth-client#getFullBlocks-${JSON.stringify({ blockNumber, blockHash })}`);
const result = await this._graphqlClient.query(
const { allEthHeaderCids } = await this._graphqlClient.query(
ethQueries.getFullBlocks,
{
blockNumber: blockNumber?.toString(),
@ -102,10 +102,10 @@ export class EthClient implements EthClientInterface {
);
console.timeEnd(`time:eth-client#getFullBlocks-${JSON.stringify({ blockNumber, blockHash })}`);
return result;
return allEthHeaderCids.nodes;
}
async getFullTransaction (txHash: string, blockNumber?: number): Promise<FullTransaction> {
async getFullTransaction (txHash: string, blockNumber?: number): Promise<EthFullTransaction> {
console.time(`time:eth-client#getFullTransaction-${JSON.stringify({ txHash, blockNumber })}`);
const result = await this._graphqlClient.query(
ethQueries.getFullTransaction,

View File

@ -75,6 +75,7 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
}
`;
// TODO: Get block size from ipld-eth-server
export const getFullBlocks = gql`
query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) {

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { errors, providers, utils } from 'ethers';
import { Cache } from '@cerc-io/cache';
import { encodeHeader, escapeHexString, EthClient as EthClientInterface, FullTransaction } from '@cerc-io/util';
import { encodeHeader, escapeHexString, EthClient as EthClientInterface, EthFullTransaction } from '@cerc-io/util';
import { padKey } from '@cerc-io/ipld-eth-client';
export interface Config {
@ -32,7 +32,10 @@ export class EthClient implements EthClientInterface {
constructor (config: Config) {
const { rpcEndpoint, cache } = config;
assert(rpcEndpoint, 'Missing RPC endpoint');
this._provider = new providers.JsonRpcProvider(rpcEndpoint);
this._provider = new providers.StaticJsonRpcProvider({
url: rpcEndpoint,
allowGzip: true
});
this._cache = cache;
}
@ -171,39 +174,33 @@ export class EthClient implements EthClientInterface {
const rlpData = encodeHeader(header);
const allEthHeaderCids = {
nodes: [
{
blockNumber: this._provider.formatter.number(rawBlock.number).toString(),
blockHash: this._provider.formatter.hash(rawBlock.hash),
parentHash: this._provider.formatter.hash(rawBlock.parentHash),
timestamp: this._provider.formatter.number(rawBlock.timestamp).toString(),
stateRoot: this._provider.formatter.hash(rawBlock.stateRoot),
td: this._provider.formatter.bigNumber(rawBlock.totalDifficulty).toString(),
txRoot: this._provider.formatter.hash(rawBlock.transactionsRoot),
receiptRoot: this._provider.formatter.hash(rawBlock.receiptsRoot),
uncleRoot: this._provider.formatter.hash(rawBlock.sha3Uncles),
bloom: escapeHexString(this._provider.formatter.hex(rawBlock.logsBloom)),
blockByMhKey: {
data: escapeHexString(rlpData)
}
}
]
};
return { allEthHeaderCids };
return [{
blockNumber: this._provider.formatter.number(rawBlock.number).toString(),
blockHash: this._provider.formatter.hash(rawBlock.hash),
parentHash: this._provider.formatter.hash(rawBlock.parentHash),
timestamp: this._provider.formatter.number(rawBlock.timestamp).toString(),
stateRoot: this._provider.formatter.hash(rawBlock.stateRoot),
td: this._provider.formatter.bigNumber(rawBlock.totalDifficulty).toString(),
txRoot: this._provider.formatter.hash(rawBlock.transactionsRoot),
receiptRoot: this._provider.formatter.hash(rawBlock.receiptsRoot),
uncleRoot: this._provider.formatter.hash(rawBlock.sha3Uncles),
bloom: escapeHexString(this._provider.formatter.hex(rawBlock.logsBloom)),
size: this._provider.formatter.number(rawBlock.size).toString(),
blockByMhKey: {
data: escapeHexString(rlpData)
}
}];
}
async getFullTransaction (txHash: string): Promise<FullTransaction> {
async getFullTransaction (txHash: string): Promise<EthFullTransaction> {
console.time(`time:eth-client#getFullTransaction-${JSON.stringify({ txHash })}`);
const tx = await this._provider.getTransaction(txHash);
console.timeEnd(`time:eth-client#getFullTransaction-${JSON.stringify({ txHash })}`);
const txReceipt = await tx.wait();
return {
ethTransactionCidByTxHash: {
txHash: tx.hash,
index: txReceipt.transactionIndex,
index: (tx as any).transactionIndex,
src: tx.from,
dst: tx.to
},

View File

@ -14,7 +14,7 @@ import {
NULL_BLOCK_ERROR
} from './constants';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, IndexerInterface, EventInterface } from './types';
import { BlockProgressInterface, IndexerInterface, EventInterface, EthFullTransaction, EthFullBlock } from './types';
import { wait } from './misc';
import { OrderDirection } from './database';
import { JobQueueConfig } from './config';
@ -27,6 +27,8 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true });
export interface PrefetchedBlock {
block: BlockProgressInterface;
events: DeepPartial<EventInterface>[];
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
}
/**
@ -104,6 +106,20 @@ export const fetchBlocksAtHeight = async (
if (!blocks.length) {
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
await wait(jobQueueConfig.blockDelayInMilliSecs);
} else {
blocks.forEach(block => {
blockAndEventsMap.set(
block.blockHash,
{
// Block is set later in job-runner when saving to database
block: {} as BlockProgressInterface,
events: [],
ethFullBlock: block,
// Transactions are set later in job-runner when fetching events
ethFullTransactions: []
}
);
});
}
} catch (err: any) {
// Handle null block error in case of Lotus EVM
@ -153,15 +169,15 @@ export const fetchAndSaveFilteredLogsAndBlocks = async (
): Promise<BlockProgressInterface[]> => {
// Fetch filtered logs and required blocks
console.time('time:common#fetchAndSaveFilteredLogsAndBlocks-fetchAndSaveFilteredEventsAndBlocks');
const blocksWithEvents = await indexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock);
const blocksData = await indexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock);
console.timeEnd('time:common#fetchAndSaveFilteredLogsAndBlocks-fetchAndSaveFilteredEventsAndBlocks');
// Set blocks with events in blockAndEventsMap cache
blocksWithEvents.forEach(({ blockProgress, events }) => {
blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events });
blocksData.forEach(({ blockProgress, events, ethFullBlock, ethFullTransactions }) => {
blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events, ethFullBlock, ethFullTransactions });
});
return blocksWithEvents.map(({ blockProgress }) => blockProgress);
return blocksData.map(({ blockProgress }) => blockProgress);
};
export const _prefetchBlocks = async (
@ -181,7 +197,15 @@ export const _prefetchBlocks = async (
);
blocksWithEvents.forEach(({ blockProgress, events }) => {
blockAndEventsMap.set(blockProgress.blockHash, { block: blockProgress, events });
blockAndEventsMap.set(
blockProgress.blockHash,
{
block: blockProgress,
events,
// TODO: Set ethFullBlock and ethFullTransactions
ethFullBlock: {} as EthFullBlock,
ethFullTransactions: []
});
});
};
@ -283,17 +307,23 @@ export const _fetchBatchBlocks = async (
*/
export const processBatchEvents = async (
indexer: IndexerInterface,
block: BlockProgressInterface,
eventsInBatch: number,
subgraphEventsOrder: boolean
data: {
block: BlockProgressInterface;
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
},
{ eventsInBatch, subgraphEventsOrder }: {
eventsInBatch: number;
subgraphEventsOrder: boolean;
}
): Promise<boolean> => {
let dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[];
let isNewContractWatched = false;
if (subgraphEventsOrder) {
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
({ dbBlock, updatedDbEvents, isNewContractWatched } = await _processEventsInSubgraphOrder(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} else {
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
({ dbBlock, updatedDbEvents } = await _processEvents(indexer, data, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
}
if (indexer.processBlockAfterEvents) {
@ -314,7 +344,15 @@ export const processBatchEvents = async (
return isNewContractWatched;
};
const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[] }> => {
const _processEvents = async (
indexer: IndexerInterface,
{ block, ethFullBlock, ethFullTransactions }: {
block: BlockProgressInterface;
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
},
eventsInBatch: number
): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[] }> => {
const updatedDbEvents: EventInterface[] = [];
let page = 0;
@ -356,7 +394,7 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt
updatedDbEvents.push(event);
}
await indexer.processEvent(event);
await indexer.processEvent(event, { ethFullBlock, ethFullTransactions });
}
block.lastProcessedEventIndex = event.index;
@ -371,7 +409,15 @@ const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInt
return { dbBlock: block, updatedDbEvents: updatedDbEvents };
};
const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[], isNewContractWatched: boolean }> => {
const _processEventsInSubgraphOrder = async (
indexer: IndexerInterface,
{ block, ethFullBlock, ethFullTransactions }: {
block: BlockProgressInterface;
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
},
eventsInBatch: number
): Promise<{ dbBlock: BlockProgressInterface, updatedDbEvents: EventInterface[], isNewContractWatched: boolean }> => {
// Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = [];
@ -411,7 +457,9 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
// Process known events in a loop
for (const event of watchedContractEvents) {
await indexer.processEvent(event);
console.time(`time:common#_processEventsInSubgraphOrder-block-${block.blockNumber}-processEvent-${event.eventName}`);
await indexer.processEvent(event, { ethFullBlock, ethFullTransactions });
console.timeEnd(`time:common#_processEventsInSubgraphOrder-block-${block.blockNumber}-processEvent-${event.eventName}`);
block.lastProcessedEventIndex = event.index;
block.numProcessedEvents++;
@ -430,7 +478,9 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
if (indexer.upstreamConfig.ethServer.filterLogsByAddresses) {
// Fetch and parse events for newly watched contracts
const newContracts = watchedContracts.filter(contract => !initiallyWatchedContracts.includes(contract));
console.time(`time:common#_processEventsInSubgraphOrder-fetchEventsForContracts-block-${block.blockNumber}-unwatched-contract`);
const events = await indexer.fetchEventsForContracts(block.blockHash, block.blockNumber, newContracts);
console.timeEnd(`time:common#_processEventsInSubgraphOrder-fetchEventsForContracts-block-${block.blockNumber}-unwatched-contract`);
events.forEach(event => {
event.block = block;
@ -457,7 +507,9 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
console.time('time:common#processEventsInSubgraphOrder-processing_initially_unwatched_events');
// In the end process events of newly watched contracts
for (const updatedDbEvent of updatedDbEvents) {
await indexer.processEvent(updatedDbEvent);
console.time(`time:common#processEventsInSubgraphOrder-block-${block.blockNumber}-updated-processEvent-${updatedDbEvent.eventName}`);
await indexer.processEvent(updatedDbEvent, { ethFullBlock, ethFullTransactions });
console.timeEnd(`time:common#processEventsInSubgraphOrder-block-${block.blockNumber}-updated-processEvent-${updatedDbEvent.eventName}`);
block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, updatedDbEvent.index);
block.numProcessedEvents++;

View File

@ -33,6 +33,7 @@ export class EventWatcher {
_indexer: IndexerInterface;
_pubsub: PubSub;
_jobQueue: JobQueue;
_realtimeProcessingStarted = false;
_shutDown = false;
_signalCount = 0;
@ -135,6 +136,13 @@ export class EventWatcher {
log(`Starting realtime block processing from block ${startBlockNumber}`);
await processBlockByNumber(this._jobQueue, startBlockNumber);
// Check if realtime processing already started and avoid resubscribing to block progress event
if (this._realtimeProcessingStarted) {
return;
}
this._realtimeProcessingStarted = true;
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
const blockProgressEventIterable = {

View File

@ -34,7 +34,7 @@ export interface Transaction {
hash: string;
index: number;
from: string;
to: string;
to?: string;
value: string;
gasLimit: string;
gasPrice?: string;

View File

@ -6,6 +6,7 @@ import assert from 'assert';
import { BlockProgressInterface, IndexerInterface } from './types';
import { processBatchEvents } from './common';
import { EthFullBlock } from '.';
export const indexBlock = async (
indexer: IndexerInterface,
@ -46,6 +47,14 @@ export const indexBlock = async (
assert(indexer.processBlock);
await indexer.processBlock(blockProgress);
await processBatchEvents(indexer, blockProgress, eventsInBatch, subgraphEventsOrder);
await processBatchEvents(
indexer,
{
block: blockProgress,
// TODO: Set ethFullBlock and ethFullTransactions
ethFullBlock: {} as EthFullBlock,
ethFullTransactions: []
},
{ eventsInBatch, subgraphEventsOrder });
}
};

View File

@ -24,7 +24,9 @@ import {
StateKind,
EthClient,
ContractJobData,
EventsQueueJobKind
EventsQueueJobKind,
EthFullBlock,
EthFullTransaction
} from './types';
import { UNKNOWN_EVENT_NAME, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants';
import { JobQueue } from './job-queue';
@ -100,6 +102,11 @@ export type ResultMeta = {
hasIndexingErrors: boolean;
};
export type ExtraEventData = {
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
}
export class Indexer {
_serverConfig: ServerConfig;
_upstreamConfig: UpstreamConfig;
@ -284,10 +291,9 @@ export class Indexer {
return res;
}
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<EthFullBlock[]> {
assert(blockFilter.blockHash || blockFilter.blockNumber);
const result = await this._ethClient.getBlocks(blockFilter);
const { allEthHeaderCids: { nodes: blocks } } = result;
const blocks = await this._ethClient.getFullBlocks(blockFilter);
if (!blocks.length) {
try {
@ -461,7 +467,12 @@ export class Indexer {
kind: string,
logObj: { topics: string[]; data: string }
) => { eventName: string; eventInfo: {[key: string]: any}; eventSignature: string }
): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
): Promise<{
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[],
ethFullBlock: EthFullBlock,
ethFullTransactions: EthFullTransaction[]
}[]> {
assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
@ -474,45 +485,67 @@ export class Indexer {
});
const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
// Create unique list of tx required
const txHashes = Array.from([
...new Set<string>(logs.map((log: any) => log.transaction.hash))
]);
// Fetch blocks with transactions for the logs returned
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
const blocksWithTxPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => {
const result = await this._ethClient.getBlockWithTransactions({ blockHash });
const blocksPromises = Array.from(blockLogsMap.keys()).map(async (blockHash) => {
const [fullBlock] = await this._ethClient.getFullBlocks({ blockHash });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
},
...block
}
]
}
} = result;
const block = {
...fullBlock,
blockTimestamp: Number(fullBlock.timestamp),
blockNumber: Number(fullBlock.blockNumber)
};
block.blockTimestamp = Number(block.timestamp);
block.blockNumber = Number(block.blockNumber);
return { block, transactions } as { block: DeepPartial<BlockProgressInterface>; transactions: any[] };
return { block, fullBlock } as { block: DeepPartial<BlockProgressInterface>; fullBlock: EthFullBlock };
});
const blockWithTxs = await Promise.all(blocksWithTxPromises);
const ethFullTxPromises = txHashes.map(async txHash => {
return this._ethClient.getFullTransaction(txHash);
});
const blocks = await Promise.all(blocksPromises);
const ethFullTxs = await Promise.all(ethFullTxPromises);
const ethFullTxsMap = ethFullTxs.reduce((acc: Map<string, EthFullTransaction>, ethFullTx) => {
acc.set(ethFullTx.ethTransactionCidByTxHash.txHash, ethFullTx);
return acc;
}, new Map());
console.timeEnd(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-fetch-blocks-txs-${fromBlock}-${toBlock}`);
// Map db ready events according to blockhash
console.time(`time:indexer#fetchAndSaveFilteredEventsAndBlocks-db-save-blocks-events-${fromBlock}-${toBlock}`);
const blockWithDbEventsPromises = blockWithTxs.map(async ({ block, transactions }) => {
const blockWithDbEventsPromises = blocks.map(async ({ block, fullBlock }) => {
const blockHash = block.blockHash;
assert(blockHash);
const logs = blockLogsMap.get(blockHash) || [];
const events = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
const txHashes = Array.from([
...new Set<string>(logs.map((log: any) => log.transaction.hash))
]);
const blockEthFullTxs = txHashes.map(txHash => ethFullTxsMap.get(txHash)) as EthFullTransaction[];
const events = this.createDbEventsFromLogsAndTxs(
blockHash,
logs,
blockEthFullTxs.map(ethFullTx => ethFullTx?.ethTransactionCidByTxHash),
parseEventNameAndArgs
);
const [blockProgress] = await this.saveBlockWithEvents(block, events);
return { blockProgress, events: [] };
return {
blockProgress,
ethFullBlock: fullBlock,
ethFullTransactions: blockEthFullTxs,
block,
events: []
};
});
const blocksWithDbEvents = await Promise.all(blockWithDbEventsPromises);
@ -536,48 +569,57 @@ export class Indexer {
}
// Fetch events (to be saved to db) for a particular block
async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ events: DeepPartial<EventInterface>[], transactions: EthFullTransaction[]}> {
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);
return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
const events = this.createDbEventsFromLogsAndTxs(
blockHash,
logs,
transactions.map(tx => tx.ethTransactionCidByTxHash),
parseEventNameAndArgs
);
return { events, transactions };
}
async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
const { topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);
return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
return this.createDbEventsFromLogsAndTxs(
blockHash,
logs,
transactions.map(tx => tx.ethTransactionCidByTxHash),
parseEventNameAndArgs
);
}
async _fetchLogsAndTransactions (blockHash: string, blockNumber: number, addresses?: string[], topics?: string[][]): Promise<{ logs: any[]; transactions: any[] }> {
const logsPromise = await this._ethClient.getLogs({
async _fetchLogsAndTransactions (blockHash: string, blockNumber: number, addresses?: string[], topics?: string[][]): Promise<{ logs: any[]; transactions: EthFullTransaction[] }> {
const { logs } = await this._ethClient.getLogs({
blockHash,
blockNumber: blockNumber.toString(),
addresses,
topics
});
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber });
const [
{ logs },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([logsPromise, transactionsPromise]);
const transactions = await this._fetchTxsFromLogs(logs);
return { logs, transactions };
}
async _fetchTxsFromLogs (logs: any[]): Promise<EthFullTransaction[]> {
const txHashes = Array.from([
...new Set<string>(logs.map((log) => log.transaction.hash))
]);
const ethFullTxPromises = txHashes.map(async txHash => {
return this._ethClient.getFullTransaction(txHash);
});
return Promise.all(ethFullTxPromises);
}
// Create events to be saved to db for a block given blockHash, logs, transactions and a parser function
createDbEventsFromLogsAndTxs (blockHash: string, logs: any, transactions: any, parseEventNameAndArgs: (kind: string, logObj: any) => any): DeepPartial<EventInterface>[] {
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {

View File

@ -502,6 +502,20 @@ export class JobRunner {
throw new Error(message);
}
blocks.forEach(block => {
this._blockAndEventsMap.set(
block.blockHash,
{
// block is set later in job when saving to database
block: {} as BlockProgressInterface,
events: [],
ethFullBlock: block,
// Transactions are set later in job when fetching events
ethFullTransactions: []
}
);
});
const [{ cid: parentCid, blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks;
await this.jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
@ -549,7 +563,9 @@ export class JobRunner {
if (!blockProgress) {
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
if (prefetchedBlock) {
// Check if prefetched block is set properly
// prefetchedBlock.block is an empty object when running in realtime processing
if (prefetchedBlock && prefetchedBlock.block.blockHash) {
({ block: blockProgress } = prefetchedBlock);
} else {
// Delay required to process block.
@ -558,11 +574,20 @@ export class JobRunner {
console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`);
[blockProgress] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
let ethFullTransactions;
[blockProgress,, ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
const data = this._blockAndEventsMap.get(blockHash);
assert(data);
this._blockAndEventsMap.set(blockHash, { block: blockProgress, events: [] });
this._blockAndEventsMap.set(
blockHash,
{
...data,
block: blockProgress,
ethFullTransactions
});
}
}
@ -588,26 +613,33 @@ export class JobRunner {
const { blockHash, isRetryAttempt } = jobData;
try {
if (!this._blockAndEventsMap.has(blockHash)) {
console.time('time:job-runner#_processEvents-get-block-progress');
const block = await this._indexer.getBlockProgress(blockHash);
console.timeEnd('time:job-runner#_processEvents-get-block-progress');
// NOTE: blockAndEventsMap should contain block as watcher is reset
// if (!this._blockAndEventsMap.has(blockHash)) {
// console.time('time:job-runner#_processEvents-get-block-progress');
// const block = await this._indexer.getBlockProgress(blockHash);
// console.timeEnd('time:job-runner#_processEvents-get-block-progress');
assert(block);
this._blockAndEventsMap.set(blockHash, { block, events: [] });
}
// assert(block);
// this._blockAndEventsMap.set(blockHash, { block, events: [] });
// }
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);
assert(prefetchedBlock);
const { block } = prefetchedBlock;
const { block, ethFullBlock, ethFullTransactions } = prefetchedBlock;
log(`Processing events for block ${block.blockNumber}`);
console.time(`time:job-runner#_processEvents-events-${block.blockNumber}`);
const isNewContractWatched = await processBatchEvents(
this._indexer,
block,
this._jobQueueConfig.eventsInBatch,
this._jobQueueConfig.subgraphEventsOrder
{
block,
ethFullBlock,
ethFullTransactions
},
{
eventsInBatch: this._jobQueueConfig.eventsInBatch,
subgraphEventsOrder: this._jobQueueConfig.subgraphEventsOrder
}
);
console.timeEnd(`time:job-runner#_processEvents-events-${block.blockNumber}`);

View File

@ -18,10 +18,10 @@ import { GQLCacheConfig, Config } from './config';
import { JobQueue } from './job-queue';
import { GraphDecimal } from './graph/graph-decimal';
import * as EthDecoder from './eth';
import { getCachedBlockSize } from './block-size-cache';
import { ResultEvent } from './indexer';
import { EventInterface, EthClient } from './types';
import { EventInterface, EthFullBlock, EthFullTransaction } from './types';
import { BlockHeight } from './database';
import { Transaction } from './graph/utils';
const JSONbigNative = JSONbig({ useNativeBigInt: true });
@ -154,7 +154,7 @@ export const getResetYargs = (): yargs.Argv => {
};
export const getCustomProvider = (url?: utils.ConnectionInfo | string, network?: providers.Networkish): providers.JsonRpcProvider => {
const provider = new providers.JsonRpcProvider(url, network);
const provider = new providers.StaticJsonRpcProvider(url, network);
provider.formatter = new CustomFormatter();
return provider;
};
@ -182,52 +182,40 @@ class CustomFormatter extends providers.Formatter {
}
}
export const getFullBlock = async (ethClient: EthClient, ethProvider: providers.BaseProvider, blockHash: string, blockNumber: number): Promise<any> => {
const {
allEthHeaderCids: {
nodes: [
fullBlock
]
}
} = await ethClient.getFullBlocks({ blockHash, blockNumber });
assert(fullBlock.blockByMhKey);
export const getFullBlock = (ethFullBlock: EthFullBlock): any => {
// Decode the header data.
const header = EthDecoder.decodeHeader(EthDecoder.decodeData(fullBlock.blockByMhKey.data));
const header = EthDecoder.decodeHeader(EthDecoder.decodeData(ethFullBlock.blockByMhKey.data));
assert(header);
// TODO: Calculate size from rlp encoded data.
// Get block info from JSON RPC API provided by ipld-eth-server.
const provider = ethProvider as providers.JsonRpcProvider;
const size = await getCachedBlockSize(provider, blockHash, Number(fullBlock.blockNumber));
return {
headerId: fullBlock.id,
cid: fullBlock.cid,
blockNumber: fullBlock.blockNumber,
blockHash: fullBlock.blockHash,
parentHash: fullBlock.parentHash,
timestamp: fullBlock.timestamp,
stateRoot: fullBlock.stateRoot,
td: fullBlock.td,
txRoot: fullBlock.txRoot,
receiptRoot: fullBlock.receiptRoot,
uncleHash: fullBlock.uncleRoot,
headerId: ethFullBlock.id,
cid: ethFullBlock.cid,
blockNumber: ethFullBlock.blockNumber,
blockHash: ethFullBlock.blockHash,
parentHash: ethFullBlock.parentHash,
timestamp: ethFullBlock.timestamp,
stateRoot: ethFullBlock.stateRoot,
td: ethFullBlock.td,
txRoot: ethFullBlock.txRoot,
receiptRoot: ethFullBlock.receiptRoot,
uncleHash: ethFullBlock.uncleRoot,
difficulty: header.Difficulty.toString(),
gasLimit: header.GasLimit.toString(),
gasUsed: header.GasUsed.toString(),
author: header.Beneficiary,
size: BigInt(size).toString(),
size: ethFullBlock.size,
baseFee: header.BaseFee?.toString()
};
};
export const getFullTransaction = async (ethClient: EthClient, txHash: string, blockNumber: number): Promise<any> => {
export const getFullTransaction = (txHash: string, ethFullTransactions: EthFullTransaction[]): Transaction => {
const ethFullTransaction = ethFullTransactions.find(ethFullTransaction => ethFullTransaction.ethTransactionCidByTxHash.txHash === txHash);
assert(ethFullTransaction);
let {
ethTransactionCidByTxHash: fullTx,
data: txData
} = await ethClient.getFullTransaction(txHash, blockNumber);
} = ethFullTransaction;
// Check if txData does not exist when using ipld-eth-client
if (!txData) {

View File

@ -9,7 +9,7 @@ import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { ServerConfig, UpstreamConfig } from './config';
import { Where, QueryOptions, Database } from './database';
import { ValueResult, StateStatus } from './indexer';
import { ValueResult, StateStatus, ExtraEventData } from './indexer';
import { JOB_KIND_CONTRACT, JOB_KIND_EVENTS } from './constants';
export enum StateKind {
@ -84,6 +84,77 @@ export interface StateInterface {
data: Buffer;
}
export interface EthFullTransaction {
ethTransactionCidByTxHash: {
txHash: string;
index: number;
src: string;
dst?: string;
blockByMhKey?: {
data: string;
}
},
data?: Transaction;
}
export interface EthFullBlock {
id?: string,
cid?: string;
blockNumber: string;
blockHash: string;
parentHash: string;
timestamp: string;
stateRoot: string;
td: string;
txRoot: string;
receiptRoot: string;
uncleRoot: string;
bloom: string;
size: string;
blockByMhKey: {
data: string;
}
}
export interface EthClient {
getStorageAt({ blockHash, contract, slot }: {
blockHash: string;
contract: string;
slot: string;
}): Promise<{
value: string;
proof: {
data: string;
};
}>;
getBlockWithTransactions({ blockNumber, blockHash }: {
blockNumber?: number;
blockHash?: string;
}): Promise<any>;
getBlocks({ blockNumber, blockHash }: {
blockNumber?: number;
blockHash?: string;
}): Promise<any>;
getFullBlocks({ blockNumber, blockHash }: {
blockNumber?: number;
blockHash?: string;
}): Promise<EthFullBlock[]>;
getFullTransaction(txHash: string, blockNumber?: number): Promise<EthFullTransaction>;
getBlockByHash(blockHash?: string): Promise<any>;
getLogs(vars: {
blockHash: string,
blockNumber: string,
addresses?: string[],
topics?: string[][]
}): Promise<any>;
getLogsForBlockRange?: (vars: {
fromBlock?: number,
toBlock?: number,
addresses?: string[],
topics?: string[][]
}) => Promise<any>;
}
export interface IndexerInterface {
readonly serverConfig: ServerConfig
readonly upstreamConfig: UpstreamConfig
@ -95,15 +166,24 @@ export interface IndexerInterface {
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (): Promise<SyncStatusInterface | undefined>
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<EthFullBlock[]>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]>
fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[
BlockProgressInterface,
DeepPartial<EventInterface>[],
EthFullTransaction[]
]>
fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[],
ethFullBlock: EthFullBlock,
ethFullTransactions: EthFullTransaction[]
}[]>
fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise<DeepPartial<EventInterface>[]>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
@ -117,7 +197,7 @@ export interface IndexerInterface {
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
saveEvents (dbEvents: DeepPartial<EventInterface>[]): Promise<void>
processEvent (event: EventInterface): Promise<void>
processEvent (event: EventInterface, extraData: ExtraEventData): Promise<void>
parseEventNameAndArgs?: (kind: string, logObj: any) => any
isWatchedContract: (address: string) => ContractInterface | undefined;
getWatchedContracts: () => ContractInterface[]
@ -204,58 +284,6 @@ export interface GraphWatcherInterface {
setIndexer (indexer: IndexerInterface): void;
}
export interface FullTransaction {
ethTransactionCidByTxHash: {
txHash: string;
index: number;
src: string;
dst?: string;
blockByMhKey?: {
data: string;
}
},
data?: Transaction;
}
export interface EthClient {
getStorageAt({ blockHash, contract, slot }: {
blockHash: string;
contract: string;
slot: string;
}): Promise<{
value: string;
proof: {
data: string;
};
}>;
getBlockWithTransactions({ blockNumber, blockHash }: {
blockNumber?: number;
blockHash?: string;
}): Promise<any>;
getBlocks({ blockNumber, blockHash }: {
blockNumber?: number;
blockHash?: string;
}): Promise<any>;
getFullBlocks({ blockNumber, blockHash }: {
blockNumber?: number;
blockHash?: string;
}): Promise<any>;
getFullTransaction(txHash: string, blockNumber?: number): Promise<FullTransaction>;
getBlockByHash(blockHash?: string): Promise<any>;
getLogs(vars: {
blockHash: string,
blockNumber: string,
addresses?: string[],
topics?: string[][]
}): Promise<any>;
getLogsForBlockRange?: (vars: {
fromBlock?: number,
toBlock?: number,
addresses?: string[],
topics?: string[][]
}) => Promise<any>;
}
export type Clients = {
ethClient: EthClient;
[key: string]: any;