diff --git a/lerna.json b/lerna.json index 554d8101..9f146d02 100644 --- a/lerna.json +++ b/lerna.json @@ -2,7 +2,7 @@ "packages": [ "packages/*" ], - "version": "0.2.85", + "version": "0.2.86", "npmClient": "yarn", "useWorkspaces": true, "command": { diff --git a/packages/cache/package.json b/packages/cache/package.json index 149acf4f..ffa6bd8b 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cache", - "version": "0.2.85", + "version": "0.2.86", "description": "Generic object cache", "main": "dist/index.js", "scripts": { diff --git a/packages/cli/package.json b/packages/cli/package.json index b0be89e2..3e2fe44a 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cli", - "version": "0.2.85", + "version": "0.2.86", "main": "dist/index.js", "license": "AGPL-3.0", "scripts": { @@ -15,13 +15,13 @@ }, "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.85", - "@cerc-io/ipld-eth-client": "^0.2.85", + "@cerc-io/cache": "^0.2.86", + "@cerc-io/ipld-eth-client": "^0.2.86", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.85", - "@cerc-io/rpc-eth-client": "^0.2.85", - "@cerc-io/util": "^0.2.85", + "@cerc-io/peer": "^0.2.86", + "@cerc-io/rpc-eth-client": "^0.2.86", + "@cerc-io/util": "^0.2.86", "@ethersproject/providers": "^5.4.4", "@graphql-tools/utils": "^9.1.1", "@ipld/dag-cbor": "^8.0.0", diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 10cb39dc..294a6bc9 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -7,6 +7,8 @@ import { hideBin } from 'yargs/helpers'; import 'reflect-metadata'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; +import { errors } from 'ethers'; +import debug from 'debug'; import { JsonRpcProvider } from '@ethersproject/providers'; import { @@ -20,10 +22,14 @@ import { GraphWatcherInterface, startMetricsServer, Config, - UpstreamConfig + UpstreamConfig, + NEW_BLOCK_MAX_RETRIES_ERROR } from '@cerc-io/util'; import { BaseCmd } from './base'; +import { initClients } from './utils/index'; + +const log = debug('vulcanize:job-runner'); interface Arguments { configFile: string; @@ -33,6 +39,10 @@ export class JobRunnerCmd { _argv?: Arguments; _baseCmd: BaseCmd; + _currentEndpointIndex = { + rpcProviderEndpoint: 0 + }; + constructor () { this._baseCmd = new BaseCmd(); } @@ -110,7 +120,27 @@ export class JobRunnerCmd { await indexer.addContracts(); } - const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue); + const jobRunner = new JobRunner( + config.jobQueue, + indexer, + jobQueue, + async (error: any) => { + // Check if it is a server error or timeout from ethers.js + // https://docs.ethers.org/v5/api/utils/logger/#errors--server-error + // https://docs.ethers.org/v5/api/utils/logger/#errors--timeout + if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) { + const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint]; + ++this._currentEndpointIndex.rpcProviderEndpoint; + + if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) { + this._currentEndpointIndex.rpcProviderEndpoint = 0; + } + + const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex); + indexer.switchClients({ ethClient, ethProvider }); + log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`); + } + }); // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs await jobRunner.jobQueue.deleteAllJobs('completed'); @@ -121,7 +151,7 @@ export class JobRunnerCmd { await startJobRunner(jobRunner); jobRunner.handleShutdown(); - await startMetricsServer(config, indexer); + await startMetricsServer(config, indexer, this._currentEndpointIndex); } _getArgv (): any { diff --git a/packages/cli/src/utils/index.ts b/packages/cli/src/utils/index.ts index 556e18ba..d2b25ea0 100644 --- a/packages/cli/src/utils/index.ts +++ b/packages/cli/src/utils/index.ts @@ -22,7 +22,7 @@ export function readPeerId (filePath: string): PeerIdObj { return JSON.parse(peerIdJson); } -export const initClients = async (config: Config): Promise<{ +export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{ ethClient: EthClient, ethProvider: providers.JsonRpcProvider }> => { @@ -32,9 +32,10 @@ export const initClients = async (config: Config): Promise<{ assert(dbConfig, 'Missing database config'); assert(upstreamConfig, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, rpcClient = false }, cache: cacheConfig } = upstreamConfig; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig; - assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint'); + assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints'); + assert(rpcProviderEndpoints.length, 'No endpoints configured in ethServer.rpcProviderEndpoints'); const cache = await getCache(cacheConfig); @@ -42,12 +43,13 @@ export const initClients = async (config: Config): Promise<{ if (rpcClient) { ethClient = new RpcEthClient({ - rpcEndpoint: rpcProviderEndpoint, + rpcEndpoint: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint], cache }); } else { assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); + // TODO: Implement failover for GQL endpoint ethClient = new GqlEthClient({ gqlEndpoint: gqlApiEndpoint, cache @@ -55,7 +57,7 @@ export const initClients = async (config: Config): Promise<{ } const ethProvider = getCustomProvider({ - url: rpcProviderEndpoint, + url: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint], allowGzip: true }); diff --git a/packages/codegen/package.json b/packages/codegen/package.json index 0ca9715a..b0e13adb 100644 --- a/packages/codegen/package.json +++ b/packages/codegen/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/codegen", - "version": "0.2.85", + "version": "0.2.86", "description": "Code generator", "private": true, "main": "index.js", @@ -20,7 +20,7 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/util": "^0.2.85", + "@cerc-io/util": "^0.2.86", "@graphql-tools/load-files": "^6.5.2", "@npmcli/package-json": "^5.0.0", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 2ec00181..96ab41f7 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -63,7 +63,9 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - rpcProviderEndpoint = "http://127.0.0.1:8081" + rpcProviderEndpoints = [ + "http://127.0.0.1:8081" + ] # Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) rpcClient = false @@ -100,3 +102,6 @@ # Max block range of historical processing after which it waits for completion of events processing # If set to -1 historical processing does not wait for events processing and completes till latest canonical block historicalMaxFetchAhead = 10000 + + # Max number of retries to fetch new block after which watcher will failover to other RPC endpoints + maxNewBlockRetries = 3 diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 371023db..b44b7eaa 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -199,6 +199,15 @@ export class Indexer implements IndexerInterface { await this._baseIndexer.fetchStateStatus(); } + switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: BaseProvider }): void { + this._ethClient = ethClient; + this._ethProvider = ethProvider; + this._baseIndexer.switchClients({ ethClient, ethProvider }); + {{#if (subgraphPath)}} + this._graphWatcher.switchClients({ ethClient, ethProvider }); + {{/if}} + } + {{#if (subgraphPath)}} async getMetaData (block: BlockHeight): Promise { return this._baseIndexer.getMetaData(block); diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index 991e7e92..67101c45 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -41,12 +41,12 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.3.19", - "@cerc-io/cli": "^0.2.85", - "@cerc-io/ipld-eth-client": "^0.2.85", - "@cerc-io/solidity-mapper": "^0.2.85", - "@cerc-io/util": "^0.2.85", + "@cerc-io/cli": "^0.2.86", + "@cerc-io/ipld-eth-client": "^0.2.86", + "@cerc-io/solidity-mapper": "^0.2.86", + "@cerc-io/util": "^0.2.86", {{#if (subgraphPath)}} - "@cerc-io/graph-node": "^0.2.85", + "@cerc-io/graph-node": "^0.2.86", {{/if}} "@ethersproject/providers": "^5.4.4", "debug": "^4.3.1", diff --git a/packages/graph-node/package.json b/packages/graph-node/package.json index 1a618e3f..d0dc5db0 100644 --- a/packages/graph-node/package.json +++ b/packages/graph-node/package.json @@ -1,10 +1,10 @@ { "name": "@cerc-io/graph-node", - "version": "0.2.85", + "version": "0.2.86", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { - "@cerc-io/solidity-mapper": "^0.2.85", + "@cerc-io/solidity-mapper": "^0.2.86", "@ethersproject/providers": "^5.4.4", "@graphprotocol/graph-ts": "^0.22.0", "@nomiclabs/hardhat-ethers": "^2.0.2", @@ -51,9 +51,9 @@ "dependencies": { "@apollo/client": "^3.3.19", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", - "@cerc-io/cache": "^0.2.85", - "@cerc-io/ipld-eth-client": "^0.2.85", - "@cerc-io/util": "^0.2.85", + "@cerc-io/cache": "^0.2.86", + "@cerc-io/ipld-eth-client": "^0.2.86", + "@cerc-io/util": "^0.2.86", "@types/json-diff": "^0.5.2", "@types/yargs": "^17.0.0", "bn.js": "^4.11.9", diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index be497666..595653ab 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -129,6 +129,11 @@ export class GraphWatcher { this.fillEventSignatureMap(); } + async switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }) { + this._ethClient = ethClient; + this._ethProvider = ethProvider; + } + fillEventSignatureMap () { this._dataSources.forEach(contract => { if (contract.kind === 'ethereum/contract' && contract.mapping.kind === 'ethereum/events') { diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 9f6a1a37..6344a33b 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -2,6 +2,7 @@ import assert from 'assert'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; +import { providers } from 'ethers'; import { IndexerInterface, @@ -338,4 +339,8 @@ export class Indexer implements IndexerInterface { async getFullTransactions (txHashList: string[]): Promise { return []; } + + switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void { + return undefined; + } } diff --git a/packages/ipld-eth-client/package.json b/packages/ipld-eth-client/package.json index fbe1e5d4..3e264a28 100644 --- a/packages/ipld-eth-client/package.json +++ b/packages/ipld-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/ipld-eth-client", - "version": "0.2.85", + "version": "0.2.86", "description": "IPLD ETH Client", "main": "dist/index.js", "scripts": { @@ -20,8 +20,8 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.85", - "@cerc-io/util": "^0.2.85", + "@cerc-io/cache": "^0.2.86", + "@cerc-io/util": "^0.2.86", "cross-fetch": "^3.1.4", "debug": "^4.3.1", "ethers": "^5.4.4", diff --git a/packages/peer/package.json b/packages/peer/package.json index a939c76a..47f9350e 100644 --- a/packages/peer/package.json +++ b/packages/peer/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/peer", - "version": "0.2.85", + "version": "0.2.86", "description": "libp2p module", "main": "dist/index.js", "exports": "./dist/index.js", diff --git a/packages/rpc-eth-client/package.json b/packages/rpc-eth-client/package.json index 18d163d6..40f1504f 100644 --- a/packages/rpc-eth-client/package.json +++ b/packages/rpc-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/rpc-eth-client", - "version": "0.2.85", + "version": "0.2.86", "description": "RPC ETH Client", "main": "dist/index.js", "scripts": { @@ -19,9 +19,9 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/cache": "^0.2.85", - "@cerc-io/ipld-eth-client": "^0.2.85", - "@cerc-io/util": "^0.2.85", + "@cerc-io/cache": "^0.2.86", + "@cerc-io/ipld-eth-client": "^0.2.86", + "@cerc-io/util": "^0.2.86", "chai": "^4.3.4", "ethers": "^5.4.4", "left-pad": "^1.3.0", diff --git a/packages/solidity-mapper/package.json b/packages/solidity-mapper/package.json index b2fbe75f..805bba32 100644 --- a/packages/solidity-mapper/package.json +++ b/packages/solidity-mapper/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/solidity-mapper", - "version": "0.2.85", + "version": "0.2.86", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { diff --git a/packages/test/package.json b/packages/test/package.json index 2999dceb..5631a10c 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/test", - "version": "0.2.85", + "version": "0.2.86", "main": "dist/index.js", "license": "AGPL-3.0", "private": true, diff --git a/packages/tracing-client/package.json b/packages/tracing-client/package.json index 13b99b60..48a846a3 100644 --- a/packages/tracing-client/package.json +++ b/packages/tracing-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/tracing-client", - "version": "0.2.85", + "version": "0.2.86", "description": "ETH VM tracing client", "main": "dist/index.js", "scripts": { diff --git a/packages/util/package.json b/packages/util/package.json index ecbbd919..df555dd8 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -1,13 +1,13 @@ { "name": "@cerc-io/util", - "version": "0.2.85", + "version": "0.2.86", "main": "dist/index.js", "license": "AGPL-3.0", "dependencies": { "@apollo/utils.keyvaluecache": "^1.0.1", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.85", - "@cerc-io/solidity-mapper": "^0.2.85", + "@cerc-io/peer": "^0.2.86", + "@cerc-io/solidity-mapper": "^0.2.86", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@ethersproject/properties": "^5.7.0", "@ethersproject/providers": "^5.4.4", @@ -52,7 +52,7 @@ "yargs": "^17.0.1" }, "devDependencies": { - "@cerc-io/cache": "^0.2.85", + "@cerc-io/cache": "^0.2.86", "@nomiclabs/hardhat-waffle": "^2.0.1", "@types/bunyan": "^1.8.8", "@types/express": "^4.17.14", diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 53388e6d..9933a939 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -22,6 +22,8 @@ const DEFAULT_EVENTS_IN_BATCH = 50; const log = debug('vulcanize:common'); const JSONbigNative = JSONbig({ useNativeBigInt: true }); +export const NEW_BLOCK_MAX_RETRIES_ERROR = 'Reached max retries for fetching new block'; + export interface PrefetchedBlock { block?: BlockProgressInterface; events: DeepPartial[]; @@ -65,6 +67,7 @@ export const fetchBlocksAtHeight = async ( blockAndEventsMap: Map ): Promise[]> => { let blocks: EthFullBlock[] = []; + let newBlockRetries = 0; // Try fetching blocks from eth-server until found. while (!blocks.length) { @@ -84,6 +87,13 @@ export const fetchBlocksAtHeight = async ( if (!blocks.length) { log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); + + // Check number of retries for fetching new block + if (jobQueueConfig.maxNewBlockRetries && newBlockRetries > jobQueueConfig.maxNewBlockRetries) { + throw new Error(NEW_BLOCK_MAX_RETRIES_ERROR); + } + + newBlockRetries++; await wait(jobQueueConfig.blockDelayInMilliSecs); } else { blocks.forEach(block => { diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 0fa067d6..773dd8ed 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -22,15 +22,22 @@ export interface JobQueueConfig { lazyUpdateBlockProgress?: boolean; subgraphEventsOrder: boolean; blockDelayInMilliSecs: number; + // Block range in which logs are fetched during historical blocks processing historicalLogsBlockRange?: number; + // Max block range of historical processing after which it waits for completion of events processing // If set to -1 historical processing does not wait for events processing and completes till latest canonical block historicalMaxFetchAhead?: number; + // Boolean to switch between modes of processing events when starting the server // Setting to true will fetch filtered events and required blocks in a range of blocks and then process them // Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head) useBlockRanges: boolean; + + // Max number of retries to fetch new block after which watcher will failover to other RPC endpoints + // Infinitely retry if not set + maxNewBlockRetries?: number; } export interface GQLCacheConfig { @@ -254,16 +261,21 @@ export interface UpstreamConfig { cache: CacheConfig; ethServer: { gqlApiEndpoint: string; - rpcProviderEndpoint: string; + rpcProviderEndpoints: string[]; rpcProviderMutationEndpoint: string; + // Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) rpcClient: boolean; + // Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint isFEVM: boolean; + // Boolean flag to filter event logs by contracts filterLogsByAddresses: boolean; + // Boolean flag to filter event logs by topics filterLogsByTopics: boolean; + payments: EthServerPaymentsConfig; } traceProviderEndpoint: string; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 7aa85cab..df9a11a9 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -138,6 +138,11 @@ export class Indexer { this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); } + switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: ethers.providers.BaseProvider }): void { + this._ethClient = ethClient; + this._ethProvider = ethProvider; + } + async fetchContracts (): Promise { assert(this._db.getContracts); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 05ed6994..ca7f227a 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -63,23 +63,46 @@ export class JobRunner { _signalCount = 0; _errorInEventsProcessing = false; - constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { + _jobErrorHandler: (error: Error) => Promise; + + constructor ( + jobQueueConfig: JobQueueConfig, + indexer: IndexerInterface, + jobQueue: JobQueue, + // eslint-disable-next-line @typescript-eslint/no-empty-function + jobErrorHandler: (error: Error) => Promise = async () => {} + ) { this._indexer = indexer; this.jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; + this._jobErrorHandler = jobErrorHandler; } async subscribeBlockProcessingQueue (): Promise { await this.jobQueue.subscribe( QUEUE_BLOCK_PROCESSING, - async (job) => this.processBlock(job) + async (job) => { + try { + await this.processBlock(job); + } catch (error) { + this._jobErrorHandler(error as Error); + throw error; + } + } ); } async subscribeHistoricalProcessingQueue (): Promise { await this.jobQueue.subscribe( QUEUE_HISTORICAL_PROCESSING, - async (job) => this.processHistoricalBlocks(job), + async (job) => { + try { + await this.processHistoricalBlocks(job); + } catch (error) { + this._jobErrorHandler(error as Error); + throw error; + } + }, { teamSize: 1 } @@ -89,7 +112,14 @@ export class JobRunner { async subscribeEventProcessingQueue (): Promise { await this.jobQueue.subscribe( QUEUE_EVENT_PROCESSING, - async (job) => this.processEvent(job as PgBoss.JobWithMetadataDoneCallback), + async (job) => { + try { + await this.processEvent(job as PgBoss.JobWithMetadataDoneCallback); + } catch (error) { + this._jobErrorHandler(error as Error); + throw error; + } + }, { teamSize: 1, includeMetadata: true @@ -100,14 +130,28 @@ export class JobRunner { async subscribeHooksQueue (): Promise { await this.jobQueue.subscribe( QUEUE_HOOKS, - async (job) => this.processHooks(job) + async (job) => { + try { + await this.processHooks(job); + } catch (error) { + this._jobErrorHandler(error as Error); + throw error; + } + } ); } async subscribeBlockCheckpointQueue (): Promise { await this.jobQueue.subscribe( QUEUE_BLOCK_CHECKPOINT, - async (job) => this.processCheckpoint(job) + async (job) => { + try { + this.processCheckpoint(job); + } catch (error) { + this._jobErrorHandler(error as Error); + throw error; + } + } ); } @@ -587,11 +631,6 @@ export class JobRunner { // Do not throw error and complete the job as block will be processed after parent block processing. return; - } else { - // Remove the unknown events of the parent block if it is marked complete. - console.time('time:job-runner#_indexBlock-remove-unknown-events'); - await this._indexer.removeUnknownEvents(parentBlock); - console.timeEnd('time:job-runner#_indexBlock-remove-unknown-events'); } } @@ -708,7 +747,15 @@ export class JobRunner { lastBlockNumEvents.set(block.numEvents); this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); - await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber); + + console.time('time:job-runner#_processEvents-update-status-and-remove-unknown-events'); + await Promise.all([ + // Update latest processed block in SyncStatus + this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber), + // Remove the unknown events from processed block + this._indexer.removeUnknownEvents(block) + ]); + console.timeEnd('time:job-runner#_processEvents-update-status-and-remove-unknown-events'); if (retryCount > 0) { await Promise.all([ diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index 9ddefbb4..873ba3ca 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -98,7 +98,7 @@ isSyncingHistoricalBlocks.set(Number(undefined)); // Export metrics on a server const app: Application = express(); -export const startMetricsServer = async (config: Config, indexer: IndexerInterface): Promise => { +export const startMetricsServer = async (config: Config, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise => { if (!config.metrics) { log('Metrics is disabled. To enable add metrics host and port.'); return; @@ -128,7 +128,7 @@ export const startMetricsServer = async (config: Config, indexer: IndexerInterfa await registerDBSizeMetrics(config); - await registerUpstreamChainHeadMetrics(config); + await registerUpstreamChainHeadMetrics(config, endpointIndexes.rpcProviderEndpoint); // Collect default metrics client.collectDefaultMetrics(); @@ -179,8 +179,8 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise => { - const ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoint); +const registerUpstreamChainHeadMetrics = async ({ upstream }: Config, rpcProviderEndpointIndex: number): Promise => { + const ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[rpcProviderEndpointIndex]); // eslint-disable-next-line no-new new client.Gauge({ diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 74650357..c01d2f63 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -3,7 +3,7 @@ // import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm'; -import { Transaction } from 'ethers'; +import { Transaction, providers } from 'ethers'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -161,6 +161,8 @@ export interface IndexerInterface { readonly serverConfig: ServerConfig readonly upstreamConfig: UpstreamConfig readonly storageLayoutMap: Map + // eslint-disable-next-line no-use-before-define + readonly graphWatcher?: GraphWatcherInterface init (): Promise getBlockProgress (blockHash: string): Promise getBlockProgressEntities (where: FindConditions, options: FindManyOptions): Promise @@ -234,6 +236,7 @@ export interface IndexerInterface { clearProcessedBlockData (block: BlockProgressInterface): Promise getResultEvent (event: EventInterface): any getFullTransactions (txHashList: string[]): Promise + switchClients (clients: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void } export interface DatabaseInterface {