From 13addd8241d27397d49fa97fca5c7ded2c8dfc8b Mon Sep 17 00:00:00 2001 From: Nabarun Date: Thu, 20 Jun 2024 16:36:55 +0530 Subject: [PATCH] Refactor methods for switching client to indexer --- packages/cli/src/base.ts | 2 +- packages/cli/src/job-runner.ts | 34 ++--------- packages/cli/src/utils/index.ts | 10 +-- .../src/templates/config-template.handlebars | 8 +-- packages/util/src/config.ts | 6 +- packages/util/src/indexer.ts | 61 ++++++++++++++++++- packages/util/src/job-runner.ts | 53 ++++++---------- packages/util/src/metrics.ts | 18 +++--- packages/util/src/types.ts | 5 +- 9 files changed, 101 insertions(+), 96 deletions(-) diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 290786ce..85eb1f5b 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -96,7 +96,7 @@ export class BaseCmd { this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await this._jobQueue.start(); - const { ethClient, ethProvider } = await initClients(this._config); + const { ethClient, ethProvider } = await initClients(this._config.upstream); this._ethProvider = ethProvider; this._clients = { ethClient, ...clients }; } diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 7a49db44..1e18f243 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -7,7 +7,6 @@ import { hideBin } from 'yargs/helpers'; import 'reflect-metadata'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; -import debug from 'debug'; import { JsonRpcProvider } from '@ethersproject/providers'; import { @@ -21,14 +20,10 @@ import { GraphWatcherInterface, startMetricsServer, Config, - UpstreamConfig, - setActiveUpstreamEndpointMetric + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; -import { initClients } from './utils/index'; - -const log = debug('vulcanize:job-runner'); interface Arguments { configFile: string; @@ -38,10 +33,6 @@ export class JobRunnerCmd { _argv?: Arguments; _baseCmd: BaseCmd; - _currentEndpointIndex = { - rpcProviderEndpoint: 0 - }; - constructor () { this._baseCmd = new BaseCmd(); } @@ -122,25 +113,8 @@ export class JobRunnerCmd { const jobRunner = new JobRunner( config.jobQueue, indexer, - jobQueue, - config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint], - async (): Promise => { - const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; - ++this._currentEndpointIndex.rpcProviderEndpoint; - - if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) { - this._currentEndpointIndex.rpcProviderEndpoint = 0; - } - - const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex); - indexer.switchClients({ ethClient, ethProvider }); - setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint); - - const newRpcEndpoint = ethProvider.connection.url; - log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`); - - return newRpcEndpoint; - }); + jobQueue + ); // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs await jobRunner.jobQueue.deleteAllJobs('completed'); @@ -151,7 +125,7 @@ export class JobRunnerCmd { await startJobRunner(jobRunner); jobRunner.handleShutdown(); - await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex); + await startMetricsServer(config, jobQueue, indexer); } _getArgv (): any { diff --git a/packages/cli/src/utils/index.ts b/packages/cli/src/utils/index.ts index d2b25ea0..09d52378 100644 --- a/packages/cli/src/utils/index.ts +++ b/packages/cli/src/utils/index.ts @@ -9,7 +9,7 @@ import { providers } from 'ethers'; // @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183 import { PeerIdObj } from '@cerc-io/peer'; -import { Config, EthClient, getCustomProvider } from '@cerc-io/util'; +import { EthClient, UpstreamConfig, getCustomProvider } from '@cerc-io/util'; import { getCache } from '@cerc-io/cache'; import { EthClient as GqlEthClient } from '@cerc-io/ipld-eth-client'; import { EthClient as RpcEthClient } from '@cerc-io/rpc-eth-client'; @@ -22,16 +22,10 @@ export function readPeerId (filePath: string): PeerIdObj { return JSON.parse(peerIdJson); } -export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{ +export const initClients = async (upstreamConfig: UpstreamConfig, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{ ethClient: EthClient, ethProvider: providers.JsonRpcProvider }> => { - const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config; - - assert(serverConfig, 'Missing server config'); - assert(dbConfig, 'Missing database config'); - assert(upstreamConfig, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig; assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints'); diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 7cedadef..50f513f7 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -83,6 +83,10 @@ # Boolean flag to filter event logs by topics filterLogsByTopics = true + # Switch clients if eth_getLogs call takes more than threshold (in secs) + # Set to 0 for disabling switching + getLogsClientSwitchThresholdInSecs = 0 + [upstream.cache] name = "requests" enabled = false @@ -96,10 +100,6 @@ subgraphEventsOrder = true blockDelayInMilliSecs = 2000 - // Switch clients if eth_getLogs call takes more than threshold (in secs) - // (set to 0 to disable switching) - getLogsClientSwitchThresholdInSecs = 0 - # Number of blocks by which block processing lags behind head blockProcessingOffset = 0 diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index fa41d447..b36fbade 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -23,9 +23,6 @@ export interface JobQueueConfig { subgraphEventsOrder: boolean; blockDelayInMilliSecs: number; - // Switch clients if eth_getLogs call takes more than threshold (in secs) - getLogsClientSwitchThresholdInSecs?: number; - // Number of blocks by which block processing lags behind head (default: 0) blockProcessingOffset?: number; @@ -292,6 +289,9 @@ export interface UpstreamConfig { // Boolean flag to filter event logs by topics filterLogsByTopics: boolean; + // Switch clients if eth_getLogs call takes more than threshold (in secs) + getLogsClientSwitchThresholdInSecs?: number; + payments: EthServerPaymentsConfig; } traceProviderEndpoint: string; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 6fe500ea..fdd1f438 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -33,6 +33,7 @@ import { JobQueue } from './job-queue'; import { Where, QueryOptions, BlockHeight } from './database'; import { ServerConfig, UpstreamConfig } from './config'; import { createOrUpdateStateData, StateDataMeta } from './state-helper'; +import { ethRpcRequestDuration, setActiveUpstreamEndpointMetric } from './metrics'; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; @@ -113,12 +114,16 @@ export class Indexer { _db: DatabaseInterface; _ethClient: EthClient; _getStorageAt: GetStorageAt; - _ethProvider: ethers.providers.BaseProvider; + _ethProvider: ethers.providers.JsonRpcProvider; _jobQueue: JobQueue; _watchedContracts: { [key: string]: ContractInterface } = {}; _stateStatusMap: { [key: string]: StateStatus } = {}; + _currentEndpointIndex = { + rpcProviderEndpoint: 0 + }; + constructor ( config: { server: ServerConfig; @@ -126,7 +131,7 @@ export class Indexer { }, db: DatabaseInterface, ethClient: EthClient, - ethProvider: ethers.providers.BaseProvider, + ethProvider: ethers.providers.JsonRpcProvider, jobQueue: JobQueue ) { this._serverConfig = config.server; @@ -136,11 +141,61 @@ export class Indexer { this._ethProvider = ethProvider; this._jobQueue = jobQueue; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); + + setActiveUpstreamEndpointMetric( + this._upstreamConfig, + this._currentEndpointIndex.rpcProviderEndpoint + ); } - switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: ethers.providers.BaseProvider }): void { + async switchClients ( + initClients: (upstreamConfig: UpstreamConfig, endpointIndexes: typeof this._currentEndpointIndex) => Promise<{ + ethClient: EthClient, + ethProvider: ethers.providers.JsonRpcProvider + }> + ): Promise<{ ethClient: EthClient, ethProvider: ethers.providers.JsonRpcProvider }> { + const oldRpcEndpoint = this._upstreamConfig.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; + ++this._currentEndpointIndex.rpcProviderEndpoint; + + if (this._currentEndpointIndex.rpcProviderEndpoint === this._upstreamConfig.ethServer.rpcProviderEndpoints.length) { + this._currentEndpointIndex.rpcProviderEndpoint = 0; + } + + const { ethClient, ethProvider } = await initClients(this._upstreamConfig, this._currentEndpointIndex); + setActiveUpstreamEndpointMetric( + this._upstreamConfig, + this._currentEndpointIndex.rpcProviderEndpoint + ); + + const newRpcEndpoint = ethProvider.connection.url; + log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`); + this._ethClient = ethClient; this._ethProvider = ethProvider; + return { ethClient, ethProvider }; + } + + async isGetLogsRequestsSlow (): Promise { + const threshold = this._upstreamConfig.ethServer.getLogsClientSwitchThresholdInSecs; + + if (threshold) { + const getLogsLabels = { + method: 'eth_getLogs', + provider: this._ethProvider.connection.url + }; + + const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get(); + + const currentProviderDuration = ethRpcRequestDurationMetrics.values.find( + val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider + ); + + if (currentProviderDuration) { + return currentProviderDuration.value > threshold; + } + } + + return false; } async fetchContracts (): Promise { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f6c81402..f544c290 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -32,7 +32,7 @@ import { fetchAndSaveFilteredLogsAndBlocks, NEW_BLOCK_MAX_RETRIES_ERROR } from './common'; -import { ethRpcRequestDuration, isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; +import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; const log = debug('vulcanize:job-runner'); @@ -64,22 +64,14 @@ export class JobRunner { _signalCount = 0; _errorInEventsProcessing = false; - _currentRpcProviderEndpoint: string; - _switchClients: () => Promise; - constructor ( jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, - jobQueue: JobQueue, - currentRpcProviderEndpoint: string, - // eslint-disable-next-line @typescript-eslint/no-empty-function - switchClients: () => Promise = async () => '' + jobQueue: JobQueue ) { this._indexer = indexer; this.jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._currentRpcProviderEndpoint = currentRpcProviderEndpoint; - this._switchClients = switchClients; } async subscribeBlockProcessingQueue (): Promise { @@ -192,7 +184,9 @@ export class JobRunner { } // Switch clients if getLogs requests are too slow - await this._switchClientOnSlowGetLogsRequests(); + if (await this._indexer.isGetLogsRequestsSlow()) { + await this._indexer.switchClients(); + } break; } @@ -300,7 +294,9 @@ export class JobRunner { this._historicalProcessingCompletedUpto = endBlock; // Switch clients if getLogs requests are too slow - await this._switchClientOnSlowGetLogsRequests(); + if (await this._indexer.isGetLogsRequestsSlow()) { + await this._indexer.switchClients(); + } if (endBlock < processingEndBlockNumber) { // If endBlock is lesser than processingEndBlockNumber push new historical job @@ -800,29 +796,18 @@ export class JobRunner { } async _jobErrorHandler (error: any): Promise { - // Check if it is a server error or timeout from ethers.js - // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error - // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout - if (error.code === ethersErrors.SERVER_ERROR || error.code === ethersErrors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) { + if ( + // Switch client if it is a server error from ethers.js + // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error + error.code === ethersErrors.SERVER_ERROR || + // Switch client if it is a timeout error from ethers.js + // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout + error.code === ethersErrors.TIMEOUT || + // Switch client if error is for max retries to get new block at head + error.message === NEW_BLOCK_MAX_RETRIES_ERROR + ) { log('RPC endpoint is not working; failing over to another one'); - this._currentRpcProviderEndpoint = await this._switchClients(); - } - } - - async _switchClientOnSlowGetLogsRequests (): Promise { - const threshold = this._jobQueueConfig.getLogsClientSwitchThresholdInSecs; - if (threshold) { - const getLogsLabels = { - method: 'eth_getLogs', - provider: this._currentRpcProviderEndpoint - }; - - const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get(); - const currentProviderDuration = ethRpcRequestDurationMetrics.values.find(val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider); - if (currentProviderDuration && currentProviderDuration.value > threshold) { - log(`eth_getLogs call with current RPC endpoint took too long (${currentProviderDuration.value} sec); switching over to another one`); - this._currentRpcProviderEndpoint = await this._switchClients(); - } + await this._indexer.switchClients(); } } diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index 80d31fc7..2a42b50e 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -10,7 +10,7 @@ import assert from 'assert'; import { ethers } from 'ethers'; import JsonRpcProvider = ethers.providers.JsonRpcProvider; -import { Config } from './config'; +import { Config, UpstreamConfig } from './config'; import { IndexerInterface } from './types'; import { JobQueue } from './job-queue'; @@ -103,7 +103,7 @@ const upstreamEndpointsMetric = new client.Gauge({ // Export metrics on a server const app: Application = express(); -export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise => { +export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface): Promise => { if (!config.metrics) { log('Metrics is disabled. To enable add metrics host and port.'); return; @@ -134,11 +134,9 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind await registerWatcherConfigMetrics(config); - setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint); - await registerDBSizeMetrics(config); - await registerUpstreamChainHeadMetrics(config, endpointIndexes.rpcProviderEndpoint); + await registerUpstreamChainHeadMetrics(); await registerWatcherInfoMetrics(); @@ -159,14 +157,14 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind // ETH RPC provider used for upstream chain head metrics let ethRpcProvider: JsonRpcProvider | undefined; -export const setActiveUpstreamEndpointMetric = ({ upstream }: Config, currentEndpointIndex: number): void => { - const endpoints = upstream.ethServer.rpcProviderEndpoints; +export const setActiveUpstreamEndpointMetric = (upstreamConfig: UpstreamConfig, currentEndpointIndex: number): void => { + const endpoints = upstreamConfig.ethServer.rpcProviderEndpoints; endpoints.forEach((endpoint, index) => { upstreamEndpointsMetric.set({ provider: endpoint }, Number(index === currentEndpointIndex)); }); - ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[currentEndpointIndex]); + ethRpcProvider = new JsonRpcProvider(upstreamConfig.ethServer.rpcProviderEndpoints[currentEndpointIndex]); }; const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise => { @@ -204,9 +202,7 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise => { - ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[rpcProviderEndpointIndex]); - +const registerUpstreamChainHeadMetrics = async (): Promise => { // eslint-disable-next-line no-new new client.Gauge({ name: 'latest_upstream_block_number', diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 78888491..47943366 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -3,7 +3,7 @@ // import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm'; -import { Transaction, providers } from 'ethers'; +import { Transaction } from 'ethers'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -236,7 +236,8 @@ export interface IndexerInterface { clearProcessedBlockData (block: BlockProgressInterface): Promise getResultEvent (event: EventInterface): any getFullTransactions (txHashList: string[]): Promise - switchClients (clients: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void + isGetLogsRequestsSlow(): Promise + switchClients(): Promise } export interface DatabaseInterface {