Factory PoolCreated event handler (#120)

* Setup handler for PoolCreated event.

* Create Pool entity.

* Subscribe to uni-watcher for watching events.

* Refactor code to create GraphQLClient in ipld-eth-client.

Co-authored-by: nikugogoi <95nikass@gmail.com>
Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-07-06 16:55:11 +05:30 committed by GitHub
parent 9c60895352
commit aec9281fb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 393 additions and 117 deletions

View File

@ -0,0 +1 @@
export * from './src/client';

View File

@ -3,6 +3,7 @@
"version": "0.1.0",
"description": "ERC20 Watcher",
"private": true,
"main": "index.ts",
"scripts": {
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",
"server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml",

View File

@ -0,0 +1,29 @@
import { gql } from 'apollo-server-express';
import { GraphQLClient } from '@vulcanize/ipld-eth-client';
import { querySymbol } from './queries';
interface Config {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
}
export class Client {
_config: Config;
_client: GraphQLClient;
constructor (config: Config) {
this._config = config;
this._client = new GraphQLClient(config);
}
async getSymbol (blockHash: string | undefined, token: string): Promise<any> {
const { symbol } = await this._client.query(
gql(querySymbol),
{ blockHash, token }
);
return symbol;
}
}

View File

@ -1,2 +1,3 @@
export * from './src/eth-client';
export * from './src/utils';
export * from './src/graphql-client';

View File

@ -1,18 +1,10 @@
import assert from 'assert';
import debug from 'debug';
import fetch from 'cross-fetch';
import { SubscriptionClient } from 'subscriptions-transport-ws';
import ws from 'ws';
import { ApolloClient, NormalizedCacheObject, split, HttpLink, InMemoryCache } from '@apollo/client/core';
import { getMainDefinition } from '@apollo/client/utilities';
import { WebSocketLink } from '@apollo/client/link/ws';
import { Cache } from '@vulcanize/cache';
import ethQueries from './eth-queries';
import { padKey } from './utils';
const log = debug('vulcanize:eth-client');
import { GraphQLClient } from './graphql-client';
interface Config {
gqlEndpoint: string;
@ -28,7 +20,7 @@ interface Vars {
export class EthClient {
_config: Config;
_client: ApolloClient<NormalizedCacheObject>;
_graphqlClient: GraphQLClient;
_cache: Cache | undefined;
constructor (config: Config) {
@ -39,45 +31,7 @@ export class EthClient {
assert(gqlEndpoint, 'Missing gql endpoint');
assert(gqlSubscriptionEndpoint, 'Missing gql subscription endpoint');
// https://www.apollographql.com/docs/react/data/subscriptions/
const subscriptionClient = new SubscriptionClient(gqlSubscriptionEndpoint, {
reconnect: true,
connectionCallback: (error: Error[]) => {
if (error) {
log('Subscription client connection error', error[0].message);
} else {
log('Subscription client connected successfully');
}
}
}, ws);
subscriptionClient.onError(error => {
log('Subscription client error', error.message);
});
const httpLink = new HttpLink({
uri: gqlEndpoint,
fetch
});
const wsLink = new WebSocketLink(subscriptionClient);
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
this._client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache()
});
this._graphqlClient = new GraphQLClient({ gqlEndpoint, gqlSubscriptionEndpoint });
this._cache = cache;
}
@ -107,9 +61,7 @@ export class EthClient {
}
async getBlockWithTransactions (blockNumber: string): Promise<any> {
const { data: result } = await this._client.query({ query: ethQueries.getBlockWithTransactions, variables: { blockNumber } });
return result;
return this._graphqlClient.query(ethQueries.getBlockWithTransactions, { blockNumber });
}
async getLogs (vars: Vars): Promise<any> {
@ -120,27 +72,11 @@ export class EthClient {
}
async watchLogs (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
const observable = await this._client.subscribe({
query: ethQueries.subscribeLogs
});
return observable.subscribe({
next (data) {
onNext(data);
}
});
return this._graphqlClient.subscribe(ethQueries.subscribeLogs, onNext);
}
async watchTransactions (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
const observable = await this._client.subscribe({
query: ethQueries.subscribeTransactions
});
return observable.subscribe({
next (data) {
onNext(data);
}
});
return this._graphqlClient.subscribe(ethQueries.subscribeTransactions, onNext);
}
async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise<any> {
@ -158,7 +94,7 @@ export class EthClient {
}
// Result not cached or cache disabled, need to perform an upstream GQL query.
const { data: result } = await this._client.query({ query: ethQueries[queryName], variables: vars });
const result = await this._graphqlClient.query(ethQueries[queryName], vars);
// Cache the result and return it, if cache is enabled.
if (this._cache) {

View File

@ -0,0 +1,86 @@
import assert from 'assert';
import debug from 'debug';
import fetch from 'cross-fetch';
import { SubscriptionClient } from 'subscriptions-transport-ws';
import ws from 'ws';
import { ApolloClient, NormalizedCacheObject, split, HttpLink, InMemoryCache, DocumentNode, TypedDocumentNode } from '@apollo/client/core';
import { getMainDefinition } from '@apollo/client/utilities';
import { WebSocketLink } from '@apollo/client/link/ws';
const log = debug('vulcanize:client');
interface Config {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
}
export class GraphQLClient {
_config: Config;
_client: ApolloClient<NormalizedCacheObject>;
constructor (config: Config) {
this._config = config;
const { gqlEndpoint, gqlSubscriptionEndpoint } = config;
assert(gqlEndpoint, 'Missing gql endpoint');
assert(gqlSubscriptionEndpoint, 'Missing gql subscription endpoint');
// https://www.apollographql.com/docs/react/data/subscriptions/
const subscriptionClient = new SubscriptionClient(gqlSubscriptionEndpoint, {
reconnect: true,
connectionCallback: (error: Error[]) => {
if (error) {
log('Subscription client connection error', error[0].message);
} else {
log('Subscription client connected successfully');
}
}
}, ws);
subscriptionClient.onError(error => {
log('Subscription client error', error.message);
});
const httpLink = new HttpLink({
uri: gqlEndpoint,
fetch
});
const wsLink = new WebSocketLink(subscriptionClient);
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
this._client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache()
});
}
async subscribe (query: DocumentNode, onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
const observable = await this._client.subscribe({ query });
return observable.subscribe({
next (data) {
onNext(data);
}
});
}
async query (query: DocumentNode | TypedDocumentNode, variables: { [key: string]: any }): Promise<any> {
const { data: result } = await this._client.query({ query, variables });
return result;
}
}

View File

@ -1,6 +1,6 @@
[server]
host = "127.0.0.1"
port = 3003
port = 3004
[database]
type = "postgres"
@ -22,10 +22,18 @@
subscribersDir = "src/subscriber"
[upstream]
gqlEndpoint = "http://127.0.0.1:8083/graphql"
gqlEndpoint = "http://127.0.0.1:8082/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache]
name = "requests"
enabled = false
deleteOnStart = false
[upstream.uniWatcher]
gqlEndpoint = "http://127.0.0.1:3003/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:3003/graphql"
[upstream.tokenWatcher]
gqlEndpoint = "http://127.0.0.1:3001/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:3001/graphql"

View File

@ -6,9 +6,11 @@
"private": true,
"dependencies": {
"@vulcanize/cache": "^0.1.0",
"@vulcanize/erc20-watcher": "^0.1.0",
"@vulcanize/ipld-eth-client": "^0.1.0",
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3"
"apollo-type-bigint": "^0.1.3",
"typeorm": "^0.2.32"
},
"scripts": {
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",

View File

@ -18,7 +18,15 @@ export interface Config {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
traceProviderEndpoint: string;
cache: CacheConfig
cache: CacheConfig;
uniWatcher: {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
};
tokenWatcher: {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
}
}
jobQueue: {
dbConnectionString: string;

View File

@ -2,7 +2,10 @@ import assert from 'assert';
import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Factory } from './entity/Factory';
import { Pool } from './entity/Pool';
import { Event } from './entity/Event';
import { Token } from './entity/Token';
import { EventSyncProgress } from './entity/EventProgress';
export class Database {
@ -27,6 +30,67 @@ export class Database {
return this._conn.close();
}
async loadFactory ({ id, blockNumber }: DeepPartial<Factory>): Promise<Factory> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Factory);
let entity = await repo.createQueryBuilder('factory')
.where('id = :id AND block_number <= :blockNumber', {
id,
blockNumber
})
.getOne();
if (!entity) {
entity = repo.create({ blockNumber, id });
entity = await repo.save(entity);
}
return entity;
});
}
async loadPool ({ id, blockNumber }: DeepPartial<Pool>): Promise<Pool> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Pool);
let entity = await repo.createQueryBuilder('pool')
.where('id = :id AND block_number <= :blockNumber', {
id,
blockNumber
})
.getOne();
if (!entity) {
entity = repo.create({ blockNumber, id });
entity = await repo.save(entity);
}
return entity;
});
}
async loadToken ({ id, blockNumber }: DeepPartial<Token>, getValues: () => Promise<DeepPartial<Token>>): Promise<Token> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Token);
let entity = await repo.createQueryBuilder('token')
.where('id = :id AND block_number <= :blockNumber', {
id,
blockNumber
})
.getOne();
if (!entity) {
const tokenValues = await getValues();
entity = repo.create({ blockNumber, id, ...tokenValues });
entity = await repo.save(entity);
}
return entity;
});
}
// Returns true if events have already been synced for the (block, token) combination.
async didSyncEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<boolean> {
const numRows = await this._conn.getRepository(EventSyncProgress)

View File

@ -0,0 +1,14 @@
import { Entity, Column, Index, PrimaryColumn } from 'typeorm';
@Entity()
@Index(['blockNumber', 'id'], { unique: true })
export class Factory {
@PrimaryColumn('varchar', { length: 42 })
id!: string;
@Column('numeric')
blockNumber!: number;
@Column('numeric', { default: 0 })
poolCount!: number;
}

View File

@ -0,0 +1,11 @@
import { Entity, PrimaryColumn, Column, Index } from 'typeorm';
@Entity()
@Index(['blockNumber', 'id'])
export class Pool {
@PrimaryColumn('varchar', { length: 42 })
id!: string;
@Column('numeric')
blockNumber!: number;
}

View File

@ -0,0 +1,14 @@
import { Entity, PrimaryColumn, Column, Index } from 'typeorm';
@Entity()
@Index(['blockNumber', 'id'])
export class Token {
@PrimaryColumn('varchar', { length: 42 })
id!: string;
@Column('numeric')
blockNumber!: number;
@Column('varchar')
symbol!: string;
}

View File

@ -1,59 +1,94 @@
import assert from 'assert';
import debug from 'debug';
import _ from 'lodash';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Indexer } from './indexer';
import { Database } from './database';
const log = debug('vulcanize:events');
interface PoolCreatedEvent {
token0: string;
token1: string;
fee: bigint;
tickSpacing: bigint;
pool: string;
}
interface ResultEvent {
proof: {
data: string
}
event: {
__typename: string;
[key: string]: any;
}
}
export class EventWatcher {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_db: Database
_subscription?: ZenObservable.Subscription
_uniClient: UniClient
_erc20Client: ERC20Client
constructor (ethClient: EthClient, indexer: Indexer) {
assert(ethClient);
assert(indexer);
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client) {
assert(db);
this._ethClient = ethClient;
this._indexer = indexer;
this._db = db;
this._uniClient = uniClient;
this._erc20Client = erc20Client;
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
log('Started watching upstream logs...');
this._subscription = await this._ethClient.watchLogs(async (value) => {
const receipt = _.get(value, 'data.listen.relatedNode');
log('watchLogs', JSON.stringify(receipt, null, 2));
// Check if this log is for a contract we care about.
const { logContracts } = receipt;
if (logContracts && logContracts.length) {
for (let logIndex = 0; logIndex < logContracts.length; logIndex++) {
const contractAddress = logContracts[logIndex];
const isWatchedContract = await this._indexer.isUniswapContract(contractAddress);
if (isWatchedContract) {
// TODO: Move processing to background task runner.
const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash } } } = receipt;
await this._indexer.getEvents(blockHash, contractAddress, null);
// Trigger other indexer methods based on event topic.
await this._indexer.processEvent(blockHash, contractAddress, receipt, logIndex);
}
}
}
});
log('Started watching upstream events...');
this._subscription = await this._uniClient.watchEvents(this._handleEvents.bind(this));
}
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream logs');
log('Stopped watching upstream events');
this._subscription.unsubscribe();
}
}
async _handleEvents ({ blockHash, blockNumber, contract, event }: { blockHash: string, blockNumber: number, contract: string, event: ResultEvent}): Promise<void> {
// TODO: Process proof (proof.data) in event.
const { event: { __typename: eventType, ...eventValues } } = event;
switch (eventType) {
case 'PoolCreatedEvent':
this._handlePoolCreated(blockHash, blockNumber, contract, eventValues as PoolCreatedEvent);
break;
default:
break;
}
}
async _handlePoolCreated (blockHash: string, blockNumber: number, contractAddress: string, poolCreatedEvent: PoolCreatedEvent): Promise<void> {
const { token0: token0Address, token1: token1Address, fee, tickSpacing, pool: poolAddress } = poolCreatedEvent;
// Load factory.
const factory = await this._db.loadFactory({ blockNumber, id: contractAddress });
factory.poolCount = factory.poolCount + 1;
// Create new Pool entity.
const pool = this._db.loadPool({ blockNumber, id: poolAddress });
// TODO: Load Token entities.
const getTokenValues = async (tokenAddress: string) => {
const { value: symbol } = await this._erc20Client.getSymbol(blockHash, tokenAddress);
return { symbol };
};
const token0 = this._db.loadToken({ blockNumber, id: token0Address }, () => getTokenValues(token0Address));
const token1 = this._db.loadToken({ blockNumber, id: token1Address }, () => getTokenValues(token1Address));
// TODO: Update Token entities.
// TODO: Update Pool entity.
// TODO: Save entities to DB.
}
}

View File

@ -23,6 +23,11 @@ export interface ValueResult {
}
}
export interface BlockHeight {
number: number;
hash: string;
}
type EventsResult = Array<{
event: {
from?: string;

View File

@ -10,6 +10,8 @@ import { createServer } from 'http';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import typeDefs from './schema';
@ -46,20 +48,22 @@ export const main = async (): Promise<any> => {
await db.init();
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig } = upstream;
const { gqlEndpoint, gqlSubscriptionEndpoint, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream;
assert(gqlEndpoint, 'Missing upstream gqlEndpoint');
assert(gqlSubscriptionEndpoint, 'Missing upstream gqlSubscriptionEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, gqlSubscriptionEndpoint, cache });
const uniClient = new UniClient(uniWatcher);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, ethClient, pubsub);
const eventWatcher = new EventWatcher(ethClient, indexer);
const eventWatcher = new EventWatcher(db, uniClient, erc20Client);
await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer);

View File

@ -0,0 +1 @@
export * from './src/client';

View File

@ -3,6 +3,7 @@
"version": "0.1.0",
"description": "Uniswap v3 Watcher",
"private": true,
"main": "index.ts",
"scripts": {
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",
"server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml",

View File

@ -0,0 +1,50 @@
import { gql } from 'apollo-server-express';
import { GraphQLClient } from '@vulcanize/ipld-eth-client';
interface Config {
gqlEndpoint: string;
gqlSubscriptionEndpoint: string;
}
export class Client {
_config: Config;
_client: GraphQLClient;
constructor (config: Config) {
this._config = config;
this._client = new GraphQLClient(config);
}
async watchEvents (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
return this._client.subscribe(
gql`
subscription SubscriptionReceipt {
onEvent {
blockHash
blockNumber
contract
event {
proof {
data
}
event {
__typename
... on PoolCreatedEvent {
token0
token1
fee
tickSpacing
pool
}
}
}
}
}
`,
({ data }) => {
onNext(data.onEvent);
}
);
}
}

View File

@ -1,5 +1,6 @@
import { SignerWithAddress } from "@nomiclabs/hardhat-ethers/signers";
import { task } from "hardhat/config";
import '@nomiclabs/hardhat-ethers';
// This is a sample Hardhat task. To learn how to create your own go to
// https://hardhat.org/guides/create-task.html

View File

@ -3,6 +3,7 @@ import {
abi as FACTORY_ABI,
} from '@uniswap/v3-core/artifacts/contracts/UniswapV3Factory.sol/UniswapV3Factory.json'
import { ContractTransaction } from "ethers";
import '@nomiclabs/hardhat-ethers';
task("create-pool", "Creates pool using Factory contract")
.addParam('factory', 'Address of factory contract', undefined, types.string)

View File

@ -1,8 +1,9 @@
import { task, types } from "hardhat/config";
import { task } from "hardhat/config";
import {
abi as FACTORY_ABI,
bytecode as FACTORY_BYTECODE,
} from '@uniswap/v3-core/artifacts/contracts/UniswapV3Factory.sol/UniswapV3Factory.json'
} from '@uniswap/v3-core/artifacts/contracts/UniswapV3Factory.sol/UniswapV3Factory.json';
import '@nomiclabs/hardhat-ethers';
task("deploy-factory", "Deploys Factory contract")
.setAction(async (_, hre) => {

View File

@ -1,10 +1,12 @@
import { task, types } from "hardhat/config";
import '@nomiclabs/hardhat-ethers';
task("deploy-token", "Deploys new token")
.addParam('name', 'Name of the token', undefined, types.string)
.addParam('symbol', 'Symbol of the token', undefined, types.string)
.setAction(async (args, hre) => {
const { name, symbol } = args
await hre.run("compile");
const Token = await hre.ethers.getContractFactory('ERC20Token');
const token = await Token.deploy(name, symbol);