Watch for Factory PoolCreated event. (#119)

This commit is contained in:
Ashwin Phatak 2021-07-05 16:15:54 +05:30 committed by GitHub
parent da758aceaa
commit 8d2a4c6b14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 129 additions and 160 deletions

View File

@ -92,7 +92,7 @@ export class Database {
async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> { async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.where('block_hash = :blockHash AND token = :token AND :eventName = :eventName', { .where('block_hash = :blockHash AND token = :token AND event_name = :eventName', {
blockHash, blockHash,
token, token,
eventName eventName

View File

@ -22,10 +22,13 @@
subscribersDir = "src/subscriber" subscribersDir = "src/subscriber"
[upstream] [upstream]
gqlEndpoint = "http://127.0.0.1:8083/graphql" gqlEndpoint = "http://127.0.0.1:8082/graphql"
gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql" gqlSubscriptionEndpoint = "http://127.0.0.1:5000/graphql"
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
enabled = false enabled = false
deleteOnStart = false deleteOnStart = false
[contracts]
factory = "0xF3eeD405056b4BB1ce100761325f82F2b6Da2D82"

View File

@ -19,6 +19,9 @@ export interface Config {
gqlSubscriptionEndpoint: string; gqlSubscriptionEndpoint: string;
cache: CacheConfig cache: CacheConfig
} }
contracts: {
factory: string;
}
} }
export const getConfig = async (configFile: string): Promise<Config> => { export const getConfig = async (configFile: string): Promise<Config> => {

View File

@ -28,41 +28,41 @@ export class Database {
} }
// Returns true if events have already been synced for the (block, token) combination. // Returns true if events have already been synced for the (block, token) combination.
async didSyncEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<boolean> { async didSyncEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<boolean> {
const numRows = await this._conn.getRepository(EventSyncProgress) const numRows = await this._conn.getRepository(EventSyncProgress)
.createQueryBuilder() .createQueryBuilder()
.where('block_hash = :blockHash AND token = :token', { .where('block_hash = :blockHash AND contract = :contract', {
blockHash, blockHash,
token contract
}) })
.getCount(); .getCount();
return numRows > 0; return numRows > 0;
} }
async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> { async getEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<Event[]> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.where('block_hash = :blockHash AND token = :token', { .where('block_hash = :blockHash AND contract = :contract', {
blockHash, blockHash,
token contract
}) })
.addOrderBy('id', 'ASC') .addOrderBy('id', 'ASC')
.getMany(); .getMany();
} }
async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> { async getEventsByName ({ blockHash, contract, eventName }: { blockHash: string, contract: string, eventName: string }): Promise<Event[] | undefined> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.where('block_hash = :blockHash AND token = :token AND :eventName = :eventName', { .where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', {
blockHash, blockHash,
token, contract,
eventName eventName
}) })
.getMany(); .getMany();
} }
async saveEvents ({ blockHash, token, events }: { blockHash: string, token: string, events: DeepPartial<Event>[] }): Promise<void> { async saveEvents ({ blockHash, contract, events }: { blockHash: string, contract: string, events: DeepPartial<Event>[] }): Promise<void> {
// In a transaction: // In a transaction:
// (1) Save all the events in the database. // (1) Save all the events in the database.
// (2) Add an entry to the event progress table. // (2) Add an entry to the event progress table.
@ -73,9 +73,9 @@ export class Database {
// Check sync progress inside the transaction. // Check sync progress inside the transaction.
const numRows = await repo const numRows = await repo
.createQueryBuilder() .createQueryBuilder()
.where('block_hash = :blockHash AND token = :token', { .where('block_hash = :blockHash AND contract = :contract', {
blockHash, blockHash,
token contract
}) })
.getCount(); .getCount();
@ -88,7 +88,7 @@ export class Database {
.execute(); .execute();
// Update event sync progress. // Update event sync progress.
const progress = repo.create({ blockHash, token }); const progress = repo.create({ blockHash, contract });
await repo.save(progress); await repo.save(progress);
} }
}); });

View File

@ -0,0 +1,17 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
@Entity()
@Index(['address'], { unique: true })
export class Contract {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar', { length: 42 })
address!: string;
@Column('varchar', { length: 8 })
kind!: string;
@Column('numeric')
startingBlock!: number;
}

View File

@ -2,7 +2,7 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
@Entity() @Entity()
// Index to query all events for a contract efficiently. // Index to query all events for a contract efficiently.
@Index(['blockHash', 'token']) @Index(['blockHash', 'contract'])
export class Event { export class Event {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;
@ -11,11 +11,15 @@ export class Event {
blockHash!: string; blockHash!: string;
@Column('varchar', { length: 42 }) @Column('varchar', { length: 42 })
token!: string; contract!: string;
@Column('varchar', { length: 256 }) @Column('varchar', { length: 256 })
eventName!: string; eventName!: string;
// TODO: Polymorphic relationships?
@Column('text')
eventData!: string;
@Column('text') @Column('text')
proof!: string; proof!: string;
} }

View File

@ -7,7 +7,7 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
// yet been synced from upstream. // yet been synced from upstream.
// //
@Entity() @Entity()
@Index(['blockHash', 'token'], { unique: true }) @Index(['blockHash', 'contract'], { unique: true })
export class EventSyncProgress { export class EventSyncProgress {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;
@ -16,5 +16,5 @@ export class EventSyncProgress {
blockHash!: string; blockHash!: string;
@Column('varchar', { length: 42 }) @Column('varchar', { length: 42 })
token!: string; contract!: string;
} }

View File

@ -1,107 +1,73 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { invert } from 'lodash'; import _ from 'lodash';
import { JsonFragment } from '@ethersproject/abi';
import { DeepPartial } from 'typeorm'; import { DeepPartial } from 'typeorm';
import JSONbig from 'json-bigint'; import JSONbig from 'json-bigint';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import { PubSub } from 'apollo-server-express'; import { PubSub } from 'apollo-server-express';
import { EthClient, topictoAddress } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getEventNameTopics, getStorageValue, GetStorageAt, StorageLayout } from '@vulcanize/solidity-mapper'; import { GetStorageAt } from '@vulcanize/solidity-mapper';
import { Database } from './database'; import { Database } from './database';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
import { Config } from './config';
import factoryABI from './artifacts/factory.json';
const log = debug('vulcanize:indexer'); const log = debug('vulcanize:indexer');
interface Artifacts {
abi: JsonFragment[];
storageLayout: StorageLayout;
}
export interface ValueResult {
value: string | bigint;
proof: {
data: string;
}
}
type EventsResult = Array<{ type EventsResult = Array<{
event: { event: any;
from?: string;
to?: string;
owner?: string;
spender?: string;
value?: BigInt;
__typename: string;
}
proof: string; proof: string;
}> }>
export class Indexer { export class Indexer {
_config: Config;
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
_pubsub: PubSub _pubsub: PubSub
_getStorageAt: GetStorageAt _getStorageAt: GetStorageAt
// _abi: JsonFragment[] _factoryContract: ethers.utils.Interface
// _storageLayout: StorageLayout
// _contract: ethers.utils.Interface
constructor (db: Database, ethClient: EthClient, pubsub: PubSub) { constructor (config: Config, db: Database, ethClient: EthClient, pubsub: PubSub) {
assert(config);
assert(db); assert(db);
assert(ethClient); assert(ethClient);
assert(pubsub); assert(pubsub);
// const { abi, storageLayout } = artifacts; this._config = config;
// assert(abi);
// assert(storageLayout);
this._db = db; this._db = db;
this._ethClient = ethClient; this._ethClient = ethClient;
this._pubsub = pubsub; this._pubsub = pubsub;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
// this._abi = abi; this._factoryContract = new ethers.utils.Interface(factoryABI);
// this._storageLayout = storageLayout;
// this._contract = new ethers.utils.Interface(this._abi);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator(['event']); return this._pubsub.asyncIterator(['event']);
} }
async getEvents (blockHash: string, token: string, name: string | null): Promise<EventsResult> { async getEvents (blockHash: string, contract: string, name: string | null): Promise<EventsResult> {
const didSyncEvents = await this._db.didSyncEvents({ blockHash, token }); const didSyncEvents = await this._db.didSyncEvents({ blockHash, contract });
if (!didSyncEvents) { if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table. // Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash, token }); await this._fetchAndSaveEvents({ blockHash, contract });
log('getEvents: db miss, fetching from upstream server'); log('getEvents: db miss, fetching from upstream server');
} }
assert(await this._db.didSyncEvents({ blockHash, token })); assert(await this._db.didSyncEvents({ blockHash, contract }));
const events = await this._db.getEvents({ blockHash, token }); const events = await this._db.getEvents({ blockHash, contract });
log('getEvents: db hit'); log(`getEvents: db hit, num events: ${events.length}`);
const result = events const result = events
// TODO: Filter using db WHERE condition when name is not empty. // TODO: Filter using db WHERE condition when name is not empty.
.filter(event => !name || name === event.eventName) .filter(event => !name || name === event.eventName)
.map(e => { .map(e => {
const eventFields: { const eventFields = JSON.parse(e.eventData);
from?: string,
to?: string,
value?: BigInt,
owner?: string,
spender?: string,
} = {};
switch (e.eventName) {
// TODO: Handle events.
}
return { return {
event: { event: {
@ -118,112 +84,87 @@ export class Indexer {
return result; return result;
} }
async triggerIndexingOnEvent (blockHash: string, token: string, receipt: any, logIndex: number): Promise<void> { /* eslint-disable */
const topics = []; async triggerIndexingOnEvent (blockHash: string, contract: string, receipt: any, logIndex: number): Promise<void> {
// We only care about the event type for now.
const data = '0x0000000000000000000000000000000000000000000000000000000000000000';
topics.push(receipt.topic0S[logIndex]);
topics.push(receipt.topic1S[logIndex]);
topics.push(receipt.topic2S[logIndex]);
// const { name: eventName, args } = this._contract.parseLog({ topics, data });
// log(`trigger indexing on event: ${eventName} ${args}`);
// What data we index depends on the kind of event.
// switch (eventName) {
// TODO: Index event.
// }
} }
/* eslint-enable */
async publishEventToSubscribers (blockHash: string, token: string, logIndex: number): Promise<void> { async publishEventToSubscribers (blockHash: string, contract: string, logIndex: number): Promise<void> {
// TODO: Optimize this fetching of events. // TODO: Optimize this fetching of events.
const events = await this.getEvents(blockHash, token, null); const events = await this.getEvents(blockHash, contract, null);
log(JSON.stringify(events, null, 2));
log(logIndex);
const event = events[logIndex]; const event = events[logIndex];
log(`pushing event to GQL subscribers: ${event.event.__typename}`); log(`pushing event to GQL subscribers: ${event.event.__typename}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onTokenEvent`. // Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish('event', { await this._pubsub.publish('event', {
onTokenEvent: { onEvent: {
blockHash, blockHash,
token, contract,
event event
} }
}); });
} }
async isUniswapContract (address: String): Promise<boolean> { async isUniswapContract (address: string): Promise<boolean> {
// TODO: Return true for uniswap contracts of interest to the indexer (from config?). // TODO: Return true for uniswap contracts of interest to the indexer (from config?).
return address != null; return address != null;
} }
async processEvent (blockHash: string, token: string, receipt: any, logIndex: number): Promise<void> { async processEvent (blockHash: string, contract: string, receipt: any, logIndex: number): Promise<void> {
// Trigger indexing of data based on the event. // Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(blockHash, token, receipt, logIndex); await this.triggerIndexingOnEvent(blockHash, contract, receipt, logIndex);
// Also trigger downstream event watcher subscriptions. // Also trigger downstream event watcher subscriptions.
await this.publishEventToSubscribers(blockHash, token, logIndex); await this.publishEventToSubscribers(blockHash, contract, logIndex);
} }
// TODO: Move into base/class or framework package. async _fetchAndSaveEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<void> {
async _getStorageValue (blockHash: string, token: string, variable: string, ...mappingKeys: string[]): Promise<ValueResult> { const logs = await this._ethClient.getLogs({ blockHash, contract });
return {
value: '', const dbEvents = logs.map((logObj: any) => {
proof: { const { topics, data, cid, ipldBlock } = logObj;
data: ''
let eventName;
let eventProps = {};
// TODO: Get contract kind from contracts table.
if (contract === this._config.contracts.factory) {
const logDescription = this._factoryContract.parseLog({ data, topics });
const { token0, token1, fee, tickSpacing, pool } = logDescription.args;
eventName = logDescription.name;
eventProps = { token0, token1, fee, tickSpacing, pool };
} }
};
// return getStorageValue( let event: DeepPartial<Event> | undefined;
// this._storageLayout, if (eventName) {
// this._getStorageAt, event = {
// blockHash, blockHash,
// token, contract,
// variable, eventName,
// ...mappingKeys eventData: JSONbig.stringify({ ...eventProps }),
// ); proof: JSONbig.stringify({
} data: JSONbig.stringify({
blockHash,
async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<void> { receipt: {
const logs = await this._ethClient.getLogs({ blockHash, contract: token }); cid,
ipldBlock
const eventNameToTopic = {}; // getEventNameTopics(this._abi); }
const logTopicToEventName = invert(eventNameToTopic); })
const dbEvents = logs.map((log: any) => {
const { topics, data: value, cid, ipldBlock } = log;
const [topic0, topic1, topic2] = topics;
const eventName = logTopicToEventName[topic0];
const address1 = topictoAddress(topic1);
const address2 = topictoAddress(topic2);
const event: DeepPartial<Event> = {
blockHash,
token,
eventName,
proof: JSONbig.stringify({
data: JSONbig.stringify({
blockHash,
receipt: {
cid,
ipldBlock
}
}) })
}) };
};
switch (eventName) {
// TODO: Handle event.
} }
return event; return event;
}); });
await this._db.saveEvents({ blockHash, token, events: dbEvents }); const events: DeepPartial<Event>[] = _.compact(dbEvents);
await this._db.saveEvents({ blockHash, contract, events });
} }
} }

View File

@ -2,7 +2,7 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint'; import BigInt from 'apollo-type-bigint';
import debug from 'debug'; import debug from 'debug';
import { Indexer, ValueResult } from './indexer'; import { Indexer } from './indexer';
const log = debug('vulcanize:resolver'); const log = debug('vulcanize:resolver');
@ -13,32 +13,34 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
BigInt: new BigInt('bigInt'), BigInt: new BigInt('bigInt'),
ERC20Event: { ERC20Event: {
__resolveType() { __resolveType () {
return null; return null;
} }
}, },
FactoryEvent: { FactoryEvent: {
__resolveType() { __resolveType () {
return null; return null;
} }
}, },
NonFungiblePositionManagerEvent: { NonFungiblePositionManagerEvent: {
__resolveType() { __resolveType () {
return null; return null;
} }
}, },
PoolEvent: { PoolEvent: {
__resolveType() { __resolveType () {
return null; return null;
} }
}, },
Event: { Event: {
__resolveType() { __resolveType: (obj: any) => {
return null; assert(obj.__typename);
return obj.__typename;
} }
}, },
@ -50,9 +52,9 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
Query: { Query: {
events: async (_: any, { blockHash, token, name }: { blockHash: string, token: string, name: string }) => { events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => {
log('events', blockHash, token, name || ''); log('events', blockHash, contract, name || '');
return indexer.getEvents(blockHash, token, name); return indexer.getEvents(blockHash, contract, name);
} }
} }
}; };

View File

@ -11,7 +11,6 @@ import { createServer } from 'http';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import artifacts from './artifacts/ERC20.json';
import typeDefs from './schema'; import typeDefs from './schema';
import { createResolvers as createMockResolvers } from './mock/resolvers'; import { createResolvers as createMockResolvers } from './mock/resolvers';
@ -58,7 +57,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, pubsub); const indexer = new Indexer(config, db, ethClient, pubsub);
const eventWatcher = new EventWatcher(ethClient, indexer); const eventWatcher = new EventWatcher(ethClient, indexer);
await eventWatcher.start(); await eventWatcher.start();