Misc changes for uni-info-watcher resolvers (#183)

* Implement transaction from to set entity origin field.

* Add all transaction fields specified in schema and add delay in uni-info block processing.

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-04 18:42:59 +05:30 committed by GitHub
parent eea3a5864b
commit 73e0475dfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 153 additions and 89 deletions

View File

@ -73,7 +73,7 @@ export const main = async (): Promise<any> => {
log(`Fill block ${blockNumber}`);
// TODO: Add pause between requests so as to not overwhelm the upsteam server.
const result = await ethClient.getBlockWithTransactions(blockNumber.toString());
const result = await ethClient.getBlockWithTransactions({ blockNumber });
const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, ethTransactionCidsByHeaderId: { nodes: txNodes } } = blockNodes[bi];

View File

@ -100,6 +100,8 @@ export const fetchTokenDecimals = async (ethProvider: BaseProvider, tokenAddress
if (staticTokenDefinition != null) {
return staticTokenDefinition.decimals;
}
decimalValue = 0;
}
return BigInt(decimalValue);

View File

@ -17,7 +17,7 @@ export class StaticTokenDefinition {
// Get all tokens with a static defintion
static getStaticDefinitions (): Array<StaticTokenDefinition> {
const staticDefinitions = new Array<StaticTokenDefinition>(6);
const staticDefinitions = [];
// Add DGD.
const tokenDGD = new StaticTokenDefinition(

View File

@ -28,7 +28,6 @@ export class EthClient {
const { gqlEndpoint, gqlSubscriptionEndpoint, cache } = config;
assert(gqlEndpoint, 'Missing gql endpoint');
assert(gqlSubscriptionEndpoint, 'Missing gql subscription endpoint');
this._graphqlClient = new GraphQLClient({ gqlEndpoint, gqlSubscriptionEndpoint });
@ -59,8 +58,14 @@ export class EthClient {
};
}
async getBlockWithTransactions (blockNumber: string): Promise<any> {
return this._graphqlClient.query(ethQueries.getBlockWithTransactions, { blockNumber });
async getBlockWithTransactions ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise<any> {
return this._graphqlClient.query(
ethQueries.getBlockWithTransactions,
{
blockNumber: blockNumber?.toString(),
blockHash
}
);
}
async getBlockByHash (blockHash: string): Promise<any> {

View File

@ -36,8 +36,8 @@ query getLogs($blockHash: Bytes32!, $contract: Address) {
`;
export const getBlockWithTransactions = gql`
query allEthHeaderCids($blockNumber: BigInt) {
allEthHeaderCids(condition: { blockNumber: $blockNumber }) {
query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) {
nodes {
cid
blockNumber
@ -48,6 +48,9 @@ query allEthHeaderCids($blockNumber: BigInt) {
nodes {
cid
txHash
index
src
dst
}
}
}

View File

@ -4,7 +4,7 @@ import fetch from 'cross-fetch';
import { SubscriptionClient } from 'subscriptions-transport-ws';
import ws from 'ws';
import { ApolloClient, NormalizedCacheObject, split, HttpLink, InMemoryCache, DocumentNode, TypedDocumentNode } from '@apollo/client/core';
import { ApolloClient, NormalizedCacheObject, split, HttpLink, InMemoryCache, DocumentNode, TypedDocumentNode, from } from '@apollo/client/core';
import { getMainDefinition } from '@apollo/client/utilities';
import { WebSocketLink } from '@apollo/client/link/ws';
@ -12,7 +12,7 @@ const log = debug('vulcanize:client');
export interface GraphQLConfig {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
gqlSubscriptionEndpoint?: string;
}
export class GraphQLClient {
@ -25,45 +25,50 @@ export class GraphQLClient {
const { gqlEndpoint, gqlSubscriptionEndpoint } = config;
assert(gqlEndpoint, 'Missing gql endpoint');
assert(gqlSubscriptionEndpoint, 'Missing gql subscription endpoint');
// https://www.apollographql.com/docs/react/data/subscriptions/
const subscriptionClient = new SubscriptionClient(gqlSubscriptionEndpoint, {
reconnect: true,
connectionCallback: (error: Error[]) => {
if (error) {
log('Subscription client connection error', error[0].message);
} else {
log('Subscription client connected successfully');
}
}
}, ws);
subscriptionClient.onError(error => {
log('Subscription client error', error.message);
});
const httpLink = new HttpLink({
uri: gqlEndpoint,
fetch
});
const wsLink = new WebSocketLink(subscriptionClient);
let link = from([httpLink]);
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
if (gqlSubscriptionEndpoint) {
// https://www.apollographql.com/docs/react/data/subscriptions/
const subscriptionClient = new SubscriptionClient(gqlSubscriptionEndpoint, {
reconnect: true,
connectionCallback: (error: Error[]) => {
if (error) {
log('Subscription client connection error', error[0].message);
} else {
log('Subscription client connected successfully');
}
}
}, ws);
subscriptionClient.onError(error => {
log('Subscription client error', error.message);
});
const wsLink = new WebSocketLink(subscriptionClient);
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
link = splitLink;
}
this._client = new ApolloClient({
link: splitLink,
link,
cache: new InMemoryCache()
});
}

View File

@ -42,3 +42,4 @@
[jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue"
maxCompletionLag = 300
jobDelay = 1000

View File

@ -456,9 +456,8 @@ export class Database {
const { limit = DEFAULT_LIMIT, orderBy, orderDirection, skip = DEFAULT_SKIP } = queryOptions;
// TODO: Use skip and take methods. Currently throws error when using with join.
selectQueryBuilder = selectQueryBuilder.offset(skip)
.limit(limit);
selectQueryBuilder = selectQueryBuilder.skip(skip)
.take(limit);
if (orderBy) {
const columnMetadata = repo.metadata.findColumnWithPropertyName(orderBy);
@ -776,15 +775,19 @@ export class Database {
const blockRepo = this._conn.getRepository(BlockProgress);
let block = await blockRepo.findOne({ blockHash });
assert(block);
const blockHashes = [blockHash];
// TODO: Should be calcualted from chainHeadBlockNumber?
const canonicalBlockNumber = block.blockNumber - MAX_REORG_DEPTH;
while (block.blockNumber > canonicalBlockNumber) {
blockHashes.push(block.parentHash);
block = await blockRepo.findOne({ blockHash: block.parentHash });
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
const blockHashes = [block.blockHash];
while (block.blockNumber > canonicalBlockNumber && block.blockNumber > syncStatus.latestCanonicalBlockNumber) {
blockHash = block.parentHash;
block = await blockRepo.findOne({ blockHash });
assert(block);
blockHashes.push(block.blockHash);
}
return { canonicalBlockNumber, blockHashes };

View File

@ -36,9 +36,8 @@ export class Burn {
@Column('varchar', { length: 42 })
owner!: string
// TODO: Assign origin with Transaction from address.
// @Column('varchar', { length: 42 })
// origin!: string
@Column('varchar', { length: 42 })
origin!: string
@Column('bigint')
amount!: bigint

View File

@ -27,6 +27,9 @@ export class Event {
@Column('text')
eventInfo!: string;
@Column('text')
extraInfo!: string;
@Column('text')
proof!: string;
}

View File

@ -39,9 +39,8 @@ export class Mint {
@Column('varchar', { length: 42 })
sender!: string
// TODO: Assign origin with Transaction from address.
// @Column('varchar', { length: 42 })
// origin!: string
@Column('varchar', { length: 42 })
origin!: string
@Column('bigint')
amount!: bigint

View File

@ -36,9 +36,8 @@ export class Swap {
@Column('varchar', { length: 42 })
sender!: string
// TODO: Assign origin with Transaction from address.
// @Column('varchar', { length: 42 })
// origin!: string
@Column('varchar', { length: 42 })
origin!: string
@Column('varchar', { length: 42 })
recipient!: string

View File

@ -25,8 +25,7 @@ export class Token {
@Column('numeric', { transformer: decimalTransformer })
totalSupply!: Decimal;
// TODO: Fetch decimals from contract using erc20-watcher. Currently using hardcoded value.
@Column('bigint', { default: 18 })
@Column('bigint')
decimals!: bigint;
@Column('numeric', { default: 0, transformer: decimalTransformer })

View File

@ -95,7 +95,9 @@ export interface Block {
export interface Transaction {
hash: string;
from?: string;
from: string;
to: string;
index: number;
}
export interface ResultEvent {

View File

@ -59,6 +59,7 @@ export class Indexer {
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSON.parse(event.eventInfo);
const { tx } = JSON.parse(event.extraInfo);
return {
block: {
@ -68,13 +69,10 @@ export class Indexer {
parentHash: block.parentHash
},
tx: {
hash: event.txHash
},
tx,
contract: event.contract,
eventIndex: event.index,
event: {
__typename: event.eventName,
...eventFields
@ -244,6 +242,7 @@ export class Indexer {
} = events[i];
const { __typename: eventName, ...eventInfo } = event;
const extraInfo = { tx };
dbEvents.push({
index: eventIndex,
@ -251,6 +250,7 @@ export class Indexer {
contract,
eventName,
eventInfo: JSONbig.stringify(eventInfo),
extraInfo: JSONbig.stringify(extraInfo),
proof: JSONbig.stringify(proof)
});
}
@ -334,13 +334,12 @@ export class Indexer {
const { value: symbol } = await this._erc20Client.getSymbol(block.hash, tokenAddress);
const { value: name } = await this._erc20Client.getName(block.hash, tokenAddress);
const { value: totalSupply } = await this._erc20Client.getTotalSupply(block.hash, tokenAddress);
// TODO: Decimals not implemented by erc20-watcher.
// const { value: decimals } = await this._erc20Client.getDecimals(blockHash, tokenAddress);
const { value: decimals } = await this._erc20Client.getDecimals(block.hash, tokenAddress);
token.symbol = symbol;
token.name = name;
token.totalSupply = totalSupply;
token.decimals = decimals;
return this._db.saveToken(token, block);
}
@ -454,10 +453,7 @@ export class Indexer {
mint.token1 = pool.token1;
mint.owner = mintEvent.owner;
mint.sender = mintEvent.sender;
// TODO: Assign origin with Transaction from address.
// origin: event.transaction.from
mint.origin = tx.from;
mint.amount = mintEvent.amount;
mint.amount0 = amount0;
mint.amount1 = amount1;
@ -590,10 +586,7 @@ export class Indexer {
burn.token0 = pool.token0;
burn.token1 = pool.token1;
burn.owner = burnEvent.owner;
// TODO: Assign origin with Transaction from address.
// origin: event.transaction.from
burn.origin = tx.from;
burn.amount = burnEvent.amount;
burn.amount0 = amount0;
burn.amount1 = amount1;
@ -772,10 +765,7 @@ export class Indexer {
swap.token0 = pool.token0;
swap.token1 = pool.token1;
swap.sender = swapEvent.sender;
// TODO: Assign origin with Transaction from address.
// origin: event.transaction.from
swap.origin = tx.from;
swap.recipient = swapEvent.recipient;
swap.amount0 = amount0;
swap.amount1 = amount1;

View File

@ -6,7 +6,7 @@ import debug from 'debug';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { getConfig, JobQueue } from '@vulcanize/util';
import { getConfig, JobQueue, wait } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
@ -61,7 +61,8 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
const { dbConnectionString, maxCompletionLag, jobDelay } = jobQueueConfig;
assert(jobDelay, 'Missing job delay time');
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
@ -114,6 +115,9 @@ export const main = async (): Promise<any> => {
// Check if block is being already processed.
const blockProgress = await indexer.getBlockProgress(block.hash);
if (!blockProgress) {
// Delay to allow uni-watcher to process block.
await wait(jobDelay);
const events = await indexer.getOrFetchBlockEvents(block);
for (let ei = 0; ei < events.length; ei++) {

View File

@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
log(`Fill block ${blockNumber}`);
// TODO: Add pause between requests so as to not overwhelm the upsteam server.
const result = await ethClient.getBlockWithTransactions(blockNumber.toString());
const result = await ethClient.getBlockWithTransactions({ blockNumber });
const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber, parentHash } = blockNodes[bi];

View File

@ -46,16 +46,18 @@ export class Indexer {
_config: Config;
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient
_getStorageAt: GetStorageAt
_factoryContract: ethers.utils.Interface
_poolContract: ethers.utils.Interface
_nfpmContract: ethers.utils.Interface
constructor (config: Config, db: Database, ethClient: EthClient) {
constructor (config: Config, db: Database, ethClient: EthClient, postgraphileClient: EthClient) {
this._config = config;
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._factoryContract = new ethers.utils.Interface(factoryABI);
@ -66,6 +68,7 @@ export class Indexer {
getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSON.parse(event.eventInfo);
const { tx } = JSON.parse(event.extraInfo);
return {
block: {
@ -76,7 +79,10 @@ export class Indexer {
},
tx: {
hash: event.txHash
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
@ -297,6 +303,23 @@ export class Indexer {
async fetchAndSaveEvents (blockHash: string): Promise<void> {
const { block, logs } = await this._ethClient.getLogs({ blockHash });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
} = await this._postgraphileClient.getBlockWithTransactions({ blockHash });
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
@ -317,7 +340,8 @@ export class Indexer {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const extraInfo = { topics, data };
const tx = transactionMap[txHash];
const extraInfo = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const uniContract = await this.isUniswapContract(contract);

View File

@ -214,7 +214,12 @@ export const main = async (): Promise<any> => {
cache
});
const indexer = new Indexer(config, db, ethClient);
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const indexer = new Indexer(config, db, ethClient, postgraphileClient);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -10,6 +10,9 @@ const resultEvent = `
}
tx {
hash
from
to
index
}
contract
eventIndex

View File

@ -56,7 +56,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
const block = await indexer.getBlockProgress(blockHash);
if (!block || !block.isComplete) {
// TODO: Trigger indexing for the block.
throw new Error('Not available');
throw new Error(`Block hash ${blockHash} number ${block?.blockNumber} not processed yet`);
}
const events = await indexer.getEventsByFilter(blockHash, contract, name);

View File

@ -57,10 +57,15 @@ export const main = async (): Promise<any> => {
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config, db, ethClient);
const indexer = new Indexer(config, db, ethClient, postgraphileClient);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -77,6 +77,7 @@ describe('uni-watcher', () => {
let db: Database;
let uniClient: UniClient;
let ethClient: EthClient;
let postgraphileClient: EthClient;
let signer: Signer;
let recipient: string;
@ -105,6 +106,11 @@ describe('uni-watcher', () => {
cache
});
postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const endpoint = `http://${host}:${port}/graphql`;
const gqlEndpoint = endpoint;
const gqlSubscriptionEndpoint = endpoint;
@ -135,7 +141,7 @@ describe('uni-watcher', () => {
await watchContract(db, factory.address, 'factory', 100);
// Verifying with the db.
const indexer = new Indexer(config, db, ethClient);
const indexer = new Indexer(config, db, ethClient, postgraphileClient);
assert(await indexer.isUniswapContract(factory.address), 'Factory contract not added to database.');
});
@ -323,7 +329,7 @@ describe('uni-watcher', () => {
await watchContract(db, nfpm.address, 'nfpm', 100);
// Verifying with the db.
const indexer = new Indexer(config, db, ethClient);
const indexer = new Indexer(config, db, ethClient, postgraphileClient);
assert(await indexer.isUniswapContract(nfpm.address), 'NonfungiblePositionManager contract not added to database.');
});

View File

@ -2,3 +2,4 @@ export * from './src/config';
export * from './src/database';
export * from './src/job-queue';
export * from './src/constants';
export * from './src/index';

View File

@ -35,6 +35,7 @@ export interface Config {
jobQueue: {
dbConnectionString: string;
maxCompletionLag: number;
jobDelay?: number;
}
}

View File

@ -0,0 +1,5 @@
/**
* Method to wait for specified time.
* @param time Time to wait in milliseconds
*/
export const wait = async (time: number): Promise<void> => new Promise(resolve => setTimeout(resolve, time));