From a8e59eb6b9ed445f939abbbcc5057490cc074f76 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 26 Oct 2023 17:25:56 +0530 Subject: [PATCH] Support filtering by topics when fetching logs in rpc-eth-client (#441) * Support filtering by topics when fetching logs * Include event signatures from all contracts in topics list * Refactor common code * Update package versions --- lerna.json | 2 +- packages/cache/package.json | 2 +- packages/cli/package.json | 12 +- packages/codegen/package.json | 4 +- .../src/templates/config-template.handlebars | 7 +- .../src/templates/indexer-template.handlebars | 18 ++- .../src/templates/package-template.handlebars | 10 +- packages/graph-node/package.json | 10 +- packages/ipld-eth-client/package.json | 6 +- packages/ipld-eth-client/src/eth-client.ts | 1 + packages/peer/package.json | 2 +- packages/rpc-eth-client/package.json | 8 +- packages/rpc-eth-client/src/eth-client.ts | 32 +++-- packages/solidity-mapper/package.json | 2 +- packages/test/package.json | 2 +- packages/tracing-client/package.json | 2 +- packages/util/package.json | 8 +- packages/util/src/common.ts | 115 ++++++++---------- packages/util/src/config.ts | 3 +- packages/util/src/indexer.ts | 75 +++++++----- packages/util/src/types.ts | 6 +- 21 files changed, 179 insertions(+), 148 deletions(-) diff --git a/lerna.json b/lerna.json index fe2d9912..4203543b 100644 --- a/lerna.json +++ b/lerna.json @@ -2,7 +2,7 @@ "packages": [ "packages/*" ], - "version": "0.2.66", + "version": "0.2.67", "npmClient": "yarn", "useWorkspaces": true, "command": { diff --git a/packages/cache/package.json b/packages/cache/package.json index 53d3f6b7..93ac72d5 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cache", - "version": "0.2.66", + "version": "0.2.67", "description": "Generic object cache", "main": "dist/index.js", "scripts": { diff --git a/packages/cli/package.json b/packages/cli/package.json index 0cd3a80d..b479adf9 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cli", - "version": "0.2.66", + "version": "0.2.67", "main": "dist/index.js", "license": "AGPL-3.0", "scripts": { @@ -12,13 +12,13 @@ }, "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.66", - "@cerc-io/ipld-eth-client": "^0.2.66", + "@cerc-io/cache": "^0.2.67", + "@cerc-io/ipld-eth-client": "^0.2.67", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.66", - "@cerc-io/rpc-eth-client": "^0.2.66", - "@cerc-io/util": "^0.2.66", + "@cerc-io/peer": "^0.2.67", + "@cerc-io/rpc-eth-client": "^0.2.67", + "@cerc-io/util": "^0.2.67", "@ethersproject/providers": "^5.4.4", "@graphql-tools/utils": "^9.1.1", "@ipld/dag-cbor": "^8.0.0", diff --git a/packages/codegen/package.json b/packages/codegen/package.json index 83a1e212..26fabd83 100644 --- a/packages/codegen/package.json +++ b/packages/codegen/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/codegen", - "version": "0.2.66", + "version": "0.2.67", "description": "Code generator", "private": true, "main": "index.js", @@ -20,7 +20,7 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/util": "^0.2.66", + "@cerc-io/util": "^0.2.67", "@graphql-tools/load-files": "^6.5.2", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", "@solidity-parser/parser": "^0.13.2", diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 8a3d7da2..bf50e7e2 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -23,8 +23,11 @@ clearEntitiesCacheInterval = 1000 {{/if}} - # Boolean to filter logs by contract. - filterLogs = false + # Boolean to filter logs by contracts. + filterLogsByAddresses = false + + # Boolean to filter logs by topics. + filterLogsByTopics = false # Max block range for which to return events in eventsInRange GQL query. # Use -1 for skipping check on block range. diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 1983f6f7..134cd9ac 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -96,6 +96,7 @@ export class Indexer implements IndexerInterface { _abiMap: Map; _storageLayoutMap: Map; _contractMap: Map; + _eventSignaturesMap: Map; {{#if (subgraphPath)}} _entityTypesMap: Map; @@ -121,6 +122,9 @@ export class Indexer implements IndexerInterface { this._abiMap = new Map(); this._storageLayoutMap = new Map(); this._contractMap = new Map(); + this._eventSignaturesMap = new Map(); + let contractInterface: ethers.utils.Interface; + let eventSignatures: string[]; {{#each contracts as | contract |}} const { abi: {{contract.contractName}}ABI{{#if contract.contractStorageLayout}}, storageLayout: {{contract.contractName}}StorageLayout{{/if}} } = {{contract.contractName}}Artifacts; @@ -129,11 +133,19 @@ export class Indexer implements IndexerInterface { assert({{contract.contractName}}ABI); this._abiMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}ABI); + + contractInterface = new ethers.utils.Interface({{contract.contractName}}ABI); + this._contractMap.set(KIND_{{capitalize contract.contractName}}, contractInterface); + + eventSignatures = Object.values(contractInterface.events).map(value => { + return contractInterface.getEventTopic(value); + }); + this._eventSignaturesMap.set(KIND_{{capitalize contract.contractName}}, eventSignatures); {{#if contract.contractStorageLayout}} + assert({{contract.contractName}}StorageLayout); this._storageLayoutMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}StorageLayout); {{/if}} - this._contractMap.set(KIND_{{capitalize contract.contractName}}, new ethers.utils.Interface({{contract.contractName}}ABI)); {{/each}} {{#if (subgraphPath)}} @@ -642,7 +654,7 @@ export class Indexer implements IndexerInterface { } async fetchEventsAndSaveBlocks (blocks: DeepPartial[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial[] }[]> { - return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this.parseEventNameAndArgs.bind(this)); + return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); } async saveBlockAndFetchEvents (block: DeepPartial): Promise<[BlockProgress, DeepPartial[]]> { @@ -762,7 +774,7 @@ export class Indexer implements IndexerInterface { assert(blockHash); assert(blockNumber); - const dbEvents = await this._baseIndexer.fetchEvents(blockHash, blockNumber, this.parseEventNameAndArgs.bind(this)); + const dbEvents = await this._baseIndexer.fetchEvents(blockHash, blockNumber, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); const dbTx = await this._db.createTransactionRunner(); try { diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index db560dbe..86fb1174 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -41,12 +41,12 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.3.19", - "@cerc-io/cli": "^0.2.65", - "@cerc-io/ipld-eth-client": "^0.2.65", - "@cerc-io/solidity-mapper": "^0.2.65", - "@cerc-io/util": "^0.2.65", + "@cerc-io/cli": "^0.2.67", + "@cerc-io/ipld-eth-client": "^0.2.67", + "@cerc-io/solidity-mapper": "^0.2.67", + "@cerc-io/util": "^0.2.67", {{#if (subgraphPath)}} - "@cerc-io/graph-node": "^0.2.65", + "@cerc-io/graph-node": "^0.2.67", {{/if}} "@ethersproject/providers": "^5.4.4", "apollo-type-bigint": "^0.1.3", diff --git a/packages/graph-node/package.json b/packages/graph-node/package.json index 4f7ffaca..c0fcb4e3 100644 --- a/packages/graph-node/package.json +++ b/packages/graph-node/package.json @@ -1,10 +1,10 @@ { "name": "@cerc-io/graph-node", - "version": "0.2.66", + "version": "0.2.67", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { - "@cerc-io/solidity-mapper": "^0.2.66", + "@cerc-io/solidity-mapper": "^0.2.67", "@ethersproject/providers": "^5.4.4", "@graphprotocol/graph-ts": "^0.22.0", "@nomiclabs/hardhat-ethers": "^2.0.2", @@ -51,9 +51,9 @@ "dependencies": { "@apollo/client": "^3.3.19", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", - "@cerc-io/cache": "^0.2.66", - "@cerc-io/ipld-eth-client": "^0.2.66", - "@cerc-io/util": "^0.2.66", + "@cerc-io/cache": "^0.2.67", + "@cerc-io/ipld-eth-client": "^0.2.67", + "@cerc-io/util": "^0.2.67", "@types/json-diff": "^0.5.2", "@types/yargs": "^17.0.0", "bn.js": "^4.11.9", diff --git a/packages/ipld-eth-client/package.json b/packages/ipld-eth-client/package.json index b2c767f4..b8c84fff 100644 --- a/packages/ipld-eth-client/package.json +++ b/packages/ipld-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/ipld-eth-client", - "version": "0.2.66", + "version": "0.2.67", "description": "IPLD ETH Client", "main": "dist/index.js", "scripts": { @@ -20,8 +20,8 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.66", - "@cerc-io/util": "^0.2.66", + "@cerc-io/cache": "^0.2.67", + "@cerc-io/util": "^0.2.67", "cross-fetch": "^3.1.4", "debug": "^4.3.1", "ethers": "^5.4.4", diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index 815ce291..351d38b8 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -133,6 +133,7 @@ export class EthClient implements EthClientInterface { }; } + // TODO: Support filtering logs using topics async getLogs (vars: Vars): Promise { console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); const result = await this._getCachedOrFetch('getLogs', vars); diff --git a/packages/peer/package.json b/packages/peer/package.json index bcc60a3b..94dcd342 100644 --- a/packages/peer/package.json +++ b/packages/peer/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/peer", - "version": "0.2.66", + "version": "0.2.67", "description": "libp2p module", "main": "dist/index.js", "exports": "./dist/index.js", diff --git a/packages/rpc-eth-client/package.json b/packages/rpc-eth-client/package.json index ee26a478..f85dee59 100644 --- a/packages/rpc-eth-client/package.json +++ b/packages/rpc-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/rpc-eth-client", - "version": "0.2.66", + "version": "0.2.67", "description": "RPC ETH Client", "main": "dist/index.js", "scripts": { @@ -19,9 +19,9 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/cache": "^0.2.66", - "@cerc-io/ipld-eth-client": "^0.2.66", - "@cerc-io/util": "^0.2.66", + "@cerc-io/cache": "^0.2.67", + "@cerc-io/ipld-eth-client": "^0.2.67", + "@cerc-io/util": "^0.2.67", "chai": "^4.3.4", "ethers": "^5.4.4", "left-pad": "^1.3.0", diff --git a/packages/rpc-eth-client/src/eth-client.ts b/packages/rpc-eth-client/src/eth-client.ts index 0da998f0..ae796e38 100644 --- a/packages/rpc-eth-client/src/eth-client.ts +++ b/packages/rpc-eth-client/src/eth-client.ts @@ -21,6 +21,7 @@ interface Vars { contract?: string; slot?: string; addresses?: string[]; + topics?: string[][]; fromBlock?: number; toBlock?: number; } @@ -235,12 +236,18 @@ export class EthClient implements EthClientInterface { async getLogs (vars: { blockHash: string, blockNumber: string, - addresses?: string[] + addresses?: string[], + topics?: string[][] }): Promise { const blockNumber = Number(vars.blockNumber); console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); - const result = await this._getLogs({ fromBlock: blockNumber, toBlock: blockNumber, addresses: vars.addresses }); + const result = await this._getLogs({ + fromBlock: blockNumber, + toBlock: blockNumber, + addresses: vars.addresses, + topics: vars.topics + }); console.timeEnd(`time:eth-client#getLogs-${JSON.stringify(vars)}`); return result; @@ -249,10 +256,16 @@ export class EthClient implements EthClientInterface { async getLogsForBlockRange (vars: { fromBlock?: number, toBlock?: number, - addresses?: string[] + addresses?: string[], + topics?: string[][] }): Promise { console.time(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`); - const result = await this._getLogs({ fromBlock: Number(vars.fromBlock), toBlock: Number(vars.toBlock), addresses: vars.addresses }); + const result = await this._getLogs({ + fromBlock: Number(vars.fromBlock), + toBlock: Number(vars.toBlock), + addresses: vars.addresses, + topics: vars.topics + }); console.timeEnd(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`); return result; @@ -262,9 +275,10 @@ export class EthClient implements EthClientInterface { async _getLogs (vars: { fromBlock?: number, toBlock?: number, - addresses?: string[] + addresses?: string[], + topics?: string[][] }): Promise { - const { fromBlock, toBlock, addresses = [] } = vars; + const { fromBlock, toBlock, addresses = [], topics } = vars; const result = await this._getCachedOrFetch( 'getLogs', @@ -273,7 +287,8 @@ export class EthClient implements EthClientInterface { const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({ fromBlock, toBlock, - address + address, + topics })); const logsByAddress = await Promise.all(logsByAddressPromises); let logs = logsByAddress.flat(); @@ -282,7 +297,8 @@ export class EthClient implements EthClientInterface { if (!addresses.length) { logs = await this._provider.getLogs({ fromBlock, - toBlock + toBlock, + topics }); } diff --git a/packages/solidity-mapper/package.json b/packages/solidity-mapper/package.json index 392cd73c..7cbe572d 100644 --- a/packages/solidity-mapper/package.json +++ b/packages/solidity-mapper/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/solidity-mapper", - "version": "0.2.66", + "version": "0.2.67", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { diff --git a/packages/test/package.json b/packages/test/package.json index 99f9ad07..085c84c5 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/test", - "version": "0.2.66", + "version": "0.2.67", "main": "dist/index.js", "license": "AGPL-3.0", "private": true, diff --git a/packages/tracing-client/package.json b/packages/tracing-client/package.json index 3895edd7..f40a1ef1 100644 --- a/packages/tracing-client/package.json +++ b/packages/tracing-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/tracing-client", - "version": "0.2.66", + "version": "0.2.67", "description": "ETH VM tracing client", "main": "dist/index.js", "scripts": { diff --git a/packages/util/package.json b/packages/util/package.json index a3d4932d..dd7bdead 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -1,13 +1,13 @@ { "name": "@cerc-io/util", - "version": "0.2.66", + "version": "0.2.67", "main": "dist/index.js", "license": "AGPL-3.0", "dependencies": { "@apollo/utils.keyvaluecache": "^1.0.1", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.66", - "@cerc-io/solidity-mapper": "^0.2.66", + "@cerc-io/peer": "^0.2.67", + "@cerc-io/solidity-mapper": "^0.2.67", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@ethersproject/properties": "^5.7.0", "@ethersproject/providers": "^5.4.4", @@ -51,7 +51,7 @@ "yargs": "^17.0.1" }, "devDependencies": { - "@cerc-io/cache": "^0.2.66", + "@cerc-io/cache": "^0.2.67", "@nomiclabs/hardhat-waffle": "^2.0.1", "@types/bunyan": "^1.8.8", "@types/express": "^4.17.14", diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 150c597f..8860bec4 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -258,9 +258,9 @@ export const _fetchBatchBlocks = async ( export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise => { let dbBlock: BlockProgressInterface, dbEvents: EventInterface[]; if (subgraphEventsOrder) { - ({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch)); + ({ dbBlock, dbEvents } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } else { - ({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch)); + ({ dbBlock, dbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH)); } if (indexer.processBlockAfterEvents) { @@ -279,25 +279,20 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents'); }; -export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { +const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { const dbEvents: EventInterface[] = []; - let page = 0; - // Check if block processing is complete. - while (block.numProcessedEvents < block.numEvents) { + let page = 0; + let numFetchedEvents = 0; + + // Check if we are out of events. + while (numFetchedEvents < block.numEvents) { console.time('time:common#processEvents-fetching_events_batch'); // Fetch events in batches - const events = await indexer.getBlockEvents( - block.blockHash, - {}, - { - skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH), - limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); + const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page); + page++; + numFetchedEvents += events.length; console.timeEnd('time:common#processEvents-fetching_events_batch'); @@ -308,7 +303,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr console.time('time:common#processEvents-processing_events_batch'); // Process events in loop - for (const event of events) { + for (let event of events) { // Skipping check for order of events processing since logIndex in FEVM is not index of log in block // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log // if (event.index <= block.lastProcessedEventIndex) { @@ -321,20 +316,8 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSONbigNative.parse(event.extraInfo); - - assert(indexer.parseEventNameAndArgs); - assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - - event.eventName = eventName; - event.eventInfo = JSONbigNative.stringify(eventInfo); - event.extraInfo = JSONbigNative.stringify({ - ...logObj, - eventSignature - }); - - // Save updated event to the db + // Parse the unknown event and save updated event to the db + event = _parseUnknownEvent(indexer, event, watchedContract.kind); dbEvents.push(event); } @@ -351,30 +334,23 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr return { dbBlock: block, dbEvents }; }; -export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { +const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { // Create list of initially watched contracts const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address); const unwatchedContractEvents: EventInterface[] = []; const dbEvents: EventInterface[] = []; + let page = 0; + let numFetchedEvents = 0; // Check if we are out of events. - let numFetchedEvents = 0; while (numFetchedEvents < block.numEvents) { console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch'); // Fetch events in batches - const events = await indexer.getBlockEvents( - block.blockHash, - {}, - { - skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH), - limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, - orderBy: 'index', - orderDirection: OrderDirection.asc - } - ); + const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page); + page++; numFetchedEvents += events.length; console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch'); @@ -397,12 +373,6 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl // Process known events in a loop for (const event of watchedContractEvents) { - // Skipping check for order of events processing since logIndex in FEVM is not index of log in block - // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log - // if (event.index <= block.lastProcessedEventIndex) { - // throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); - // } - await indexer.processEvent(event); block.lastProcessedEventIndex = event.index; @@ -415,27 +385,15 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events'); // At last, process all the events of newly watched contracts - for (const event of unwatchedContractEvents) { + for (let event of unwatchedContractEvents) { const watchedContract = indexer.isWatchedContract(event.contract); if (watchedContract) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { - const logObj = JSONbigNative.parse(event.extraInfo); - - assert(indexer.parseEventNameAndArgs); - assert(typeof watchedContract !== 'boolean'); - const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); - - event.eventName = eventName; - event.eventInfo = JSONbigNative.stringify(eventInfo); - event.extraInfo = JSONbigNative.stringify({ - ...logObj, - eventSignature - }); - - // Save updated event to the db + // Parse the unknown event and save updated event to the db + event = _parseUnknownEvent(indexer, event, watchedContract.kind); dbEvents.push(event); } @@ -451,6 +409,35 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl return { dbBlock: block, dbEvents }; }; +const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eventsInBatch: number, page: number): Promise => { + return indexer.getBlockEvents( + blockHash, + {}, + { + skip: page * eventsInBatch, + limit: eventsInBatch, + orderBy: 'index', + orderDirection: OrderDirection.asc + } + ); +}; + +const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, contractKind: string): EventInterface => { + const logObj = JSONbigNative.parse(event.extraInfo); + + assert(indexer.parseEventNameAndArgs); + const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(contractKind, logObj); + + event.eventName = eventName; + event.eventInfo = JSONbigNative.stringify(eventInfo); + event.extraInfo = JSONbigNative.stringify({ + ...logObj, + eventSignature + }); + + return event; +}; + /** * Create pruning job in QUEUE_BLOCK_PROCESSING. * @param jobQueue diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index e74d8010..06faf4cf 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -200,7 +200,8 @@ export interface ServerConfig { subgraphPath: string; enableState: boolean; wasmRestartBlocksInterval: number; - filterLogs: boolean; + filterLogsByAddresses: boolean; + filterLogsByTopics: boolean; maxEventsBlockRange: number; clearEntitiesCacheInterval: number; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index cc3c7c00..bced3783 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -265,7 +265,7 @@ export class Indexer { // For each of the given blocks, fetches events and saves them along with the block to db // Returns an array with [block, events] for all the given blocks - async fetchEventsAndSaveBlocks (blocks: DeepPartial[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> { + async fetchEventsAndSaveBlocks (blocks: DeepPartial[], eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> { if (!blocks.length) { return []; } @@ -274,7 +274,7 @@ export class Indexer { const toBlock = blocks[blocks.length - 1].blockNumber; log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`); - const dbEventsMap = await this.fetchEventsForBlocks(blocks, parseEventNameAndArgs); + const dbEventsMap = await this.fetchEventsForBlocks(blocks, eventSignaturesMap, parseEventNameAndArgs); const blocksWithEventsPromises = blocks.map(async block => { const blockHash = block.blockHash; @@ -291,31 +291,25 @@ export class Indexer { } // Fetch events (to be saved to db) for a block range - async fetchEventsForBlocks (blocks: DeepPartial[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]>> { + async fetchEventsForBlocks (blocks: DeepPartial[], eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]>> { if (!blocks.length) { return new Map(); } // Fetch logs for block range of given blocks - let logs: any; const fromBlock = blocks[0].blockNumber; const toBlock = blocks[blocks.length - 1].blockNumber; assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient'); - if (this._serverConfig.filterLogs) { - const watchedContracts = this.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - ({ logs } = await this._ethClient.getLogsForBlockRange({ - fromBlock, - toBlock, - addresses - })); - } else { - ({ logs } = await this._ethClient.getLogsForBlockRange({ fromBlock, toBlock })); - } + const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); + + const { logs } = await this._ethClient.getLogsForBlockRange({ + fromBlock, + toBlock, + addresses, + topics + }); // Skip further processing if no relevant logs found in the entire block range if (!logs.length) { @@ -380,23 +374,15 @@ export class Indexer { } // Fetch events (to be saved to db) for a particular block - async fetchEvents (blockHash: string, blockNumber: number, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]> { - let logsPromise: Promise; + async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]> { + const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); - if (this._serverConfig.filterLogs) { - const watchedContracts = this.getWatchedContracts(); - const addresses = watchedContracts.map((watchedContract): string => { - return watchedContract.address; - }); - - logsPromise = this._ethClient.getLogs({ - blockHash, - blockNumber: blockNumber.toString(), - addresses - }); - } else { - logsPromise = this._ethClient.getLogs({ blockHash, blockNumber: blockNumber.toString() }); - } + const logsPromise = await this._ethClient.getLogs({ + blockHash, + blockNumber: blockNumber.toString(), + addresses, + topics + }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber }); @@ -1198,6 +1184,29 @@ export class Indexer { this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus); } + _createLogsFilters (eventSignaturesMap: Map): { addresses: string[] | undefined, topics: string[][] | undefined } { + let addresses: string[] | undefined; + let eventSignatures: string[] | undefined; + + if (this._serverConfig.filterLogsByAddresses) { + const watchedContracts = this.getWatchedContracts(); + addresses = watchedContracts.map((watchedContract): string => { + return watchedContract.address; + }); + } + + if (this._serverConfig.filterLogsByTopics) { + const eventSignaturesSet = new Set(); + eventSignaturesMap.forEach(sigs => sigs.forEach(sig => { + eventSignaturesSet.add(sig); + })); + + eventSignatures = Array.from(eventSignaturesSet); + } + + return { addresses, topics: eventSignatures && [eventSignatures] }; + } + parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } { const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => { acc[input.name] = this._parseLogArg(input, logDescription.args[index]); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 839e4181..89c73b5b 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -220,12 +220,14 @@ export interface EthClient { getLogs(vars: { blockHash: string, blockNumber: string, - addresses?: string[] + addresses?: string[], + topics?: string[][] }): Promise; getLogsForBlockRange?: (vars: { fromBlock?: number, toBlock?: number, - addresses?: string[] + addresses?: string[], + topics?: string[][] }) => Promise; }