Save watched contracts in db. (#122)

This commit is contained in:
Ashwin Phatak 2021-07-06 15:58:03 +05:30 committed by GitHub
parent 0b6a33561b
commit 9c60895352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 43 deletions

View File

@ -29,6 +29,3 @@
name = "requests"
enabled = false
deleteOnStart = false
[contracts]
factory = "0xECfCae19742EE4644b209d75cF15ee6D02A726Ff"

View File

@ -0,0 +1,51 @@
import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';
import { ethers } from 'ethers';
import { Config, getConfig } from '../config';
import { Database } from '../database';
(async () => {
const argv = await yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)'
},
address: {
type: 'string',
require: true,
demandOption: true,
describe: 'Address of the deployed contract'
},
kind: {
type: 'string',
require: true,
demandOption: true,
describe: 'Kind of contract (factory|pool)'
},
startingBlock: {
type: 'number',
default: 1,
describe: 'Starting block'
}
}).argv;
const config: Config = await getConfig(argv.configFile);
const { database: dbConfig } = config;
assert(dbConfig);
const db = new Database(dbConfig);
await db.init();
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
const address = ethers.utils.getAddress(argv.address);
await db.saveContract(address, argv.kind, argv.startingBlock);
await db.close();
})();

View File

@ -19,9 +19,6 @@ export interface Config {
gqlSubscriptionEndpoint: string;
cache: CacheConfig
}
contracts: {
factory: string;
}
}
export const getConfig = async (configFile: string): Promise<Config> => {

View File

@ -3,6 +3,7 @@ import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'ty
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Event } from './entity/Event';
import { Contract } from './entity/Contract';
import { EventSyncProgress } from './entity/EventProgress';
export class Database {
@ -93,4 +94,27 @@ export class Database {
}
});
}
async getContract (address: string): Promise<Contract | undefined> {
return this._conn.getRepository(Contract)
.createQueryBuilder('contract')
.where('address = :address', { address })
.getOne();
}
async saveContract (address: string, kind: string, startingBlock: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
});
}
}

View File

@ -1,12 +1,12 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
import { Entity, PrimaryColumn, Column } from 'typeorm';
export const KIND_FACTORY = 'factory';
export const KIND_POOL = 'pool';
@Entity()
@Index(['address'], { unique: true })
export class Contract {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar', { length: 42 })
@PrimaryColumn('varchar', { length: 42 })
address!: string;
@Column('varchar', { length: 8 })

View File

@ -35,15 +35,14 @@ export class EventWatcher {
if (logContracts && logContracts.length) {
for (let logIndex = 0; logIndex < logContracts.length; logIndex++) {
const contractAddress = logContracts[logIndex];
const isWatchedContract = await this._indexer.isUniswapContract(contractAddress);
if (isWatchedContract) {
// TODO: Move processing to background task runner.
const uniContract = await this._indexer.isUniswapContract(contractAddress);
if (uniContract) {
const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt;
await this._indexer.getEvents(blockHash, contractAddress, null);
const events = await this._indexer.getEvents(blockHash, contractAddress, null);
const event = events[logIndex];
// Trigger other indexer methods based on event topic.
await this._indexer.processEvent(blockHash, blockNumber, contractAddress, receipt, logIndex);
await this._indexer.processEvent(blockHash, blockNumber, uniContract, receipt, event);
}
}
}

View File

@ -11,16 +11,19 @@ import { GetStorageAt } from '@vulcanize/solidity-mapper';
import { Database } from './database';
import { Event } from './entity/Event';
import { Contract, KIND_FACTORY, KIND_POOL } from './entity/Contract';
import { Config } from './config';
import factoryABI from './artifacts/factory.json';
const log = debug('vulcanize:indexer');
type EventsResult = Array<{
type EventResult = {
event: any;
proof: string;
}>
};
type EventsResult = Array<EventResult>;
export class Indexer {
_config: Config;
@ -51,10 +54,15 @@ export class Indexer {
}
async getEvents (blockHash: string, contract: string, name: string | null): Promise<EventsResult> {
const uniContract = await this.isUniswapContract(contract);
if (!uniContract) {
throw new Error('Not a uniswap contract');
}
const didSyncEvents = await this._db.didSyncEvents({ blockHash, contract });
if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash, contract });
await this._fetchAndSaveEvents({ blockHash, contract, uniContract });
log('getEvents: db miss, fetching from upstream server');
}
@ -84,21 +92,16 @@ export class Indexer {
return result;
}
/* eslint-disable */
async triggerIndexingOnEvent (blockHash: string, contract: string, receipt: any, logIndex: number): Promise<void> {
async triggerIndexingOnEvent (blockNumber: number, event: EventResult): Promise<void> {
switch (event.event.__typename) {
case 'PoolCreatedEvent': {
const poolContract = ethers.utils.getAddress(event.event.pool);
await this._db.saveContract(poolContract, KIND_POOL, blockNumber);
}
}
}
/* eslint-enable */
async publishEventToSubscribers (blockHash: string, blockNumber: number, contract: string, logIndex: number): Promise<void> {
// TODO: Optimize this fetching of events.
const events = await this.getEvents(blockHash, contract, null);
log(JSON.stringify(events, null, 2));
log(logIndex);
const event = events[logIndex];
async publishEventToSubscribers (blockHash: string, blockNumber: number, contract: string, event: EventResult): Promise<void> {
log(`pushing event to GQL subscribers: ${event.event.__typename}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
@ -112,20 +115,21 @@ export class Indexer {
});
}
async isUniswapContract (address: string): Promise<boolean> {
// TODO: Return true for uniswap contracts of interest to the indexer (from config?).
return address != null;
async isUniswapContract (address: string): Promise<Contract | undefined> {
return this._db.getContract(ethers.utils.getAddress(address));
}
async processEvent (blockHash: string, blockNumber: number, contract: string, receipt: any, logIndex: number): Promise<void> {
async processEvent (blockHash: string, blockNumber: number, contract: Contract, receipt: any, event: EventResult): Promise<void> {
// Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(blockHash, contract, receipt, logIndex);
await this.triggerIndexingOnEvent(blockNumber, event);
// Also trigger downstream event watcher subscriptions.
await this.publishEventToSubscribers(blockHash, blockNumber, contract, logIndex);
await this.publishEventToSubscribers(blockHash, blockNumber, contract.address, event);
}
async _fetchAndSaveEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<void> {
async _fetchAndSaveEvents ({ blockHash, contract, uniContract }: { blockHash: string, contract: string, uniContract: Contract }): Promise<void> {
assert(uniContract);
const logs = await this._ethClient.getLogs({ blockHash, contract });
const dbEvents = logs.map((logObj: any) => {
@ -134,8 +138,7 @@ export class Indexer {
let eventName;
let eventProps = {};
// TODO: Get contract kind from contracts table.
if (contract === this._config.contracts.factory) {
if (uniContract.kind === KIND_FACTORY) {
const logDescription = this._factoryContract.parseLog({ data, topics });
const { token0, token1, fee, tickSpacing, pool } = logDescription.args;