Add isFEVM flag in config to avoid filtering event logs by topics (#454)

* Pass upstream config to indexer instance

* Add isFEVM flag and refactor watcher config fields

* Codegen changes for indexer

* Add missing getter in dummy indexer for graph-node tests
This commit is contained in:
Nabarun Gogoi 2023-11-07 12:07:49 +05:30 committed by GitHub
parent dd92b4feb2
commit 8547876764
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 157 additions and 64 deletions

View File

@ -17,7 +17,8 @@ import {
ServerConfig,
Clients,
EventWatcher,
GraphWatcherInterface
GraphWatcherInterface,
UpstreamConfig
} from '@cerc-io/util';
import { initClients } from './utils/index';
@ -102,7 +103,10 @@ export class BaseCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
@ -117,7 +121,12 @@ export class BaseCmd {
assert(this._ethProvider);
assert(this._jobQueue);
this._indexer = new Indexer(this._config.server, this._database, this._clients, this._ethProvider, this._jobQueue, graphWatcher);
const config = {
server: this._config.server,
upstream: this._config.upstream
};
this._indexer = new Indexer(config, this._database, this._clients, this._ethProvider, this._jobQueue, graphWatcher);
await this._indexer.init();
if (graphWatcher) {

View File

@ -15,7 +15,8 @@ import {
ServerConfig,
Clients,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from '../base';
@ -72,7 +73,10 @@ export class CreateCheckpointCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -17,7 +17,8 @@ import {
verifyCheckpointData,
GraphDatabase,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from '../base';
@ -73,7 +74,10 @@ export class VerifyCheckpointCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -27,7 +27,8 @@ import {
StateKind,
createOrUpdateStateData,
getContractEntitiesMap,
prepareEntityStateFromGQLResponse
prepareEntityStateFromGQLResponse,
UpstreamConfig
} from '@cerc-io/util';
import { GraphQLClient } from '@cerc-io/ipld-eth-client';
@ -97,7 +98,10 @@ export class CreateStateFromGQLCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -20,7 +20,8 @@ import {
StateKind,
Clients,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -78,7 +79,10 @@ export class ExportStateCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -20,7 +20,8 @@ import {
Clients,
fillBlocks,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -81,7 +82,10 @@ export class FillCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -23,7 +23,8 @@ import {
GraphWatcherInterface,
GraphDatabase,
updateEntitiesFromState,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -80,7 +81,10 @@ export class ImportStateCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -17,7 +17,8 @@ import {
Clients,
indexBlock,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -72,7 +73,10 @@ export class IndexBlockCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -18,7 +18,8 @@ import {
ServerConfig,
Clients,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -75,7 +76,10 @@ export class InspectCIDCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -19,7 +19,8 @@ import {
JobRunner,
GraphWatcherInterface,
startMetricsServer,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -81,7 +82,10 @@ export class JobRunnerCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -15,7 +15,8 @@ import {
ServerConfig,
Clients,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from '../base';
@ -71,7 +72,10 @@ export class ResetWatcherCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -28,7 +28,8 @@ import {
Config,
PaymentsManager,
Consensus,
readParty
readParty,
UpstreamConfig
} from '@cerc-io/util';
import { TypeSource } from '@graphql-tools/utils';
import type {
@ -111,7 +112,10 @@ export class ServerCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -16,7 +16,8 @@ import {
ServerConfig,
Clients,
GraphWatcherInterface,
Config
Config,
UpstreamConfig
} from '@cerc-io/util';
import { BaseCmd } from './base';
@ -74,7 +75,10 @@ export class WatchContractCmd {
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,

View File

@ -23,17 +23,6 @@
clearEntitiesCacheInterval = 1000
{{/if}}
# Boolean to filter logs by contracts.
filterLogsByAddresses = false
# Boolean to filter logs by topics.
filterLogsByTopics = false
# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges = true
# Max block range for which to return events in eventsInRange GQL query.
# Use -1 for skipping check on block range.
maxEventsBlockRange = 1000
@ -75,9 +64,17 @@
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
# Flag to specify if rpc-eth-client should be used from RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
# Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient = false
# Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint
isFEVM = false
# Boolean flag to filter event logs by contracts
filterLogsByAddresses = false
# Boolean flag to filter event logs by topics
filterLogsByTopics = false
[upstream.cache]
name = "requests"
enabled = false
@ -93,6 +90,11 @@
prefetchBlocksInMem = true
prefetchBlockCount = 10
# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges = true
# Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange = 2000

View File

@ -42,7 +42,8 @@ import {
getResultEvent,
DatabaseInterface,
Clients,
EthClient
EthClient,
UpstreamConfig
} from '@cerc-io/util';
{{#if (subgraphPath)}}
import { GraphWatcher } from '@cerc-io/graph-node';
@ -89,6 +90,7 @@ export class Indexer implements IndexerInterface {
_ethProvider: BaseProvider;
_baseIndexer: BaseIndexer;
_serverConfig: ServerConfig;
_upstreamConfig: UpstreamConfig;
{{#if (subgraphPath)}}
_graphWatcher: GraphWatcher;
{{/if}}
@ -105,15 +107,28 @@ export class Indexer implements IndexerInterface {
_subgraphStateMap: Map<string, any>;
{{/if}}
constructor (serverConfig: ServerConfig, db: DatabaseInterface, clients: Clients, ethProvider: BaseProvider, jobQueue: JobQueue{{#if (subgraphPath)}}, graphWatcher?: GraphWatcherInterface{{/if}}) {
constructor (
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: BaseProvider,
jobQueue: JobQueue{{#if (subgraphPath)}},{{/if}}
{{#if (subgraphPath)}}
graphWatcher?: GraphWatcherInterface
{{/if}}
) {
assert(db);
assert(clients.ethClient);
this._db = db as Database;
this._ethClient = clients.ethClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue);
this._serverConfig = config.server;
this._upstreamConfig = config.upstream;
this._baseIndexer = new BaseIndexer(config, this._db, this._ethClient, this._ethProvider, jobQueue);
{{#if (subgraphPath)}}
assert(graphWatcher);
this._graphWatcher = graphWatcher as GraphWatcher;
@ -163,6 +178,10 @@ export class Indexer implements IndexerInterface {
return this._serverConfig;
}
get upstreamConfig (): UpstreamConfig {
return this._upstreamConfig;
}
get storageLayoutMap (): Map<string, StorageLayout> {
return this._storageLayoutMap;
}

View File

@ -17,7 +17,8 @@ import {
getResultEvent,
ResultEvent,
StateKind,
EthClient
EthClient,
UpstreamConfig
} from '@cerc-io/util';
import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
@ -37,6 +38,10 @@ export class Indexer implements IndexerInterface {
return {} as ServerConfig;
}
get upstreamConfig () {
return {} as UpstreamConfig;
}
get storageLayoutMap (): Map<string, StorageLayout> {
return this._storageLayoutMap;
}

View File

@ -427,7 +427,7 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B
isNewContractWatched = true;
// Check if filterLogsByAddresses is set to true
if (indexer.serverConfig.filterLogsByAddresses) {
if (indexer.upstreamConfig.ethServer.filterLogsByAddresses) {
// Fetch and parse events for newly watched contracts
const newContracts = watchedContracts.filter(contract => !initiallyWatchedContracts.includes(contract));
const events = await indexer.fetchEventsForContracts(block.blockHash, block.blockNumber, newContracts);

View File

@ -29,6 +29,10 @@ export interface JobQueueConfig {
// Max block range of historical processing after which it waits for completion of events processing
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead?: number;
// Boolean to switch between modes of processing events when starting the server
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
useBlockRanges: boolean;
}
export interface GQLCacheConfig {
@ -205,26 +209,19 @@ export interface ServerConfig {
subgraphPath: string;
enableState: boolean;
wasmRestartBlocksInterval: number;
filterLogsByAddresses: boolean;
filterLogsByTopics: boolean;
maxEventsBlockRange: number;
clearEntitiesCacheInterval: number;
// Boolean to switch between modes of processing events when starting the server.
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges: boolean;
// Boolean to skip updating entity fields required in state creation and not required in the frontend.
// Boolean to skip updating entity fields required in state creation and not required in the frontend
skipStateFieldsUpdate: boolean;
// Max GQL API requests to process simultaneously (defaults to 1).
// Max GQL API requests to process simultaneously (defaults to 1)
maxSimultaneousRequests?: number;
// Max GQL API requests in queue until reject (defaults to -1, means do not reject).
// Max GQL API requests in queue until reject (defaults to -1, means do not reject)
maxRequestQueueLimit?: number;
// Boolean to load GQL query nested entity relations sequentially.
// Boolean to load GQL query nested entity relations sequentially
loadRelationsSequential: boolean;
// GQL cache-control max-age settings (in seconds)
@ -260,7 +257,14 @@ export interface UpstreamConfig {
gqlApiEndpoint: string;
rpcProviderEndpoint: string;
rpcProviderMutationEndpoint: string;
// Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient: boolean;
// Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint
isFEVM: boolean;
// Boolean flag to filter event logs by contracts
filterLogsByAddresses: boolean;
// Boolean flag to filter event logs by topics
filterLogsByTopics: boolean;
payments: EthServerPaymentsConfig;
}
traceProviderEndpoint: string;

View File

@ -103,7 +103,7 @@ export class EventWatcher {
// Check if filter for logs is enabled
// Check if starting block for watcher is before latest canonical block
if (this._config.server.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
if (this._config.jobQueue.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) {
let endBlockNumber = latestCanonicalBlockNumber;
const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD;

View File

@ -27,7 +27,7 @@ import {
import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants';
import { JobQueue } from './job-queue';
import { Where, QueryOptions } from './database';
import { ServerConfig } from './config';
import { ServerConfig, UpstreamConfig } from './config';
import { createOrUpdateStateData, StateDataMeta } from './state-helper';
const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000;
@ -90,6 +90,7 @@ export type ResultEvent = {
export class Indexer {
_serverConfig: ServerConfig;
_upstreamConfig: UpstreamConfig;
_db: DatabaseInterface;
_ethClient: EthClient;
_getStorageAt: GetStorageAt;
@ -100,13 +101,17 @@ export class Indexer {
_stateStatusMap: { [key: string]: StateStatus } = {};
constructor (
serverConfig: ServerConfig,
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
ethClient: EthClient,
ethProvider: ethers.providers.BaseProvider,
jobQueue: JobQueue
) {
this._serverConfig = serverConfig;
this._serverConfig = config.server;
this._upstreamConfig = config.upstream;
this._db = db;
this._ethClient = ethClient;
this._ethProvider = ethProvider;
@ -1289,14 +1294,14 @@ export class Indexer {
let addresses: string[] | undefined;
let eventSignatures: string[] | undefined;
if (this._serverConfig.filterLogsByAddresses) {
if (this._upstreamConfig.ethServer.filterLogsByAddresses) {
const watchedContracts = this.getWatchedContracts();
addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
}
if (this._serverConfig.filterLogsByTopics) {
if (this._upstreamConfig.ethServer.filterLogsByTopics && !this._upstreamConfig.ethServer.isFEVM) {
const eventSignaturesSet = new Set<string>();
eventSignaturesMap.forEach(sigs => sigs.forEach(sig => {
eventSignaturesSet.add(sig);

View File

@ -572,7 +572,7 @@ export class JobRunner {
this._blockAndEventsMap.delete(block.blockHash);
// Check if new contract was added and filterLogsByAddresses is set to true
if (isNewContractWatched && this._indexer.serverConfig.filterLogsByAddresses) {
if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) {
// Delete jobs for any pending events processing
await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING);

View File

@ -6,7 +6,7 @@ import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions,
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { ServerConfig } from './config';
import { ServerConfig, UpstreamConfig } from './config';
import { Where, QueryOptions, Database } from './database';
import { ValueResult, StateStatus } from './indexer';
@ -81,6 +81,7 @@ export interface StateInterface {
export interface IndexerInterface {
readonly serverConfig: ServerConfig
readonly upstreamConfig: UpstreamConfig
readonly storageLayoutMap: Map<string, StorageLayout>
init (): Promise<void>
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>