Support topics filtering in getLogs ETH RPC API (#537)

* Store event topics in separate columns in db

* Store event data in a separate column in db

* Support topics filter in eth_getLogs RPC API

* Make RPC server path configurable

* Sort logs result by log index
This commit is contained in:
prathamesh0 2024-09-18 15:37:13 +05:30 committed by GitHub
parent d413d724c7
commit a585500012
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 171 additions and 35 deletions

View File

@ -44,6 +44,44 @@ columns:
columnOptions:
- option: length
value: 256
- name: topic0
pgType: varchar
tsType: string
columnType: Column
columnOptions:
- option: length
value: 66
- name: topic1
pgType: varchar
tsType: string | null
columnType: Column
columnOptions:
- option: length
value: 66
- option: nullable
value: true
- name: topic2
pgType: varchar
tsType: string | null
columnType: Column
columnOptions:
- option: length
value: 66
- option: nullable
value: true
- name: topic3
pgType: varchar
tsType: string | null
columnType: Column
columnOptions:
- option: length
value: 66
- option: nullable
value: true
- name: data
pgType: varchar
tsType: string
columnType: Column
- name: eventInfo
pgType: text
tsType: string

View File

@ -25,13 +25,7 @@
# Flag to specify whether RPC endpoint supports block hash as block tag parameter
rpcSupportsBlockHashParam = true
# Enable ETH JSON RPC server at /rpc
enableEthRPCServer = true
# Max number of logs that can be returned in a single getLogs request (default: 10000)
ethGetLogsResultLimit = 10000
# Server GQL config
# GQL server config
[server.gql]
path = "/graphql"
@ -55,6 +49,14 @@
timeTravelMaxAge = 86400 # 1 day
{{/if}}
# ETH RPC server config
[server.ethRPC]
enabled = true
path = "/rpc"
# Max number of logs that can be returned in a single getLogs request (default: 10000)
getLogsResultLimit = 10000
[metrics]
host = "127.0.0.1"
port = 9000

View File

@ -227,6 +227,18 @@ export interface GQLConfig {
logDir?: string;
}
// ETH RPC server config
export interface EthRPCConfig {
// Enable ETH JSON RPC server
enabled: boolean;
// Path to expose the RPC server at
path?: string;
// Max number of logs that can be returned in a single getLogs request
getLogsResultLimit?: number;
}
export interface ServerConfig {
host: string;
port: number;
@ -253,11 +265,8 @@ export interface ServerConfig {
// https://ethereum.org/en/developers/docs/apis/json-rpc/#default-block
rpcSupportsBlockHashParam: boolean;
// Enable ETH JSON RPC server at /rpc
enableEthRPCServer: boolean;
// Max number of logs that can be returned in a single getLogs request
ethGetLogsResultLimit?: number;
// ETH JSON RPC server config
ethRPC: EthRPCConfig;
}
export interface FundingAmountsConfig {

View File

@ -18,8 +18,9 @@ const ERROR_CONTRACT_METHOD_NOT_FOUND = 'Contract method not found';
const ERROR_METHOD_NOT_IMPLEMENTED = 'Method not implemented';
const ERROR_INVALID_BLOCK_TAG = 'Invalid block tag';
const ERROR_INVALID_BLOCK_HASH = 'Invalid block hash';
const ERROR_INVALID_CONTRACT_ADDRESS = 'Invalid contract address';
const ERROR_INVALID_TOPICS = 'Invalid topics';
const ERROR_BLOCK_NOT_FOUND = 'Block not found';
const ERROR_TOPICS_FILTER_NOT_SUPPORTED = 'Topics filter not supported';
const ERROR_LIMIT_EXCEEDED = 'Query results exceeds limit';
const DEFAULT_BLOCK_TAG = 'latest';
@ -114,20 +115,14 @@ export const createEthRPCHandlers = async (
// Parse arg params into where options
const where: FindConditions<EventInterface> = {};
// TODO: Support topics filter
if (params.topics) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_TOPICS_FILTER_NOT_SUPPORTED);
}
// Address filter, address or a list of addresses
if (params.address) {
if (Array.isArray(params.address)) {
if (params.address.length > 0) {
where.contract = In(params.address);
}
} else {
where.contract = Equal(params.address);
}
buildAddressFilter(params.address, where);
}
// Topics filter
if (params.topics) {
buildTopicsFilter(params.topics, where);
}
// Block hash takes precedence over fromBlock / toBlock if provided
@ -158,8 +153,14 @@ export const createEthRPCHandlers = async (
// Fetch events from the db
// Load block relation
const resultLimit = indexer.serverConfig.ethGetLogsResultLimit || DEFAULT_ETH_GET_LOGS_RESULT_LIMIT;
const events = await indexer.getEvents({ where, relations: ['block'], take: resultLimit + 1 });
const resultLimit = indexer.serverConfig.ethRPC.getLogsResultLimit || DEFAULT_ETH_GET_LOGS_RESULT_LIMIT;
const events = await indexer.getEvents({
where,
relations: ['block'],
// TODO: Use querybuilder to order by block number
order: { block: 'ASC', index: 'ASC' },
take: resultLimit + 1
});
// Limit number of results can be returned by a single query
if (events.length > resultLimit) {
@ -229,10 +230,82 @@ const parseEthGetLogsBlockTag = async (indexer: IndexerInterface, blockTag: stri
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_INVALID_BLOCK_TAG);
};
const buildAddressFilter = (address: any, where: FindConditions<EventInterface>): void => {
if (Array.isArray(address)) {
// Validate input addresses
address.forEach((add: string) => {
if (!utils.isHexString(add, 20)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_CONTRACT_ADDRESS}: expected hex string of size 20`);
}
});
if (address.length > 0) {
where.contract = In(address);
}
} else {
// Validate input address
if (!utils.isHexString(address, 20)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_CONTRACT_ADDRESS}: expected hex string of size 20`);
}
where.contract = Equal(address);
}
};
type TopicColumn = 'topic0' | 'topic1' | 'topic2' | 'topic3';
const buildTopicsFilter = (topics: any, where: FindConditions<EventInterface>): void => {
// Check that topics is an array of size <= 4
if (!Array.isArray(topics)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_INVALID_TOPICS);
}
if (topics.length > 4) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_TOPICS}: exceeds max topics`);
}
for (let i = 0; i < topics.length; i++) {
addTopicCondition(topics[i], `topic${i}` as TopicColumn, where);
}
};
const addTopicCondition = (
topicFilter: string[] | string,
topicIndex: TopicColumn,
where: FindConditions<EventInterface>
): any => {
if (Array.isArray(topicFilter)) {
// Validate input topics
topicFilter.forEach((topic: string) => {
if (!utils.isHexString(topic, 32)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_TOPICS}: expected hex string of size 32 for ${topicIndex}`);
}
});
if (topicFilter.length > 0) {
where[topicIndex] = In(topicFilter);
}
} else {
// Validate input address
if (!utils.isHexString(topicFilter, 32)) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, `${ERROR_INVALID_TOPICS}: expected hex string of size 32 for ${topicIndex}`);
}
where[topicIndex] = Equal(topicFilter);
}
};
const transformEventsToLogs = async (events: Array<EventInterface>): Promise<any[]> => {
return events.map(event => {
const parsedExtraInfo = JSON.parse(event.extraInfo);
const topics: string[] = [];
[event.topic0, event.topic1, event.topic2, event.topic3].forEach(topic => {
if (topic) {
topics.push(topic);
}
});
return {
address: event.contract.toLowerCase(),
blockHash: event.block.blockHash,
@ -240,8 +313,8 @@ const transformEventsToLogs = async (events: Array<EventInterface>): Promise<any
transactionHash: event.txHash,
transactionIndex: `0x${parsedExtraInfo.tx.index.toString(16)}`,
logIndex: `0x${parsedExtraInfo.logIndex.toString(16)}`,
data: parsedExtraInfo.data,
topics: parsedExtraInfo.topics,
data: event.data,
topics,
removed: event.block.isPruned
};
});

View File

@ -670,7 +670,9 @@ export class Indexer {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx, logIndex };
const extraInfo: { [key: string]: any } = { tx, logIndex };
const [topic0, topic1, topic2, topic3] = topics as string[];
const contract = ethers.utils.getAddress(address);
const watchedContracts = this.isContractAddressWatched(contract);
@ -694,6 +696,11 @@ export class Indexer {
txHash,
contract,
eventName,
topic0,
topic1,
topic2,
topic3,
data,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({

View File

@ -24,7 +24,7 @@ import { PaymentsManager, paymentsPlugin } from './payments';
const log = debug('vulcanize:server');
const DEFAULT_GQL_PATH = '/graphql';
const ETH_RPC_PATH = '/rpc';
const DEFAULT_ETH_RPC_PATH = '/rpc';
export const createAndStartServer = async (
app: Application,
@ -102,13 +102,15 @@ export const createAndStartServer = async (
path: gqlPath
});
if (serverConfig.enableEthRPCServer) {
const rpcPath = serverConfig.ethRPC?.path ?? DEFAULT_ETH_RPC_PATH;
if (serverConfig.ethRPC?.enabled) {
// Create a JSON-RPC server to handle ETH RPC calls
const rpcServer = jayson.Server(ethRPCHandlers);
// Mount the JSON-RPC server to ETH_RPC_PATH
app.use(
ETH_RPC_PATH,
rpcPath,
jsonParser(),
(req: any, res: any, next: () => void) => {
// Convert all GET requests to POST to avoid getting rejected from jayson server middleware
@ -124,8 +126,8 @@ export const createAndStartServer = async (
httpServer.listen(port, host, () => {
log(`GQL server is listening on http://${host}:${port}${server.graphqlPath}`);
if (serverConfig.enableEthRPCServer) {
log(`ETH JSON RPC server is listening on http://${host}:${port}${ETH_RPC_PATH}`);
if (serverConfig.ethRPC?.enabled) {
log(`ETH JSON RPC server is listening on http://${host}:${port}${rpcPath}`);
}
});

View File

@ -62,6 +62,11 @@ export interface EventInterface {
index: number;
contract: string;
eventName: string;
topic0: string;
topic1: string | null;
topic2: string | null;
topic3: string | null;
data: string;
eventInfo: string;
extraInfo: string;
proof: string;