Implement switching endpoints after slow eth_getLogs RPC requests (#525)

* Switch upstream endpoint if getLogs requests are too slow

* Refactor methods for switching client to indexer

* Update codegen indexer template

* Add dummy methods in graph-node test Indexer

* Upgrade package versions to 0.2.101

---------

Co-authored-by: Prathamesh Musale <prathamesh.musale0@gmail.com>
This commit is contained in:
Nabarun Gogoi 2024-06-20 17:57:01 +05:30 committed by GitHub
parent ff471da287
commit b9a899aec1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 162 additions and 108 deletions

View File

@ -2,7 +2,7 @@
"packages": [ "packages": [
"packages/*" "packages/*"
], ],
"version": "0.2.100", "version": "0.2.101",
"npmClient": "yarn", "npmClient": "yarn",
"useWorkspaces": true, "useWorkspaces": true,
"command": { "command": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/cache", "name": "@cerc-io/cache",
"version": "0.2.100", "version": "0.2.101",
"description": "Generic object cache", "description": "Generic object cache",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/cli", "name": "@cerc-io/cli",
"version": "0.2.100", "version": "0.2.101",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"scripts": { "scripts": {
@ -15,13 +15,13 @@
}, },
"dependencies": { "dependencies": {
"@apollo/client": "^3.7.1", "@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.100", "@cerc-io/cache": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.100", "@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.15", "@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.100", "@cerc-io/peer": "^0.2.101",
"@cerc-io/rpc-eth-client": "^0.2.100", "@cerc-io/rpc-eth-client": "^0.2.101",
"@cerc-io/util": "^0.2.100", "@cerc-io/util": "^0.2.101",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1", "@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0", "@ipld/dag-cbor": "^8.0.0",

View File

@ -96,7 +96,7 @@ export class BaseCmd {
this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await this._jobQueue.start(); await this._jobQueue.start();
const { ethClient, ethProvider } = await initClients(this._config); const { ethClient, ethProvider } = await initClients(this._config.upstream);
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._clients = { ethClient, ...clients }; this._clients = { ethClient, ...clients };
} }

View File

@ -7,8 +7,6 @@ import { hideBin } from 'yargs/helpers';
import 'reflect-metadata'; import 'reflect-metadata';
import assert from 'assert'; import assert from 'assert';
import { ConnectionOptions } from 'typeorm'; import { ConnectionOptions } from 'typeorm';
import { errors } from 'ethers';
import debug from 'debug';
import { JsonRpcProvider } from '@ethersproject/providers'; import { JsonRpcProvider } from '@ethersproject/providers';
import { import {
@ -22,15 +20,10 @@ import {
GraphWatcherInterface, GraphWatcherInterface,
startMetricsServer, startMetricsServer,
Config, Config,
UpstreamConfig, UpstreamConfig
NEW_BLOCK_MAX_RETRIES_ERROR,
setActiveUpstreamEndpointMetric
} from '@cerc-io/util'; } from '@cerc-io/util';
import { BaseCmd } from './base'; import { BaseCmd } from './base';
import { initClients } from './utils/index';
const log = debug('vulcanize:job-runner');
interface Arguments { interface Arguments {
configFile: string; configFile: string;
@ -40,10 +33,6 @@ export class JobRunnerCmd {
_argv?: Arguments; _argv?: Arguments;
_baseCmd: BaseCmd; _baseCmd: BaseCmd;
_currentEndpointIndex = {
rpcProviderEndpoint: 0
};
constructor () { constructor () {
this._baseCmd = new BaseCmd(); this._baseCmd = new BaseCmd();
} }
@ -124,26 +113,8 @@ export class JobRunnerCmd {
const jobRunner = new JobRunner( const jobRunner = new JobRunner(
config.jobQueue, config.jobQueue,
indexer, indexer,
jobQueue, jobQueue
async (error: any) => { );
// Check if it is a server error or timeout from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
++this._currentEndpointIndex.rpcProviderEndpoint;
if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) {
this._currentEndpointIndex.rpcProviderEndpoint = 0;
}
const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex);
indexer.switchClients({ ethClient, ethProvider });
setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint);
log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`);
}
});
// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
await jobRunner.jobQueue.deleteAllJobs('completed'); await jobRunner.jobQueue.deleteAllJobs('completed');
@ -154,7 +125,7 @@ export class JobRunnerCmd {
await startJobRunner(jobRunner); await startJobRunner(jobRunner);
jobRunner.handleShutdown(); jobRunner.handleShutdown();
await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex); await startMetricsServer(config, jobQueue, indexer);
} }
_getArgv (): any { _getArgv (): any {

View File

@ -9,7 +9,7 @@ import { providers } from 'ethers';
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183 // @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
import { PeerIdObj } from '@cerc-io/peer'; import { PeerIdObj } from '@cerc-io/peer';
import { Config, EthClient, getCustomProvider } from '@cerc-io/util'; import { EthClient, UpstreamConfig, getCustomProvider } from '@cerc-io/util';
import { getCache } from '@cerc-io/cache'; import { getCache } from '@cerc-io/cache';
import { EthClient as GqlEthClient } from '@cerc-io/ipld-eth-client'; import { EthClient as GqlEthClient } from '@cerc-io/ipld-eth-client';
import { EthClient as RpcEthClient } from '@cerc-io/rpc-eth-client'; import { EthClient as RpcEthClient } from '@cerc-io/rpc-eth-client';
@ -22,16 +22,10 @@ export function readPeerId (filePath: string): PeerIdObj {
return JSON.parse(peerIdJson); return JSON.parse(peerIdJson);
} }
export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{ export const initClients = async (upstreamConfig: UpstreamConfig, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{
ethClient: EthClient, ethClient: EthClient,
ethProvider: providers.JsonRpcProvider ethProvider: providers.JsonRpcProvider
}> => { }> => {
const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config;
assert(serverConfig, 'Missing server config');
assert(dbConfig, 'Missing database config');
assert(upstreamConfig, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig; const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig;
assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints'); assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints');

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/codegen", "name": "@cerc-io/codegen",
"version": "0.2.100", "version": "0.2.101",
"description": "Code generator", "description": "Code generator",
"private": true, "private": true,
"main": "index.js", "main": "index.js",
@ -20,7 +20,7 @@
}, },
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@cerc-io/util": "^0.2.100", "@cerc-io/util": "^0.2.101",
"@graphql-tools/load-files": "^6.5.2", "@graphql-tools/load-files": "^6.5.2",
"@npmcli/package-json": "^5.0.0", "@npmcli/package-json": "^5.0.0",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",

View File

@ -83,6 +83,10 @@
# Boolean flag to filter event logs by topics # Boolean flag to filter event logs by topics
filterLogsByTopics = true filterLogsByTopics = true
# Switch clients if eth_getLogs call takes more than threshold (in secs)
# Set to 0 for disabling switching
getLogsClientSwitchThresholdInSecs = 0
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
enabled = false enabled = false

View File

@ -8,13 +8,12 @@ import debug from 'debug';
{{#if queries}} {{#if queries}}
import JSONbig from 'json-bigint'; import JSONbig from 'json-bigint';
{{/if}} {{/if}}
import { ethers, constants } from 'ethers'; import { ethers, constants, providers } from 'ethers';
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
import { GraphQLResolveInfo } from 'graphql'; import { GraphQLResolveInfo } from 'graphql';
{{/if}} {{/if}}
import { JsonFragment } from '@ethersproject/abi'; import { JsonFragment } from '@ethersproject/abi';
import { BaseProvider } from '@ethersproject/providers';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { import {
Indexer as BaseIndexer, Indexer as BaseIndexer,
@ -49,6 +48,7 @@ import {
EthFullTransaction, EthFullTransaction,
ExtraEventData ExtraEventData
} from '@cerc-io/util'; } from '@cerc-io/util';
import { initClients } from '@cerc-io/cli';
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
import { GraphWatcher } from '@cerc-io/graph-node'; import { GraphWatcher } from '@cerc-io/graph-node';
{{/if}} {{/if}}
@ -91,7 +91,7 @@ const {{capitalize event}}_EVENT = '{{event}}';
export class Indexer implements IndexerInterface { export class Indexer implements IndexerInterface {
_db: Database; _db: Database;
_ethClient: EthClient; _ethClient: EthClient;
_ethProvider: BaseProvider; _ethProvider: providers.JsonRpcProvider;
_baseIndexer: BaseIndexer; _baseIndexer: BaseIndexer;
_serverConfig: ServerConfig; _serverConfig: ServerConfig;
_upstreamConfig: UpstreamConfig; _upstreamConfig: UpstreamConfig;
@ -118,7 +118,7 @@ export class Indexer implements IndexerInterface {
}, },
db: DatabaseInterface, db: DatabaseInterface,
clients: Clients, clients: Clients,
ethProvider: BaseProvider, ethProvider: providers.JsonRpcProvider,
jobQueue: JobQueue{{#if (subgraphPath)}},{{/if}} jobQueue: JobQueue{{#if (subgraphPath)}},{{/if}}
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
graphWatcher?: GraphWatcherInterface graphWatcher?: GraphWatcherInterface
@ -199,15 +199,19 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.fetchStateStatus(); await this._baseIndexer.fetchStateStatus();
} }
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: BaseProvider }): void { async switchClients (): Promise<void> {
const { ethClient, ethProvider } = await this._baseIndexer.switchClients(initClients);
this._ethClient = ethClient; this._ethClient = ethClient;
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._baseIndexer.switchClients({ ethClient, ethProvider });
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
this._graphWatcher.switchClients({ ethClient, ethProvider }); this._graphWatcher.switchClients({ ethClient, ethProvider });
{{/if}} {{/if}}
} }
async isGetLogsRequestsSlow (): Promise<boolean> {
return this._baseIndexer.isGetLogsRequestsSlow();
}
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> { async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
return this._baseIndexer.getMetaData(block); return this._baseIndexer.getMetaData(block);

View File

@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.100", "@cerc-io/cli": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.100", "@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/solidity-mapper": "^0.2.100", "@cerc-io/solidity-mapper": "^0.2.101",
"@cerc-io/util": "^0.2.100", "@cerc-io/util": "^0.2.101",
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.100", "@cerc-io/graph-node": "^0.2.101",
{{/if}} {{/if}}
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"debug": "^4.3.1", "debug": "^4.3.1",

View File

@ -1,10 +1,10 @@
{ {
"name": "@cerc-io/graph-node", "name": "@cerc-io/graph-node",
"version": "0.2.100", "version": "0.2.101",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"devDependencies": { "devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.100", "@cerc-io/solidity-mapper": "^0.2.101",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0", "@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2", "@nomiclabs/hardhat-ethers": "^2.0.2",
@ -51,9 +51,9 @@
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.100", "@cerc-io/cache": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.100", "@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/util": "^0.2.100", "@cerc-io/util": "^0.2.101",
"@types/json-diff": "^0.5.2", "@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0", "@types/yargs": "^17.0.0",
"bn.js": "^4.11.9", "bn.js": "^4.11.9",

View File

@ -340,7 +340,11 @@ export class Indexer implements IndexerInterface {
return []; return [];
} }
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void { async switchClients (): Promise<void> {
return undefined; return undefined;
} }
async isGetLogsRequestsSlow (): Promise<boolean> {
return false;
}
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/ipld-eth-client", "name": "@cerc-io/ipld-eth-client",
"version": "0.2.100", "version": "0.2.101",
"description": "IPLD ETH Client", "description": "IPLD ETH Client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {
@ -20,8 +20,8 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.7.1", "@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.100", "@cerc-io/cache": "^0.2.101",
"@cerc-io/util": "^0.2.100", "@cerc-io/util": "^0.2.101",
"cross-fetch": "^3.1.4", "cross-fetch": "^3.1.4",
"debug": "^4.3.1", "debug": "^4.3.1",
"ethers": "^5.4.4", "ethers": "^5.4.4",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/peer", "name": "@cerc-io/peer",
"version": "0.2.100", "version": "0.2.101",
"description": "libp2p module", "description": "libp2p module",
"main": "dist/index.js", "main": "dist/index.js",
"exports": "./dist/index.js", "exports": "./dist/index.js",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/rpc-eth-client", "name": "@cerc-io/rpc-eth-client",
"version": "0.2.100", "version": "0.2.101",
"description": "RPC ETH Client", "description": "RPC ETH Client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {
@ -19,9 +19,9 @@
}, },
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@cerc-io/cache": "^0.2.100", "@cerc-io/cache": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.100", "@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/util": "^0.2.100", "@cerc-io/util": "^0.2.101",
"chai": "^4.3.4", "chai": "^4.3.4",
"ethers": "^5.4.4", "ethers": "^5.4.4",
"left-pad": "^1.3.0", "left-pad": "^1.3.0",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/solidity-mapper", "name": "@cerc-io/solidity-mapper",
"version": "0.2.100", "version": "0.2.101",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"devDependencies": { "devDependencies": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/test", "name": "@cerc-io/test",
"version": "0.2.100", "version": "0.2.101",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"private": true, "private": true,

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/tracing-client", "name": "@cerc-io/tracing-client",
"version": "0.2.100", "version": "0.2.101",
"description": "ETH VM tracing client", "description": "ETH VM tracing client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {

View File

@ -1,13 +1,13 @@
{ {
"name": "@cerc-io/util", "name": "@cerc-io/util",
"version": "0.2.100", "version": "0.2.101",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"dependencies": { "dependencies": {
"@apollo/utils.keyvaluecache": "^1.0.1", "@apollo/utils.keyvaluecache": "^1.0.1",
"@cerc-io/nitro-node": "^0.1.15", "@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.100", "@cerc-io/peer": "^0.2.101",
"@cerc-io/solidity-mapper": "^0.2.100", "@cerc-io/solidity-mapper": "^0.2.101",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"@ethersproject/properties": "^5.7.0", "@ethersproject/properties": "^5.7.0",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
@ -54,7 +54,7 @@
"yargs": "^17.0.1" "yargs": "^17.0.1"
}, },
"devDependencies": { "devDependencies": {
"@cerc-io/cache": "^0.2.100", "@cerc-io/cache": "^0.2.101",
"@nomiclabs/hardhat-waffle": "^2.0.1", "@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/bunyan": "^1.8.8", "@types/bunyan": "^1.8.8",
"@types/express": "^4.17.14", "@types/express": "^4.17.14",

View File

@ -289,6 +289,9 @@ export interface UpstreamConfig {
// Boolean flag to filter event logs by topics // Boolean flag to filter event logs by topics
filterLogsByTopics: boolean; filterLogsByTopics: boolean;
// Switch clients if eth_getLogs call takes more than threshold (in secs)
getLogsClientSwitchThresholdInSecs?: number;
payments: EthServerPaymentsConfig; payments: EthServerPaymentsConfig;
} }
traceProviderEndpoint: string; traceProviderEndpoint: string;

View File

@ -33,6 +33,7 @@ import { JobQueue } from './job-queue';
import { Where, QueryOptions, BlockHeight } from './database'; import { Where, QueryOptions, BlockHeight } from './database';
import { ServerConfig, UpstreamConfig } from './config'; import { ServerConfig, UpstreamConfig } from './config';
import { createOrUpdateStateData, StateDataMeta } from './state-helper'; import { createOrUpdateStateData, StateDataMeta } from './state-helper';
import { ethRpcRequestDuration, setActiveUpstreamEndpointMetric } from './metrics';
const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000;
@ -113,12 +114,16 @@ export class Indexer {
_db: DatabaseInterface; _db: DatabaseInterface;
_ethClient: EthClient; _ethClient: EthClient;
_getStorageAt: GetStorageAt; _getStorageAt: GetStorageAt;
_ethProvider: ethers.providers.BaseProvider; _ethProvider: ethers.providers.JsonRpcProvider;
_jobQueue: JobQueue; _jobQueue: JobQueue;
_watchedContracts: { [key: string]: ContractInterface } = {}; _watchedContracts: { [key: string]: ContractInterface } = {};
_stateStatusMap: { [key: string]: StateStatus } = {}; _stateStatusMap: { [key: string]: StateStatus } = {};
_currentEndpointIndex = {
rpcProviderEndpoint: 0
};
constructor ( constructor (
config: { config: {
server: ServerConfig; server: ServerConfig;
@ -126,7 +131,7 @@ export class Indexer {
}, },
db: DatabaseInterface, db: DatabaseInterface,
ethClient: EthClient, ethClient: EthClient,
ethProvider: ethers.providers.BaseProvider, ethProvider: ethers.providers.JsonRpcProvider,
jobQueue: JobQueue jobQueue: JobQueue
) { ) {
this._serverConfig = config.server; this._serverConfig = config.server;
@ -136,11 +141,61 @@ export class Indexer {
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
setActiveUpstreamEndpointMetric(
this._upstreamConfig,
this._currentEndpointIndex.rpcProviderEndpoint
);
} }
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: ethers.providers.BaseProvider }): void { async switchClients (
initClients: (upstreamConfig: UpstreamConfig, endpointIndexes: typeof this._currentEndpointIndex) => Promise<{
ethClient: EthClient,
ethProvider: ethers.providers.JsonRpcProvider
}>
): Promise<{ ethClient: EthClient, ethProvider: ethers.providers.JsonRpcProvider }> {
const oldRpcEndpoint = this._upstreamConfig.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
++this._currentEndpointIndex.rpcProviderEndpoint;
if (this._currentEndpointIndex.rpcProviderEndpoint === this._upstreamConfig.ethServer.rpcProviderEndpoints.length) {
this._currentEndpointIndex.rpcProviderEndpoint = 0;
}
const { ethClient, ethProvider } = await initClients(this._upstreamConfig, this._currentEndpointIndex);
setActiveUpstreamEndpointMetric(
this._upstreamConfig,
this._currentEndpointIndex.rpcProviderEndpoint
);
const newRpcEndpoint = ethProvider.connection.url;
log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`);
this._ethClient = ethClient; this._ethClient = ethClient;
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
return { ethClient, ethProvider };
}
async isGetLogsRequestsSlow (): Promise<boolean> {
const threshold = this._upstreamConfig.ethServer.getLogsClientSwitchThresholdInSecs;
if (threshold) {
const getLogsLabels = {
method: 'eth_getLogs',
provider: this._ethProvider.connection.url
};
const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get();
const currentProviderDuration = ethRpcRequestDurationMetrics.values.find(
val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider
);
if (currentProviderDuration) {
return currentProviderDuration.value > threshold;
}
}
return false;
} }
async fetchContracts (): Promise<void> { async fetchContracts (): Promise<void> {

View File

@ -4,7 +4,7 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { constants, ethers } from 'ethers'; import { constants, ethers, errors as ethersErrors } from 'ethers';
import { DeepPartial, In } from 'typeorm'; import { DeepPartial, In } from 'typeorm';
import PgBoss from 'pg-boss'; import PgBoss from 'pg-boss';
@ -29,7 +29,8 @@ import {
processBatchEvents, processBatchEvents,
PrefetchedBlock, PrefetchedBlock,
fetchBlocksAtHeight, fetchBlocksAtHeight,
fetchAndSaveFilteredLogsAndBlocks fetchAndSaveFilteredLogsAndBlocks,
NEW_BLOCK_MAX_RETRIES_ERROR
} from './common'; } from './common';
import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
@ -63,19 +64,14 @@ export class JobRunner {
_signalCount = 0; _signalCount = 0;
_errorInEventsProcessing = false; _errorInEventsProcessing = false;
_jobErrorHandler: (error: Error) => Promise<void>;
constructor ( constructor (
jobQueueConfig: JobQueueConfig, jobQueueConfig: JobQueueConfig,
indexer: IndexerInterface, indexer: IndexerInterface,
jobQueue: JobQueue, jobQueue: JobQueue
// eslint-disable-next-line @typescript-eslint/no-empty-function
jobErrorHandler: (error: Error) => Promise<void> = async () => {}
) { ) {
this._indexer = indexer; this._indexer = indexer;
this.jobQueue = jobQueue; this.jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig; this._jobQueueConfig = jobQueueConfig;
this._jobErrorHandler = jobErrorHandler;
} }
async subscribeBlockProcessingQueue (): Promise<void> { async subscribeBlockProcessingQueue (): Promise<void> {
@ -187,6 +183,11 @@ export class JobRunner {
await Promise.all(indexBlockPromises); await Promise.all(indexBlockPromises);
} }
// Switch clients if getLogs requests are too slow
if (await this._indexer.isGetLogsRequestsSlow()) {
await this._indexer.switchClients();
}
break; break;
} }
@ -292,6 +293,11 @@ export class JobRunner {
this._historicalProcessingCompletedUpto = endBlock; this._historicalProcessingCompletedUpto = endBlock;
// Switch clients if getLogs requests are too slow
if (await this._indexer.isGetLogsRequestsSlow()) {
await this._indexer.switchClients();
}
if (endBlock < processingEndBlockNumber) { if (endBlock < processingEndBlockNumber) {
// If endBlock is lesser than processingEndBlockNumber push new historical job // If endBlock is lesser than processingEndBlockNumber push new historical job
await this.jobQueue.pushJob( await this.jobQueue.pushJob(
@ -789,6 +795,22 @@ export class JobRunner {
} }
} }
async _jobErrorHandler (error: any): Promise<void> {
if (
// Switch client if it is a server error from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
error.code === ethersErrors.SERVER_ERROR ||
// Switch client if it is a timeout error from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
error.code === ethersErrors.TIMEOUT ||
// Switch client if error is for max retries to get new block at head
error.message === NEW_BLOCK_MAX_RETRIES_ERROR
) {
log('RPC endpoint is not working; failing over to another one');
await this._indexer.switchClients();
}
}
_updateWatchedContracts (job: any): void { _updateWatchedContracts (job: any): void {
const { data: { contract } } = job; const { data: { contract } } = job;
this._indexer.cacheContract(contract); this._indexer.cacheContract(contract);

View File

@ -10,7 +10,7 @@ import assert from 'assert';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import JsonRpcProvider = ethers.providers.JsonRpcProvider; import JsonRpcProvider = ethers.providers.JsonRpcProvider;
import { Config } from './config'; import { Config, UpstreamConfig } from './config';
import { IndexerInterface } from './types'; import { IndexerInterface } from './types';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
@ -103,7 +103,7 @@ const upstreamEndpointsMetric = new client.Gauge({
// Export metrics on a server // Export metrics on a server
const app: Application = express(); const app: Application = express();
export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => { export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface): Promise<void> => {
if (!config.metrics) { if (!config.metrics) {
log('Metrics is disabled. To enable add metrics host and port.'); log('Metrics is disabled. To enable add metrics host and port.');
return; return;
@ -134,11 +134,9 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind
await registerWatcherConfigMetrics(config); await registerWatcherConfigMetrics(config);
setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint);
await registerDBSizeMetrics(config); await registerDBSizeMetrics(config);
await registerUpstreamChainHeadMetrics(config, endpointIndexes.rpcProviderEndpoint); await registerUpstreamChainHeadMetrics();
await registerWatcherInfoMetrics(); await registerWatcherInfoMetrics();
@ -159,14 +157,14 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind
// ETH RPC provider used for upstream chain head metrics // ETH RPC provider used for upstream chain head metrics
let ethRpcProvider: JsonRpcProvider | undefined; let ethRpcProvider: JsonRpcProvider | undefined;
export const setActiveUpstreamEndpointMetric = ({ upstream }: Config, currentEndpointIndex: number): void => { export const setActiveUpstreamEndpointMetric = (upstreamConfig: UpstreamConfig, currentEndpointIndex: number): void => {
const endpoints = upstream.ethServer.rpcProviderEndpoints; const endpoints = upstreamConfig.ethServer.rpcProviderEndpoints;
endpoints.forEach((endpoint, index) => { endpoints.forEach((endpoint, index) => {
upstreamEndpointsMetric.set({ provider: endpoint }, Number(index === currentEndpointIndex)); upstreamEndpointsMetric.set({ provider: endpoint }, Number(index === currentEndpointIndex));
}); });
ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[currentEndpointIndex]); ethRpcProvider = new JsonRpcProvider(upstreamConfig.ethServer.rpcProviderEndpoints[currentEndpointIndex]);
}; };
const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<void> => { const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<void> => {
@ -204,9 +202,7 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<vo
}); });
}; };
const registerUpstreamChainHeadMetrics = async ({ upstream }: Config, rpcProviderEndpointIndex: number): Promise<void> => { const registerUpstreamChainHeadMetrics = async (): Promise<void> => {
ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[rpcProviderEndpointIndex]);
// eslint-disable-next-line no-new // eslint-disable-next-line no-new
new client.Gauge({ new client.Gauge({
name: 'latest_upstream_block_number', name: 'latest_upstream_block_number',

View File

@ -3,7 +3,7 @@
// //
import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm'; import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm';
import { Transaction, providers } from 'ethers'; import { Transaction } from 'ethers';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
@ -236,7 +236,8 @@ export interface IndexerInterface {
clearProcessedBlockData (block: BlockProgressInterface): Promise<void> clearProcessedBlockData (block: BlockProgressInterface): Promise<void>
getResultEvent (event: EventInterface): any getResultEvent (event: EventInterface): any
getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]>
switchClients (clients: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void isGetLogsRequestsSlow(): Promise<boolean>
switchClients(): Promise<void>
} }
export interface DatabaseInterface { export interface DatabaseInterface {