From 2d845f79593d612157777f60dcb2fd712e8aac10 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Fri, 14 Jun 2024 10:09:30 +0530 Subject: [PATCH] Switch upstream endpoint if getLogs requests are too slow --- packages/cli/src/job-runner.ts | 33 ++++++------- .../src/templates/config-template.handlebars | 4 ++ packages/util/src/config.ts | 3 ++ packages/util/src/job-runner.ts | 49 ++++++++++++++++--- 4 files changed, 65 insertions(+), 24 deletions(-) diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index c9f2d720..7a49db44 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -7,7 +7,6 @@ import { hideBin } from 'yargs/helpers'; import 'reflect-metadata'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; -import { errors } from 'ethers'; import debug from 'debug'; import { JsonRpcProvider } from '@ethersproject/providers'; @@ -23,7 +22,6 @@ import { startMetricsServer, Config, UpstreamConfig, - NEW_BLOCK_MAX_RETRIES_ERROR, setActiveUpstreamEndpointMetric } from '@cerc-io/util'; @@ -125,24 +123,23 @@ export class JobRunnerCmd { config.jobQueue, indexer, jobQueue, - async (error: any) => { - // Check if it is a server error or timeout from ethers.js - // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error - // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout - if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) { - const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; - ++this._currentEndpointIndex.rpcProviderEndpoint; + config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint], + async (): Promise => { + const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; + ++this._currentEndpointIndex.rpcProviderEndpoint; - if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) { - this._currentEndpointIndex.rpcProviderEndpoint = 0; - } - - const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex); - indexer.switchClients({ ethClient, ethProvider }); - setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint); - - log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`); + if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) { + this._currentEndpointIndex.rpcProviderEndpoint = 0; } + + const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex); + indexer.switchClients({ ethClient, ethProvider }); + setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint); + + const newRpcEndpoint = ethProvider.connection.url; + log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`); + + return newRpcEndpoint; }); // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 1f69980c..7cedadef 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -96,6 +96,10 @@ subgraphEventsOrder = true blockDelayInMilliSecs = 2000 + // Switch clients if eth_getLogs call takes more than threshold (in secs) + // (set to 0 to disable switching) + getLogsClientSwitchThresholdInSecs = 0 + # Number of blocks by which block processing lags behind head blockProcessingOffset = 0 diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 5f5326cd..fa41d447 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -23,6 +23,9 @@ export interface JobQueueConfig { subgraphEventsOrder: boolean; blockDelayInMilliSecs: number; + // Switch clients if eth_getLogs call takes more than threshold (in secs) + getLogsClientSwitchThresholdInSecs?: number; + // Number of blocks by which block processing lags behind head (default: 0) blockProcessingOffset?: number; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index ca7f227a..f6c81402 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { constants, ethers } from 'ethers'; +import { constants, ethers, errors as ethersErrors } from 'ethers'; import { DeepPartial, In } from 'typeorm'; import PgBoss from 'pg-boss'; @@ -29,9 +29,10 @@ import { processBatchEvents, PrefetchedBlock, fetchBlocksAtHeight, - fetchAndSaveFilteredLogsAndBlocks + fetchAndSaveFilteredLogsAndBlocks, + NEW_BLOCK_MAX_RETRIES_ERROR } from './common'; -import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; +import { ethRpcRequestDuration, isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; const log = debug('vulcanize:job-runner'); @@ -63,19 +64,22 @@ export class JobRunner { _signalCount = 0; _errorInEventsProcessing = false; - _jobErrorHandler: (error: Error) => Promise; + _currentRpcProviderEndpoint: string; + _switchClients: () => Promise; constructor ( jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue, + currentRpcProviderEndpoint: string, // eslint-disable-next-line @typescript-eslint/no-empty-function - jobErrorHandler: (error: Error) => Promise = async () => {} + switchClients: () => Promise = async () => '' ) { this._indexer = indexer; this.jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._jobErrorHandler = jobErrorHandler; + this._currentRpcProviderEndpoint = currentRpcProviderEndpoint; + this._switchClients = switchClients; } async subscribeBlockProcessingQueue (): Promise { @@ -187,6 +191,9 @@ export class JobRunner { await Promise.all(indexBlockPromises); } + // Switch clients if getLogs requests are too slow + await this._switchClientOnSlowGetLogsRequests(); + break; } @@ -292,6 +299,9 @@ export class JobRunner { this._historicalProcessingCompletedUpto = endBlock; + // Switch clients if getLogs requests are too slow + await this._switchClientOnSlowGetLogsRequests(); + if (endBlock < processingEndBlockNumber) { // If endBlock is lesser than processingEndBlockNumber push new historical job await this.jobQueue.pushJob( @@ -789,6 +799,33 @@ export class JobRunner { } } + async _jobErrorHandler (error: any): Promise { + // Check if it is a server error or timeout from ethers.js + // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error + // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout + if (error.code === ethersErrors.SERVER_ERROR || error.code === ethersErrors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) { + log('RPC endpoint is not working; failing over to another one'); + this._currentRpcProviderEndpoint = await this._switchClients(); + } + } + + async _switchClientOnSlowGetLogsRequests (): Promise { + const threshold = this._jobQueueConfig.getLogsClientSwitchThresholdInSecs; + if (threshold) { + const getLogsLabels = { + method: 'eth_getLogs', + provider: this._currentRpcProviderEndpoint + }; + + const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get(); + const currentProviderDuration = ethRpcRequestDurationMetrics.values.find(val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider); + if (currentProviderDuration && currentProviderDuration.value > threshold) { + log(`eth_getLogs call with current RPC endpoint took too long (${currentProviderDuration.value} sec); switching over to another one`); + this._currentRpcProviderEndpoint = await this._switchClients(); + } + } + } + _updateWatchedContracts (job: any): void { const { data: { contract } } = job; this._indexer.cacheContract(contract);