mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-27 02:32:07 +00:00
Refactor methods for switching client to indexer
This commit is contained in:
parent
2d845f7959
commit
13addd8241
@ -96,7 +96,7 @@ export class BaseCmd {
|
||||
this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
|
||||
await this._jobQueue.start();
|
||||
|
||||
const { ethClient, ethProvider } = await initClients(this._config);
|
||||
const { ethClient, ethProvider } = await initClients(this._config.upstream);
|
||||
this._ethProvider = ethProvider;
|
||||
this._clients = { ethClient, ...clients };
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import { hideBin } from 'yargs/helpers';
|
||||
import 'reflect-metadata';
|
||||
import assert from 'assert';
|
||||
import { ConnectionOptions } from 'typeorm';
|
||||
import debug from 'debug';
|
||||
|
||||
import { JsonRpcProvider } from '@ethersproject/providers';
|
||||
import {
|
||||
@ -21,14 +20,10 @@ import {
|
||||
GraphWatcherInterface,
|
||||
startMetricsServer,
|
||||
Config,
|
||||
UpstreamConfig,
|
||||
setActiveUpstreamEndpointMetric
|
||||
UpstreamConfig
|
||||
} from '@cerc-io/util';
|
||||
|
||||
import { BaseCmd } from './base';
|
||||
import { initClients } from './utils/index';
|
||||
|
||||
const log = debug('vulcanize:job-runner');
|
||||
|
||||
interface Arguments {
|
||||
configFile: string;
|
||||
@ -38,10 +33,6 @@ export class JobRunnerCmd {
|
||||
_argv?: Arguments;
|
||||
_baseCmd: BaseCmd;
|
||||
|
||||
_currentEndpointIndex = {
|
||||
rpcProviderEndpoint: 0
|
||||
};
|
||||
|
||||
constructor () {
|
||||
this._baseCmd = new BaseCmd();
|
||||
}
|
||||
@ -122,25 +113,8 @@ export class JobRunnerCmd {
|
||||
const jobRunner = new JobRunner(
|
||||
config.jobQueue,
|
||||
indexer,
|
||||
jobQueue,
|
||||
config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint],
|
||||
async (): Promise<string> => {
|
||||
const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
|
||||
++this._currentEndpointIndex.rpcProviderEndpoint;
|
||||
|
||||
if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) {
|
||||
this._currentEndpointIndex.rpcProviderEndpoint = 0;
|
||||
}
|
||||
|
||||
const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex);
|
||||
indexer.switchClients({ ethClient, ethProvider });
|
||||
setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint);
|
||||
|
||||
const newRpcEndpoint = ethProvider.connection.url;
|
||||
log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`);
|
||||
|
||||
return newRpcEndpoint;
|
||||
});
|
||||
jobQueue
|
||||
);
|
||||
|
||||
// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
|
||||
await jobRunner.jobQueue.deleteAllJobs('completed');
|
||||
@ -151,7 +125,7 @@ export class JobRunnerCmd {
|
||||
await startJobRunner(jobRunner);
|
||||
jobRunner.handleShutdown();
|
||||
|
||||
await startMetricsServer(config, jobQueue, indexer, this._currentEndpointIndex);
|
||||
await startMetricsServer(config, jobQueue, indexer);
|
||||
}
|
||||
|
||||
_getArgv (): any {
|
||||
|
@ -9,7 +9,7 @@ import { providers } from 'ethers';
|
||||
|
||||
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
|
||||
import { PeerIdObj } from '@cerc-io/peer';
|
||||
import { Config, EthClient, getCustomProvider } from '@cerc-io/util';
|
||||
import { EthClient, UpstreamConfig, getCustomProvider } from '@cerc-io/util';
|
||||
import { getCache } from '@cerc-io/cache';
|
||||
import { EthClient as GqlEthClient } from '@cerc-io/ipld-eth-client';
|
||||
import { EthClient as RpcEthClient } from '@cerc-io/rpc-eth-client';
|
||||
@ -22,16 +22,10 @@ export function readPeerId (filePath: string): PeerIdObj {
|
||||
return JSON.parse(peerIdJson);
|
||||
}
|
||||
|
||||
export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{
|
||||
export const initClients = async (upstreamConfig: UpstreamConfig, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{
|
||||
ethClient: EthClient,
|
||||
ethProvider: providers.JsonRpcProvider
|
||||
}> => {
|
||||
const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config;
|
||||
|
||||
assert(serverConfig, 'Missing server config');
|
||||
assert(dbConfig, 'Missing database config');
|
||||
assert(upstreamConfig, 'Missing upstream config');
|
||||
|
||||
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig;
|
||||
|
||||
assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints');
|
||||
|
@ -83,6 +83,10 @@
|
||||
# Boolean flag to filter event logs by topics
|
||||
filterLogsByTopics = true
|
||||
|
||||
# Switch clients if eth_getLogs call takes more than threshold (in secs)
|
||||
# Set to 0 for disabling switching
|
||||
getLogsClientSwitchThresholdInSecs = 0
|
||||
|
||||
[upstream.cache]
|
||||
name = "requests"
|
||||
enabled = false
|
||||
@ -96,10 +100,6 @@
|
||||
subgraphEventsOrder = true
|
||||
blockDelayInMilliSecs = 2000
|
||||
|
||||
// Switch clients if eth_getLogs call takes more than threshold (in secs)
|
||||
// (set to 0 to disable switching)
|
||||
getLogsClientSwitchThresholdInSecs = 0
|
||||
|
||||
# Number of blocks by which block processing lags behind head
|
||||
blockProcessingOffset = 0
|
||||
|
||||
|
@ -23,9 +23,6 @@ export interface JobQueueConfig {
|
||||
subgraphEventsOrder: boolean;
|
||||
blockDelayInMilliSecs: number;
|
||||
|
||||
// Switch clients if eth_getLogs call takes more than threshold (in secs)
|
||||
getLogsClientSwitchThresholdInSecs?: number;
|
||||
|
||||
// Number of blocks by which block processing lags behind head (default: 0)
|
||||
blockProcessingOffset?: number;
|
||||
|
||||
@ -292,6 +289,9 @@ export interface UpstreamConfig {
|
||||
// Boolean flag to filter event logs by topics
|
||||
filterLogsByTopics: boolean;
|
||||
|
||||
// Switch clients if eth_getLogs call takes more than threshold (in secs)
|
||||
getLogsClientSwitchThresholdInSecs?: number;
|
||||
|
||||
payments: EthServerPaymentsConfig;
|
||||
}
|
||||
traceProviderEndpoint: string;
|
||||
|
@ -33,6 +33,7 @@ import { JobQueue } from './job-queue';
|
||||
import { Where, QueryOptions, BlockHeight } from './database';
|
||||
import { ServerConfig, UpstreamConfig } from './config';
|
||||
import { createOrUpdateStateData, StateDataMeta } from './state-helper';
|
||||
import { ethRpcRequestDuration, setActiveUpstreamEndpointMetric } from './metrics';
|
||||
|
||||
const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000;
|
||||
|
||||
@ -113,12 +114,16 @@ export class Indexer {
|
||||
_db: DatabaseInterface;
|
||||
_ethClient: EthClient;
|
||||
_getStorageAt: GetStorageAt;
|
||||
_ethProvider: ethers.providers.BaseProvider;
|
||||
_ethProvider: ethers.providers.JsonRpcProvider;
|
||||
_jobQueue: JobQueue;
|
||||
|
||||
_watchedContracts: { [key: string]: ContractInterface } = {};
|
||||
_stateStatusMap: { [key: string]: StateStatus } = {};
|
||||
|
||||
_currentEndpointIndex = {
|
||||
rpcProviderEndpoint: 0
|
||||
};
|
||||
|
||||
constructor (
|
||||
config: {
|
||||
server: ServerConfig;
|
||||
@ -126,7 +131,7 @@ export class Indexer {
|
||||
},
|
||||
db: DatabaseInterface,
|
||||
ethClient: EthClient,
|
||||
ethProvider: ethers.providers.BaseProvider,
|
||||
ethProvider: ethers.providers.JsonRpcProvider,
|
||||
jobQueue: JobQueue
|
||||
) {
|
||||
this._serverConfig = config.server;
|
||||
@ -136,11 +141,61 @@ export class Indexer {
|
||||
this._ethProvider = ethProvider;
|
||||
this._jobQueue = jobQueue;
|
||||
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
|
||||
|
||||
setActiveUpstreamEndpointMetric(
|
||||
this._upstreamConfig,
|
||||
this._currentEndpointIndex.rpcProviderEndpoint
|
||||
);
|
||||
}
|
||||
|
||||
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: ethers.providers.BaseProvider }): void {
|
||||
async switchClients (
|
||||
initClients: (upstreamConfig: UpstreamConfig, endpointIndexes: typeof this._currentEndpointIndex) => Promise<{
|
||||
ethClient: EthClient,
|
||||
ethProvider: ethers.providers.JsonRpcProvider
|
||||
}>
|
||||
): Promise<{ ethClient: EthClient, ethProvider: ethers.providers.JsonRpcProvider }> {
|
||||
const oldRpcEndpoint = this._upstreamConfig.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
|
||||
++this._currentEndpointIndex.rpcProviderEndpoint;
|
||||
|
||||
if (this._currentEndpointIndex.rpcProviderEndpoint === this._upstreamConfig.ethServer.rpcProviderEndpoints.length) {
|
||||
this._currentEndpointIndex.rpcProviderEndpoint = 0;
|
||||
}
|
||||
|
||||
const { ethClient, ethProvider } = await initClients(this._upstreamConfig, this._currentEndpointIndex);
|
||||
setActiveUpstreamEndpointMetric(
|
||||
this._upstreamConfig,
|
||||
this._currentEndpointIndex.rpcProviderEndpoint
|
||||
);
|
||||
|
||||
const newRpcEndpoint = ethProvider.connection.url;
|
||||
log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`);
|
||||
|
||||
this._ethClient = ethClient;
|
||||
this._ethProvider = ethProvider;
|
||||
return { ethClient, ethProvider };
|
||||
}
|
||||
|
||||
async isGetLogsRequestsSlow (): Promise<boolean> {
|
||||
const threshold = this._upstreamConfig.ethServer.getLogsClientSwitchThresholdInSecs;
|
||||
|
||||
if (threshold) {
|
||||
const getLogsLabels = {
|
||||
method: 'eth_getLogs',
|
||||
provider: this._ethProvider.connection.url
|
||||
};
|
||||
|
||||
const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get();
|
||||
|
||||
const currentProviderDuration = ethRpcRequestDurationMetrics.values.find(
|
||||
val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider
|
||||
);
|
||||
|
||||
if (currentProviderDuration) {
|
||||
return currentProviderDuration.value > threshold;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async fetchContracts (): Promise<void> {
|
||||
|
@ -32,7 +32,7 @@ import {
|
||||
fetchAndSaveFilteredLogsAndBlocks,
|
||||
NEW_BLOCK_MAX_RETRIES_ERROR
|
||||
} from './common';
|
||||
import { ethRpcRequestDuration, isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
|
||||
import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
|
||||
|
||||
const log = debug('vulcanize:job-runner');
|
||||
|
||||
@ -64,22 +64,14 @@ export class JobRunner {
|
||||
_signalCount = 0;
|
||||
_errorInEventsProcessing = false;
|
||||
|
||||
_currentRpcProviderEndpoint: string;
|
||||
_switchClients: () => Promise<string>;
|
||||
|
||||
constructor (
|
||||
jobQueueConfig: JobQueueConfig,
|
||||
indexer: IndexerInterface,
|
||||
jobQueue: JobQueue,
|
||||
currentRpcProviderEndpoint: string,
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
||||
switchClients: () => Promise<string> = async () => ''
|
||||
jobQueue: JobQueue
|
||||
) {
|
||||
this._indexer = indexer;
|
||||
this.jobQueue = jobQueue;
|
||||
this._jobQueueConfig = jobQueueConfig;
|
||||
this._currentRpcProviderEndpoint = currentRpcProviderEndpoint;
|
||||
this._switchClients = switchClients;
|
||||
}
|
||||
|
||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||
@ -192,7 +184,9 @@ export class JobRunner {
|
||||
}
|
||||
|
||||
// Switch clients if getLogs requests are too slow
|
||||
await this._switchClientOnSlowGetLogsRequests();
|
||||
if (await this._indexer.isGetLogsRequestsSlow()) {
|
||||
await this._indexer.switchClients();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
@ -300,7 +294,9 @@ export class JobRunner {
|
||||
this._historicalProcessingCompletedUpto = endBlock;
|
||||
|
||||
// Switch clients if getLogs requests are too slow
|
||||
await this._switchClientOnSlowGetLogsRequests();
|
||||
if (await this._indexer.isGetLogsRequestsSlow()) {
|
||||
await this._indexer.switchClients();
|
||||
}
|
||||
|
||||
if (endBlock < processingEndBlockNumber) {
|
||||
// If endBlock is lesser than processingEndBlockNumber push new historical job
|
||||
@ -800,29 +796,18 @@ export class JobRunner {
|
||||
}
|
||||
|
||||
async _jobErrorHandler (error: any): Promise<void> {
|
||||
// Check if it is a server error or timeout from ethers.js
|
||||
if (
|
||||
// Switch client if it is a server error from ethers.js
|
||||
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
|
||||
error.code === ethersErrors.SERVER_ERROR ||
|
||||
// Switch client if it is a timeout error from ethers.js
|
||||
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
|
||||
if (error.code === ethersErrors.SERVER_ERROR || error.code === ethersErrors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
|
||||
error.code === ethersErrors.TIMEOUT ||
|
||||
// Switch client if error is for max retries to get new block at head
|
||||
error.message === NEW_BLOCK_MAX_RETRIES_ERROR
|
||||
) {
|
||||
log('RPC endpoint is not working; failing over to another one');
|
||||
this._currentRpcProviderEndpoint = await this._switchClients();
|
||||
}
|
||||
}
|
||||
|
||||
async _switchClientOnSlowGetLogsRequests (): Promise<void> {
|
||||
const threshold = this._jobQueueConfig.getLogsClientSwitchThresholdInSecs;
|
||||
if (threshold) {
|
||||
const getLogsLabels = {
|
||||
method: 'eth_getLogs',
|
||||
provider: this._currentRpcProviderEndpoint
|
||||
};
|
||||
|
||||
const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get();
|
||||
const currentProviderDuration = ethRpcRequestDurationMetrics.values.find(val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider);
|
||||
if (currentProviderDuration && currentProviderDuration.value > threshold) {
|
||||
log(`eth_getLogs call with current RPC endpoint took too long (${currentProviderDuration.value} sec); switching over to another one`);
|
||||
this._currentRpcProviderEndpoint = await this._switchClients();
|
||||
}
|
||||
await this._indexer.switchClients();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,7 +10,7 @@ import assert from 'assert';
|
||||
import { ethers } from 'ethers';
|
||||
import JsonRpcProvider = ethers.providers.JsonRpcProvider;
|
||||
|
||||
import { Config } from './config';
|
||||
import { Config, UpstreamConfig } from './config';
|
||||
import { IndexerInterface } from './types';
|
||||
import { JobQueue } from './job-queue';
|
||||
|
||||
@ -103,7 +103,7 @@ const upstreamEndpointsMetric = new client.Gauge({
|
||||
// Export metrics on a server
|
||||
const app: Application = express();
|
||||
|
||||
export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => {
|
||||
export const startMetricsServer = async (config: Config, jobQueue: JobQueue, indexer: IndexerInterface): Promise<void> => {
|
||||
if (!config.metrics) {
|
||||
log('Metrics is disabled. To enable add metrics host and port.');
|
||||
return;
|
||||
@ -134,11 +134,9 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind
|
||||
|
||||
await registerWatcherConfigMetrics(config);
|
||||
|
||||
setActiveUpstreamEndpointMetric(config, endpointIndexes.rpcProviderEndpoint);
|
||||
|
||||
await registerDBSizeMetrics(config);
|
||||
|
||||
await registerUpstreamChainHeadMetrics(config, endpointIndexes.rpcProviderEndpoint);
|
||||
await registerUpstreamChainHeadMetrics();
|
||||
|
||||
await registerWatcherInfoMetrics();
|
||||
|
||||
@ -159,14 +157,14 @@ export const startMetricsServer = async (config: Config, jobQueue: JobQueue, ind
|
||||
// ETH RPC provider used for upstream chain head metrics
|
||||
let ethRpcProvider: JsonRpcProvider | undefined;
|
||||
|
||||
export const setActiveUpstreamEndpointMetric = ({ upstream }: Config, currentEndpointIndex: number): void => {
|
||||
const endpoints = upstream.ethServer.rpcProviderEndpoints;
|
||||
export const setActiveUpstreamEndpointMetric = (upstreamConfig: UpstreamConfig, currentEndpointIndex: number): void => {
|
||||
const endpoints = upstreamConfig.ethServer.rpcProviderEndpoints;
|
||||
|
||||
endpoints.forEach((endpoint, index) => {
|
||||
upstreamEndpointsMetric.set({ provider: endpoint }, Number(index === currentEndpointIndex));
|
||||
});
|
||||
|
||||
ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[currentEndpointIndex]);
|
||||
ethRpcProvider = new JsonRpcProvider(upstreamConfig.ethServer.rpcProviderEndpoints[currentEndpointIndex]);
|
||||
};
|
||||
|
||||
const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<void> => {
|
||||
@ -204,9 +202,7 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<vo
|
||||
});
|
||||
};
|
||||
|
||||
const registerUpstreamChainHeadMetrics = async ({ upstream }: Config, rpcProviderEndpointIndex: number): Promise<void> => {
|
||||
ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[rpcProviderEndpointIndex]);
|
||||
|
||||
const registerUpstreamChainHeadMetrics = async (): Promise<void> => {
|
||||
// eslint-disable-next-line no-new
|
||||
new client.Gauge({
|
||||
name: 'latest_upstream_block_number',
|
||||
|
@ -3,7 +3,7 @@
|
||||
//
|
||||
|
||||
import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm';
|
||||
import { Transaction, providers } from 'ethers';
|
||||
import { Transaction } from 'ethers';
|
||||
|
||||
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
|
||||
|
||||
@ -236,7 +236,8 @@ export interface IndexerInterface {
|
||||
clearProcessedBlockData (block: BlockProgressInterface): Promise<void>
|
||||
getResultEvent (event: EventInterface): any
|
||||
getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]>
|
||||
switchClients (clients: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void
|
||||
isGetLogsRequestsSlow(): Promise<boolean>
|
||||
switchClients(): Promise<void>
|
||||
}
|
||||
|
||||
export interface DatabaseInterface {
|
||||
|
Loading…
Reference in New Issue
Block a user