From b9a899aec1a273fda54ff8126bdeafa8d2bfcc55 Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Thu, 20 Jun 2024 17:57:01 +0530 Subject: [PATCH] Implement switching endpoints after slow `eth_getLogs` RPC requests (#525) * Switch upstream endpoint if getLogs requests are too slow * Refactor methods for switching client to indexer * Update codegen indexer template * Add dummy methods in graph-node test Indexer * Upgrade package versions to 0.2.101 --------- Co-authored-by: Prathamesh Musale --- lerna.json | 2 +- packages/cache/package.json | 2 +- packages/cli/package.json | 12 ++-- packages/cli/src/base.ts | 2 +- packages/cli/src/job-runner.ts | 37 ++--------- packages/cli/src/utils/index.ts | 10 +-- packages/codegen/package.json | 4 +- .../src/templates/config-template.handlebars | 4 ++ .../src/templates/indexer-template.handlebars | 16 +++-- .../src/templates/package-template.handlebars | 10 +-- packages/graph-node/package.json | 10 +-- packages/graph-node/test/utils/indexer.ts | 6 +- packages/ipld-eth-client/package.json | 6 +- packages/peer/package.json | 2 +- packages/rpc-eth-client/package.json | 8 +-- packages/solidity-mapper/package.json | 2 +- packages/test/package.json | 2 +- packages/tracing-client/package.json | 2 +- packages/util/package.json | 8 +-- packages/util/src/config.ts | 3 + packages/util/src/indexer.ts | 61 ++++++++++++++++++- packages/util/src/job-runner.ts | 38 +++++++++--- packages/util/src/metrics.ts | 18 +++--- packages/util/src/types.ts | 5 +- 24 files changed, 162 insertions(+), 108 deletions(-) diff --git a/lerna.json b/lerna.json index 2a001136..fdeb12bf 100644 --- a/lerna.json +++ b/lerna.json @@ -2,7 +2,7 @@ "packages": [ "packages/*" ], - "version": "0.2.100", + "version": "0.2.101", "npmClient": "yarn", "useWorkspaces": true, "command": { diff --git a/packages/cache/package.json b/packages/cache/package.json index 0661d8db..1757a6f5 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cache", - "version": "0.2.100", + "version": "0.2.101", "description": "Generic object cache", "main": "dist/index.js", "scripts": { diff --git a/packages/cli/package.json b/packages/cli/package.json index 43edb6f1..e805dc57 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cli", - "version": "0.2.100", + "version": "0.2.101", "main": "dist/index.js", "license": "AGPL-3.0", "scripts": { @@ -15,13 +15,13 @@ }, "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.100", - "@cerc-io/ipld-eth-client": "^0.2.100", + "@cerc-io/cache": "^0.2.101", + "@cerc-io/ipld-eth-client": "^0.2.101", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.100", - "@cerc-io/rpc-eth-client": "^0.2.100", - "@cerc-io/util": "^0.2.100", + "@cerc-io/peer": "^0.2.101", + "@cerc-io/rpc-eth-client": "^0.2.101", + "@cerc-io/util": "^0.2.101", "@ethersproject/providers": "^5.4.4", "@graphql-tools/utils": "^9.1.1", "@ipld/dag-cbor": "^8.0.0", diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 290786ce..85eb1f5b 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -96,7 +96,7 @@ export class BaseCmd { this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await this._jobQueue.start(); - const { ethClient, ethProvider } = await initClients(this._config); + const { ethClient, ethProvider } = await initClients(this._config.upstream); this._ethProvider = ethProvider; this._clients = { ethClient, ...clients }; } diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index c9f2d720..1e18f243 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -7,8 +7,6 @@ import { hideBin } from 'yargs/helpers'; import 'reflect-metadata'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; -import { errors } from 'ethers'; -import debug from 'debug'; import { JsonRpcProvider } from '@ethersproject/providers'; import { @@ -22,15 +20,10 @@ import { GraphWatcherInterface, startMetricsServer, Config, - UpstreamConfig, - NEW_BLOCK_MAX_RETRIES_ERROR, - setActiveUpstreamEndpointMetric + UpstreamConfig } from '@cerc-io/util'; import { BaseCmd } from './base'; -import { initClients } from './utils/index'; - -const log = debug('vulcanize:job-runner'); interface Arguments { configFile: string; @@ -40,10 +33,6 @@ export class JobRunnerCmd { _argv?: Arguments; _baseCmd: BaseCmd; - _currentEndpointIndex = { - rpcProviderEndpoint: 0 - }; - constructor () { this._baseCmd = new BaseCmd(); } @@ -124,26 +113,8 @@ export class JobRunnerCmd { const jobRunner = new JobRunner( config.jobQueue, indexer, - jobQueue, - async (error: any) => { - // Check if it is a server error or timeout from ethers.js - // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error - // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout - if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) { - const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; - ++this._currentEndpointIndex.rpcProviderEndpoint; - - if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) { - this._currentEndpointIndex.rpcProviderEndpoint = 0; - } - - const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex); - indexer.switchClients({ ethClient, ethProvider }); - setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint); - - log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`); - } - }); + jobQueue + ); // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs await jobRunner.jobQueue.deleteAllJobs('completed'); @@ -154,7 +125,7 @@ export class JobRunnerCmd { await startJobRunner(jobRunner); jobRunner.handleShutdown(); - await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex); + await startMetricsServer(config, jobQueue, indexer); } _getArgv (): any { diff --git a/packages/cli/src/utils/index.ts b/packages/cli/src/utils/index.ts index d2b25ea0..09d52378 100644 --- a/packages/cli/src/utils/index.ts +++ b/packages/cli/src/utils/index.ts @@ -9,7 +9,7 @@ import { providers } from 'ethers'; // @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183 import { PeerIdObj } from '@cerc-io/peer'; -import { Config, EthClient, getCustomProvider } from '@cerc-io/util'; +import { EthClient, UpstreamConfig, getCustomProvider } from '@cerc-io/util'; import { getCache } from '@cerc-io/cache'; import { EthClient as GqlEthClient } from '@cerc-io/ipld-eth-client'; import { EthClient as RpcEthClient } from '@cerc-io/rpc-eth-client'; @@ -22,16 +22,10 @@ export function readPeerId (filePath: string): PeerIdObj { return JSON.parse(peerIdJson); } -export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{ +export const initClients = async (upstreamConfig: UpstreamConfig, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{ ethClient: EthClient, ethProvider: providers.JsonRpcProvider }> => { - const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config; - - assert(serverConfig, 'Missing server config'); - assert(dbConfig, 'Missing database config'); - assert(upstreamConfig, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig; assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints'); diff --git a/packages/codegen/package.json b/packages/codegen/package.json index 71657a07..a117cb37 100644 --- a/packages/codegen/package.json +++ b/packages/codegen/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/codegen", - "version": "0.2.100", + "version": "0.2.101", "description": "Code generator", "private": true, "main": "index.js", @@ -20,7 +20,7 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/util": "^0.2.100", + "@cerc-io/util": "^0.2.101", "@graphql-tools/load-files": "^6.5.2", "@npmcli/package-json": "^5.0.0", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 1f69980c..50f513f7 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -83,6 +83,10 @@ # Boolean flag to filter event logs by topics filterLogsByTopics = true + # Switch clients if eth_getLogs call takes more than threshold (in secs) + # Set to 0 for disabling switching + getLogsClientSwitchThresholdInSecs = 0 + [upstream.cache] name = "requests" enabled = false diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 47c5cb35..0036e331 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -8,13 +8,12 @@ import debug from 'debug'; {{#if queries}} import JSONbig from 'json-bigint'; {{/if}} -import { ethers, constants } from 'ethers'; +import { ethers, constants, providers } from 'ethers'; {{#if (subgraphPath)}} import { GraphQLResolveInfo } from 'graphql'; {{/if}} import { JsonFragment } from '@ethersproject/abi'; -import { BaseProvider } from '@ethersproject/providers'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { Indexer as BaseIndexer, @@ -49,6 +48,7 @@ import { EthFullTransaction, ExtraEventData } from '@cerc-io/util'; +import { initClients } from '@cerc-io/cli'; {{#if (subgraphPath)}} import { GraphWatcher } from '@cerc-io/graph-node'; {{/if}} @@ -91,7 +91,7 @@ const {{capitalize event}}_EVENT = '{{event}}'; export class Indexer implements IndexerInterface { _db: Database; _ethClient: EthClient; - _ethProvider: BaseProvider; + _ethProvider: providers.JsonRpcProvider; _baseIndexer: BaseIndexer; _serverConfig: ServerConfig; _upstreamConfig: UpstreamConfig; @@ -118,7 +118,7 @@ export class Indexer implements IndexerInterface { }, db: DatabaseInterface, clients: Clients, - ethProvider: BaseProvider, + ethProvider: providers.JsonRpcProvider, jobQueue: JobQueue{{#if (subgraphPath)}},{{/if}} {{#if (subgraphPath)}} graphWatcher?: GraphWatcherInterface @@ -199,15 +199,19 @@ export class Indexer implements IndexerInterface { await this._baseIndexer.fetchStateStatus(); } - switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: BaseProvider }): void { + async switchClients (): Promise { + const { ethClient, ethProvider } = await this._baseIndexer.switchClients(initClients); this._ethClient = ethClient; this._ethProvider = ethProvider; - this._baseIndexer.switchClients({ ethClient, ethProvider }); {{#if (subgraphPath)}} this._graphWatcher.switchClients({ ethClient, ethProvider }); {{/if}} } + async isGetLogsRequestsSlow (): Promise { + return this._baseIndexer.isGetLogsRequestsSlow(); + } + {{#if (subgraphPath)}} async getMetaData (block: BlockHeight): Promise { return this._baseIndexer.getMetaData(block); diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index b47d88fe..9913179e 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -41,12 +41,12 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.3.19", - "@cerc-io/cli": "^0.2.100", - "@cerc-io/ipld-eth-client": "^0.2.100", - "@cerc-io/solidity-mapper": "^0.2.100", - "@cerc-io/util": "^0.2.100", + "@cerc-io/cli": "^0.2.101", + "@cerc-io/ipld-eth-client": "^0.2.101", + "@cerc-io/solidity-mapper": "^0.2.101", + "@cerc-io/util": "^0.2.101", {{#if (subgraphPath)}} - "@cerc-io/graph-node": "^0.2.100", + "@cerc-io/graph-node": "^0.2.101", {{/if}} "@ethersproject/providers": "^5.4.4", "debug": "^4.3.1", diff --git a/packages/graph-node/package.json b/packages/graph-node/package.json index fa6d634c..0fd237a7 100644 --- a/packages/graph-node/package.json +++ b/packages/graph-node/package.json @@ -1,10 +1,10 @@ { "name": "@cerc-io/graph-node", - "version": "0.2.100", + "version": "0.2.101", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { - "@cerc-io/solidity-mapper": "^0.2.100", + "@cerc-io/solidity-mapper": "^0.2.101", "@ethersproject/providers": "^5.4.4", "@graphprotocol/graph-ts": "^0.22.0", "@nomiclabs/hardhat-ethers": "^2.0.2", @@ -51,9 +51,9 @@ "dependencies": { "@apollo/client": "^3.3.19", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", - "@cerc-io/cache": "^0.2.100", - "@cerc-io/ipld-eth-client": "^0.2.100", - "@cerc-io/util": "^0.2.100", + "@cerc-io/cache": "^0.2.101", + "@cerc-io/ipld-eth-client": "^0.2.101", + "@cerc-io/util": "^0.2.101", "@types/json-diff": "^0.5.2", "@types/yargs": "^17.0.0", "bn.js": "^4.11.9", diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 6344a33b..e3eda96b 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -340,7 +340,11 @@ export class Indexer implements IndexerInterface { return []; } - switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void { + async switchClients (): Promise { return undefined; } + + async isGetLogsRequestsSlow (): Promise { + return false; + } } diff --git a/packages/ipld-eth-client/package.json b/packages/ipld-eth-client/package.json index 1ea1ae2e..c2ac0afc 100644 --- a/packages/ipld-eth-client/package.json +++ b/packages/ipld-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/ipld-eth-client", - "version": "0.2.100", + "version": "0.2.101", "description": "IPLD ETH Client", "main": "dist/index.js", "scripts": { @@ -20,8 +20,8 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.100", - "@cerc-io/util": "^0.2.100", + "@cerc-io/cache": "^0.2.101", + "@cerc-io/util": "^0.2.101", "cross-fetch": "^3.1.4", "debug": "^4.3.1", "ethers": "^5.4.4", diff --git a/packages/peer/package.json b/packages/peer/package.json index f45d2cc2..a9028c4e 100644 --- a/packages/peer/package.json +++ b/packages/peer/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/peer", - "version": "0.2.100", + "version": "0.2.101", "description": "libp2p module", "main": "dist/index.js", "exports": "./dist/index.js", diff --git a/packages/rpc-eth-client/package.json b/packages/rpc-eth-client/package.json index 39daf210..e478ba17 100644 --- a/packages/rpc-eth-client/package.json +++ b/packages/rpc-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/rpc-eth-client", - "version": "0.2.100", + "version": "0.2.101", "description": "RPC ETH Client", "main": "dist/index.js", "scripts": { @@ -19,9 +19,9 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/cache": "^0.2.100", - "@cerc-io/ipld-eth-client": "^0.2.100", - "@cerc-io/util": "^0.2.100", + "@cerc-io/cache": "^0.2.101", + "@cerc-io/ipld-eth-client": "^0.2.101", + "@cerc-io/util": "^0.2.101", "chai": "^4.3.4", "ethers": "^5.4.4", "left-pad": "^1.3.0", diff --git a/packages/solidity-mapper/package.json b/packages/solidity-mapper/package.json index afac51db..49231127 100644 --- a/packages/solidity-mapper/package.json +++ b/packages/solidity-mapper/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/solidity-mapper", - "version": "0.2.100", + "version": "0.2.101", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { diff --git a/packages/test/package.json b/packages/test/package.json index 3883fbe0..9c2c9837 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/test", - "version": "0.2.100", + "version": "0.2.101", "main": "dist/index.js", "license": "AGPL-3.0", "private": true, diff --git a/packages/tracing-client/package.json b/packages/tracing-client/package.json index a70f5f5d..f7ad182f 100644 --- a/packages/tracing-client/package.json +++ b/packages/tracing-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/tracing-client", - "version": "0.2.100", + "version": "0.2.101", "description": "ETH VM tracing client", "main": "dist/index.js", "scripts": { diff --git a/packages/util/package.json b/packages/util/package.json index ab13eef1..568a96a1 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -1,13 +1,13 @@ { "name": "@cerc-io/util", - "version": "0.2.100", + "version": "0.2.101", "main": "dist/index.js", "license": "AGPL-3.0", "dependencies": { "@apollo/utils.keyvaluecache": "^1.0.1", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.100", - "@cerc-io/solidity-mapper": "^0.2.100", + "@cerc-io/peer": "^0.2.101", + "@cerc-io/solidity-mapper": "^0.2.101", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@ethersproject/properties": "^5.7.0", "@ethersproject/providers": "^5.4.4", @@ -54,7 +54,7 @@ "yargs": "^17.0.1" }, "devDependencies": { - "@cerc-io/cache": "^0.2.100", + "@cerc-io/cache": "^0.2.101", "@nomiclabs/hardhat-waffle": "^2.0.1", "@types/bunyan": "^1.8.8", "@types/express": "^4.17.14", diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 5f5326cd..b36fbade 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -289,6 +289,9 @@ export interface UpstreamConfig { // Boolean flag to filter event logs by topics filterLogsByTopics: boolean; + // Switch clients if eth_getLogs call takes more than threshold (in secs) + getLogsClientSwitchThresholdInSecs?: number; + payments: EthServerPaymentsConfig; } traceProviderEndpoint: string; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 6fe500ea..fdd1f438 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -33,6 +33,7 @@ import { JobQueue } from './job-queue'; import { Where, QueryOptions, BlockHeight } from './database'; import { ServerConfig, UpstreamConfig } from './config'; import { createOrUpdateStateData, StateDataMeta } from './state-helper'; +import { ethRpcRequestDuration, setActiveUpstreamEndpointMetric } from './metrics'; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; @@ -113,12 +114,16 @@ export class Indexer { _db: DatabaseInterface; _ethClient: EthClient; _getStorageAt: GetStorageAt; - _ethProvider: ethers.providers.BaseProvider; + _ethProvider: ethers.providers.JsonRpcProvider; _jobQueue: JobQueue; _watchedContracts: { [key: string]: ContractInterface } = {}; _stateStatusMap: { [key: string]: StateStatus } = {}; + _currentEndpointIndex = { + rpcProviderEndpoint: 0 + }; + constructor ( config: { server: ServerConfig; @@ -126,7 +131,7 @@ export class Indexer { }, db: DatabaseInterface, ethClient: EthClient, - ethProvider: ethers.providers.BaseProvider, + ethProvider: ethers.providers.JsonRpcProvider, jobQueue: JobQueue ) { this._serverConfig = config.server; @@ -136,11 +141,61 @@ export class Indexer { this._ethProvider = ethProvider; this._jobQueue = jobQueue; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); + + setActiveUpstreamEndpointMetric( + this._upstreamConfig, + this._currentEndpointIndex.rpcProviderEndpoint + ); } - switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: ethers.providers.BaseProvider }): void { + async switchClients ( + initClients: (upstreamConfig: UpstreamConfig, endpointIndexes: typeof this._currentEndpointIndex) => Promise<{ + ethClient: EthClient, + ethProvider: ethers.providers.JsonRpcProvider + }> + ): Promise<{ ethClient: EthClient, ethProvider: ethers.providers.JsonRpcProvider }> { + const oldRpcEndpoint = this._upstreamConfig.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; + ++this._currentEndpointIndex.rpcProviderEndpoint; + + if (this._currentEndpointIndex.rpcProviderEndpoint === this._upstreamConfig.ethServer.rpcProviderEndpoints.length) { + this._currentEndpointIndex.rpcProviderEndpoint = 0; + } + + const { ethClient, ethProvider } = await initClients(this._upstreamConfig, this._currentEndpointIndex); + setActiveUpstreamEndpointMetric( + this._upstreamConfig, + this._currentEndpointIndex.rpcProviderEndpoint + ); + + const newRpcEndpoint = ethProvider.connection.url; + log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`); + this._ethClient = ethClient; this._ethProvider = ethProvider; + return { ethClient, ethProvider }; + } + + async isGetLogsRequestsSlow (): Promise { + const threshold = this._upstreamConfig.ethServer.getLogsClientSwitchThresholdInSecs; + + if (threshold) { + const getLogsLabels = { + method: 'eth_getLogs', + provider: this._ethProvider.connection.url + }; + + const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get(); + + const currentProviderDuration = ethRpcRequestDurationMetrics.values.find( + val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider + ); + + if (currentProviderDuration) { + return currentProviderDuration.value > threshold; + } + } + + return false; } async fetchContracts (): Promise { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index ca7f227a..f544c290 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { constants, ethers } from 'ethers'; +import { constants, ethers, errors as ethersErrors } from 'ethers'; import { DeepPartial, In } from 'typeorm'; import PgBoss from 'pg-boss'; @@ -29,7 +29,8 @@ import { processBatchEvents, PrefetchedBlock, fetchBlocksAtHeight, - fetchAndSaveFilteredLogsAndBlocks + fetchAndSaveFilteredLogsAndBlocks, + NEW_BLOCK_MAX_RETRIES_ERROR } from './common'; import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; @@ -63,19 +64,14 @@ export class JobRunner { _signalCount = 0; _errorInEventsProcessing = false; - _jobErrorHandler: (error: Error) => Promise; - constructor ( jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, - jobQueue: JobQueue, - // eslint-disable-next-line @typescript-eslint/no-empty-function - jobErrorHandler: (error: Error) => Promise = async () => {} + jobQueue: JobQueue ) { this._indexer = indexer; this.jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._jobErrorHandler = jobErrorHandler; } async subscribeBlockProcessingQueue (): Promise { @@ -187,6 +183,11 @@ export class JobRunner { await Promise.all(indexBlockPromises); } + // Switch clients if getLogs requests are too slow + if (await this._indexer.isGetLogsRequestsSlow()) { + await this._indexer.switchClients(); + } + break; } @@ -292,6 +293,11 @@ export class JobRunner { this._historicalProcessingCompletedUpto = endBlock; + // Switch clients if getLogs requests are too slow + if (await this._indexer.isGetLogsRequestsSlow()) { + await this._indexer.switchClients(); + } + if (endBlock < processingEndBlockNumber) { // If endBlock is lesser than processingEndBlockNumber push new historical job await this.jobQueue.pushJob( @@ -789,6 +795,22 @@ export class JobRunner { } } + async _jobErrorHandler (error: any): Promise { + if ( + // Switch client if it is a server error from ethers.js + // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error + error.code === ethersErrors.SERVER_ERROR || + // Switch client if it is a timeout error from ethers.js + // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout + error.code === ethersErrors.TIMEOUT || + // Switch client if error is for max retries to get new block at head + error.message === NEW_BLOCK_MAX_RETRIES_ERROR + ) { + log('RPC endpoint is not working; failing over to another one'); + await this._indexer.switchClients(); + } + } + _updateWatchedContracts (job: any): void { const { data: { contract } } = job; this._indexer.cacheContract(contract); diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index 80d31fc7..2a42b50e 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -10,7 +10,7 @@ import assert from 'assert'; import { ethers } from 'ethers'; import JsonRpcProvider = ethers.providers.JsonRpcProvider; -import { Config } from './config'; +import { Config, UpstreamConfig } from './config'; import { IndexerInterface } from './types'; import { JobQueue } from './job-queue'; @@ -103,7 +103,7 @@ const upstreamEndpointsMetric = new client.Gauge({ // Export metrics on a server const app: Application = express(); -export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise => { +export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface): Promise => { if (!config.metrics) { log('Metrics is disabled. To enable add metrics host and port.'); return; @@ -134,11 +134,9 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind await registerWatcherConfigMetrics(config); - setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint); - await registerDBSizeMetrics(config); - await registerUpstreamChainHeadMetrics(config, endpointIndexes.rpcProviderEndpoint); + await registerUpstreamChainHeadMetrics(); await registerWatcherInfoMetrics(); @@ -159,14 +157,14 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind // ETH RPC provider used for upstream chain head metrics let ethRpcProvider: JsonRpcProvider | undefined; -export const setActiveUpstreamEndpointMetric = ({ upstream }: Config, currentEndpointIndex: number): void => { - const endpoints = upstream.ethServer.rpcProviderEndpoints; +export const setActiveUpstreamEndpointMetric = (upstreamConfig: UpstreamConfig, currentEndpointIndex: number): void => { + const endpoints = upstreamConfig.ethServer.rpcProviderEndpoints; endpoints.forEach((endpoint, index) => { upstreamEndpointsMetric.set({ provider: endpoint }, Number(index === currentEndpointIndex)); }); - ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[currentEndpointIndex]); + ethRpcProvider = new JsonRpcProvider(upstreamConfig.ethServer.rpcProviderEndpoints[currentEndpointIndex]); }; const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise => { @@ -204,9 +202,7 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise => { - ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[rpcProviderEndpointIndex]); - +const registerUpstreamChainHeadMetrics = async (): Promise => { // eslint-disable-next-line no-new new client.Gauge({ name: 'latest_upstream_block_number', diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 78888491..47943366 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -3,7 +3,7 @@ // import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm'; -import { Transaction, providers } from 'ethers'; +import { Transaction } from 'ethers'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -236,7 +236,8 @@ export interface IndexerInterface { clearProcessedBlockData (block: BlockProgressInterface): Promise getResultEvent (event: EventInterface): any getFullTransactions (txHashList: string[]): Promise - switchClients (clients: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void + isGetLogsRequestsSlow(): Promise + switchClients(): Promise } export interface DatabaseInterface {