Handle missing blocks by making eth call to ipld-eth-server (#285)

This commit is contained in:
nikugogoi 2021-10-22 15:20:11 +05:30 committed by GitHub
parent 6f3b8029b2
commit 737d9a9b6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 113 additions and 27 deletions

View File

@ -69,7 +69,7 @@ export class Indexer {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
const { abi, storageLayout } = artifacts;

View File

@ -7,7 +7,7 @@ import debug from 'debug';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial } from 'typeorm';
import JSONbig from 'json-bigint';
import { BigNumber, ethers } from 'ethers';
import { ethers } from 'ethers';
import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -60,7 +60,7 @@ export class Indexer {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._serverMode = serverMode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
const { abi, storageLayout } = artifacts;
@ -117,8 +117,7 @@ export class Indexer {
log('balanceOf: db miss, fetching from upstream server');
let result: ValueResult;
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
const blockNumber = BigNumber.from(number).toNumber();
const { block: { number: blockNumber } } = await this._ethClient.getBlockByHash(blockHash);
if (this._serverMode === ETH_CALL_MODE) {
const contract = new ethers.Contract(token, this._abi, this._ethProvider);
@ -155,8 +154,7 @@ export class Indexer {
log('allowance: db miss, fetching from upstream server');
let result: ValueResult;
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
const blockNumber = BigNumber.from(number).toNumber();
const { block: { number: blockNumber } } = await this._ethClient.getBlockByHash(blockHash);
if (this._serverMode === ETH_CALL_MODE) {
const contract = new ethers.Contract(token, this._abi, this._ethProvider);

View File

@ -77,7 +77,11 @@ export class EthClient {
}
async getBlockByHash (blockHash: string): Promise<any> {
return this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
const { block } = await this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
block.number = parseInt(block.number, 16);
block.timestamp = parseInt(block.timestamp, 16);
return { block };
}
async getLogs (vars: Vars): Promise<any> {

View File

@ -19,7 +19,7 @@
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545"
rpcProviderEndpoint = "http://127.0.0.1:8081"
[upstream.cache]
name = "requests"

View File

@ -0,0 +1,40 @@
[server]
host = "127.0.0.1"
port = 3004
# Use mode demo when running watcher locally.
# Mode demo whitelists all tokens so that entity values get updated.
mode = "demo"
[database]
type = "postgres"
host = "localhost"
port = 5432
database = "uni-info-watcher"
username = "postgres"
password = "postgres"
synchronize = true
logging = false
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545"
[upstream.cache]
name = "requests"
enabled = false
deleteOnStart = false
[upstream.uniWatcher]
gqlEndpoint = "http://127.0.0.1:3003/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:3003/graphql"
[upstream.tokenWatcher]
gqlEndpoint = "http://127.0.0.1:3001/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:3001/graphql"
[jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 1000

View File

@ -38,7 +38,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { dbConfig, serverConfig, upstreamConfig, ethClient } = await getResetConfig(config);
const { dbConfig, serverConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
@ -52,7 +52,7 @@ export const handler = async (argv: any): Promise<void> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, serverConfig.mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, serverConfig.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -7,6 +7,7 @@ import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -59,7 +60,7 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream;
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
@ -71,11 +72,12 @@ export const main = async (): Promise<any> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;

View File

@ -18,7 +18,7 @@ import { BlockProgress } from './entity/BlockProgress';
import { SyncStatus } from './entity/SyncStatus';
import { Token } from './entity/Token';
const CONFIG_FILE = './environments/local.toml';
const CONFIG_FILE = './environments/test.toml';
describe('getPrevEntityVersion', () => {
let db: Database;

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import debug from 'debug';
import { DeepPartial, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { utils } from 'ethers';
import { providers, utils } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
@ -47,7 +47,7 @@ export class Indexer implements IndexerInterface {
_baseIndexer: BaseIndexer
_isDemo: boolean
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, mode: string) {
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, mode: string) {
assert(db);
assert(uniClient);
assert(erc20Client);
@ -57,7 +57,7 @@ export class Indexer implements IndexerInterface {
this._uniClient = uniClient;
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, ethProvider);
this._isDemo = mode === 'demo';
}

View File

@ -7,6 +7,7 @@ import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getDefaultProvider } from 'ethers';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
@ -90,7 +91,21 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint }, tokenWatcher, cache: cacheConfig, ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint } } = upstream;
const {
uniWatcher: {
gqlEndpoint,
gqlSubscriptionEndpoint
},
tokenWatcher,
cache: cacheConfig,
ethServer: {
gqlApiEndpoint,
gqlPostgraphileEndpoint,
rpcProviderEndpoint
}
} = upstream;
assert(gqlEndpoint, 'Missing upstream uniWatcher.gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint');
@ -107,8 +122,9 @@ export const main = async (): Promise<any> => {
});
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -11,6 +11,7 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import 'graphql-import-node';
import { createServer } from 'http';
import { getDefaultProvider } from 'ethers';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
@ -56,7 +57,8 @@ export const main = async (): Promise<any> => {
const {
ethServer: {
gqlApiEndpoint,
gqlPostgraphileEndpoint
gqlPostgraphileEndpoint,
rpcProviderEndpoint
},
uniWatcher,
tokenWatcher,
@ -75,7 +77,8 @@ export const main = async (): Promise<any> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, mode);
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -42,7 +42,7 @@ import {
fetchTransaction
} from '../test/utils';
const CONFIG_FILE = './environments/local.toml';
const CONFIG_FILE = './environments/test.toml';
describe('uni-info-watcher', () => {
let factory: Contract;

View File

@ -50,8 +50,8 @@ export class Indexer implements IndexerInterface {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._ethProvider = ethProvider;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);

View File

@ -14,6 +14,7 @@ import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusIn
import { UNKNOWN_EVENT_NAME } from './constants';
const MAX_EVENTS_BLOCK_RANGE = 1000;
const MISSING_BLOCKS_ERROR = 'sql: no rows in result set';
const log = debug('vulcanize:indexer');
@ -27,12 +28,14 @@ export interface ValueResult {
export class Indexer {
_db: DatabaseInterface;
_ethClient: EthClient;
_getStorageAt: GetStorageAt
_getStorageAt: GetStorageAt;
_ethProvider: ethers.providers.BaseProvider;
constructor (db: DatabaseInterface, ethClient: EthClient) {
constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider) {
this._db = db;
this._ethClient = ethClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._ethProvider = ethProvider;
}
async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
@ -104,8 +107,28 @@ export class Indexer {
}
async getBlock (blockHash: string): Promise<any> {
const { block } = await this._ethClient.getLogs({ blockHash });
return block;
try {
const { block } = await this._ethClient.getBlockByHash(blockHash);
return block;
} catch (error) {
// If block is not present in header_cids, eth_getBlockByHash call is made to update statediff.
if (error instanceof Error && error.message === MISSING_BLOCKS_ERROR) {
try {
await this._ethProvider.getBlock(blockHash);
} catch (error: any) {
// eth_getBlockByHash will update statediff but takes some time.
// The block is not returned immediately and an error is thrown so that it is fetched in the next job retry.
if (error.code === ethers.utils.Logger.errors.SERVER_ERROR) {
throw new Error('Block not found');
}
throw error;
}
}
throw error;
}
}
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {