Replace storage calls in uni-info-watcher with eth_calls (#281)

* Replace storage calls in uni-info-watcher with eth_calls

* Handle eth_call reverts and catch them in uni-info-watcher
This commit is contained in:
nikugogoi 2021-10-21 13:24:46 +05:30 committed by GitHub
parent 5cbcd455d2
commit 7953ba2949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 274 additions and 40 deletions

View File

@ -424,6 +424,20 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getModelEntities(queryRunner, entity, block, where, queryOptions, relations);
}
async getModelEntitiesNoTx<Entity> (entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: Relation[] = []): Promise<Entity[]> {
const queryRunner = this._conn.createQueryRunner();
let res;
try {
await queryRunner.connect();
res = await this.getModelEntities(queryRunner, entity, block, where, queryOptions, relations);
} finally {
await queryRunner.release();
}
return res;
}
async saveFactory (queryRunner: QueryRunner, factory: Factory, block: Block): Promise<Factory> {
const repo = queryRunner.manager.getRepository(Factory);
factory.blockNumber = block.number;

View File

@ -1231,16 +1231,26 @@ export class Indexer implements IndexerInterface {
let position = await this._db.getPosition({ id: tokenId.toString(), blockHash });
if (!position) {
const nfpmPosition = await this._uniClient.getPosition(blockHash, tokenId);
let positionResult;
// The contract call reverts in situations where the position is minted and deleted in the same block.
// From my investigation this happens in calls from BancorSwap.
// (e.g. 0xf7867fa19aa65298fadb8d4f72d0daed5e836f3ba01f0b9b9631cdc6c36bed40)
try {
({ value: positionResult } = await this._uniClient.positions(blockHash, contractAddress, tokenId));
} catch (error: any) {
// The contract call reverts in situations where the position is minted and deleted in the same block.
// From my investigation this happens in calls from BancorSwap.
// (e.g. 0xf7867fa19aa65298fadb8d4f72d0daed5e836f3ba01f0b9b9631cdc6c36bed40)
if (nfpmPosition) {
const { token0: token0Address, token1: token1Address, fee } = await this._uniClient.poolIdToPoolKey(blockHash, nfpmPosition.poolId);
if (error.message !== utils.Logger.errors.CALL_EXCEPTION) {
log('nfpm positions eth_call failed');
throw error;
}
}
const { pool: poolAddress } = await this._uniClient.getPool(blockHash, token0Address, token1Address, fee);
if (positionResult) {
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getModelEntitiesNoTx(Factory, { hash: blockHash }, {}, { limit: 1 });
const { value: poolAddress } = await this._uniClient.callGetPool(blockHash, factory.id, positionResult.token0, positionResult.token1, positionResult.fee);
position = new Position();
position.id = tokenId.toString();
@ -1250,23 +1260,23 @@ export class Indexer implements IndexerInterface {
position.pool = pool;
const [token0, token1] = await Promise.all([
this._db.getTokenNoTx({ id: token0Address, blockHash }),
this._db.getTokenNoTx({ id: token1Address, blockHash })
this._db.getTokenNoTx({ id: positionResult.token0, blockHash }),
this._db.getTokenNoTx({ id: positionResult.token1, blockHash })
]);
assert(token0 && token1);
position.token0 = token0;
position.token1 = token1;
const [tickLower, tickUpper] = await Promise.all([
this._db.getTickNoTx({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockHash }),
this._db.getTickNoTx({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockHash })
this._db.getTickNoTx({ id: poolAddress.concat('#').concat(positionResult.tickLower.toString()), blockHash }),
this._db.getTickNoTx({ id: poolAddress.concat('#').concat(positionResult.tickUpper.toString()), blockHash })
]);
assert(tickLower && tickUpper);
position.tickLower = tickLower;
position.tickUpper = tickUpper;
position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString());
position.feeGrowthInside0LastX128 = BigInt(positionResult.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(positionResult.feeGrowthInside1LastX128.toString());
}
}
@ -1274,11 +1284,16 @@ export class Indexer implements IndexerInterface {
}
async _updateFeeVars (position: Position, block: Block, contractAddress: string, tokenId: bigint): Promise<Position> {
const nfpmPosition = await this._uniClient.getPosition(block.hash, tokenId);
try {
const { value: positionResult } = await this._uniClient.positions(block.hash, contractAddress, tokenId);
if (nfpmPosition) {
position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString());
if (positionResult) {
position.feeGrowthInside0LastX128 = BigInt(positionResult.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(positionResult.feeGrowthInside1LastX128.toString());
}
} catch (error) {
log('nfpm positions eth_call failed');
log(error);
}
return position;

View File

@ -16,7 +16,7 @@
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545"
rpcProviderEndpoint = "http://127.0.0.1:8081"
[upstream.cache]
name = "requests"

View File

@ -0,0 +1,29 @@
[server]
host = "127.0.0.1"
port = 3003
[database]
type = "postgres"
host = "localhost"
port = 5432
database = "uni-watcher"
username = "postgres"
password = "postgres"
synchronize = true
logging = false
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8545"
[upstream.cache]
name = "requests"
enabled = false
deleteOnStart = false
[jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue"
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100

View File

@ -6,6 +6,7 @@ import { expect, assert } from 'chai';
import { AssertionError } from 'assert';
import 'mocha';
import _ from 'lodash';
import { getDefaultProvider } from 'ethers';
import { getConfig, JobQueue, JobRunner, JOB_KIND_PRUNE } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
@ -17,7 +18,7 @@ import { Database } from './database';
import { BlockProgress } from './entity/BlockProgress';
import { SyncStatus } from './entity/SyncStatus';
const CONFIG_FILE = './environments/local.toml';
const CONFIG_FILE = './environments/test.toml';
describe('chain pruning', () => {
let db: Database;
@ -45,7 +46,7 @@ describe('chain pruning', () => {
// Create an Indexer object.
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
@ -61,7 +62,9 @@ describe('chain pruning', () => {
cache
});
indexer = new Indexer(db, ethClient, postgraphileClient);
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(indexer, 'Could not create indexer object.');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;

View File

@ -27,13 +27,13 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { dbConfig, ethClient, postgraphileClient } = await getResetConfig(config);
const { dbConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
await db.init();
const indexer = new Indexer(db, ethClient, postgraphileClient);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -12,7 +12,9 @@ import {
queryEvents,
subscribeEvents,
queryGetContract,
queryEventsInRange
queryEventsInRange,
queryCallGetPool,
queryPositions
} from './queries';
export class Client {
@ -84,6 +86,34 @@ export class Client {
return getPool;
}
async callGetPool (blockHash: string, contractAddress: string, key0: string, key1: string, key2: number): Promise<any> {
const { callGetPool } = await this._client.query(
gql(queryCallGetPool),
{
blockHash,
contractAddress,
key0,
key1,
key2
}
);
return callGetPool;
}
async positions (blockHash: string, contractAddress: string, tokenId: bigint): Promise<any> {
const { positions } = await this._client.query(
gql(queryPositions),
{
blockHash,
contractAddress,
tokenId: tokenId.toString()
}
);
return positions;
}
async getContract (type: string): Promise<any> {
const { getContract } = await this._client.query(
gql(queryGetContract),

View File

@ -8,6 +8,7 @@ import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';
import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -57,7 +58,7 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
@ -72,10 +73,12 @@ export const main = async (): Promise<any> => {
cache
});
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

View File

@ -9,7 +9,7 @@ import { ethers } from 'ethers';
import assert from 'assert';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult } from '@vulcanize/util';
import { Database } from './database';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
@ -40,16 +40,18 @@ export class Indexer implements IndexerInterface {
_ethClient: EthClient
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_ethProvider: ethers.providers.BaseProvider
_factoryContract: ethers.utils.Interface
_poolContract: ethers.utils.Interface
_nfpmContract: ethers.utils.Interface
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient) {
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
this._ethProvider = ethProvider;
this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);
@ -290,6 +292,50 @@ export class Indexer implements IndexerInterface {
};
}
async callGetPool (blockHash: string, contractAddress: string, key0: string, key1: string, key2: number): Promise<ValueResult> {
const contract = new ethers.Contract(contractAddress, factoryABI, this._ethProvider);
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
const blockNumber = ethers.BigNumber.from(number).toNumber();
try {
const value = await contract.getPool(key0, key1, key2, { blockTag: blockNumber });
return { value };
} catch (error: any) {
if (error.code === ethers.utils.Logger.errors.CALL_EXCEPTION) {
log('eth_call error');
log(error);
throw new Error(error.code);
}
throw error;
}
}
async positions (blockHash: string, contractAddress: string, tokenId: string): Promise<ValueResult> {
const contract = new ethers.Contract(contractAddress, nfpmABI, this._ethProvider);
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
const blockNumber = ethers.BigNumber.from(number).toNumber();
try {
const value = await contract.positions(tokenId, { blockTag: blockNumber });
return { value };
} catch (error: any) {
if (error.code === ethers.utils.Logger.errors.CALL_EXCEPTION) {
log('eth_call error');
log(error);
throw new Error(error.code);
}
throw error;
}
}
async getContract (type: string): Promise<any> {
const contract = await this._db.getLatestContract(type);
return contract;

View File

@ -7,6 +7,7 @@ import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -105,7 +106,7 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
@ -121,7 +122,9 @@ export const main = async (): Promise<any> => {
cache
});
const indexer = new Indexer(db, ethClient, postgraphileClient);
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -102,7 +102,7 @@ const resultEvent = `
export const subscribeEvents = gql`
subscription SubscriptionEvents {
onEvent
onEvent
${resultEvent}
}
`;
@ -135,13 +135,38 @@ query getPosition($blockHash: String!, $tokenId: String!) {
}
`;
export const queryPositions = gql`
query getPosition($blockHash: String!, $contractAddress: String!, $tokenId: String!) {
positions(blockHash: $blockHash, contractAddress: $contractAddress, tokenId: $tokenId) {
value {
nonce,
operator,
token0,
token1,
fee,
tickLower,
tickUpper,
liquidity,
feeGrowthInside0LastX128,
feeGrowthInside1LastX128,
tokensOwed0,
tokensOwed1,
}
proof {
data
}
}
}
`;
export const queryPoolIdToPoolKey = gql`
query poolIdToPoolKey($blockHash: String!, $poolId: String!) {
poolIdToPoolKey(blockHash: $blockHash, poolId: $poolId) {
token0
token1
fee
proof {
data
}
@ -160,6 +185,17 @@ query getPool($blockHash: String!, $token0: String!, $token1: String!, $fee: Str
}
`;
export const queryCallGetPool = gql`
query callGetPool($blockHash: String!, $contractAddress: String!, $key0: String!, $key1: String!, $key2: Int!) {
callGetPool(blockHash: $blockHash, contractAddress: $contractAddress, key0: $key0, key1: $key1, key2: $key2) {
value
proof {
data
}
}
}
`;
export const queryGetContract = gql`
query queryGetContract($type: String!) {
getContract(type: $type) {

View File

@ -8,6 +8,7 @@ import debug from 'debug';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { ValueResult } from '@vulcanize/util';
const log = debug('vulcanize:resolver');
@ -83,6 +84,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
return indexer.position(blockHash, tokenId);
},
positions: (_: any, { blockHash, contractAddress, tokenId }: { blockHash: string, contractAddress: string, tokenId: string }): Promise<ValueResult> => {
log('positions', blockHash, contractAddress, tokenId);
return indexer.positions(blockHash, contractAddress, tokenId);
},
poolIdToPoolKey: (_: any, { blockHash, poolId }: { blockHash: string, poolId: string }) => {
log('poolIdToPoolKey', blockHash, poolId);
return indexer.poolIdToPoolKey(blockHash, poolId);
@ -93,6 +99,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
return indexer.getPool(blockHash, token0, token1, fee);
},
callGetPool: (_: any, { blockHash, contractAddress, key0, key1, key2 }: { blockHash: string, contractAddress: string, key0: string, key1: string, key2: number }): Promise<ValueResult> => {
log('callGetPool', blockHash, contractAddress, key0, key1, key2);
return indexer.callGetPool(blockHash, contractAddress, key0, key1, key2);
},
getContract: (_: any, { type }: { type: string }) => {
log('getContract', type);
return indexer.getContract(type);

View File

@ -32,6 +32,12 @@ type ResultGetPool {
proof: Proof
}
type ResultCallGetPool {
value: String!
proof: Proof
}
# Factory Events
# event PoolCreated(address indexed token0, address indexed token1, uint24 indexed fee, int24 tickSpacing, address pool);
@ -71,6 +77,26 @@ type ResultPoolKey {
proof: Proof
}
type PositionsValue {
nonce: BigInt!
operator: String!
token0: String!,
token1: String!,
fee: Int!,
tickLower: BigInt!
tickUpper: BigInt!
liquidity: BigInt!
feeGrowthInside0LastX128: BigInt!
feeGrowthInside1LastX128: BigInt!
tokensOwed0: BigInt!
tokensOwed1: BigInt!
}
type ResultPositions {
value: PositionsValue!
proof: Proof
}
# NonfungiblePositionManager Events
# event IncreaseLiquidity(uint256 indexed tokenId, uint128 liquidity, uint256 amount0, uint256 amount1);
@ -223,6 +249,12 @@ type Query {
poolId: String!
): ResultPoolKey
positions(
blockHash: String!,
contractAddress: String!,
tokenId: String!
): ResultPositions!
# Factory
getPool(
@ -232,6 +264,14 @@ type Query {
fee: String!
): ResultGetPool
callGetPool(
blockHash: String!
contractAddress: String!
key0: String!
key1: String!
key2: Int!
): ResultCallGetPool
# Pool
feeGrowthGlobal0X128(

View File

@ -11,6 +11,7 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import 'graphql-import-node';
import { createServer } from 'http';
import { getDefaultProvider } from 'ethers';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -51,7 +52,7 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream;
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
@ -67,10 +68,12 @@ export const main = async (): Promise<any> => {
cache
});
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -44,7 +44,7 @@ import {
checksCollectEvent
} from '../test/utils';
const CONFIG_FILE = './environments/local.toml';
const CONFIG_FILE = './environments/test.toml';
describe('uni-watcher', () => {
let factory: Contract;
@ -64,6 +64,7 @@ describe('uni-watcher', () => {
let uniClient: UniClient;
let ethClient: EthClient;
let postgraphileClient: EthClient;
let ethProvider: ethers.providers.JsonRpcProvider;
let signer: Signer;
let recipient: string;
let deadline: number;
@ -106,8 +107,8 @@ describe('uni-watcher', () => {
gqlSubscriptionEndpoint
});
const provider = new ethers.providers.JsonRpcProvider(rpcProviderEndpoint);
signer = provider.getSigner();
ethProvider = new ethers.providers.JsonRpcProvider(rpcProviderEndpoint);
signer = ethProvider.getSigner();
recipient = await signer.getAddress();
// Deadline set to 2 days from current date.
@ -129,7 +130,7 @@ describe('uni-watcher', () => {
factory = new Contract(factoryContract.address, FACTORY_ABI, signer);
// Verifying with the db.
const indexer = new Indexer(db, ethClient, postgraphileClient);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.');
});
@ -264,7 +265,7 @@ describe('uni-watcher', () => {
nfpm = new Contract(nfpmContract.address, NFPM_ABI, signer);
// Verifying with the db.
const indexer = new Indexer(db, ethClient, postgraphileClient);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider);
assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.');
});

View File

@ -21,7 +21,7 @@ import { Client as UniClient } from '../src/client';
import { Database } from '../src/database';
import { watchContract } from '../src/utils/index';
const CONFIG_FILE = './environments/local.toml';
const CONFIG_FILE = './environments/test.toml';
const deployFactoryContract = async (db: Database, signer: Signer): Promise<Contract> => {
// Deploy factory from uniswap package.