Implement failover for RPC endpoints in watcher (#506)

* Handle RPC endpoint server errors and switch failover endpoints

* Add config maxNewBlockRetries for switching to failover endpoint

* Upgrade package versions

* Move unknown events removal after event processing for historical sync

* Rename doFailOverEndpoints to switchClients
This commit is contained in:
Nabarun Gogoi 2024-05-09 16:03:06 +05:30 committed by GitHub
parent 6d837dc824
commit c9696c3d9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 195 additions and 62 deletions

View File

@ -2,7 +2,7 @@
"packages": [ "packages": [
"packages/*" "packages/*"
], ],
"version": "0.2.85", "version": "0.2.86",
"npmClient": "yarn", "npmClient": "yarn",
"useWorkspaces": true, "useWorkspaces": true,
"command": { "command": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/cache", "name": "@cerc-io/cache",
"version": "0.2.85", "version": "0.2.86",
"description": "Generic object cache", "description": "Generic object cache",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/cli", "name": "@cerc-io/cli",
"version": "0.2.85", "version": "0.2.86",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"scripts": { "scripts": {
@ -15,13 +15,13 @@
}, },
"dependencies": { "dependencies": {
"@apollo/client": "^3.7.1", "@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.85", "@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.85", "@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.15", "@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.85", "@cerc-io/peer": "^0.2.86",
"@cerc-io/rpc-eth-client": "^0.2.85", "@cerc-io/rpc-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.85", "@cerc-io/util": "^0.2.86",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1", "@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0", "@ipld/dag-cbor": "^8.0.0",

View File

@ -7,6 +7,8 @@ import { hideBin } from 'yargs/helpers';
import 'reflect-metadata'; import 'reflect-metadata';
import assert from 'assert'; import assert from 'assert';
import { ConnectionOptions } from 'typeorm'; import { ConnectionOptions } from 'typeorm';
import { errors } from 'ethers';
import debug from 'debug';
import { JsonRpcProvider } from '@ethersproject/providers'; import { JsonRpcProvider } from '@ethersproject/providers';
import { import {
@ -20,10 +22,14 @@ import {
GraphWatcherInterface, GraphWatcherInterface,
startMetricsServer, startMetricsServer,
Config, Config,
UpstreamConfig UpstreamConfig,
NEW_BLOCK_MAX_RETRIES_ERROR
} from '@cerc-io/util'; } from '@cerc-io/util';
import { BaseCmd } from './base'; import { BaseCmd } from './base';
import { initClients } from './utils/index';
const log = debug('vulcanize:job-runner');
interface Arguments { interface Arguments {
configFile: string; configFile: string;
@ -33,6 +39,10 @@ export class JobRunnerCmd {
_argv?: Arguments; _argv?: Arguments;
_baseCmd: BaseCmd; _baseCmd: BaseCmd;
_currentEndpointIndex = {
rpcProviderEndpoint: 0
};
constructor () { constructor () {
this._baseCmd = new BaseCmd(); this._baseCmd = new BaseCmd();
} }
@ -110,7 +120,27 @@ export class JobRunnerCmd {
await indexer.addContracts(); await indexer.addContracts();
} }
const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue); const jobRunner = new JobRunner(
config.jobQueue,
indexer,
jobQueue,
async (error: any) => {
// Check if it is a server error or timeout from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
++this._currentEndpointIndex.rpcProviderEndpoint;
if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) {
this._currentEndpointIndex.rpcProviderEndpoint = 0;
}
const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex);
indexer.switchClients({ ethClient, ethProvider });
log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`);
}
});
// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs // Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
await jobRunner.jobQueue.deleteAllJobs('completed'); await jobRunner.jobQueue.deleteAllJobs('completed');
@ -121,7 +151,7 @@ export class JobRunnerCmd {
await startJobRunner(jobRunner); await startJobRunner(jobRunner);
jobRunner.handleShutdown(); jobRunner.handleShutdown();
await startMetricsServer(config, indexer); await startMetricsServer(config, indexer, this._currentEndpointIndex);
} }
_getArgv (): any { _getArgv (): any {

View File

@ -22,7 +22,7 @@ export function readPeerId (filePath: string): PeerIdObj {
return JSON.parse(peerIdJson); return JSON.parse(peerIdJson);
} }
export const initClients = async (config: Config): Promise<{ export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{
ethClient: EthClient, ethClient: EthClient,
ethProvider: providers.JsonRpcProvider ethProvider: providers.JsonRpcProvider
}> => { }> => {
@ -32,9 +32,10 @@ export const initClients = async (config: Config): Promise<{
assert(dbConfig, 'Missing database config'); assert(dbConfig, 'Missing database config');
assert(upstreamConfig, 'Missing upstream config'); assert(upstreamConfig, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, rpcClient = false }, cache: cacheConfig } = upstreamConfig; const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig;
assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint'); assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints');
assert(rpcProviderEndpoints.length, 'No endpoints configured in ethServer.rpcProviderEndpoints');
const cache = await getCache(cacheConfig); const cache = await getCache(cacheConfig);
@ -42,12 +43,13 @@ export const initClients = async (config: Config): Promise<{
if (rpcClient) { if (rpcClient) {
ethClient = new RpcEthClient({ ethClient = new RpcEthClient({
rpcEndpoint: rpcProviderEndpoint, rpcEndpoint: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint],
cache cache
}); });
} else { } else {
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
// TODO: Implement failover for GQL endpoint
ethClient = new GqlEthClient({ ethClient = new GqlEthClient({
gqlEndpoint: gqlApiEndpoint, gqlEndpoint: gqlApiEndpoint,
cache cache
@ -55,7 +57,7 @@ export const initClients = async (config: Config): Promise<{
} }
const ethProvider = getCustomProvider({ const ethProvider = getCustomProvider({
url: rpcProviderEndpoint, url: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint],
allowGzip: true allowGzip: true
}); });

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/codegen", "name": "@cerc-io/codegen",
"version": "0.2.85", "version": "0.2.86",
"description": "Code generator", "description": "Code generator",
"private": true, "private": true,
"main": "index.js", "main": "index.js",
@ -20,7 +20,7 @@
}, },
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@cerc-io/util": "^0.2.85", "@cerc-io/util": "^0.2.86",
"@graphql-tools/load-files": "^6.5.2", "@graphql-tools/load-files": "^6.5.2",
"@npmcli/package-json": "^5.0.0", "@npmcli/package-json": "^5.0.0",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",

View File

@ -63,7 +63,9 @@
[upstream] [upstream]
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoints = [
"http://127.0.0.1:8081"
]
# Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) # Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient = false rpcClient = false
@ -100,3 +102,6 @@
# Max block range of historical processing after which it waits for completion of events processing # Max block range of historical processing after which it waits for completion of events processing
# If set to -1 historical processing does not wait for events processing and completes till latest canonical block # If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead = 10000 historicalMaxFetchAhead = 10000
# Max number of retries to fetch new block after which watcher will failover to other RPC endpoints
maxNewBlockRetries = 3

View File

@ -199,6 +199,15 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.fetchStateStatus(); await this._baseIndexer.fetchStateStatus();
} }
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: BaseProvider }): void {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._baseIndexer.switchClients({ ethClient, ethProvider });
{{#if (subgraphPath)}}
this._graphWatcher.switchClients({ ethClient, ethProvider });
{{/if}}
}
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> { async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
return this._baseIndexer.getMetaData(block); return this._baseIndexer.getMetaData(block);

View File

@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.85", "@cerc-io/cli": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.85", "@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/solidity-mapper": "^0.2.85", "@cerc-io/solidity-mapper": "^0.2.86",
"@cerc-io/util": "^0.2.85", "@cerc-io/util": "^0.2.86",
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.85", "@cerc-io/graph-node": "^0.2.86",
{{/if}} {{/if}}
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"debug": "^4.3.1", "debug": "^4.3.1",

View File

@ -1,10 +1,10 @@
{ {
"name": "@cerc-io/graph-node", "name": "@cerc-io/graph-node",
"version": "0.2.85", "version": "0.2.86",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"devDependencies": { "devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.85", "@cerc-io/solidity-mapper": "^0.2.86",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0", "@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2", "@nomiclabs/hardhat-ethers": "^2.0.2",
@ -51,9 +51,9 @@
"dependencies": { "dependencies": {
"@apollo/client": "^3.3.19", "@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.85", "@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.85", "@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.85", "@cerc-io/util": "^0.2.86",
"@types/json-diff": "^0.5.2", "@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0", "@types/yargs": "^17.0.0",
"bn.js": "^4.11.9", "bn.js": "^4.11.9",

View File

@ -129,6 +129,11 @@ export class GraphWatcher {
this.fillEventSignatureMap(); this.fillEventSignatureMap();
} }
async switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }) {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
}
fillEventSignatureMap () { fillEventSignatureMap () {
this._dataSources.forEach(contract => { this._dataSources.forEach(contract => {
if (contract.kind === 'ethereum/contract' && contract.mapping.kind === 'ethereum/events') { if (contract.kind === 'ethereum/contract' && contract.mapping.kind === 'ethereum/events') {

View File

@ -2,6 +2,7 @@
import assert from 'assert'; import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { providers } from 'ethers';
import { import {
IndexerInterface, IndexerInterface,
@ -338,4 +339,8 @@ export class Indexer implements IndexerInterface {
async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> { async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> {
return []; return [];
} }
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void {
return undefined;
}
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/ipld-eth-client", "name": "@cerc-io/ipld-eth-client",
"version": "0.2.85", "version": "0.2.86",
"description": "IPLD ETH Client", "description": "IPLD ETH Client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {
@ -20,8 +20,8 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@apollo/client": "^3.7.1", "@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.85", "@cerc-io/cache": "^0.2.86",
"@cerc-io/util": "^0.2.85", "@cerc-io/util": "^0.2.86",
"cross-fetch": "^3.1.4", "cross-fetch": "^3.1.4",
"debug": "^4.3.1", "debug": "^4.3.1",
"ethers": "^5.4.4", "ethers": "^5.4.4",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/peer", "name": "@cerc-io/peer",
"version": "0.2.85", "version": "0.2.86",
"description": "libp2p module", "description": "libp2p module",
"main": "dist/index.js", "main": "dist/index.js",
"exports": "./dist/index.js", "exports": "./dist/index.js",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/rpc-eth-client", "name": "@cerc-io/rpc-eth-client",
"version": "0.2.85", "version": "0.2.86",
"description": "RPC ETH Client", "description": "RPC ETH Client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {
@ -19,9 +19,9 @@
}, },
"homepage": "https://github.com/cerc-io/watcher-ts#readme", "homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": { "dependencies": {
"@cerc-io/cache": "^0.2.85", "@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.85", "@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.85", "@cerc-io/util": "^0.2.86",
"chai": "^4.3.4", "chai": "^4.3.4",
"ethers": "^5.4.4", "ethers": "^5.4.4",
"left-pad": "^1.3.0", "left-pad": "^1.3.0",

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/solidity-mapper", "name": "@cerc-io/solidity-mapper",
"version": "0.2.85", "version": "0.2.86",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"devDependencies": { "devDependencies": {

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/test", "name": "@cerc-io/test",
"version": "0.2.85", "version": "0.2.86",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"private": true, "private": true,

View File

@ -1,6 +1,6 @@
{ {
"name": "@cerc-io/tracing-client", "name": "@cerc-io/tracing-client",
"version": "0.2.85", "version": "0.2.86",
"description": "ETH VM tracing client", "description": "ETH VM tracing client",
"main": "dist/index.js", "main": "dist/index.js",
"scripts": { "scripts": {

View File

@ -1,13 +1,13 @@
{ {
"name": "@cerc-io/util", "name": "@cerc-io/util",
"version": "0.2.85", "version": "0.2.86",
"main": "dist/index.js", "main": "dist/index.js",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"dependencies": { "dependencies": {
"@apollo/utils.keyvaluecache": "^1.0.1", "@apollo/utils.keyvaluecache": "^1.0.1",
"@cerc-io/nitro-node": "^0.1.15", "@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.85", "@cerc-io/peer": "^0.2.86",
"@cerc-io/solidity-mapper": "^0.2.85", "@cerc-io/solidity-mapper": "^0.2.86",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"@ethersproject/properties": "^5.7.0", "@ethersproject/properties": "^5.7.0",
"@ethersproject/providers": "^5.4.4", "@ethersproject/providers": "^5.4.4",
@ -52,7 +52,7 @@
"yargs": "^17.0.1" "yargs": "^17.0.1"
}, },
"devDependencies": { "devDependencies": {
"@cerc-io/cache": "^0.2.85", "@cerc-io/cache": "^0.2.86",
"@nomiclabs/hardhat-waffle": "^2.0.1", "@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/bunyan": "^1.8.8", "@types/bunyan": "^1.8.8",
"@types/express": "^4.17.14", "@types/express": "^4.17.14",

View File

@ -22,6 +22,8 @@ const DEFAULT_EVENTS_IN_BATCH = 50;
const log = debug('vulcanize:common'); const log = debug('vulcanize:common');
const JSONbigNative = JSONbig({ useNativeBigInt: true }); const JSONbigNative = JSONbig({ useNativeBigInt: true });
export const NEW_BLOCK_MAX_RETRIES_ERROR = 'Reached max retries for fetching new block';
export interface PrefetchedBlock { export interface PrefetchedBlock {
block?: BlockProgressInterface; block?: BlockProgressInterface;
events: DeepPartial<EventInterface>[]; events: DeepPartial<EventInterface>[];
@ -65,6 +67,7 @@ export const fetchBlocksAtHeight = async (
blockAndEventsMap: Map<string, PrefetchedBlock> blockAndEventsMap: Map<string, PrefetchedBlock>
): Promise<DeepPartial<BlockProgressInterface>[]> => { ): Promise<DeepPartial<BlockProgressInterface>[]> => {
let blocks: EthFullBlock[] = []; let blocks: EthFullBlock[] = [];
let newBlockRetries = 0;
// Try fetching blocks from eth-server until found. // Try fetching blocks from eth-server until found.
while (!blocks.length) { while (!blocks.length) {
@ -84,6 +87,13 @@ export const fetchBlocksAtHeight = async (
if (!blocks.length) { if (!blocks.length) {
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
// Check number of retries for fetching new block
if (jobQueueConfig.maxNewBlockRetries && newBlockRetries > jobQueueConfig.maxNewBlockRetries) {
throw new Error(NEW_BLOCK_MAX_RETRIES_ERROR);
}
newBlockRetries++;
await wait(jobQueueConfig.blockDelayInMilliSecs); await wait(jobQueueConfig.blockDelayInMilliSecs);
} else { } else {
blocks.forEach(block => { blocks.forEach(block => {

View File

@ -22,15 +22,22 @@ export interface JobQueueConfig {
lazyUpdateBlockProgress?: boolean; lazyUpdateBlockProgress?: boolean;
subgraphEventsOrder: boolean; subgraphEventsOrder: boolean;
blockDelayInMilliSecs: number; blockDelayInMilliSecs: number;
// Block range in which logs are fetched during historical blocks processing // Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange?: number; historicalLogsBlockRange?: number;
// Max block range of historical processing after which it waits for completion of events processing // Max block range of historical processing after which it waits for completion of events processing
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block // If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead?: number; historicalMaxFetchAhead?: number;
// Boolean to switch between modes of processing events when starting the server // Boolean to switch between modes of processing events when starting the server
// Setting to true will fetch filtered events and required blocks in a range of blocks and then process them // Setting to true will fetch filtered events and required blocks in a range of blocks and then process them
// Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head) // Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head)
useBlockRanges: boolean; useBlockRanges: boolean;
// Max number of retries to fetch new block after which watcher will failover to other RPC endpoints
// Infinitely retry if not set
maxNewBlockRetries?: number;
} }
export interface GQLCacheConfig { export interface GQLCacheConfig {
@ -254,16 +261,21 @@ export interface UpstreamConfig {
cache: CacheConfig; cache: CacheConfig;
ethServer: { ethServer: {
gqlApiEndpoint: string; gqlApiEndpoint: string;
rpcProviderEndpoint: string; rpcProviderEndpoints: string[];
rpcProviderMutationEndpoint: string; rpcProviderMutationEndpoint: string;
// Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client) // Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient: boolean; rpcClient: boolean;
// Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint // Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint
isFEVM: boolean; isFEVM: boolean;
// Boolean flag to filter event logs by contracts // Boolean flag to filter event logs by contracts
filterLogsByAddresses: boolean; filterLogsByAddresses: boolean;
// Boolean flag to filter event logs by topics // Boolean flag to filter event logs by topics
filterLogsByTopics: boolean; filterLogsByTopics: boolean;
payments: EthServerPaymentsConfig; payments: EthServerPaymentsConfig;
} }
traceProviderEndpoint: string; traceProviderEndpoint: string;

View File

@ -138,6 +138,11 @@ export class Indexer {
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
} }
switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: ethers.providers.BaseProvider }): void {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
}
async fetchContracts (): Promise<void> { async fetchContracts (): Promise<void> {
assert(this._db.getContracts); assert(this._db.getContracts);

View File

@ -63,23 +63,46 @@ export class JobRunner {
_signalCount = 0; _signalCount = 0;
_errorInEventsProcessing = false; _errorInEventsProcessing = false;
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { _jobErrorHandler: (error: Error) => Promise<void>;
constructor (
jobQueueConfig: JobQueueConfig,
indexer: IndexerInterface,
jobQueue: JobQueue,
// eslint-disable-next-line @typescript-eslint/no-empty-function
jobErrorHandler: (error: Error) => Promise<void> = async () => {}
) {
this._indexer = indexer; this._indexer = indexer;
this.jobQueue = jobQueue; this.jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig; this._jobQueueConfig = jobQueueConfig;
this._jobErrorHandler = jobErrorHandler;
} }
async subscribeBlockProcessingQueue (): Promise<void> { async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe( await this.jobQueue.subscribe(
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
async (job) => this.processBlock(job) async (job) => {
try {
await this.processBlock(job);
} catch (error) {
this._jobErrorHandler(error as Error);
throw error;
}
}
); );
} }
async subscribeHistoricalProcessingQueue (): Promise<void> { async subscribeHistoricalProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe( await this.jobQueue.subscribe(
QUEUE_HISTORICAL_PROCESSING, QUEUE_HISTORICAL_PROCESSING,
async (job) => this.processHistoricalBlocks(job), async (job) => {
try {
await this.processHistoricalBlocks(job);
} catch (error) {
this._jobErrorHandler(error as Error);
throw error;
}
},
{ {
teamSize: 1 teamSize: 1
} }
@ -89,7 +112,14 @@ export class JobRunner {
async subscribeEventProcessingQueue (): Promise<void> { async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe( await this.jobQueue.subscribe(
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
async (job) => this.processEvent(job as PgBoss.JobWithMetadataDoneCallback<EventsJobData | ContractJobData, object>), async (job) => {
try {
await this.processEvent(job as PgBoss.JobWithMetadataDoneCallback<EventsJobData | ContractJobData, object>);
} catch (error) {
this._jobErrorHandler(error as Error);
throw error;
}
},
{ {
teamSize: 1, teamSize: 1,
includeMetadata: true includeMetadata: true
@ -100,14 +130,28 @@ export class JobRunner {
async subscribeHooksQueue (): Promise<void> { async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe( await this.jobQueue.subscribe(
QUEUE_HOOKS, QUEUE_HOOKS,
async (job) => this.processHooks(job) async (job) => {
try {
await this.processHooks(job);
} catch (error) {
this._jobErrorHandler(error as Error);
throw error;
}
}
); );
} }
async subscribeBlockCheckpointQueue (): Promise<void> { async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe( await this.jobQueue.subscribe(
QUEUE_BLOCK_CHECKPOINT, QUEUE_BLOCK_CHECKPOINT,
async (job) => this.processCheckpoint(job) async (job) => {
try {
this.processCheckpoint(job);
} catch (error) {
this._jobErrorHandler(error as Error);
throw error;
}
}
); );
} }
@ -587,11 +631,6 @@ export class JobRunner {
// Do not throw error and complete the job as block will be processed after parent block processing. // Do not throw error and complete the job as block will be processed after parent block processing.
return; return;
} else {
// Remove the unknown events of the parent block if it is marked complete.
console.time('time:job-runner#_indexBlock-remove-unknown-events');
await this._indexer.removeUnknownEvents(parentBlock);
console.timeEnd('time:job-runner#_indexBlock-remove-unknown-events');
} }
} }
@ -708,7 +747,15 @@ export class JobRunner {
lastBlockNumEvents.set(block.numEvents); lastBlockNumEvents.set(block.numEvents);
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer(); this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber);
console.time('time:job-runner#_processEvents-update-status-and-remove-unknown-events');
await Promise.all([
// Update latest processed block in SyncStatus
this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber),
// Remove the unknown events from processed block
this._indexer.removeUnknownEvents(block)
]);
console.timeEnd('time:job-runner#_processEvents-update-status-and-remove-unknown-events');
if (retryCount > 0) { if (retryCount > 0) {
await Promise.all([ await Promise.all([

View File

@ -98,7 +98,7 @@ isSyncingHistoricalBlocks.set(Number(undefined));
// Export metrics on a server // Export metrics on a server
const app: Application = express(); const app: Application = express();
export const startMetricsServer = async (config: Config, indexer: IndexerInterface): Promise<void> => { export const startMetricsServer = async (config: Config, indexer: IndexerInterface, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<void> => {
if (!config.metrics) { if (!config.metrics) {
log('Metrics is disabled. To enable add metrics host and port.'); log('Metrics is disabled. To enable add metrics host and port.');
return; return;
@ -128,7 +128,7 @@ export const startMetricsServer = async (config: Config, indexer: IndexerInterfa
await registerDBSizeMetrics(config); await registerDBSizeMetrics(config);
await registerUpstreamChainHeadMetrics(config); await registerUpstreamChainHeadMetrics(config, endpointIndexes.rpcProviderEndpoint);
// Collect default metrics // Collect default metrics
client.collectDefaultMetrics(); client.collectDefaultMetrics();
@ -179,8 +179,8 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<vo
}); });
}; };
const registerUpstreamChainHeadMetrics = async ({ upstream }: Config): Promise<void> => { const registerUpstreamChainHeadMetrics = async ({ upstream }: Config, rpcProviderEndpointIndex: number): Promise<void> => {
const ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoint); const ethRpcProvider = new JsonRpcProvider(upstream.ethServer.rpcProviderEndpoints[rpcProviderEndpointIndex]);
// eslint-disable-next-line no-new // eslint-disable-next-line no-new
new client.Gauge({ new client.Gauge({

View File

@ -3,7 +3,7 @@
// //
import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm'; import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm';
import { Transaction } from 'ethers'; import { Transaction, providers } from 'ethers';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
@ -161,6 +161,8 @@ export interface IndexerInterface {
readonly serverConfig: ServerConfig readonly serverConfig: ServerConfig
readonly upstreamConfig: UpstreamConfig readonly upstreamConfig: UpstreamConfig
readonly storageLayoutMap: Map<string, StorageLayout> readonly storageLayoutMap: Map<string, StorageLayout>
// eslint-disable-next-line no-use-before-define
readonly graphWatcher?: GraphWatcherInterface
init (): Promise<void> init (): Promise<void>
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]> getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]>
@ -234,6 +236,7 @@ export interface IndexerInterface {
clearProcessedBlockData (block: BlockProgressInterface): Promise<void> clearProcessedBlockData (block: BlockProgressInterface): Promise<void>
getResultEvent (event: EventInterface): any getResultEvent (event: EventInterface): any
getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]>
switchClients (clients: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void
} }
export interface DatabaseInterface { export interface DatabaseInterface {