diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 0ac7832a..290786ce 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -17,7 +17,8 @@ import { ServerConfig, Clients, EventWatcher, - GraphWatcherInterface + GraphWatcherInterface, + UpstreamConfig } from '@cerc-io/util'; import { initClients } from './utils/index'; @@ -102,7 +103,10 @@ export class BaseCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, @@ -117,7 +121,12 @@ export class BaseCmd { assert(this._ethProvider); assert(this._jobQueue); - this._indexer = new Indexer(this._config.server, this._database, this._clients, this._ethProvider, this._jobQueue, graphWatcher); + const config = { + server: this._config.server, + upstream: this._config.upstream + }; + + this._indexer = new Indexer(config, this._database, this._clients, this._ethProvider, this._jobQueue, graphWatcher); await this._indexer.init(); if (graphWatcher) { diff --git a/packages/cli/src/checkpoint/create.ts b/packages/cli/src/checkpoint/create.ts index 86d43829..2d0509ff 100644 --- a/packages/cli/src/checkpoint/create.ts +++ b/packages/cli/src/checkpoint/create.ts @@ -15,7 +15,8 @@ import { ServerConfig, Clients, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from '../base'; @@ -72,7 +73,10 @@ export class CreateCheckpointCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/checkpoint/verify.ts b/packages/cli/src/checkpoint/verify.ts index 56922a1e..6ec1bcb4 100644 --- a/packages/cli/src/checkpoint/verify.ts +++ b/packages/cli/src/checkpoint/verify.ts @@ -17,7 +17,8 @@ import { verifyCheckpointData, GraphDatabase, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from '../base'; @@ -73,7 +74,10 @@ export class VerifyCheckpointCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/create-state-gql.ts b/packages/cli/src/create-state-gql.ts index 6cd17c1a..bf9caff4 100644 --- a/packages/cli/src/create-state-gql.ts +++ b/packages/cli/src/create-state-gql.ts @@ -27,7 +27,8 @@ import { StateKind, createOrUpdateStateData, getContractEntitiesMap, - prepareEntityStateFromGQLResponse + prepareEntityStateFromGQLResponse, + UpstreamConfig } from '@cerc-io/util'; import { GraphQLClient } from '@cerc-io/ipld-eth-client'; @@ -97,7 +98,10 @@ export class CreateStateFromGQLCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/export-state.ts b/packages/cli/src/export-state.ts index 0c724c76..e3cf0a67 100644 --- a/packages/cli/src/export-state.ts +++ b/packages/cli/src/export-state.ts @@ -20,7 +20,8 @@ import { StateKind, Clients, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -78,7 +79,10 @@ export class ExportStateCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/fill.ts b/packages/cli/src/fill.ts index cdba1263..8957e15a 100644 --- a/packages/cli/src/fill.ts +++ b/packages/cli/src/fill.ts @@ -20,7 +20,8 @@ import { Clients, fillBlocks, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -81,7 +82,10 @@ export class FillCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/import-state.ts b/packages/cli/src/import-state.ts index 3f536479..e6cafbff 100644 --- a/packages/cli/src/import-state.ts +++ b/packages/cli/src/import-state.ts @@ -23,7 +23,8 @@ import { GraphWatcherInterface, GraphDatabase, updateEntitiesFromState, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -80,7 +81,10 @@ export class ImportStateCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/index-block.ts b/packages/cli/src/index-block.ts index a0c95fd7..6fabdc0b 100644 --- a/packages/cli/src/index-block.ts +++ b/packages/cli/src/index-block.ts @@ -17,7 +17,8 @@ import { Clients, indexBlock, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -72,7 +73,10 @@ export class IndexBlockCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/inspect-cid.ts b/packages/cli/src/inspect-cid.ts index 70ac870d..be77437f 100644 --- a/packages/cli/src/inspect-cid.ts +++ b/packages/cli/src/inspect-cid.ts @@ -18,7 +18,8 @@ import { ServerConfig, Clients, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -75,7 +76,10 @@ export class InspectCIDCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 959ddd2f..921cb19a 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -19,7 +19,8 @@ import { JobRunner, GraphWatcherInterface, startMetricsServer, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -81,7 +82,10 @@ export class JobRunnerCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/reset/watcher.ts b/packages/cli/src/reset/watcher.ts index 1257f509..9e60b466 100644 --- a/packages/cli/src/reset/watcher.ts +++ b/packages/cli/src/reset/watcher.ts @@ -15,7 +15,8 @@ import { ServerConfig, Clients, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from '../base'; @@ -71,7 +72,10 @@ export class ResetWatcherCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 9a970033..c96a8ab4 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -28,7 +28,8 @@ import { Config, PaymentsManager, Consensus, - readParty + readParty, + UpstreamConfig } from '@cerc-io/util'; import { TypeSource } from '@graphql-tools/utils'; import type { @@ -111,7 +112,10 @@ export class ServerCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/cli/src/watch-contract.ts b/packages/cli/src/watch-contract.ts index 6f443690..3847d24c 100644 --- a/packages/cli/src/watch-contract.ts +++ b/packages/cli/src/watch-contract.ts @@ -16,7 +16,8 @@ import { ServerConfig, Clients, GraphWatcherInterface, - Config + Config, + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; @@ -74,7 +75,10 @@ export class WatchContractCmd { async initIndexer ( Indexer: new ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, clients: Clients, ethProvider: JsonRpcProvider, diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 8576d7a6..205aa6a9 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -23,17 +23,6 @@ clearEntitiesCacheInterval = 1000 {{/if}} - # Boolean to filter logs by contracts. - filterLogsByAddresses = false - - # Boolean to filter logs by topics. - filterLogsByTopics = false - - # Boolean to switch between modes of processing events when starting the server. - # Setting to true will fetch filtered events and required blocks in a range of blocks and then process them. - # Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head). - useBlockRanges = true - # Max block range for which to return events in eventsInRange GQL query. # Use -1 for skipping check on block range. maxEventsBlockRange = 1000 @@ -75,9 +64,17 @@ gqlApiEndpoint = "http://127.0.0.1:8082/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" - # Flag to specify if rpc-eth-client should be used from RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) + # Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) rpcClient = false + # Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint + isFEVM = false + + # Boolean flag to filter event logs by contracts + filterLogsByAddresses = false + # Boolean flag to filter event logs by topics + filterLogsByTopics = false + [upstream.cache] name = "requests" enabled = false @@ -93,6 +90,11 @@ prefetchBlocksInMem = true prefetchBlockCount = 10 + # Boolean to switch between modes of processing events when starting the server. + # Setting to true will fetch filtered events and required blocks in a range of blocks and then process them. + # Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head). + useBlockRanges = true + # Block range in which logs are fetched during historical blocks processing historicalLogsBlockRange = 2000 diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 1b4c06b5..d3af7c63 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -42,7 +42,8 @@ import { getResultEvent, DatabaseInterface, Clients, - EthClient + EthClient, + UpstreamConfig } from '@cerc-io/util'; {{#if (subgraphPath)}} import { GraphWatcher } from '@cerc-io/graph-node'; @@ -89,6 +90,7 @@ export class Indexer implements IndexerInterface { _ethProvider: BaseProvider; _baseIndexer: BaseIndexer; _serverConfig: ServerConfig; + _upstreamConfig: UpstreamConfig; {{#if (subgraphPath)}} _graphWatcher: GraphWatcher; {{/if}} @@ -105,15 +107,28 @@ export class Indexer implements IndexerInterface { _subgraphStateMap: Map; {{/if}} - constructor (serverConfig: ServerConfig, db: DatabaseInterface, clients: Clients, ethProvider: BaseProvider, jobQueue: JobQueue{{#if (subgraphPath)}}, graphWatcher?: GraphWatcherInterface{{/if}}) { + constructor ( + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, + db: DatabaseInterface, + clients: Clients, + ethProvider: BaseProvider, + jobQueue: JobQueue{{#if (subgraphPath)}},{{/if}} + {{#if (subgraphPath)}} + graphWatcher?: GraphWatcherInterface + {{/if}} + ) { assert(db); assert(clients.ethClient); this._db = db as Database; this._ethClient = clients.ethClient; this._ethProvider = ethProvider; - this._serverConfig = serverConfig; - this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue); + this._serverConfig = config.server; + this._upstreamConfig = config.upstream; + this._baseIndexer = new BaseIndexer(config, this._db, this._ethClient, this._ethProvider, jobQueue); {{#if (subgraphPath)}} assert(graphWatcher); this._graphWatcher = graphWatcher as GraphWatcher; @@ -163,6 +178,10 @@ export class Indexer implements IndexerInterface { return this._serverConfig; } + get upstreamConfig (): UpstreamConfig { + return this._upstreamConfig; + } + get storageLayoutMap (): Map { return this._storageLayoutMap; } diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index b77d7ad1..5d256cf3 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -17,7 +17,8 @@ import { getResultEvent, ResultEvent, StateKind, - EthClient + EthClient, + UpstreamConfig } from '@cerc-io/util'; import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -37,6 +38,10 @@ export class Indexer implements IndexerInterface { return {} as ServerConfig; } + get upstreamConfig () { + return {} as UpstreamConfig; + } + get storageLayoutMap (): Map { return this._storageLayoutMap; } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 571f1d48..d1f8cba5 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -427,7 +427,7 @@ const _processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: B isNewContractWatched = true; // Check if filterLogsByAddresses is set to true - if (indexer.serverConfig.filterLogsByAddresses) { + if (indexer.upstreamConfig.ethServer.filterLogsByAddresses) { // Fetch and parse events for newly watched contracts const newContracts = watchedContracts.filter(contract => !initiallyWatchedContracts.includes(contract)); const events = await indexer.fetchEventsForContracts(block.blockHash, block.blockNumber, newContracts); diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index c30e43da..718d0c1f 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -29,6 +29,10 @@ export interface JobQueueConfig { // Max block range of historical processing after which it waits for completion of events processing // If set to -1 historical processing does not wait for events processing and completes till latest canonical block historicalMaxFetchAhead?: number; + // Boolean to switch between modes of processing events when starting the server + // Setting to true will fetch filtered events and required blocks in a range of blocks and then process them + // Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head) + useBlockRanges: boolean; } export interface GQLCacheConfig { @@ -205,26 +209,19 @@ export interface ServerConfig { subgraphPath: string; enableState: boolean; wasmRestartBlocksInterval: number; - filterLogsByAddresses: boolean; - filterLogsByTopics: boolean; maxEventsBlockRange: number; clearEntitiesCacheInterval: number; - // Boolean to switch between modes of processing events when starting the server. - // Setting to true will fetch filtered events and required blocks in a range of blocks and then process them. - // Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head). - useBlockRanges: boolean; - - // Boolean to skip updating entity fields required in state creation and not required in the frontend. + // Boolean to skip updating entity fields required in state creation and not required in the frontend skipStateFieldsUpdate: boolean; - // Max GQL API requests to process simultaneously (defaults to 1). + // Max GQL API requests to process simultaneously (defaults to 1) maxSimultaneousRequests?: number; - // Max GQL API requests in queue until reject (defaults to -1, means do not reject). + // Max GQL API requests in queue until reject (defaults to -1, means do not reject) maxRequestQueueLimit?: number; - // Boolean to load GQL query nested entity relations sequentially. + // Boolean to load GQL query nested entity relations sequentially loadRelationsSequential: boolean; // GQL cache-control max-age settings (in seconds) @@ -260,7 +257,14 @@ export interface UpstreamConfig { gqlApiEndpoint: string; rpcProviderEndpoint: string; rpcProviderMutationEndpoint: string; + // Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) rpcClient: boolean; + // Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint + isFEVM: boolean; + // Boolean flag to filter event logs by contracts + filterLogsByAddresses: boolean; + // Boolean flag to filter event logs by topics + filterLogsByTopics: boolean; payments: EthServerPaymentsConfig; } traceProviderEndpoint: string; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 8cd55561..16e81fdf 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -103,7 +103,7 @@ export class EventWatcher { // Check if filter for logs is enabled // Check if starting block for watcher is before latest canonical block - if (this._config.server.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) { + if (this._config.jobQueue.useBlockRanges && startBlockNumber < latestCanonicalBlockNumber) { let endBlockNumber = latestCanonicalBlockNumber; const historicalMaxFetchAhead = this._config.jobQueue.historicalMaxFetchAhead ?? DEFAULT_HISTORICAL_MAX_FETCH_AHEAD; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 60220d36..0737ebd2 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -27,7 +27,7 @@ import { import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants'; import { JobQueue } from './job-queue'; import { Where, QueryOptions } from './database'; -import { ServerConfig } from './config'; +import { ServerConfig, UpstreamConfig } from './config'; import { createOrUpdateStateData, StateDataMeta } from './state-helper'; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; @@ -90,6 +90,7 @@ export type ResultEvent = { export class Indexer { _serverConfig: ServerConfig; + _upstreamConfig: UpstreamConfig; _db: DatabaseInterface; _ethClient: EthClient; _getStorageAt: GetStorageAt; @@ -100,13 +101,17 @@ export class Indexer { _stateStatusMap: { [key: string]: StateStatus } = {}; constructor ( - serverConfig: ServerConfig, + config: { + server: ServerConfig; + upstream: UpstreamConfig; + }, db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue ) { - this._serverConfig = serverConfig; + this._serverConfig = config.server; + this._upstreamConfig = config.upstream; this._db = db; this._ethClient = ethClient; this._ethProvider = ethProvider; @@ -1289,14 +1294,14 @@ export class Indexer { let addresses: string[] | undefined; let eventSignatures: string[] | undefined; - if (this._serverConfig.filterLogsByAddresses) { + if (this._upstreamConfig.ethServer.filterLogsByAddresses) { const watchedContracts = this.getWatchedContracts(); addresses = watchedContracts.map((watchedContract): string => { return watchedContract.address; }); } - if (this._serverConfig.filterLogsByTopics) { + if (this._upstreamConfig.ethServer.filterLogsByTopics && !this._upstreamConfig.ethServer.isFEVM) { const eventSignaturesSet = new Set(); eventSignaturesMap.forEach(sigs => sigs.forEach(sig => { eventSignaturesSet.add(sig); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 4f35b567..ef5333ff 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -572,7 +572,7 @@ export class JobRunner { this._blockAndEventsMap.delete(block.blockHash); // Check if new contract was added and filterLogsByAddresses is set to true - if (isNewContractWatched && this._indexer.serverConfig.filterLogsByAddresses) { + if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) { // Delete jobs for any pending events processing await this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 1aba64c8..878b9bd8 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -6,7 +6,7 @@ import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; -import { ServerConfig } from './config'; +import { ServerConfig, UpstreamConfig } from './config'; import { Where, QueryOptions, Database } from './database'; import { ValueResult, StateStatus } from './indexer'; @@ -81,6 +81,7 @@ export interface StateInterface { export interface IndexerInterface { readonly serverConfig: ServerConfig + readonly upstreamConfig: UpstreamConfig readonly storageLayoutMap: Map init (): Promise getBlockProgress (blockHash: string): Promise