Support filtering by topics when fetching logs in rpc-eth-client (#441)

* Support filtering by topics when fetching logs

* Include event signatures from all contracts in topics list

* Refactor common code

* Update package versions
This commit is contained in:
prathamesh0 2023-10-26 17:25:56 +05:30 committed by GitHub
parent 43463af1f2
commit a8e59eb6b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 179 additions and 148 deletions

View File

@ -2,7 +2,7 @@
"packages": [ "packages": [
"packages/*" "packages/*"
], ],
"version": "0.2.66", "version": "0.2.67",
"npmClient": "yarn", "npmClient": "yarn",
"useWorkspaces": true, "useWorkspaces": true,
"command": { "command": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/cache", "name": "@cerc-io/cache",
"version": "0.2.66", "version": "0.2.67",
"description": "Generic object cache", "description": "Generic object cache",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/cli", "name": "@cerc-io/cli",
"version": "0.2.66", "version": "0.2.67",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"scripts": { "scripts": {
@ -12,13 +12,13 @@
}, },
"dependencies": { "dependencies": {
"@apollo/client": "^3.7.1", "@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.66", "@cerc-io/cache": "^0.2.67",
"@cerc-io/ipld-eth-client": "^0.2.66", "@cerc-io/ipld-eth-client": "^0.2.67",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.15", "@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.66", "@cerc-io/peer": "^0.2.67",
"@cerc-io/rpc-eth-client": "^0.2.66", "@cerc-io/rpc-eth-client": "^0.2.67",
"@cerc-io/util": "^0.2.66", "@cerc-io/util": "^0.2.67",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1", "@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0", "@ipld/dag-cbor": "^8.0.0",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/codegen", "name": "@cerc-io/codegen",
"version": "0.2.66", "version": "0.2.67",
"description": "Code generator", "description": "Code generator",
"private": true, "private": true,
"main": "index.js", "main": "index.js",
@ -20,7 +20,7 @@
}, },
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@cerc-io/util": "^0.2.66", "@cerc-io/util": "^0.2.67",
"@graphql-tools/load-files": "^6.5.2", "@graphql-tools/load-files": "^6.5.2",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",
"@solidity-parser/parser": "^0.13.2", "@solidity-parser/parser": "^0.13.2",

View File

@ -23,8 +23,11 @@
clearEntitiesCacheInterval = 1000 clearEntitiesCacheInterval = 1000
{{/if}} {{/if}}
# Boolean to filter logs by contract. # Boolean to filter logs by contracts.
filterLogs = false filterLogsByAddresses = false
# Boolean to filter logs by topics.
filterLogsByTopics = false
# Max block range for which to return events in eventsInRange GQL query. # Max block range for which to return events in eventsInRange GQL query.
# Use -1 for skipping check on block range. # Use -1 for skipping check on block range.

View File

@ -96,6 +96,7 @@ export class Indexer implements IndexerInterface {
_abiMap: Map<string, JsonFragment[]>; _abiMap: Map<string, JsonFragment[]>;
_storageLayoutMap: Map<string, StorageLayout>; _storageLayoutMap: Map<string, StorageLayout>;
_contractMap: Map<string, ethers.utils.Interface>; _contractMap: Map<string, ethers.utils.Interface>;
_eventSignaturesMap: Map<string, string[]>;
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
_entityTypesMap: Map<string, { [key: string]: string }>; _entityTypesMap: Map<string, { [key: string]: string }>;
@ -121,6 +122,9 @@ export class Indexer implements IndexerInterface {
this._abiMap = new Map(); this._abiMap = new Map();
this._storageLayoutMap = new Map(); this._storageLayoutMap = new Map();
this._contractMap = new Map(); this._contractMap = new Map();
this._eventSignaturesMap = new Map();
let contractInterface: ethers.utils.Interface;
let eventSignatures: string[];
{{#each contracts as | contract |}} {{#each contracts as | contract |}}
const { abi: {{contract.contractName}}ABI{{#if contract.contractStorageLayout}}, storageLayout: {{contract.contractName}}StorageLayout{{/if}} } = {{contract.contractName}}Artifacts; const { abi: {{contract.contractName}}ABI{{#if contract.contractStorageLayout}}, storageLayout: {{contract.contractName}}StorageLayout{{/if}} } = {{contract.contractName}}Artifacts;
@ -129,11 +133,19 @@ export class Indexer implements IndexerInterface {
assert({{contract.contractName}}ABI); assert({{contract.contractName}}ABI);
this._abiMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}ABI); this._abiMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}ABI);
contractInterface = new ethers.utils.Interface({{contract.contractName}}ABI);
this._contractMap.set(KIND_{{capitalize contract.contractName}}, contractInterface);
eventSignatures = Object.values(contractInterface.events).map(value => {
return contractInterface.getEventTopic(value);
});
this._eventSignaturesMap.set(KIND_{{capitalize contract.contractName}}, eventSignatures);
{{#if contract.contractStorageLayout}} {{#if contract.contractStorageLayout}}
assert({{contract.contractName}}StorageLayout); assert({{contract.contractName}}StorageLayout);
this._storageLayoutMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}StorageLayout); this._storageLayoutMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}StorageLayout);
{{/if}} {{/if}}
this._contractMap.set(KIND_{{capitalize contract.contractName}}, new ethers.utils.Interface({{contract.contractName}}ABI));
{{/each}} {{/each}}
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
@ -642,7 +654,7 @@ export class Indexer implements IndexerInterface {
} }
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> { async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> {
return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this.parseEventNameAndArgs.bind(this)); return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
} }
async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
@ -762,7 +774,7 @@ export class Indexer implements IndexerInterface {
assert(blockHash); assert(blockHash);
assert(blockNumber); assert(blockNumber);
const dbEvents = await this._baseIndexer.fetchEvents(blockHash, blockNumber, this.parseEventNameAndArgs.bind(this)); const dbEvents = await this._baseIndexer.fetchEvents(blockHash, blockNumber, this._eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {

View File

@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.65", "@cerc-io/cli": "^0.2.67",
"@cerc-io/ipld-eth-client": "^0.2.65", "@cerc-io/ipld-eth-client": "^0.2.67",
"@cerc-io/solidity-mapper": "^0.2.65", "@cerc-io/solidity-mapper": "^0.2.67",
"@cerc-io/util": "^0.2.65", "@cerc-io/util": "^0.2.67",
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.65", "@cerc-io/graph-node": "^0.2.67",
{{/if}} {{/if}}
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"apollo-type-bigint": "^0.1.3", "apollo-type-bigint": "^0.1.3",

View File

@ -1,10 +1,10 @@
{ {
"name": "@cerc-io/graph-node", "name": "@cerc-io/graph-node",
"version": "0.2.66", "version": "0.2.67",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"devDependencies": { "devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.66", "@cerc-io/solidity-mapper": "^0.2.67",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0", "@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2", "@nomiclabs/hardhat-ethers": "^2.0.2",
@ -51,9 +51,9 @@
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.66", "@cerc-io/cache": "^0.2.67",
"@cerc-io/ipld-eth-client": "^0.2.66", "@cerc-io/ipld-eth-client": "^0.2.67",
"@cerc-io/util": "^0.2.66", "@cerc-io/util": "^0.2.67",
"@types/json-diff": "^0.5.2", "@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0", "@types/yargs": "^17.0.0",
"bn.js": "^4.11.9", "bn.js": "^4.11.9",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/ipld-eth-client", "name": "@cerc-io/ipld-eth-client",
"version": "0.2.66", "version": "0.2.67",
"description": "IPLD ETH Client", "description": "IPLD ETH Client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {
@ -20,8 +20,8 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.7.1", "@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.66", "@cerc-io/cache": "^0.2.67",
"@cerc-io/util": "^0.2.66", "@cerc-io/util": "^0.2.67",
"cross-fetch": "^3.1.4", "cross-fetch": "^3.1.4",
"debug": "^4.3.1", "debug": "^4.3.1",
"ethers": "^5.4.4", "ethers": "^5.4.4",

View File

@ -133,6 +133,7 @@ export class EthClient implements EthClientInterface {
}; };
} }
// TODO: Support filtering logs using topics
async getLogs (vars: Vars): Promise<any> { async getLogs (vars: Vars): Promise<any> {
console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
const result = await this._getCachedOrFetch('getLogs', vars); const result = await this._getCachedOrFetch('getLogs', vars);

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/peer", "name": "@cerc-io/peer",
"version": "0.2.66", "version": "0.2.67",
"description": "libp2p module", "description": "libp2p module",
"main": "dist/index.js", "main": "dist/index.js",
"exports": "./dist/index.js", "exports": "./dist/index.js",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/rpc-eth-client", "name": "@cerc-io/rpc-eth-client",
"version": "0.2.66", "version": "0.2.67",
"description": "RPC ETH Client", "description": "RPC ETH Client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {
@ -19,9 +19,9 @@
}, },
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@cerc-io/cache": "^0.2.66", "@cerc-io/cache": "^0.2.67",
"@cerc-io/ipld-eth-client": "^0.2.66", "@cerc-io/ipld-eth-client": "^0.2.67",
"@cerc-io/util": "^0.2.66", "@cerc-io/util": "^0.2.67",
"chai": "^4.3.4", "chai": "^4.3.4",
"ethers": "^5.4.4", "ethers": "^5.4.4",
"left-pad": "^1.3.0", "left-pad": "^1.3.0",

View File

@ -21,6 +21,7 @@ interface Vars {
contract?: string; contract?: string;
slot?: string; slot?: string;
addresses?: string[]; addresses?: string[];
topics?: string[][];
fromBlock?: number; fromBlock?: number;
toBlock?: number; toBlock?: number;
} }
@ -235,12 +236,18 @@ export class EthClient implements EthClientInterface {
async getLogs (vars: { async getLogs (vars: {
blockHash: string, blockHash: string,
blockNumber: string, blockNumber: string,
addresses?: string[] addresses?: string[],
topics?: string[][]
}): Promise<any> { }): Promise<any> {
const blockNumber = Number(vars.blockNumber); const blockNumber = Number(vars.blockNumber);
console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
const result = await this._getLogs({ fromBlock: blockNumber, toBlock: blockNumber, addresses: vars.addresses }); const result = await this._getLogs({
fromBlock: blockNumber,
toBlock: blockNumber,
addresses: vars.addresses,
topics: vars.topics
});
console.timeEnd(`time:eth-client#getLogs-${JSON.stringify(vars)}`); console.timeEnd(`time:eth-client#getLogs-${JSON.stringify(vars)}`);
return result; return result;
@ -249,10 +256,16 @@ export class EthClient implements EthClientInterface {
async getLogsForBlockRange (vars: { async getLogsForBlockRange (vars: {
fromBlock?: number, fromBlock?: number,
toBlock?: number, toBlock?: number,
addresses?: string[] addresses?: string[],
topics?: string[][]
}): Promise<any> { }): Promise<any> {
console.time(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`); console.time(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`);
const result = await this._getLogs({ fromBlock: Number(vars.fromBlock), toBlock: Number(vars.toBlock), addresses: vars.addresses }); const result = await this._getLogs({
fromBlock: Number(vars.fromBlock),
toBlock: Number(vars.toBlock),
addresses: vars.addresses,
topics: vars.topics
});
console.timeEnd(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`); console.timeEnd(`time:eth-client#getLogsForBlockRange-${JSON.stringify(vars)}`);
return result; return result;
@ -262,9 +275,10 @@ export class EthClient implements EthClientInterface {
async _getLogs (vars: { async _getLogs (vars: {
fromBlock?: number, fromBlock?: number,
toBlock?: number, toBlock?: number,
addresses?: string[] addresses?: string[],
topics?: string[][]
}): Promise<any> { }): Promise<any> {
const { fromBlock, toBlock, addresses = [] } = vars; const { fromBlock, toBlock, addresses = [], topics } = vars;
const result = await this._getCachedOrFetch( const result = await this._getCachedOrFetch(
'getLogs', 'getLogs',
@ -273,7 +287,8 @@ export class EthClient implements EthClientInterface {
const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({ const logsByAddressPromises = addresses?.map(address => this._provider.getLogs({
fromBlock, fromBlock,
toBlock, toBlock,
address address,
topics
})); }));
const logsByAddress = await Promise.all(logsByAddressPromises); const logsByAddress = await Promise.all(logsByAddressPromises);
let logs = logsByAddress.flat(); let logs = logsByAddress.flat();
@ -282,7 +297,8 @@ export class EthClient implements EthClientInterface {
if (!addresses.length) { if (!addresses.length) {
logs = await this._provider.getLogs({ logs = await this._provider.getLogs({
fromBlock, fromBlock,
toBlock toBlock,
topics
}); });
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/solidity-mapper", "name": "@cerc-io/solidity-mapper",
"version": "0.2.66", "version": "0.2.67",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"devDependencies": { "devDependencies": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/test", "name": "@cerc-io/test",
"version": "0.2.66", "version": "0.2.67",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"private": true, "private": true,

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/tracing-client", "name": "@cerc-io/tracing-client",
"version": "0.2.66", "version": "0.2.67",
"description": "ETH VM tracing client", "description": "ETH VM tracing client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {

View File

@ -1,13 +1,13 @@
{ {
"name": "@cerc-io/util", "name": "@cerc-io/util",
"version": "0.2.66", "version": "0.2.67",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"dependencies": { "dependencies": {
"@apollo/utils.keyvaluecache": "^1.0.1", "@apollo/utils.keyvaluecache": "^1.0.1",
"@cerc-io/nitro-node": "^0.1.15", "@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.66", "@cerc-io/peer": "^0.2.67",
"@cerc-io/solidity-mapper": "^0.2.66", "@cerc-io/solidity-mapper": "^0.2.67",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"@ethersproject/properties": "^5.7.0", "@ethersproject/properties": "^5.7.0",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
@ -51,7 +51,7 @@
"yargs": "^17.0.1" "yargs": "^17.0.1"
}, },
"devDependencies": { "devDependencies": {
"@cerc-io/cache": "^0.2.66", "@cerc-io/cache": "^0.2.67",
"@nomiclabs/hardhat-waffle": "^2.0.1", "@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/bunyan": "^1.8.8", "@types/bunyan": "^1.8.8",
"@types/express": "^4.17.14", "@types/express": "^4.17.14",

View File

@ -258,9 +258,9 @@ export const _fetchBatchBlocks = async (
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => { export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => {
let dbBlock: BlockProgressInterface, dbEvents: EventInterface[]; let dbBlock: BlockProgressInterface, dbEvents: EventInterface[];
if (subgraphEventsOrder) { if (subgraphEventsOrder) {
({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch)); ({ dbBlock, dbEvents } = await _processEventsInSubgraphOrder(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} else { } else {
({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch)); ({ dbBlock, dbEvents } = await _processEvents(indexer, block, eventsInBatch || DEFAULT_EVENTS_IN_BATCH));
} }
if (indexer.processBlockAfterEvents) { if (indexer.processBlockAfterEvents) {
@ -279,25 +279,20 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents'); console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents');
}; };
export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { const _processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const dbEvents: EventInterface[] = []; const dbEvents: EventInterface[] = [];
let page = 0;
// Check if block processing is complete. let page = 0;
while (block.numProcessedEvents < block.numEvents) { let numFetchedEvents = 0;
// Check if we are out of events.
while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEvents-fetching_events_batch'); console.time('time:common#processEvents-fetching_events_batch');
// Fetch events in batches // Fetch events in batches
const events = await indexer.getBlockEvents( const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page);
block.blockHash, page++;
{}, numFetchedEvents += events.length;
{
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
console.timeEnd('time:common#processEvents-fetching_events_batch'); console.timeEnd('time:common#processEvents-fetching_events_batch');
@ -308,7 +303,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
console.time('time:common#processEvents-processing_events_batch'); console.time('time:common#processEvents-processing_events_batch');
// Process events in loop // Process events in loop
for (const event of events) { for (let event of events) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block // Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) { // if (event.index <= block.lastProcessedEventIndex) {
@ -321,20 +316,8 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
// We might not have parsed this event yet. This can happen if the contract was added // We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block. // as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) { if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSONbigNative.parse(event.extraInfo); // Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});
// Save updated event to the db
dbEvents.push(event); dbEvents.push(event);
} }
@ -351,30 +334,23 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
return { dbBlock: block, dbEvents }; return { dbBlock: block, dbEvents };
}; };
export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
// Create list of initially watched contracts // Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address); const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = []; const unwatchedContractEvents: EventInterface[] = [];
const dbEvents: EventInterface[] = []; const dbEvents: EventInterface[] = [];
let page = 0; let page = 0;
let numFetchedEvents = 0;
// Check if we are out of events. // Check if we are out of events.
let numFetchedEvents = 0;
while (numFetchedEvents < block.numEvents) { while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch'); console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch');
// Fetch events in batches // Fetch events in batches
const events = await indexer.getBlockEvents( const events = await _getEventsBatch(indexer, block.blockHash, eventsInBatch, page);
block.blockHash, page++;
{},
{
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
numFetchedEvents += events.length; numFetchedEvents += events.length;
console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch'); console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch');
@ -397,12 +373,6 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
// Process known events in a loop // Process known events in a loop
for (const event of watchedContractEvents) { for (const event of watchedContractEvents) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) {
// throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
// }
await indexer.processEvent(event); await indexer.processEvent(event);
block.lastProcessedEventIndex = event.index; block.lastProcessedEventIndex = event.index;
@ -415,27 +385,15 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events'); console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events');
// At last, process all the events of newly watched contracts // At last, process all the events of newly watched contracts
for (const event of unwatchedContractEvents) { for (let event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract); const watchedContract = indexer.isWatchedContract(event.contract);
if (watchedContract) { if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added // We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block. // as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) { if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSONbigNative.parse(event.extraInfo); // Parse the unknown event and save updated event to the db
event = _parseUnknownEvent(indexer, event, watchedContract.kind);
assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});
// Save updated event to the db
dbEvents.push(event); dbEvents.push(event);
} }
@ -451,6 +409,35 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
return { dbBlock: block, dbEvents }; return { dbBlock: block, dbEvents };
}; };
const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eventsInBatch: number, page: number): Promise<EventInterface[]> => {
return indexer.getBlockEvents(
blockHash,
{},
{
skip: page * eventsInBatch,
limit: eventsInBatch,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
};
const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, contractKind: string): EventInterface => {
const logObj = JSONbigNative.parse(event.extraInfo);
assert(indexer.parseEventNameAndArgs);
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(contractKind, logObj);
event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});
return event;
};
/** /**
* Create pruning job in QUEUE_BLOCK_PROCESSING. * Create pruning job in QUEUE_BLOCK_PROCESSING.
* @param jobQueue * @param jobQueue

View File

@ -200,7 +200,8 @@ export interface ServerConfig {
subgraphPath: string; subgraphPath: string;
enableState: boolean; enableState: boolean;
wasmRestartBlocksInterval: number; wasmRestartBlocksInterval: number;
filterLogs: boolean; filterLogsByAddresses: boolean;
filterLogsByTopics: boolean;
maxEventsBlockRange: number; maxEventsBlockRange: number;
clearEntitiesCacheInterval: number; clearEntitiesCacheInterval: number;

View File

@ -265,7 +265,7 @@ export class Indexer {
// For each of the given blocks, fetches events and saves them along with the block to db // For each of the given blocks, fetches events and saves them along with the block to db
// Returns an array with [block, events] for all the given blocks // Returns an array with [block, events] for all the given blocks
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> { async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
if (!blocks.length) { if (!blocks.length) {
return []; return [];
} }
@ -274,7 +274,7 @@ export class Indexer {
const toBlock = blocks[blocks.length - 1].blockNumber; const toBlock = blocks[blocks.length - 1].blockNumber;
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`); log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`);
const dbEventsMap = await this.fetchEventsForBlocks(blocks, parseEventNameAndArgs); const dbEventsMap = await this.fetchEventsForBlocks(blocks, eventSignaturesMap, parseEventNameAndArgs);
const blocksWithEventsPromises = blocks.map(async block => { const blocksWithEventsPromises = blocks.map(async block => {
const blockHash = block.blockHash; const blockHash = block.blockHash;
@ -291,31 +291,25 @@ export class Indexer {
} }
// Fetch events (to be saved to db) for a block range // Fetch events (to be saved to db) for a block range
async fetchEventsForBlocks (blocks: DeepPartial<BlockProgressInterface>[], parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<Map<string, DeepPartial<EventInterface>[]>> { async fetchEventsForBlocks (blocks: DeepPartial<BlockProgressInterface>[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<Map<string, DeepPartial<EventInterface>[]>> {
if (!blocks.length) { if (!blocks.length) {
return new Map(); return new Map();
} }
// Fetch logs for block range of given blocks // Fetch logs for block range of given blocks
let logs: any;
const fromBlock = blocks[0].blockNumber; const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber; const toBlock = blocks[blocks.length - 1].blockNumber;
assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient'); assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');
if (this._serverConfig.filterLogs) {
const watchedContracts = this.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
({ logs } = await this._ethClient.getLogsForBlockRange({ const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
const { logs } = await this._ethClient.getLogsForBlockRange({
fromBlock, fromBlock,
toBlock, toBlock,
addresses addresses,
})); topics
} else { });
({ logs } = await this._ethClient.getLogsForBlockRange({ fromBlock, toBlock }));
}
// Skip further processing if no relevant logs found in the entire block range // Skip further processing if no relevant logs found in the entire block range
if (!logs.length) { if (!logs.length) {
@ -380,23 +374,15 @@ export class Indexer {
} }
// Fetch events (to be saved to db) for a particular block // Fetch events (to be saved to db) for a particular block
async fetchEvents (blockHash: string, blockNumber: number, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> { async fetchEvents (blockHash: string, blockNumber: number, eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
let logsPromise: Promise<any>; const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
if (this._serverConfig.filterLogs) { const logsPromise = await this._ethClient.getLogs({
const watchedContracts = this.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
logsPromise = this._ethClient.getLogs({
blockHash, blockHash,
blockNumber: blockNumber.toString(), blockNumber: blockNumber.toString(),
addresses addresses,
topics
}); });
} else {
logsPromise = this._ethClient.getLogs({ blockHash, blockNumber: blockNumber.toString() });
}
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber }); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash, blockNumber });
@ -1198,6 +1184,29 @@ export class Indexer {
this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus); this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus);
} }
_createLogsFilters (eventSignaturesMap: Map<string, string[]>): { addresses: string[] | undefined, topics: string[][] | undefined } {
let addresses: string[] | undefined;
let eventSignatures: string[] | undefined;
if (this._serverConfig.filterLogsByAddresses) {
const watchedContracts = this.getWatchedContracts();
addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
}
if (this._serverConfig.filterLogsByTopics) {
const eventSignaturesSet = new Set<string>();
eventSignaturesMap.forEach(sigs => sigs.forEach(sig => {
eventSignaturesSet.add(sig);
}));
eventSignatures = Array.from(eventSignaturesSet);
}
return { addresses, topics: eventSignatures && [eventSignatures] };
}
parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } { parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } {
const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => { const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => {
acc[input.name] = this._parseLogArg(input, logDescription.args[index]); acc[input.name] = this._parseLogArg(input, logDescription.args[index]);

View File

@ -220,12 +220,14 @@ export interface EthClient {
getLogs(vars: { getLogs(vars: {
blockHash: string, blockHash: string,
blockNumber: string, blockNumber: string,
addresses?: string[] addresses?: string[],
topics?: string[][]
}): Promise<any>; }): Promise<any>;
getLogsForBlockRange?: (vars: { getLogsForBlockRange?: (vars: {
fromBlock?: number, fromBlock?: number,
toBlock?: number, toBlock?: number,
addresses?: string[] addresses?: string[],
topics?: string[][]
}) => Promise<any>; }) => Promise<any>;
} }