Fetch event logs for a block range in a single upstream call (#433)

* Support fetching logs for a block range in rpc-eth-client

* Add a method to fetch block events for multiple blocks at once

* Add a method to save blocks with fetched events in a block range

* Fix transactions destructuring

* Fix get logs call args

* Add a separate ETH client method to get logs in a block range

* Codegen changes
This commit is contained in:
prathamesh0 2023-10-23 16:10:09 +05:30 committed by GitHub
parent 45b7489115
commit e8d8476bef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 25 deletions

View File

@ -16,6 +16,9 @@ columns:
pgType: varchar pgType: varchar
tsType: string tsType: string
columnType: Column columnType: Column
columnOptions:
- option: nullable
value: true
- name: blockHash - name: blockHash
pgType: varchar pgType: varchar
tsType: string tsType: string

View File

@ -625,6 +625,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned); return this._baseIndexer.getBlocksAtHeight(height, isPruned);
} }
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> {
return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this.parseEventNameAndArgs.bind(this))
}
async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._saveBlockAndFetchEvents(block); return this._saveBlockAndFetchEvents(block);
} }

View File

@ -106,6 +106,12 @@ export class Indexer implements IndexerInterface {
return ''; return '';
} }
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
assert(blocks);
return [];
}
async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> { async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
return [block, []]; return [block, []];
} }

View File

@ -144,6 +144,9 @@ export class EthClient implements EthClientInterface {
return { logs: getLogs }; return { logs: getLogs };
} }
// TODO: Implement
// async getLogsForBlockRange(): Promise<any> {}
async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise<any> { async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise<any> {
const keyObj = { const keyObj = {
queryName, queryName,

View File

@ -21,6 +21,8 @@ interface Vars {
contract?: string; contract?: string;
slot?: string; slot?: string;
addresses?: string[]; addresses?: string[];
fromBlock?: number;
toBlock?: number;
} }
export class EthClient implements EthClientInterface { export class EthClient implements EthClientInterface {
@ -230,17 +232,47 @@ export class EthClient implements EthClientInterface {
}; };
} }
async getLogs (vars: { blockHash: string, blockNumber: string, addresses?: string[] }): Promise<any> { async getLogs (vars: {
const { blockNumber, addresses = [] } = vars; blockHash: string,
blockNumber: string,
addresses?: string[]
}): Promise<any> {
const blockNumber = Number(vars.blockNumber);
console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
const result = await this._getLogs({ fromBlock: blockNumber, toBlock: blockNumber, addresses: vars.addresses });
console.timeEnd(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
return result;
}
async getLogsForBlockRange (vars: {
fromBlock?: number,
toBlock?: number,
addresses?: string[]
}): Promise<any> {
console.time(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`);
const result = await this._getLogs({ fromBlock: Number(vars.fromBlock), toBlock: Number(vars.toBlock), addresses: vars.addresses });
console.timeEnd(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`);
return result;
}
// TODO: Implement return type
async _getLogs (vars: {
fromBlock?: number,
toBlock?: number,
addresses?: string[]
}): Promise<any> {
const { fromBlock, toBlock, addresses = [] } = vars;
const result = await this._getCachedOrFetch( const result = await this._getCachedOrFetch(
'getLogs', 'getLogs',
vars, vars,
async () => { async () => {
const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({ const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({
fromBlock: Number(blockNumber), fromBlock,
toBlock: Number(blockNumber), toBlock,
address address
})); }));
const logsByAddress = await Promise.all(logsByAddressPromises); const logsByAddress = await Promise.all(logsByAddressPromises);
@ -249,8 +281,8 @@ export class EthClient implements EthClientInterface {
// If no addresses provided to filter // If no addresses provided to filter
if (!logs.length) { if (!logs.length) {
logs = await this._provider.getLogs({ logs = await this._provider.getLogs({
fromBlock: Number(blockNumber), fromBlock,
toBlock: Number(blockNumber) toBlock
}); });
} }
@ -272,10 +304,11 @@ export class EthClient implements EthClientInterface {
acc.set(txReceipt.transactionHash, txReceipt); acc.set(txReceipt.transactionHash, txReceipt);
return acc; return acc;
}, new Map<string, providers.TransactionReceipt>()); }, new Map<string, providers.TransactionReceipt>());
console.timeEnd(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
return { return {
logs: result.map((log) => ({ logs: result.map((log) => ({
// blockHash required for sorting logs fetched in a block range
blockHash: log.blockHash,
account: { account: {
address: log.address address: log.address
}, },

View File

@ -196,34 +196,22 @@ export const _fetchBatchBlocks = async (
await wait(jobQueueConfig.blockDelayInMilliSecs); await wait(jobQueueConfig.blockDelayInMilliSecs);
} }
// Flatten array as there can be multiple blocks at the same height
blocks = blocks.flat(); blocks = blocks.flat();
if (jobQueueConfig.jobDelayInMilliSecs) { if (jobQueueConfig.jobDelayInMilliSecs) {
await wait(jobQueueConfig.jobDelayInMilliSecs); await wait(jobQueueConfig.jobDelayInMilliSecs);
} }
console.time('time:common#fetchBatchBlocks-saveBlockAndFetchEvents'); blocks.forEach(block => {
const blockAndEventsPromises = blocks.map(async block => {
block.blockTimestamp = block.timestamp; block.blockTimestamp = block.timestamp;
try {
log(`_fetchBatchBlocks#saveBlockAndFetchEvents: fetching from upstream server ${block.blockHash}`);
const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block);
log(`_fetchBatchBlocks#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
return { blockProgress, events };
} catch (error) {
log(error);
return null;
}
}); });
const blockAndEventsList = await Promise.all(blockAndEventsPromises); console.time('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');
console.timeEnd('time:common#fetchBatchBlocks-saveBlockAndFetchEvents'); const blockAndEventsList = await indexer.fetchEventsAndSaveBlocks(blocks);
console.timeEnd('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');
return blockAndEventsList.filter(blockAndEvent => blockAndEvent !== null) as { return blockAndEventsList;
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[]
}[];
}; };
/** /**

View File

@ -263,6 +263,117 @@ export class Indexer {
return this._db.getEvent(id); return this._db.getEvent(id);
} }
// For each of the given blocks, fetches events and saves them along with the block to db
// Returns an array with [block, events] for all the given blocks
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber;
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`);
const dbEventsMap = await this.fetchEventsForBlocks(blocks, parseEventNameAndArgs);
const blocksWithEventsPromises = blocks.map(async block => {
const blockHash = block.blockHash;
assert(blockHash);
const blockToSave = {
cid: block.cid,
blockHash: block.blockHash,
blockNumber: block.blockNumber,
blockTimestamp: block.blockTimestamp,
parentHash: block.parentHash
};
const dbEvents = dbEventsMap.get(blockHash) || [];
const [blockProgress] = await this.saveBlockWithEvents(blockToSave, dbEvents);
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetched for block: ${blockHash} num events: ${blockProgress.numEvents}`);
return { blockProgress, events: [] };
});
return Promise.all(blocksWithEventsPromises);
}
// Fetch events (to be saved to db) for a block range
async fetchEventsForBlocks (blocks: DeepPartial<BlockProgressInterface>[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<Map<string, DeepPartial<EventInterface>[]>> {
if (!blocks.length) {
return new Map();
}
// Fetch logs for block range of given blocks
let logsPromise: Promise<any>;
const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber;
assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');
if (this._serverConfig.filterLogs) {
const watchedContracts = this.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
logsPromise = this._ethClient.getLogsForBlockRange({
fromBlock,
toBlock,
addresses
});
} else {
logsPromise = this._ethClient.getLogsForBlockRange({ fromBlock, toBlock });
}
// Fetch transactions for given blocks
const transactionsMap: Map<string, any> = new Map();
const transactionPromises = blocks.map(async (block) => {
assert(block.blockHash);
const blockWithTransactions = await this._ethClient.getBlockWithTransactions({ blockHash: block.blockHash, blockNumber: block.blockNumber });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
} = blockWithTransactions;
transactionsMap.set(block.blockHash, transactions);
});
const [{ logs }] = await Promise.all([logsPromise, ...transactionPromises]);
// Sort logs according to blockhash
const logsMap: Map<string, any> = new Map();
logs.forEach((log: any) => {
const { blockHash: logBlockHash } = log;
assert(typeof logBlockHash === 'string');
if (!logsMap.has(logBlockHash)) {
logsMap.set(logBlockHash, []);
}
logsMap.get(logBlockHash).push(log);
});
// Map db ready events according to blockhash
const dbEventsMap: Map<string, DeepPartial<EventInterface>[]> = new Map();
blocks.forEach(block => {
const blockHash = block.blockHash;
assert(blockHash);
const logs = logsMap.get(blockHash) || [];
const transactions = transactionsMap.get(blockHash);
const dbEvents = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
dbEventsMap.set(blockHash, dbEvents);
});
return dbEventsMap;
}
// Fetch events (to be saved to db) for a particular block
async fetchEvents (blockHash: string, blockNumber: number, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> { async fetchEvents (blockHash: string, blockNumber: number, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
let logsPromise: Promise<any>; let logsPromise: Promise<any>;
@ -298,6 +409,11 @@ export class Indexer {
} }
] = await Promise.all([logsPromise, transactionsPromise]); ] = await Promise.all([logsPromise, transactionsPromise]);
return this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
}
// Create events to be saved to db for a block given blockHash, logs, transactions and a parser function
createDbEventsFromLogsAndTxs (blockHash: string, logs: any, transactions: any, parseEventNameAndArgs: (kind: string, logObj: any) => any): DeepPartial<EventInterface>[] {
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction; acc[transaction.txHash] = transaction;
return acc; return acc;
@ -365,6 +481,23 @@ export class Indexer {
return dbEvents; return dbEvents;
} }
async saveBlockWithEvents (block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
const dbTx = await this._db.createTransactionRunner();
try {
console.time(`time:indexer#_saveBlockWithEvents-db-save-${block.blockNumber}`);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, events);
await dbTx.commitTransaction();
console.timeEnd(`time:indexer#_saveBlockWithEvents-db-save-${block.blockNumber}`);
return [blockProgress, []];
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async saveBlockProgress (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> { async saveBlockProgress (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> {
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
let res; let res;

View File

@ -95,6 +95,7 @@ export interface IndexerInterface {
getLatestStateIndexedBlock (): Promise<BlockProgressInterface> getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>> getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string> getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]>
saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]>
removeUnknownEvents (block: BlockProgressInterface): Promise<void> removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
@ -219,6 +220,11 @@ export interface EthClient {
blockNumber: string, blockNumber: string,
addresses?: string[] addresses?: string[]
}): Promise<any>; }): Promise<any>;
getLogsForBlockRange?: (vars: {
fromBlock?: number,
toBlock?: number,
addresses?: string[]
}) => Promise<any>;
} }
export type Clients = { export type Clients = {