Switch upstream endpoint if getLogs requests are too slow

This commit is contained in:
Prathamesh Musale 2024-06-14 10:09:30 +05:30 committed by Nabarun
parent ff471da287
commit 2d845f7959
4 changed files with 65 additions and 24 deletions

View File

@ -7,7 +7,6 @@ import { hideBin } from 'yargs/helpers';
import 'reflect-metadata'; import 'reflect-metadata';
import assert from 'assert'; import assert from 'assert';
import { ConnectionOptions } from 'typeorm'; import { ConnectionOptions } from 'typeorm';
import { errors } from 'ethers';
import debug from 'debug'; import debug from 'debug';
import { JsonRpcProvider } from '@ethersproject/providers'; import { JsonRpcProvider } from '@ethersproject/providers';
@ -23,7 +22,6 @@ import {
startMetricsServer, startMetricsServer,
Config, Config,
UpstreamConfig, UpstreamConfig,
NEW_BLOCK_MAX_RETRIES_ERROR,
setActiveUpstreamEndpointMetric setActiveUpstreamEndpointMetric
} from '@cerc-io/util'; } from '@cerc-io/util';
@ -125,11 +123,8 @@ export class JobRunnerCmd {
config.jobQueue, config.jobQueue,
indexer, indexer,
jobQueue, jobQueue,
async (error: any) => { config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint],
// Check if it is a server error or timeout from ethers.js async (): Promise<string> => {
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
++this._currentEndpointIndex.rpcProviderEndpoint; ++this._currentEndpointIndex.rpcProviderEndpoint;
@ -141,8 +136,10 @@ export class JobRunnerCmd {
indexer.switchClients({ ethClient, ethProvider }); indexer.switchClients({ ethClient, ethProvider });
setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint); setActiveUpstreamEndpointMetric(config, this._currentEndpointIndex.rpcProviderEndpoint);
log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`); const newRpcEndpoint = ethProvider.connection.url;
} log(`Switching RPC endpoint from ${oldRpcEndpoint} to endpoint ${newRpcEndpoint}`);
return newRpcEndpoint;
}); });
// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs

View File

@ -96,6 +96,10 @@
subgraphEventsOrder = true subgraphEventsOrder = true
blockDelayInMilliSecs = 2000 blockDelayInMilliSecs = 2000
// Switch clients if eth_getLogs call takes more than threshold (in secs)
// (set to 0 to disable switching)
getLogsClientSwitchThresholdInSecs = 0
# Number of blocks by which block processing lags behind head # Number of blocks by which block processing lags behind head
blockProcessingOffset = 0 blockProcessingOffset = 0

View File

@ -23,6 +23,9 @@ export interface JobQueueConfig {
subgraphEventsOrder: boolean; subgraphEventsOrder: boolean;
blockDelayInMilliSecs: number; blockDelayInMilliSecs: number;
// Switch clients if eth_getLogs call takes more than threshold (in secs)
getLogsClientSwitchThresholdInSecs?: number;
// Number of blocks by which block processing lags behind head (default: 0) // Number of blocks by which block processing lags behind head (default: 0)
blockProcessingOffset?: number; blockProcessingOffset?: number;

View File

@ -4,7 +4,7 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { constants, ethers } from 'ethers'; import { constants, ethers, errors as ethersErrors } from 'ethers';
import { DeepPartial, In } from 'typeorm'; import { DeepPartial, In } from 'typeorm';
import PgBoss from 'pg-boss'; import PgBoss from 'pg-boss';
@ -29,9 +29,10 @@ import {
processBatchEvents, processBatchEvents,
PrefetchedBlock, PrefetchedBlock,
fetchBlocksAtHeight, fetchBlocksAtHeight,
fetchAndSaveFilteredLogsAndBlocks fetchAndSaveFilteredLogsAndBlocks,
NEW_BLOCK_MAX_RETRIES_ERROR
} from './common'; } from './common';
import { isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; import { ethRpcRequestDuration, isSyncingHistoricalBlocks, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
const log = debug('vulcanize:job-runner'); const log = debug('vulcanize:job-runner');
@ -63,19 +64,22 @@ export class JobRunner {
_signalCount = 0; _signalCount = 0;
_errorInEventsProcessing = false; _errorInEventsProcessing = false;
_jobErrorHandler: (error: Error) => Promise<void>; _currentRpcProviderEndpoint: string;
_switchClients: () => Promise<string>;
constructor ( constructor (
jobQueueConfig: JobQueueConfig, jobQueueConfig: JobQueueConfig,
indexer: IndexerInterface, indexer: IndexerInterface,
jobQueue: JobQueue, jobQueue: JobQueue,
currentRpcProviderEndpoint: string,
// eslint-disable-next-line @typescript-eslint/no-empty-function // eslint-disable-next-line @typescript-eslint/no-empty-function
jobErrorHandler: (error: Error) => Promise<void> = async () => {} switchClients: () => Promise<string> = async () => ''
) { ) {
this._indexer = indexer; this._indexer = indexer;
this.jobQueue = jobQueue; this.jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig; this._jobQueueConfig = jobQueueConfig;
this._jobErrorHandler = jobErrorHandler; this._currentRpcProviderEndpoint = currentRpcProviderEndpoint;
this._switchClients = switchClients;
} }
async subscribeBlockProcessingQueue (): Promise<void> { async subscribeBlockProcessingQueue (): Promise<void> {
@ -187,6 +191,9 @@ export class JobRunner {
await Promise.all(indexBlockPromises); await Promise.all(indexBlockPromises);
} }
// Switch clients if getLogs requests are too slow
await this._switchClientOnSlowGetLogsRequests();
break; break;
} }
@ -292,6 +299,9 @@ export class JobRunner {
this._historicalProcessingCompletedUpto = endBlock; this._historicalProcessingCompletedUpto = endBlock;
// Switch clients if getLogs requests are too slow
await this._switchClientOnSlowGetLogsRequests();
if (endBlock < processingEndBlockNumber) { if (endBlock < processingEndBlockNumber) {
// If endBlock is lesser than processingEndBlockNumber push new historical job // If endBlock is lesser than processingEndBlockNumber push new historical job
await this.jobQueue.pushJob( await this.jobQueue.pushJob(
@ -789,6 +799,33 @@ export class JobRunner {
} }
} }
async _jobErrorHandler (error: any): Promise<void> {
// Check if it is a server error or timeout from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
if (error.code === ethersErrors.SERVER_ERROR || error.code === ethersErrors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
log('RPC endpoint is not working; failing over to another one');
this._currentRpcProviderEndpoint = await this._switchClients();
}
}
async _switchClientOnSlowGetLogsRequests (): Promise<void> {
const threshold = this._jobQueueConfig.getLogsClientSwitchThresholdInSecs;
if (threshold) {
const getLogsLabels = {
method: 'eth_getLogs',
provider: this._currentRpcProviderEndpoint
};
const ethRpcRequestDurationMetrics = await ethRpcRequestDuration.get();
const currentProviderDuration = ethRpcRequestDurationMetrics.values.find(val => val.labels.method === getLogsLabels.method && val.labels.provider === getLogsLabels.provider);
if (currentProviderDuration && currentProviderDuration.value > threshold) {
log(`eth_getLogs call with current RPC endpoint took too long (${currentProviderDuration.value} sec); switching over to another one`);
this._currentRpcProviderEndpoint = await this._switchClients();
}
}
}
_updateWatchedContracts (job: any): void { _updateWatchedContracts (job: any): void {
const { data: { contract } } = job; const { data: { contract } } = job;
this._indexer.cacheContract(contract); this._indexer.cacheContract(contract);