mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-24 03:59:06 +00:00
Move uni-info-watcher event procesing to job queue. (#156)
* Move event procesing to job queue. * Store block properties in blocks table. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
95486d6553
commit
0d39363467
@ -2,7 +2,52 @@
|
|||||||
|
|
||||||
## Instructions
|
## Instructions
|
||||||
|
|
||||||
* To start the server run `yarn server`.
|
### Setup
|
||||||
|
|
||||||
|
Create a postgres12 database for the job queue:
|
||||||
|
|
||||||
|
```
|
||||||
|
sudo su - postgres
|
||||||
|
createdb uni-info-watcher-job-queue
|
||||||
|
```
|
||||||
|
|
||||||
|
Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro).
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
```
|
||||||
|
postgres@tesla:~$ psql -U postgres -h localhost uni-info-watcher-job-queue
|
||||||
|
Password for user postgres:
|
||||||
|
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
|
||||||
|
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
|
||||||
|
Type "help" for help.
|
||||||
|
|
||||||
|
uni-watcher-job-queue=# CREATE EXTENSION pgcrypto;
|
||||||
|
CREATE EXTENSION
|
||||||
|
uni-info-watcher-job-queue=# exit
|
||||||
|
```
|
||||||
|
|
||||||
|
Create a postgres12 database for the uni-info watcher:
|
||||||
|
|
||||||
|
```
|
||||||
|
sudo su - postgres
|
||||||
|
createdb uni-info-watcher
|
||||||
|
```
|
||||||
|
|
||||||
|
Update `environments/local.toml` with database connection settings for both the databases.
|
||||||
|
|
||||||
|
### Run
|
||||||
|
|
||||||
|
* Start the server:
|
||||||
|
```bash
|
||||||
|
$ yarn server
|
||||||
|
```
|
||||||
|
|
||||||
|
* Start the job runner:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
$ yarn job-runner
|
||||||
|
```
|
||||||
|
|
||||||
* Run `yarn server:mock` to run server with mock data.
|
* Run `yarn server:mock` to run server with mock data.
|
||||||
|
|
||||||
|
@ -38,3 +38,7 @@
|
|||||||
[upstream.tokenWatcher]
|
[upstream.tokenWatcher]
|
||||||
gqlEndpoint = "http://127.0.0.1:3001/graphql"
|
gqlEndpoint = "http://127.0.0.1:3001/graphql"
|
||||||
gqlSubscriptionEndpoint = "http://127.0.0.1:3001/graphql"
|
gqlSubscriptionEndpoint = "http://127.0.0.1:3001/graphql"
|
||||||
|
|
||||||
|
[jobQueue]
|
||||||
|
dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue"
|
||||||
|
maxCompletionLag = 300
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",
|
"server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml",
|
||||||
"server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml",
|
"server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml",
|
||||||
|
"job-runner": "DEBUG=vulcanize:* nodemon src/job-runner.ts -f environments/local.toml",
|
||||||
"test": "mocha -r ts-node/register src/**/*.spec.ts",
|
"test": "mocha -r ts-node/register src/**/*.spec.ts",
|
||||||
"lint": "eslint .",
|
"lint": "eslint .",
|
||||||
"build": "tsc",
|
"build": "tsc",
|
||||||
|
@ -20,6 +20,8 @@ import { Burn } from './entity/Burn';
|
|||||||
import { Swap } from './entity/Swap';
|
import { Swap } from './entity/Swap';
|
||||||
import { Position } from './entity/Position';
|
import { Position } from './entity/Position';
|
||||||
import { PositionSnapshot } from './entity/PositionSnapshot';
|
import { PositionSnapshot } from './entity/PositionSnapshot';
|
||||||
|
import { BlockProgress } from './entity/BlockProgress';
|
||||||
|
import { Block } from './events';
|
||||||
|
|
||||||
export class Database {
|
export class Database {
|
||||||
_config: ConnectionOptions
|
_config: ConnectionOptions
|
||||||
@ -633,56 +635,100 @@ export class Database {
|
|||||||
return numRows > 0;
|
return numRows > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getBlockEvents (blockHash: string): Promise<Event[]> {
|
||||||
|
return this._conn.getRepository(Event)
|
||||||
|
.createQueryBuilder('event')
|
||||||
|
.innerJoinAndSelect('event.block', 'block')
|
||||||
|
.where('block_hash = :blockHash', { blockHash })
|
||||||
|
.addOrderBy('event.id', 'ASC')
|
||||||
|
.getMany();
|
||||||
|
}
|
||||||
|
|
||||||
async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> {
|
async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> {
|
||||||
return this._conn.getRepository(Event)
|
return this._conn.getRepository(Event)
|
||||||
.createQueryBuilder('event')
|
.createQueryBuilder('event')
|
||||||
|
.innerJoinAndSelect('event.block', 'block')
|
||||||
.where('block_hash = :blockHash AND token = :token', {
|
.where('block_hash = :blockHash AND token = :token', {
|
||||||
blockHash,
|
blockHash,
|
||||||
token
|
token
|
||||||
})
|
})
|
||||||
.addOrderBy('id', 'ASC')
|
.addOrderBy('event.id', 'ASC')
|
||||||
.getMany();
|
.getMany();
|
||||||
}
|
}
|
||||||
|
|
||||||
async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> {
|
async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> {
|
||||||
return this._conn.getRepository(Event)
|
return this._conn.getRepository(Event)
|
||||||
.createQueryBuilder('event')
|
.createQueryBuilder('event')
|
||||||
|
.innerJoinAndSelect('event.block', 'block')
|
||||||
.where('block_hash = :blockHash AND token = :token AND :eventName = :eventName', {
|
.where('block_hash = :blockHash AND token = :token AND :eventName = :eventName', {
|
||||||
blockHash,
|
blockHash,
|
||||||
token,
|
token,
|
||||||
eventName
|
eventName
|
||||||
})
|
})
|
||||||
|
.addOrderBy('event.id', 'ASC')
|
||||||
.getMany();
|
.getMany();
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveEvents ({ blockHash, token, events }: { blockHash: string, token: string, events: DeepPartial<Event>[] }): Promise<void> {
|
async saveEvents (block: Block, events: DeepPartial<Event>[]): Promise<void> {
|
||||||
|
const {
|
||||||
|
hash: blockHash,
|
||||||
|
number: blockNumber,
|
||||||
|
timestamp: blockTimestamp,
|
||||||
|
parentHash
|
||||||
|
} = block;
|
||||||
|
|
||||||
|
assert(blockHash);
|
||||||
|
assert(blockNumber);
|
||||||
|
assert(blockTimestamp);
|
||||||
|
assert(parentHash);
|
||||||
|
|
||||||
// In a transaction:
|
// In a transaction:
|
||||||
// (1) Save all the events in the database.
|
// (1) Save all the events in the database.
|
||||||
// (2) Add an entry to the event progress table.
|
// (2) Add an entry to the block progress table.
|
||||||
|
|
||||||
await this._conn.transaction(async (tx) => {
|
await this._conn.transaction(async (tx) => {
|
||||||
const repo = tx.getRepository(EventSyncProgress);
|
const numEvents = events.length;
|
||||||
|
const blockProgressRepo = tx.getRepository(BlockProgress);
|
||||||
|
let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
|
||||||
|
|
||||||
// Check sync progress inside the transaction.
|
if (!blockProgress) {
|
||||||
const numRows = await repo
|
const entity = blockProgressRepo.create({
|
||||||
.createQueryBuilder()
|
|
||||||
.where('block_hash = :blockHash AND token = :token', {
|
|
||||||
blockHash,
|
blockHash,
|
||||||
token
|
parentHash,
|
||||||
})
|
blockNumber,
|
||||||
.getCount();
|
blockTimestamp,
|
||||||
|
numEvents,
|
||||||
|
numProcessedEvents: 0,
|
||||||
|
isComplete: (numEvents === 0)
|
||||||
|
});
|
||||||
|
|
||||||
|
blockProgress = await blockProgressRepo.save(entity);
|
||||||
|
|
||||||
if (numRows === 0) {
|
|
||||||
// Bulk insert events.
|
// Bulk insert events.
|
||||||
await tx.createQueryBuilder()
|
events.forEach(event => { event.block = blockProgress; });
|
||||||
.insert()
|
await tx.createQueryBuilder().insert().into(Event).values(events).execute();
|
||||||
.into(Event)
|
}
|
||||||
.values(events)
|
});
|
||||||
.execute();
|
}
|
||||||
|
|
||||||
// Update event sync progress.
|
async getEvent (id: string): Promise<Event | undefined> {
|
||||||
const progress = repo.create({ blockHash, token });
|
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
|
||||||
await repo.save(progress);
|
}
|
||||||
|
|
||||||
|
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||||
|
const repo = this._conn.getRepository(BlockProgress);
|
||||||
|
return repo.findOne({ where: { blockHash } });
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateBlockProgress (blockHash: string): Promise<void> {
|
||||||
|
await this._conn.transaction(async (tx) => {
|
||||||
|
const repo = tx.getRepository(BlockProgress);
|
||||||
|
const entity = await repo.findOne({ where: { blockHash } });
|
||||||
|
if (entity && !entity.isComplete) {
|
||||||
|
entity.numProcessedEvents++;
|
||||||
|
if (entity.numProcessedEvents >= entity.numEvents) {
|
||||||
|
entity.isComplete = true;
|
||||||
|
}
|
||||||
|
await repo.save(entity);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
31
packages/uni-info-watcher/src/entity/BlockProgress.ts
Normal file
31
packages/uni-info-watcher/src/entity/BlockProgress.ts
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
||||||
|
|
||||||
|
@Entity()
|
||||||
|
@Index(['blockHash'], { unique: true })
|
||||||
|
@Index(['blockNumber'])
|
||||||
|
@Index(['parentHash'])
|
||||||
|
export class BlockProgress {
|
||||||
|
@PrimaryGeneratedColumn()
|
||||||
|
id!: number;
|
||||||
|
|
||||||
|
@Column('varchar', { length: 66 })
|
||||||
|
blockHash!: string;
|
||||||
|
|
||||||
|
@Column('varchar', { length: 66 })
|
||||||
|
parentHash!: string;
|
||||||
|
|
||||||
|
@Column('integer')
|
||||||
|
blockNumber!: number;
|
||||||
|
|
||||||
|
@Column('integer')
|
||||||
|
blockTimestamp!: number;
|
||||||
|
|
||||||
|
@Column('integer')
|
||||||
|
numEvents!: number;
|
||||||
|
|
||||||
|
@Column('integer')
|
||||||
|
numProcessedEvents!: number;
|
||||||
|
|
||||||
|
@Column('boolean')
|
||||||
|
isComplete!: boolean
|
||||||
|
}
|
@ -14,7 +14,7 @@ export class Burn {
|
|||||||
@PrimaryColumn('integer')
|
@PrimaryColumn('integer')
|
||||||
blockNumber!: number;
|
blockNumber!: number;
|
||||||
|
|
||||||
@ManyToOne(() => Transaction, transaction => transaction.mints)
|
@ManyToOne(() => Transaction, transaction => transaction.burns)
|
||||||
transaction!: Transaction
|
transaction!: Transaction
|
||||||
|
|
||||||
@Column('bigint')
|
@Column('bigint')
|
||||||
|
@ -1,21 +1,32 @@
|
|||||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
|
||||||
|
import { BlockProgress } from './BlockProgress';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
// Index to query all events for a contract efficiently.
|
// Index to query all events for a contract efficiently.
|
||||||
@Index(['blockHash', 'token'])
|
@Index(['contract'])
|
||||||
export class Event {
|
export class Event {
|
||||||
@PrimaryGeneratedColumn()
|
@PrimaryGeneratedColumn()
|
||||||
id!: number;
|
id!: number;
|
||||||
|
|
||||||
|
@ManyToOne(() => BlockProgress)
|
||||||
|
block!: BlockProgress;
|
||||||
|
|
||||||
@Column('varchar', { length: 66 })
|
@Column('varchar', { length: 66 })
|
||||||
blockHash!: string;
|
txHash!: string;
|
||||||
|
|
||||||
|
// Index of the log in the block.
|
||||||
|
@Column('integer')
|
||||||
|
index!: number;
|
||||||
|
|
||||||
@Column('varchar', { length: 42 })
|
@Column('varchar', { length: 42 })
|
||||||
token!: string;
|
contract!: string;
|
||||||
|
|
||||||
@Column('varchar', { length: 256 })
|
@Column('varchar', { length: 256 })
|
||||||
eventName!: string;
|
eventName!: string;
|
||||||
|
|
||||||
|
@Column('text')
|
||||||
|
eventInfo!: string;
|
||||||
|
|
||||||
@Column('text')
|
@Column('text')
|
||||||
proof!: string;
|
proof!: string;
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ export class Swap {
|
|||||||
@PrimaryColumn('integer')
|
@PrimaryColumn('integer')
|
||||||
blockNumber!: number;
|
blockNumber!: number;
|
||||||
|
|
||||||
@ManyToOne(() => Transaction, transaction => transaction.mints)
|
@ManyToOne(() => Transaction, transaction => transaction.swaps)
|
||||||
transaction!: Transaction
|
transaction!: Transaction
|
||||||
|
|
||||||
@Column('bigint')
|
@Column('bigint')
|
||||||
|
@ -3,6 +3,8 @@ import { Entity, PrimaryColumn, Column, OneToMany } from 'typeorm';
|
|||||||
import { decimalTransformer } from '@vulcanize/util';
|
import { decimalTransformer } from '@vulcanize/util';
|
||||||
|
|
||||||
import { Mint } from './Mint';
|
import { Mint } from './Mint';
|
||||||
|
import { Burn } from './Burn';
|
||||||
|
import { Swap } from './Swap';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
export class Transaction {
|
export class Transaction {
|
||||||
@ -21,6 +23,9 @@ export class Transaction {
|
|||||||
@OneToMany(() => Mint, mint => mint.transaction)
|
@OneToMany(() => Mint, mint => mint.transaction)
|
||||||
mints!: Mint[];
|
mints!: Mint[];
|
||||||
|
|
||||||
// burns: [Burn]!
|
@OneToMany(() => Burn, burn => burn.transaction)
|
||||||
// swaps: [Swap]!
|
burns!: Burn[];
|
||||||
|
|
||||||
|
@OneToMany(() => Swap, swap => swap.transaction)
|
||||||
|
swaps!: Swap[];
|
||||||
}
|
}
|
||||||
|
@ -1,21 +1,12 @@
|
|||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import debug from 'debug';
|
import debug from 'debug';
|
||||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||||
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
import { JobQueue } from '../../util';
|
||||||
import { BigNumber, utils } from 'ethers';
|
import { Indexer } from './indexer';
|
||||||
|
|
||||||
import { Database } from './database';
|
|
||||||
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
|
|
||||||
import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
|
|
||||||
import { Token } from './entity/Token';
|
|
||||||
import { convertTokenToDecimal, loadTransaction, safeDiv } from './utils';
|
|
||||||
import { loadTick } from './utils/tick';
|
|
||||||
import Decimal from 'decimal.js';
|
|
||||||
import { Position } from './entity/Position';
|
|
||||||
|
|
||||||
const log = debug('vulcanize:events');
|
const log = debug('vulcanize:events');
|
||||||
|
|
||||||
interface PoolCreatedEvent {
|
export interface PoolCreatedEvent {
|
||||||
__typename: 'PoolCreatedEvent';
|
__typename: 'PoolCreatedEvent';
|
||||||
token0: string;
|
token0: string;
|
||||||
token1: string;
|
token1: string;
|
||||||
@ -24,13 +15,13 @@ interface PoolCreatedEvent {
|
|||||||
pool: string;
|
pool: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface InitializeEvent {
|
export interface InitializeEvent {
|
||||||
__typename: 'InitializeEvent';
|
__typename: 'InitializeEvent';
|
||||||
sqrtPriceX96: bigint;
|
sqrtPriceX96: bigint;
|
||||||
tick: bigint;
|
tick: bigint;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface MintEvent {
|
export interface MintEvent {
|
||||||
__typename: 'MintEvent';
|
__typename: 'MintEvent';
|
||||||
sender: string;
|
sender: string;
|
||||||
owner: string;
|
owner: string;
|
||||||
@ -41,7 +32,7 @@ interface MintEvent {
|
|||||||
amount1: bigint;
|
amount1: bigint;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface BurnEvent {
|
export interface BurnEvent {
|
||||||
__typename: 'BurnEvent';
|
__typename: 'BurnEvent';
|
||||||
owner: string;
|
owner: string;
|
||||||
tickLower: bigint;
|
tickLower: bigint;
|
||||||
@ -51,7 +42,7 @@ interface BurnEvent {
|
|||||||
amount1: bigint;
|
amount1: bigint;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface SwapEvent {
|
export interface SwapEvent {
|
||||||
__typename: 'SwapEvent';
|
__typename: 'SwapEvent';
|
||||||
sender: string;
|
sender: string;
|
||||||
recipient: string;
|
recipient: string;
|
||||||
@ -62,7 +53,7 @@ interface SwapEvent {
|
|||||||
tick: bigint;
|
tick: bigint;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface IncreaseLiquidityEvent {
|
export interface IncreaseLiquidityEvent {
|
||||||
__typename: 'IncreaseLiquidityEvent';
|
__typename: 'IncreaseLiquidityEvent';
|
||||||
tokenId: bigint;
|
tokenId: bigint;
|
||||||
liquidity: bigint;
|
liquidity: bigint;
|
||||||
@ -70,7 +61,7 @@ interface IncreaseLiquidityEvent {
|
|||||||
amount1: bigint;
|
amount1: bigint;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface DecreaseLiquidityEvent {
|
export interface DecreaseLiquidityEvent {
|
||||||
__typename: 'DecreaseLiquidityEvent';
|
__typename: 'DecreaseLiquidityEvent';
|
||||||
tokenId: bigint;
|
tokenId: bigint;
|
||||||
liquidity: bigint;
|
liquidity: bigint;
|
||||||
@ -78,45 +69,63 @@ interface DecreaseLiquidityEvent {
|
|||||||
amount1: bigint;
|
amount1: bigint;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Block {
|
export interface Block {
|
||||||
number: number;
|
number: number;
|
||||||
hash: string;
|
hash: string;
|
||||||
timestamp: number;
|
timestamp: number;
|
||||||
|
parentHash: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Transaction {
|
export interface Transaction {
|
||||||
hash: string;
|
hash: string;
|
||||||
from: string;
|
from?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ResultEvent {
|
export interface ResultEvent {
|
||||||
block: Block;
|
block: Block;
|
||||||
tx: Transaction;
|
tx: Transaction;
|
||||||
contract: string;
|
contract: string;
|
||||||
|
eventIndex: number;
|
||||||
event: PoolCreatedEvent | InitializeEvent | MintEvent | BurnEvent | SwapEvent | IncreaseLiquidityEvent | DecreaseLiquidityEvent;
|
event: PoolCreatedEvent | InitializeEvent | MintEvent | BurnEvent | SwapEvent | IncreaseLiquidityEvent | DecreaseLiquidityEvent;
|
||||||
proof: {
|
proof: {
|
||||||
data: string;
|
data: string;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const QUEUE_EVENT_PROCESSING = 'event-processing';
|
||||||
|
export const QUEUE_BLOCK_PROCESSING = 'block-processing';
|
||||||
|
|
||||||
export class EventWatcher {
|
export class EventWatcher {
|
||||||
_db: Database
|
|
||||||
_subscription?: ZenObservable.Subscription
|
_subscription?: ZenObservable.Subscription
|
||||||
_uniClient: UniClient
|
_uniClient: UniClient
|
||||||
_erc20Client: ERC20Client
|
_jobQueue: JobQueue
|
||||||
|
_indexer: Indexer
|
||||||
|
|
||||||
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client) {
|
constructor (indexer: Indexer, uniClient: UniClient, jobQueue: JobQueue) {
|
||||||
assert(db);
|
|
||||||
|
|
||||||
this._db = db;
|
|
||||||
this._uniClient = uniClient;
|
this._uniClient = uniClient;
|
||||||
this._erc20Client = erc20Client;
|
this._jobQueue = jobQueue;
|
||||||
|
this._indexer = indexer;
|
||||||
}
|
}
|
||||||
|
|
||||||
async start (): Promise<void> {
|
async start (): Promise<void> {
|
||||||
assert(!this._subscription, 'subscription already started');
|
assert(!this._subscription, 'subscription already started');
|
||||||
log('Started watching upstream events...');
|
log('Started watching upstream events...');
|
||||||
this._subscription = await this._uniClient.watchEvents(this._handleEvents.bind(this));
|
|
||||||
|
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
|
const { data: { request: { data: { block } } } } = job;
|
||||||
|
log(`Job onComplete block ${block.hash} ${block.number}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
|
const { data: { request } } = job;
|
||||||
|
|
||||||
|
log(`Job onComplete event ${request.data.id}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
this._subscription = await this._uniClient.watchEvents(async ({ block }: ResultEvent) => {
|
||||||
|
log('watchEvent', block.hash, block.number);
|
||||||
|
return this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { block });
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop (): Promise<void> {
|
async stop (): Promise<void> {
|
||||||
@ -125,756 +134,4 @@ export class EventWatcher {
|
|||||||
this._subscription.unsubscribe();
|
this._subscription.unsubscribe();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _handleEvents ({ block, tx, contract, event }: ResultEvent): Promise<void> {
|
|
||||||
// TODO: Process proof (proof.data) in event.
|
|
||||||
const { __typename: eventType } = event;
|
|
||||||
|
|
||||||
switch (eventType) {
|
|
||||||
case 'PoolCreatedEvent':
|
|
||||||
log('Factory PoolCreated event', contract);
|
|
||||||
this._handlePoolCreated(block, contract, tx, event as PoolCreatedEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'InitializeEvent':
|
|
||||||
log('Pool Initialize event', contract);
|
|
||||||
this._handleInitialize(block, contract, tx, event as InitializeEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'MintEvent':
|
|
||||||
log('Pool Mint event', contract);
|
|
||||||
this._handleMint(block, contract, tx, event as MintEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'BurnEvent':
|
|
||||||
log('Pool Burn event', contract);
|
|
||||||
this._handleBurn(block, contract, tx, event as BurnEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'SwapEvent':
|
|
||||||
log('Pool Swap event', contract);
|
|
||||||
this._handleSwap(block, contract, tx, event as SwapEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'IncreaseLiquidityEvent':
|
|
||||||
log('NFPM IncreaseLiquidity event', contract);
|
|
||||||
this._handleIncreaseLiquidity(block, contract, tx, event as IncreaseLiquidityEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case 'DecreaseLiquidityEvent':
|
|
||||||
log('NFPM DecreaseLiquidity event', contract);
|
|
||||||
this._handleDecreaseLiquidity(block, contract, tx, event as DecreaseLiquidityEvent);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise<void> {
|
|
||||||
const { number: blockNumber, hash: blockHash } = block;
|
|
||||||
const { token0: token0Address, token1: token1Address, fee, pool: poolAddress } = poolCreatedEvent;
|
|
||||||
|
|
||||||
// Load factory.
|
|
||||||
const factory = await this._db.loadFactory({ blockNumber, id: contractAddress });
|
|
||||||
|
|
||||||
// Update Factory.
|
|
||||||
let factoryPoolCount = BigNumber.from(factory.poolCount);
|
|
||||||
factoryPoolCount = factoryPoolCount.add(1);
|
|
||||||
factory.poolCount = BigInt(factoryPoolCount.toHexString());
|
|
||||||
|
|
||||||
// Get Tokens.
|
|
||||||
let [token0, token1] = await Promise.all([
|
|
||||||
this._db.getToken({ blockNumber, id: token0Address }),
|
|
||||||
this._db.getToken({ blockNumber, id: token1Address })
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Create Tokens if not present.
|
|
||||||
if (!token0) {
|
|
||||||
token0 = await this._createToken(blockHash, blockNumber, token0Address);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!token1) {
|
|
||||||
token1 = await this._createToken(blockHash, blockNumber, token1Address);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create new Pool entity.
|
|
||||||
// Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph.
|
|
||||||
const pool = await this._db.loadPool({
|
|
||||||
blockNumber,
|
|
||||||
id: poolAddress,
|
|
||||||
token0: token0,
|
|
||||||
token1: token1,
|
|
||||||
feeTier: BigInt(fee)
|
|
||||||
});
|
|
||||||
|
|
||||||
// Update white listed pools.
|
|
||||||
if (WHITELIST_TOKENS.includes(token0.id)) {
|
|
||||||
token1.whitelistPools.push(pool);
|
|
||||||
await this._db.saveToken(token1, blockNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (WHITELIST_TOKENS.includes(token1.id)) {
|
|
||||||
token0.whitelistPools.push(pool);
|
|
||||||
await this._db.saveToken(token0, blockNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save entities to DB.
|
|
||||||
await this._db.saveFactory(factory, blockNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create new Token.
|
|
||||||
* @param tokenAddress
|
|
||||||
*/
|
|
||||||
async _createToken (blockHash: string, blockNumber: number, tokenAddress: string): Promise<Token> {
|
|
||||||
const { value: symbol } = await this._erc20Client.getSymbol(blockHash, tokenAddress);
|
|
||||||
const { value: name } = await this._erc20Client.getName(blockHash, tokenAddress);
|
|
||||||
const { value: totalSupply } = await this._erc20Client.getTotalSupply(blockHash, tokenAddress);
|
|
||||||
|
|
||||||
// TODO: Decimals not implemented by erc20-watcher.
|
|
||||||
// const { value: decimals } = await this._erc20Client.getDecimals(blockHash, tokenAddress);
|
|
||||||
|
|
||||||
return this._db.loadToken({
|
|
||||||
blockNumber,
|
|
||||||
id: tokenAddress,
|
|
||||||
symbol,
|
|
||||||
name,
|
|
||||||
totalSupply
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise<void> {
|
|
||||||
const { number: blockNumber, timestamp: blockTimestamp } = block;
|
|
||||||
const { sqrtPriceX96, tick } = initializeEvent;
|
|
||||||
const pool = await this._db.getPool({ id: contractAddress, blockNumber });
|
|
||||||
assert(pool, `Pool ${contractAddress} not found.`);
|
|
||||||
|
|
||||||
// Update Pool.
|
|
||||||
pool.sqrtPrice = BigInt(sqrtPriceX96);
|
|
||||||
pool.tick = BigInt(tick);
|
|
||||||
this._db.savePool(pool, blockNumber);
|
|
||||||
|
|
||||||
// Update ETH price now that prices could have changed.
|
|
||||||
const bundle = await this._db.loadBundle({ id: '1', blockNumber });
|
|
||||||
bundle.ethPriceUSD = await getEthPriceInUSD(this._db);
|
|
||||||
this._db.saveBundle(bundle, blockNumber);
|
|
||||||
|
|
||||||
await updatePoolDayData(this._db, { contractAddress, blockNumber, blockTimestamp });
|
|
||||||
await updatePoolHourData(this._db, { contractAddress, blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
const [token0, token1] = await Promise.all([
|
|
||||||
this._db.getToken({ id: pool.token0.id, blockNumber }),
|
|
||||||
this._db.getToken({ id: pool.token1.id, blockNumber })
|
|
||||||
]);
|
|
||||||
|
|
||||||
assert(token0 && token1, 'Pool tokens not found.');
|
|
||||||
|
|
||||||
// Update token prices.
|
|
||||||
token0.derivedETH = await findEthPerToken(token0);
|
|
||||||
token1.derivedETH = await findEthPerToken(token1);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
this._db.saveToken(token0, blockNumber),
|
|
||||||
this._db.saveToken(token1, blockNumber)
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise<void> {
|
|
||||||
const { number: blockNumber, timestamp: blockTimestamp } = block;
|
|
||||||
const { hash: txHash } = tx;
|
|
||||||
const bundle = await this._db.loadBundle({ id: '1', blockNumber });
|
|
||||||
const poolAddress = contractAddress;
|
|
||||||
const pool = await this._db.loadPool({ id: poolAddress, blockNumber });
|
|
||||||
|
|
||||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
|
||||||
// Currently fetching first factory in database as only one exists.
|
|
||||||
const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 });
|
|
||||||
|
|
||||||
const token0 = pool.token0;
|
|
||||||
const token1 = pool.token1;
|
|
||||||
const amount0 = convertTokenToDecimal(mintEvent.amount0, BigInt(token0.decimals));
|
|
||||||
const amount1 = convertTokenToDecimal(mintEvent.amount1, BigInt(token1.decimals));
|
|
||||||
|
|
||||||
const amountUSD = amount0
|
|
||||||
.times(token0.derivedETH.times(bundle.ethPriceUSD))
|
|
||||||
.plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD)));
|
|
||||||
|
|
||||||
// Reset tvl aggregates until new amounts calculated.
|
|
||||||
factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH);
|
|
||||||
|
|
||||||
// Update globals.
|
|
||||||
factory.txCount = BigInt(factory.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Update token0 data.
|
|
||||||
token0.txCount = BigInt(token0.txCount) + BigInt(1);
|
|
||||||
token0.totalValueLocked = token0.totalValueLocked.plus(amount0);
|
|
||||||
token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD));
|
|
||||||
|
|
||||||
// Update token1 data.
|
|
||||||
token1.txCount = BigInt(token1.txCount) + BigInt(1);
|
|
||||||
token1.totalValueLocked = token1.totalValueLocked.plus(amount1);
|
|
||||||
token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD));
|
|
||||||
|
|
||||||
// Pool data.
|
|
||||||
pool.txCount = BigInt(pool.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Pools liquidity tracks the currently active liquidity given pools current tick.
|
|
||||||
// We only want to update it on mint if the new position includes the current tick.
|
|
||||||
if (pool.tick !== null) {
|
|
||||||
if (
|
|
||||||
BigInt(mintEvent.tickLower) <= BigInt(pool.tick) &&
|
|
||||||
BigInt(mintEvent.tickUpper) > BigInt(pool.tick)
|
|
||||||
) {
|
|
||||||
pool.liquidity = BigInt(pool.liquidity) + BigInt(mintEvent.amount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0);
|
|
||||||
pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1);
|
|
||||||
|
|
||||||
pool.totalValueLockedETH = pool.totalValueLockedToken0.times(token0.derivedETH)
|
|
||||||
.plus(pool.totalValueLockedToken1.times(token1.derivedETH));
|
|
||||||
|
|
||||||
pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
// Reset aggregates with new amounts.
|
|
||||||
factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH);
|
|
||||||
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
await this._db.loadMint({
|
|
||||||
id: transaction.id + '#' + pool.txCount.toString(),
|
|
||||||
blockNumber,
|
|
||||||
transaction,
|
|
||||||
timestamp: transaction.timestamp,
|
|
||||||
pool,
|
|
||||||
token0: pool.token0,
|
|
||||||
token1: pool.token1,
|
|
||||||
owner: mintEvent.owner,
|
|
||||||
sender: mintEvent.sender,
|
|
||||||
|
|
||||||
// TODO: Assign origin with Transaction from address.
|
|
||||||
// origin: event.transaction.from
|
|
||||||
|
|
||||||
amount: mintEvent.amount,
|
|
||||||
amount0: amount0,
|
|
||||||
amount1: amount1,
|
|
||||||
amountUSD: amountUSD,
|
|
||||||
tickLower: mintEvent.tickLower,
|
|
||||||
tickUpper: mintEvent.tickUpper
|
|
||||||
});
|
|
||||||
|
|
||||||
// Tick entities.
|
|
||||||
const lowerTickIdx = mintEvent.tickLower;
|
|
||||||
const upperTickIdx = mintEvent.tickUpper;
|
|
||||||
|
|
||||||
const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString();
|
|
||||||
const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString();
|
|
||||||
|
|
||||||
const lowerTick = await loadTick(this._db, lowerTickId, BigInt(lowerTickIdx), pool, blockNumber);
|
|
||||||
const upperTick = await loadTick(this._db, upperTickId, BigInt(upperTickIdx), pool, blockNumber);
|
|
||||||
|
|
||||||
const amount = BigInt(mintEvent.amount);
|
|
||||||
lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) + amount;
|
|
||||||
lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) + amount;
|
|
||||||
upperTick.liquidityGross = BigInt(upperTick.liquidityGross) + amount;
|
|
||||||
upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount;
|
|
||||||
|
|
||||||
// TODO: Update Tick's volume, fees, and liquidity provider count.
|
|
||||||
// Computing these on the tick level requires reimplementing some of the swapping code from v3-core.
|
|
||||||
|
|
||||||
await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
await updateTokenDayData(this._db, token1, { blockNumber, blockTimestamp });
|
|
||||||
await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
await updateTokenHourData(this._db, token1, { blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
this._db.saveToken(token0, blockNumber),
|
|
||||||
this._db.saveToken(token1, blockNumber)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await this._db.savePool(pool, blockNumber);
|
|
||||||
await this._db.saveFactory(factory, blockNumber);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
await this._db.saveTick(lowerTick, blockNumber),
|
|
||||||
await this._db.saveTick(upperTick, blockNumber)
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Skipping update inner tick vars and tick day data as they are not queried.
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise<void> {
|
|
||||||
const { number: blockNumber, timestamp: blockTimestamp } = block;
|
|
||||||
const { hash: txHash } = tx;
|
|
||||||
const bundle = await this._db.loadBundle({ id: '1', blockNumber });
|
|
||||||
const poolAddress = contractAddress;
|
|
||||||
const pool = await this._db.loadPool({ id: poolAddress, blockNumber });
|
|
||||||
|
|
||||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
|
||||||
// Currently fetching first factory in database as only one exists.
|
|
||||||
const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 });
|
|
||||||
|
|
||||||
const token0 = pool.token0;
|
|
||||||
const token1 = pool.token1;
|
|
||||||
const amount0 = convertTokenToDecimal(burnEvent.amount0, BigInt(token0.decimals));
|
|
||||||
const amount1 = convertTokenToDecimal(burnEvent.amount1, BigInt(token1.decimals));
|
|
||||||
|
|
||||||
const amountUSD = amount0
|
|
||||||
.times(token0.derivedETH.times(bundle.ethPriceUSD))
|
|
||||||
.plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD)));
|
|
||||||
|
|
||||||
// Reset tvl aggregates until new amounts calculated.
|
|
||||||
factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH);
|
|
||||||
|
|
||||||
// Update globals.
|
|
||||||
factory.txCount = BigInt(factory.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Update token0 data.
|
|
||||||
token0.txCount = BigInt(token0.txCount) + BigInt(1);
|
|
||||||
token0.totalValueLocked = token0.totalValueLocked.minus(amount0);
|
|
||||||
token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD));
|
|
||||||
|
|
||||||
// Update token1 data.
|
|
||||||
token1.txCount = BigInt(token1.txCount) + BigInt(1);
|
|
||||||
token1.totalValueLocked = token1.totalValueLocked.minus(amount1);
|
|
||||||
token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD));
|
|
||||||
|
|
||||||
// Pool data.
|
|
||||||
pool.txCount = BigInt(pool.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Pools liquidity tracks the currently active liquidity given pools current tick.
|
|
||||||
// We only want to update it on burn if the position being burnt includes the current tick.
|
|
||||||
if (
|
|
||||||
pool.tick !== null &&
|
|
||||||
burnEvent.tickLower <= pool.tick &&
|
|
||||||
burnEvent.tickUpper > pool.tick
|
|
||||||
) {
|
|
||||||
pool.liquidity = pool.liquidity - burnEvent.amount;
|
|
||||||
}
|
|
||||||
|
|
||||||
pool.totalValueLockedToken0 = pool.totalValueLockedToken0.minus(amount0);
|
|
||||||
pool.totalValueLockedToken1 = pool.totalValueLockedToken1.minus(amount1);
|
|
||||||
|
|
||||||
pool.totalValueLockedETH = pool.totalValueLockedToken0
|
|
||||||
.times(token0.derivedETH)
|
|
||||||
.plus(pool.totalValueLockedToken1.times(token1.derivedETH));
|
|
||||||
|
|
||||||
pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
// Reset aggregates with new amounts.
|
|
||||||
factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH);
|
|
||||||
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
// Burn entity.
|
|
||||||
const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
await this._db.loadBurn({
|
|
||||||
id: transaction.id + '#' + pool.txCount.toString(),
|
|
||||||
blockNumber,
|
|
||||||
transaction,
|
|
||||||
timestamp: transaction.timestamp,
|
|
||||||
pool,
|
|
||||||
token0: pool.token0,
|
|
||||||
token1: pool.token1,
|
|
||||||
owner: burnEvent.owner,
|
|
||||||
|
|
||||||
// TODO: Assign origin with Transaction from address.
|
|
||||||
// origin: event.transaction.from
|
|
||||||
|
|
||||||
amount: burnEvent.amount,
|
|
||||||
amount0,
|
|
||||||
amount1,
|
|
||||||
amountUSD,
|
|
||||||
tickLower: burnEvent.tickLower,
|
|
||||||
tickUpper: burnEvent.tickUpper
|
|
||||||
});
|
|
||||||
|
|
||||||
// Tick entities.
|
|
||||||
const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString();
|
|
||||||
const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString();
|
|
||||||
const lowerTick = await this._db.loadTick({ id: lowerTickId, blockNumber });
|
|
||||||
const upperTick = await this._db.loadTick({ id: upperTickId, blockNumber });
|
|
||||||
const amount = BigInt(burnEvent.amount);
|
|
||||||
lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount;
|
|
||||||
lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) - amount;
|
|
||||||
upperTick.liquidityGross = BigInt(upperTick.liquidityGross) - amount;
|
|
||||||
upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount;
|
|
||||||
|
|
||||||
await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
// Skipping update Tick fee and Tick day data as they are not queried.
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
await this._db.saveTick(lowerTick, blockNumber),
|
|
||||||
await this._db.saveTick(upperTick, blockNumber)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
this._db.saveToken(token0, blockNumber),
|
|
||||||
this._db.saveToken(token1, blockNumber)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await this._db.savePool(pool, blockNumber);
|
|
||||||
await this._db.saveFactory(factory, blockNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handleSwap (block: Block, contractAddress: string, tx: Transaction, swapEvent: SwapEvent): Promise<void> {
|
|
||||||
const { number: blockNumber, timestamp: blockTimestamp } = block;
|
|
||||||
const { hash: txHash } = tx;
|
|
||||||
const bundle = await this._db.loadBundle({ id: '1', blockNumber });
|
|
||||||
|
|
||||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
|
||||||
// Currently fetching first factory in database as only one exists.
|
|
||||||
const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 });
|
|
||||||
|
|
||||||
const pool = await this._db.loadPool({ id: contractAddress, blockNumber });
|
|
||||||
|
|
||||||
// Hot fix for bad pricing.
|
|
||||||
if (pool.id === '0x9663f2ca0454accad3e094448ea6f77443880454') {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const [token0, token1] = await Promise.all([
|
|
||||||
this._db.getToken({ id: pool.token0.id, blockNumber }),
|
|
||||||
this._db.getToken({ id: pool.token1.id, blockNumber })
|
|
||||||
]);
|
|
||||||
|
|
||||||
assert(token0 && token1, 'Pool tokens not found.');
|
|
||||||
|
|
||||||
// Amounts - 0/1 are token deltas. Can be positive or negative.
|
|
||||||
const amount0 = convertTokenToDecimal(swapEvent.amount0, BigInt(token0.decimals));
|
|
||||||
const amount1 = convertTokenToDecimal(swapEvent.amount1, BigInt(token1.decimals));
|
|
||||||
|
|
||||||
// Need absolute amounts for volume.
|
|
||||||
let amount0Abs = amount0;
|
|
||||||
let amount1Abs = amount1;
|
|
||||||
|
|
||||||
if (amount0.lt(new Decimal(0))) {
|
|
||||||
amount0Abs = amount0.times(new Decimal('-1'));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (amount1.lt(new Decimal(0))) {
|
|
||||||
amount1Abs = amount1.times(new Decimal('-1'));
|
|
||||||
}
|
|
||||||
|
|
||||||
const amount0ETH = amount0Abs.times(token0.derivedETH);
|
|
||||||
const amount1ETH = amount1Abs.times(token1.derivedETH);
|
|
||||||
const amount0USD = amount0ETH.times(bundle.ethPriceUSD);
|
|
||||||
const amount1USD = amount1ETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
// Get amount that should be tracked only - div 2 because cant count both input and output as volume.
|
|
||||||
const trackedAmountUSD = await getTrackedAmountUSD(this._db, amount0Abs, token0, amount1Abs, token1);
|
|
||||||
const amountTotalUSDTracked = trackedAmountUSD.div(new Decimal('2'));
|
|
||||||
const amountTotalETHTracked = safeDiv(amountTotalUSDTracked, bundle.ethPriceUSD);
|
|
||||||
const amountTotalUSDUntracked = amount0USD.plus(amount1USD).div(new Decimal('2'));
|
|
||||||
|
|
||||||
const feesETH = amountTotalETHTracked.times(pool.feeTier.toString()).div(new Decimal('1000000'));
|
|
||||||
const feesUSD = amountTotalUSDTracked.times(pool.feeTier.toString()).div(new Decimal('1000000'));
|
|
||||||
|
|
||||||
// Global updates.
|
|
||||||
factory.txCount = BigInt(factory.txCount) + BigInt(1);
|
|
||||||
factory.totalVolumeETH = factory.totalVolumeETH.plus(amountTotalETHTracked);
|
|
||||||
factory.totalVolumeUSD = factory.totalVolumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
factory.untrackedVolumeUSD = factory.untrackedVolumeUSD.plus(amountTotalUSDUntracked);
|
|
||||||
factory.totalFeesETH = factory.totalFeesETH.plus(feesETH);
|
|
||||||
factory.totalFeesUSD = factory.totalFeesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
// Reset aggregate tvl before individual pool tvl updates.
|
|
||||||
const currentPoolTvlETH = pool.totalValueLockedETH;
|
|
||||||
factory.totalValueLockedETH = factory.totalValueLockedETH.minus(currentPoolTvlETH);
|
|
||||||
|
|
||||||
// pool volume
|
|
||||||
pool.volumeToken0 = pool.volumeToken0.plus(amount0Abs);
|
|
||||||
pool.volumeToken1 = pool.volumeToken1.plus(amount1Abs);
|
|
||||||
pool.volumeUSD = pool.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
pool.untrackedVolumeUSD = pool.untrackedVolumeUSD.plus(amountTotalUSDUntracked);
|
|
||||||
pool.feesUSD = pool.feesUSD.plus(feesUSD);
|
|
||||||
pool.txCount = BigInt(pool.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Update the pool with the new active liquidity, price, and tick.
|
|
||||||
pool.liquidity = swapEvent.liquidity;
|
|
||||||
pool.tick = BigInt(swapEvent.tick);
|
|
||||||
pool.sqrtPrice = swapEvent.sqrtPriceX96;
|
|
||||||
pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0);
|
|
||||||
pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1);
|
|
||||||
|
|
||||||
// Update token0 data.
|
|
||||||
token0.volume = token0.volume.plus(amount0Abs);
|
|
||||||
token0.totalValueLocked = token0.totalValueLocked.plus(amount0);
|
|
||||||
token0.volumeUSD = token0.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token0.untrackedVolumeUSD = token0.untrackedVolumeUSD.plus(amountTotalUSDUntracked);
|
|
||||||
token0.feesUSD = token0.feesUSD.plus(feesUSD);
|
|
||||||
token0.txCount = BigInt(token0.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Update token1 data.
|
|
||||||
token1.volume = token1.volume.plus(amount1Abs);
|
|
||||||
token1.totalValueLocked = token1.totalValueLocked.plus(amount1);
|
|
||||||
token1.volumeUSD = token1.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token1.untrackedVolumeUSD = token1.untrackedVolumeUSD.plus(amountTotalUSDUntracked);
|
|
||||||
token1.feesUSD = token1.feesUSD.plus(feesUSD);
|
|
||||||
token1.txCount = BigInt(token1.txCount) + BigInt(1);
|
|
||||||
|
|
||||||
// Updated pool rates.
|
|
||||||
const prices = sqrtPriceX96ToTokenPrices(pool.sqrtPrice, token0 as Token, token1 as Token);
|
|
||||||
pool.token0Price = prices[0];
|
|
||||||
pool.token1Price = prices[1];
|
|
||||||
this._db.savePool(pool, blockNumber);
|
|
||||||
|
|
||||||
// Update USD pricing.
|
|
||||||
bundle.ethPriceUSD = await getEthPriceInUSD(this._db);
|
|
||||||
this._db.saveBundle(bundle, blockNumber);
|
|
||||||
token0.derivedETH = await findEthPerToken(token0);
|
|
||||||
token1.derivedETH = await findEthPerToken(token1);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Things afffected by new USD rates.
|
|
||||||
*/
|
|
||||||
pool.totalValueLockedETH = pool.totalValueLockedToken0
|
|
||||||
.times(token0.derivedETH)
|
|
||||||
.plus(pool.totalValueLockedToken1.times(token1.derivedETH));
|
|
||||||
|
|
||||||
pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH);
|
|
||||||
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH).times(bundle.ethPriceUSD);
|
|
||||||
token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH).times(bundle.ethPriceUSD);
|
|
||||||
|
|
||||||
// Create Swap event
|
|
||||||
const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
await this._db.loadSwap({
|
|
||||||
id: transaction.id + '#' + pool.txCount.toString(),
|
|
||||||
blockNumber,
|
|
||||||
transaction,
|
|
||||||
timestamp: transaction.timestamp,
|
|
||||||
pool,
|
|
||||||
token0: pool.token0,
|
|
||||||
token1: pool.token1,
|
|
||||||
sender: swapEvent.sender,
|
|
||||||
|
|
||||||
// TODO: Assign origin with Transaction from address.
|
|
||||||
// origin: event.transaction.from
|
|
||||||
|
|
||||||
recipient: swapEvent.recipient,
|
|
||||||
amount0: amount0,
|
|
||||||
amount1: amount1,
|
|
||||||
amountUSD: amountTotalUSDTracked,
|
|
||||||
tick: BigInt(swapEvent.tick),
|
|
||||||
sqrtPriceX96: swapEvent.sqrtPriceX96
|
|
||||||
});
|
|
||||||
|
|
||||||
// Skipping update pool fee growth as they are not queried.
|
|
||||||
|
|
||||||
// Interval data.
|
|
||||||
const uniswapDayData = await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
const poolDayData = await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
const poolHourData = await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp });
|
|
||||||
const token0DayData = await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
const token1DayData = await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
const token0HourData = await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
const token1HourData = await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
|
|
||||||
|
|
||||||
// Update volume metrics.
|
|
||||||
uniswapDayData.volumeETH = uniswapDayData.volumeETH.plus(amountTotalETHTracked);
|
|
||||||
uniswapDayData.volumeUSD = uniswapDayData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
uniswapDayData.feesUSD = uniswapDayData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
poolDayData.volumeUSD = poolDayData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
poolDayData.volumeToken0 = poolDayData.volumeToken0.plus(amount0Abs);
|
|
||||||
poolDayData.volumeToken1 = poolDayData.volumeToken1.plus(amount1Abs);
|
|
||||||
poolDayData.feesUSD = poolDayData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
poolHourData.volumeUSD = poolHourData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
poolHourData.volumeToken0 = poolHourData.volumeToken0.plus(amount0Abs);
|
|
||||||
poolHourData.volumeToken1 = poolHourData.volumeToken1.plus(amount1Abs);
|
|
||||||
poolHourData.feesUSD = poolHourData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
token0DayData.volume = token0DayData.volume.plus(amount0Abs);
|
|
||||||
token0DayData.volumeUSD = token0DayData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token0DayData.untrackedVolumeUSD = token0DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token0DayData.feesUSD = token0DayData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
token0HourData.volume = token0HourData.volume.plus(amount0Abs);
|
|
||||||
token0HourData.volumeUSD = token0HourData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token0HourData.untrackedVolumeUSD = token0HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token0HourData.feesUSD = token0HourData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
token1DayData.volume = token1DayData.volume.plus(amount1Abs);
|
|
||||||
token1DayData.volumeUSD = token1DayData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token1DayData.untrackedVolumeUSD = token1DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token1DayData.feesUSD = token1DayData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
token1HourData.volume = token1HourData.volume.plus(amount1Abs);
|
|
||||||
token1HourData.volumeUSD = token1HourData.volumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token1HourData.untrackedVolumeUSD = token1HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked);
|
|
||||||
token1HourData.feesUSD = token1HourData.feesUSD.plus(feesUSD);
|
|
||||||
|
|
||||||
this._db.saveTokenDayData(token0DayData, blockNumber);
|
|
||||||
this._db.saveTokenDayData(token1DayData, blockNumber);
|
|
||||||
this._db.saveUniswapDayData(uniswapDayData, blockNumber);
|
|
||||||
this._db.savePoolDayData(poolDayData, blockNumber);
|
|
||||||
this._db.saveFactory(factory, blockNumber);
|
|
||||||
this._db.savePool(pool, blockNumber);
|
|
||||||
this._db.saveToken(token0, blockNumber);
|
|
||||||
this._db.saveToken(token1, blockNumber);
|
|
||||||
|
|
||||||
// Skipping update of inner vars of current or crossed ticks as they are not queried.
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handleIncreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: IncreaseLiquidityEvent): Promise<void> {
|
|
||||||
const { number: blockNumber } = block;
|
|
||||||
const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId));
|
|
||||||
|
|
||||||
// position was not able to be fetched.
|
|
||||||
if (position === null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Temp fix from Subgraph mapping code.
|
|
||||||
if (utils.getAddress(position.pool.id) === utils.getAddress('0x8fe8d9bb8eeba3ed688069c3d6b556c9ca258248')) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const token0 = position.token0;
|
|
||||||
const token1 = position.token1;
|
|
||||||
|
|
||||||
const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
|
|
||||||
const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals));
|
|
||||||
|
|
||||||
position.liquidity = BigInt(position.liquidity) + BigInt(event.liquidity);
|
|
||||||
position.depositedToken0 = position.depositedToken0.plus(amount0);
|
|
||||||
position.depositedToken1 = position.depositedToken1.plus(amount1);
|
|
||||||
|
|
||||||
await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
|
|
||||||
|
|
||||||
await this._db.savePosition(position, blockNumber);
|
|
||||||
|
|
||||||
await this._savePositionSnapshot(position, block, tx);
|
|
||||||
}
|
|
||||||
|
|
||||||
async _handleDecreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: DecreaseLiquidityEvent): Promise<void> {
|
|
||||||
const { number: blockNumber } = block;
|
|
||||||
const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId));
|
|
||||||
|
|
||||||
// Position was not able to be fetched.
|
|
||||||
if (position == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Temp fix from Subgraph mapping code.
|
|
||||||
if (utils.getAddress(position.pool.id) === utils.getAddress('0x8fe8d9bb8eeba3ed688069c3d6b556c9ca258248')) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const token0 = position.token0;
|
|
||||||
const token1 = position.token1;
|
|
||||||
const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
|
|
||||||
const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals));
|
|
||||||
|
|
||||||
position.liquidity = BigInt(position.liquidity) - BigInt(event.liquidity);
|
|
||||||
position.depositedToken0 = position.depositedToken0.plus(amount0);
|
|
||||||
position.depositedToken1 = position.depositedToken1.plus(amount1);
|
|
||||||
|
|
||||||
await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
|
|
||||||
|
|
||||||
await this._db.savePosition(position, blockNumber);
|
|
||||||
|
|
||||||
await this._savePositionSnapshot(position, block, tx);
|
|
||||||
}
|
|
||||||
|
|
||||||
async _getPosition (block: Block, contractAddress: string, tx: Transaction, tokenId: bigint): Promise<Position | null> {
|
|
||||||
const { number: blockNumber, hash: blockHash, timestamp: blockTimestamp } = block;
|
|
||||||
const { hash: txHash } = tx;
|
|
||||||
let position = await this._db.getPosition({ id: tokenId.toString(), blockNumber });
|
|
||||||
|
|
||||||
if (!position) {
|
|
||||||
const nfpmPosition = await this._uniClient.getPosition(blockHash, tokenId);
|
|
||||||
|
|
||||||
// The contract call reverts in situations where the position is minted and deleted in the same block.
|
|
||||||
// From my investigation this happens in calls from BancorSwap.
|
|
||||||
// (e.g. 0xf7867fa19aa65298fadb8d4f72d0daed5e836f3ba01f0b9b9631cdc6c36bed40)
|
|
||||||
|
|
||||||
if (nfpmPosition) {
|
|
||||||
const { token0: token0Address, token1: token1Address, fee } = await this._uniClient.poolIdToPoolKey(blockHash, nfpmPosition.poolId);
|
|
||||||
|
|
||||||
const { pool: poolAddress } = await this._uniClient.getPool(blockHash, token0Address, token1Address, fee);
|
|
||||||
|
|
||||||
const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp });
|
|
||||||
const pool = await this._db.getPool({ id: poolAddress, blockNumber });
|
|
||||||
|
|
||||||
const [token0, token1] = await Promise.all([
|
|
||||||
this._db.getToken({ id: token0Address, blockNumber }),
|
|
||||||
this._db.getToken({ id: token0Address, blockNumber })
|
|
||||||
]);
|
|
||||||
|
|
||||||
const [tickLower, tickUpper] = await Promise.all([
|
|
||||||
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockNumber }),
|
|
||||||
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockNumber })
|
|
||||||
]);
|
|
||||||
|
|
||||||
position = await this._db.loadPosition({
|
|
||||||
id: tokenId.toString(),
|
|
||||||
blockNumber,
|
|
||||||
pool,
|
|
||||||
token0,
|
|
||||||
token1,
|
|
||||||
tickLower,
|
|
||||||
tickUpper,
|
|
||||||
transaction,
|
|
||||||
feeGrowthInside0LastX128: BigInt(nfpmPosition.feeGrowthInside0LastX128.toString()),
|
|
||||||
feeGrowthInside1LastX128: BigInt(nfpmPosition.feeGrowthInside1LastX128.toString())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return position || null;
|
|
||||||
}
|
|
||||||
|
|
||||||
async _updateFeeVars (position: Position, block: Block, contractAddress: string, tokenId: bigint): Promise<Position> {
|
|
||||||
const nfpmPosition = await this._uniClient.getPosition(block.hash, tokenId);
|
|
||||||
|
|
||||||
if (nfpmPosition) {
|
|
||||||
position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString());
|
|
||||||
position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
return position;
|
|
||||||
}
|
|
||||||
|
|
||||||
async _savePositionSnapshot (position: Position, block: Block, tx: Transaction): Promise<void> {
|
|
||||||
const transaction = await loadTransaction(this._db, { txHash: tx.hash, blockNumber: block.number, blockTimestamp: block.timestamp });
|
|
||||||
|
|
||||||
await this._db.loadPositionSnapshot({
|
|
||||||
id: position.id.concat('#').concat(block.number.toString()),
|
|
||||||
blockNumber: block.number,
|
|
||||||
owner: position.owner,
|
|
||||||
pool: position.pool,
|
|
||||||
position: position,
|
|
||||||
timestamp: block.timestamp,
|
|
||||||
liquidity: position.liquidity,
|
|
||||||
depositedToken0: position.depositedToken0,
|
|
||||||
depositedToken1: position.depositedToken1,
|
|
||||||
withdrawnToken0: position.withdrawnToken0,
|
|
||||||
withdrawnToken1: position.withdrawnToken1,
|
|
||||||
collectedFeesToken0: position.collectedFeesToken0,
|
|
||||||
collectedFeesToken1: position.collectedFeesToken1,
|
|
||||||
transaction,
|
|
||||||
feeGrowthInside0LastX128: position.feeGrowthInside0LastX128,
|
|
||||||
feeGrowthInside1LastX128: position.feeGrowthInside1LastX128
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
97
packages/uni-info-watcher/src/job-runner.ts
Normal file
97
packages/uni-info-watcher/src/job-runner.ts
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import assert from 'assert';
|
||||||
|
import 'reflect-metadata';
|
||||||
|
import yargs from 'yargs';
|
||||||
|
import { hideBin } from 'yargs/helpers';
|
||||||
|
import debug from 'debug';
|
||||||
|
|
||||||
|
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
||||||
|
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||||
|
import { getConfig, JobQueue } from '@vulcanize/util';
|
||||||
|
|
||||||
|
import { Indexer } from './indexer';
|
||||||
|
import { Database } from './database';
|
||||||
|
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events';
|
||||||
|
|
||||||
|
const log = debug('vulcanize:job-runner');
|
||||||
|
|
||||||
|
export const main = async (): Promise<any> => {
|
||||||
|
const argv = await yargs(hideBin(process.argv))
|
||||||
|
.option('f', {
|
||||||
|
alias: 'config-file',
|
||||||
|
demandOption: true,
|
||||||
|
describe: 'configuration file path (toml)',
|
||||||
|
type: 'string'
|
||||||
|
})
|
||||||
|
.argv;
|
||||||
|
|
||||||
|
const config = await getConfig(argv.f);
|
||||||
|
|
||||||
|
assert(config.server, 'Missing server config');
|
||||||
|
|
||||||
|
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
|
||||||
|
|
||||||
|
assert(dbConfig, 'Missing database config');
|
||||||
|
|
||||||
|
const db = new Database(dbConfig);
|
||||||
|
await db.init();
|
||||||
|
|
||||||
|
assert(upstream, 'Missing upstream config');
|
||||||
|
const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint }, tokenWatcher } = upstream;
|
||||||
|
assert(gqlEndpoint, 'Missing upstream uniWatcher.gqlEndpoint');
|
||||||
|
assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint');
|
||||||
|
|
||||||
|
const uniClient = new UniClient({
|
||||||
|
gqlEndpoint,
|
||||||
|
gqlSubscriptionEndpoint
|
||||||
|
});
|
||||||
|
|
||||||
|
const erc20Client = new ERC20Client(tokenWatcher);
|
||||||
|
|
||||||
|
const indexer = new Indexer(db, uniClient, erc20Client);
|
||||||
|
|
||||||
|
assert(jobQueueConfig, 'Missing job queue config');
|
||||||
|
|
||||||
|
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
|
||||||
|
assert(dbConnectionString, 'Missing job queue db connection string');
|
||||||
|
|
||||||
|
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
|
||||||
|
await jobQueue.start();
|
||||||
|
|
||||||
|
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||||
|
const { data: { block } } = job;
|
||||||
|
log(`Processing block hash ${block.hash} number ${block.number}`);
|
||||||
|
const events = await indexer.getOrFetchBlockEvents(block);
|
||||||
|
|
||||||
|
for (let ei = 0; ei < events.length; ei++) {
|
||||||
|
const { id } = events[ei];
|
||||||
|
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id });
|
||||||
|
}
|
||||||
|
|
||||||
|
await jobQueue.markComplete(job);
|
||||||
|
});
|
||||||
|
|
||||||
|
await jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
|
const { data: { id } } = job;
|
||||||
|
|
||||||
|
log(`Processing event ${id}`);
|
||||||
|
const dbEvent = await db.getEvent(id);
|
||||||
|
assert(dbEvent);
|
||||||
|
|
||||||
|
if (!dbEvent.block.isComplete) {
|
||||||
|
await indexer.processEvent(dbEvent);
|
||||||
|
await indexer.updateBlockProgress(dbEvent.block.blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
await jobQueue.markComplete(job);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
main().then(() => {
|
||||||
|
log('Starting job runner...');
|
||||||
|
}).catch(err => {
|
||||||
|
log(err);
|
||||||
|
});
|
||||||
|
|
||||||
|
process.on('uncaughtException', err => {
|
||||||
|
log('uncaughtException', err);
|
||||||
|
});
|
@ -1,18 +1,16 @@
|
|||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import 'reflect-metadata';
|
import 'reflect-metadata';
|
||||||
import express, { Application } from 'express';
|
import express, { Application } from 'express';
|
||||||
import { ApolloServer, PubSub } from 'apollo-server-express';
|
import { ApolloServer } from 'apollo-server-express';
|
||||||
import yargs from 'yargs';
|
import yargs from 'yargs';
|
||||||
import { hideBin } from 'yargs/helpers';
|
import { hideBin } from 'yargs/helpers';
|
||||||
import debug from 'debug';
|
import debug from 'debug';
|
||||||
import 'graphql-import-node';
|
import 'graphql-import-node';
|
||||||
import { createServer } from 'http';
|
import { createServer } from 'http';
|
||||||
|
|
||||||
import { getCache } from '@vulcanize/cache';
|
|
||||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
|
||||||
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
||||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||||
import { getConfig } from '@vulcanize/util';
|
import { getConfig, JobQueue } from '@vulcanize/util';
|
||||||
|
|
||||||
import typeDefs from './schema';
|
import typeDefs from './schema';
|
||||||
|
|
||||||
@ -40,7 +38,7 @@ export const main = async (): Promise<any> => {
|
|||||||
|
|
||||||
const { host, port } = config.server;
|
const { host, port } = config.server;
|
||||||
|
|
||||||
const { upstream, database: dbConfig } = config;
|
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
|
||||||
|
|
||||||
assert(dbConfig, 'Missing database config');
|
assert(dbConfig, 'Missing database config');
|
||||||
|
|
||||||
@ -53,7 +51,6 @@ export const main = async (): Promise<any> => {
|
|||||||
gqlApiEndpoint,
|
gqlApiEndpoint,
|
||||||
gqlPostgraphileEndpoint
|
gqlPostgraphileEndpoint
|
||||||
},
|
},
|
||||||
cache: cacheConfig,
|
|
||||||
uniWatcher,
|
uniWatcher,
|
||||||
tokenWatcher
|
tokenWatcher
|
||||||
} = upstream;
|
} = upstream;
|
||||||
@ -61,22 +58,19 @@ export const main = async (): Promise<any> => {
|
|||||||
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
|
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
|
||||||
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
|
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
|
||||||
|
|
||||||
const cache = await getCache(cacheConfig);
|
|
||||||
const ethClient = new EthClient({
|
|
||||||
gqlEndpoint: gqlApiEndpoint,
|
|
||||||
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
|
|
||||||
cache
|
|
||||||
});
|
|
||||||
|
|
||||||
const uniClient = new UniClient(uniWatcher);
|
const uniClient = new UniClient(uniWatcher);
|
||||||
|
|
||||||
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
|
|
||||||
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
||||||
const pubsub = new PubSub();
|
|
||||||
const erc20Client = new ERC20Client(tokenWatcher);
|
const erc20Client = new ERC20Client(tokenWatcher);
|
||||||
const indexer = new Indexer(db, ethClient, pubsub);
|
const indexer = new Indexer(db, uniClient, erc20Client);
|
||||||
|
|
||||||
const eventWatcher = new EventWatcher(db, uniClient, erc20Client);
|
assert(jobQueueConfig, 'Missing job queue config');
|
||||||
|
|
||||||
|
const { dbConnectionString, maxCompletionLag } = jobQueueConfig;
|
||||||
|
assert(dbConnectionString, 'Missing job queue db connection string');
|
||||||
|
|
||||||
|
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
|
||||||
|
await jobQueue.start();
|
||||||
|
|
||||||
|
const eventWatcher = new EventWatcher(indexer, uniClient, jobQueue);
|
||||||
await eventWatcher.start();
|
await eventWatcher.start();
|
||||||
|
|
||||||
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer);
|
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer);
|
||||||
@ -105,3 +99,7 @@ main().then(() => {
|
|||||||
}).catch(err => {
|
}).catch(err => {
|
||||||
log(err);
|
log(err);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
process.on('uncaughtException', err => {
|
||||||
|
log('uncaughtException', err);
|
||||||
|
});
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import { gql } from '@apollo/client/core';
|
import { gql } from '@apollo/client/core';
|
||||||
import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';
|
import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';
|
||||||
|
|
||||||
import { queryGetPool, queryPoolIdToPoolKey, queryPosition } from './queries';
|
import { queryGetPool, queryPoolIdToPoolKey, queryPosition, queryEvents, subscribeEvents } from './queries';
|
||||||
|
|
||||||
export class Client {
|
export class Client {
|
||||||
_config: GraphQLConfig;
|
_config: GraphQLConfig;
|
||||||
@ -15,89 +15,25 @@ export class Client {
|
|||||||
|
|
||||||
async watchEvents (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
|
async watchEvents (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
|
||||||
return this._client.subscribe(
|
return this._client.subscribe(
|
||||||
gql`
|
gql(subscribeEvents),
|
||||||
subscription SubscriptionReceipt {
|
|
||||||
onEvent {
|
|
||||||
block {
|
|
||||||
number
|
|
||||||
hash
|
|
||||||
timestamp
|
|
||||||
}
|
|
||||||
contract
|
|
||||||
tx {
|
|
||||||
hash
|
|
||||||
}
|
|
||||||
proof {
|
|
||||||
data
|
|
||||||
}
|
|
||||||
event {
|
|
||||||
__typename
|
|
||||||
|
|
||||||
... on PoolCreatedEvent {
|
|
||||||
token0
|
|
||||||
token1
|
|
||||||
fee
|
|
||||||
tickSpacing
|
|
||||||
pool
|
|
||||||
}
|
|
||||||
|
|
||||||
... on InitializeEvent {
|
|
||||||
sqrtPriceX96
|
|
||||||
tick
|
|
||||||
}
|
|
||||||
|
|
||||||
... on MintEvent {
|
|
||||||
sender
|
|
||||||
owner
|
|
||||||
tickLower
|
|
||||||
tickUpper
|
|
||||||
amount
|
|
||||||
amount0
|
|
||||||
amount1
|
|
||||||
}
|
|
||||||
|
|
||||||
... on BurnEvent {
|
|
||||||
owner
|
|
||||||
tickLower
|
|
||||||
tickUpper
|
|
||||||
amount
|
|
||||||
amount0
|
|
||||||
amount1
|
|
||||||
}
|
|
||||||
|
|
||||||
... on SwapEvent {
|
|
||||||
sender
|
|
||||||
recipient
|
|
||||||
amount0
|
|
||||||
amount1
|
|
||||||
sqrtPriceX96
|
|
||||||
liquidity
|
|
||||||
tick
|
|
||||||
}
|
|
||||||
|
|
||||||
... on IncreaseLiquidityEvent {
|
|
||||||
tokenId
|
|
||||||
liquidity
|
|
||||||
amount0
|
|
||||||
amount1
|
|
||||||
}
|
|
||||||
|
|
||||||
... on DecreaseLiquidityEvent {
|
|
||||||
tokenId
|
|
||||||
liquidity
|
|
||||||
amount0
|
|
||||||
amount1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
`,
|
|
||||||
({ data }) => {
|
({ data }) => {
|
||||||
onNext(data.onEvent);
|
onNext(data.onEvent);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getEvents (blockHash: string, contract?: string): Promise<any> {
|
||||||
|
const { events } = await this._client.query(
|
||||||
|
gql(queryEvents),
|
||||||
|
{
|
||||||
|
blockHash,
|
||||||
|
contract
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
async getPosition (blockHash: string, tokenId: bigint): Promise<any> {
|
async getPosition (blockHash: string, tokenId: bigint): Promise<any> {
|
||||||
const { position } = await this._client.query(
|
const { position } = await this._client.query(
|
||||||
gql(queryPosition),
|
gql(queryPosition),
|
||||||
|
@ -38,31 +38,6 @@ export class Database {
|
|||||||
.getMany();
|
.getMany();
|
||||||
}
|
}
|
||||||
|
|
||||||
async getEvents (blockHash: string, contract: string): Promise<Event[]> {
|
|
||||||
return this._conn.getRepository(Event)
|
|
||||||
.createQueryBuilder('event')
|
|
||||||
.innerJoinAndSelect('event.block', 'block')
|
|
||||||
.where('block_hash = :blockHash AND contract = :contract', {
|
|
||||||
blockHash,
|
|
||||||
contract
|
|
||||||
})
|
|
||||||
.addOrderBy('event.id', 'ASC')
|
|
||||||
.getMany();
|
|
||||||
}
|
|
||||||
|
|
||||||
async getEventsByName (blockHash: string, contract: string, eventName: string): Promise<Event[] | undefined> {
|
|
||||||
return this._conn.getRepository(Event)
|
|
||||||
.createQueryBuilder('event')
|
|
||||||
.innerJoinAndSelect('event.block', 'block')
|
|
||||||
.where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', {
|
|
||||||
blockHash,
|
|
||||||
contract,
|
|
||||||
eventName
|
|
||||||
})
|
|
||||||
.addOrderBy('event.id', 'ASC')
|
|
||||||
.getMany();
|
|
||||||
}
|
|
||||||
|
|
||||||
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
|
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
|
||||||
const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1);
|
const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1);
|
||||||
const expected = blockNumbers.length;
|
const expected = blockNumbers.length;
|
||||||
@ -135,7 +110,7 @@ export class Database {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getEvent (id: string): Promise<Event | undefined> {
|
async getEvent (id: string): Promise<Event | undefined> {
|
||||||
return this._conn.getRepository(Event).findOne(id, { relations: [ 'block' ]});
|
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveEventEntity (entity: Event): Promise<Event> {
|
async saveEventEntity (entity: Event): Promise<Event> {
|
||||||
|
@ -52,8 +52,11 @@ export class EventWatcher {
|
|||||||
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||||
const { data: { request, failed, state, createdOn } } = job;
|
const { data: { request, failed, state, createdOn } } = job;
|
||||||
|
|
||||||
await this._indexer.updateBlockProgress(request.data.blockHash);
|
const dbEvent = await this._indexer.getEvent(request.data.id);
|
||||||
const blockProgress = await this._indexer.getBlockProgress(request.data.blockHash);
|
assert(dbEvent);
|
||||||
|
|
||||||
|
await this._indexer.updateBlockProgress(dbEvent.block.blockHash);
|
||||||
|
const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash);
|
||||||
if (blockProgress && request.data.publishBlockProgress) {
|
if (blockProgress && request.data.publishBlockProgress) {
|
||||||
await this.publishBlockProgressToSubscribers(blockProgress);
|
await this.publishBlockProgressToSubscribers(blockProgress);
|
||||||
}
|
}
|
||||||
|
@ -107,18 +107,20 @@ export class Indexer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
|
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
|
||||||
const uniContract = await this.isUniswapContract(contract);
|
if (contract) {
|
||||||
if (!uniContract) {
|
const uniContract = await this.isUniswapContract(contract);
|
||||||
throw new Error('Not a uniswap contract');
|
if (!uniContract) {
|
||||||
|
throw new Error('Not a uniswap contract');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const events = await this._db.getEvents(blockHash, contract);
|
const events = await this._db.getBlockEvents(blockHash);
|
||||||
log(`getEvents: db hit, num events: ${events.length}`);
|
log(`getEvents: db hit, num events: ${events.length}`);
|
||||||
|
|
||||||
// Filtering.
|
// Filtering.
|
||||||
const result = events
|
const result = events
|
||||||
// TODO: Filter using db WHERE condition on contract.
|
// TODO: Filter using db WHERE condition on contract.
|
||||||
.filter(event => contract === event.contract)
|
.filter(event => !contract || contract === event.contract)
|
||||||
// TODO: Filter using db WHERE condition when name is not empty.
|
// TODO: Filter using db WHERE condition when name is not empty.
|
||||||
.filter(event => !name || name === event.eventName);
|
.filter(event => !name || name === event.eventName);
|
||||||
|
|
||||||
@ -340,7 +342,6 @@ export class Indexer {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
await this._db.saveEvents(block, dbEvents);
|
await this._db.saveEvents(block, dbEvents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,15 +1,96 @@
|
|||||||
import { gql } from 'graphql-request';
|
import { gql } from 'graphql-request';
|
||||||
|
|
||||||
export const queryEvents = gql`
|
const resultEvent = `
|
||||||
query getEvents($blockHash: String!, $token: String!) {
|
{
|
||||||
events(blockHash: $blockHash, token: $token) {
|
block {
|
||||||
event {
|
number
|
||||||
__typename
|
hash
|
||||||
|
timestamp
|
||||||
|
parentHash
|
||||||
|
}
|
||||||
|
tx {
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
contract
|
||||||
|
eventIndex
|
||||||
|
|
||||||
|
event {
|
||||||
|
__typename
|
||||||
|
|
||||||
|
... on PoolCreatedEvent {
|
||||||
|
token0
|
||||||
|
token1
|
||||||
|
fee
|
||||||
|
tickSpacing
|
||||||
|
pool
|
||||||
}
|
}
|
||||||
proof {
|
|
||||||
data
|
... on InitializeEvent {
|
||||||
|
sqrtPriceX96
|
||||||
|
tick
|
||||||
|
}
|
||||||
|
|
||||||
|
... on MintEvent {
|
||||||
|
sender
|
||||||
|
owner
|
||||||
|
tickLower
|
||||||
|
tickUpper
|
||||||
|
amount
|
||||||
|
amount0
|
||||||
|
amount1
|
||||||
|
}
|
||||||
|
|
||||||
|
... on BurnEvent {
|
||||||
|
owner
|
||||||
|
tickLower
|
||||||
|
tickUpper
|
||||||
|
amount
|
||||||
|
amount0
|
||||||
|
amount1
|
||||||
|
}
|
||||||
|
|
||||||
|
... on SwapEvent {
|
||||||
|
sender
|
||||||
|
recipient
|
||||||
|
amount0
|
||||||
|
amount1
|
||||||
|
sqrtPriceX96
|
||||||
|
liquidity
|
||||||
|
tick
|
||||||
|
}
|
||||||
|
|
||||||
|
... on IncreaseLiquidityEvent {
|
||||||
|
tokenId
|
||||||
|
liquidity
|
||||||
|
amount0
|
||||||
|
amount1
|
||||||
|
}
|
||||||
|
|
||||||
|
... on DecreaseLiquidityEvent {
|
||||||
|
tokenId
|
||||||
|
liquidity
|
||||||
|
amount0
|
||||||
|
amount1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
proof {
|
||||||
|
data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`;
|
||||||
|
|
||||||
|
export const subscribeEvents = gql`
|
||||||
|
subscription SubscriptionEvents {
|
||||||
|
onEvent
|
||||||
|
${resultEvent}
|
||||||
|
}
|
||||||
|
`;
|
||||||
|
|
||||||
|
export const queryEvents = gql`
|
||||||
|
query getEvents($blockHash: String!, $contract: String) {
|
||||||
|
events(blockHash: $blockHash, contract: $contract)
|
||||||
|
${resultEvent}
|
||||||
}
|
}
|
||||||
`;
|
`;
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import debug from 'debug';
|
|||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { EventWatcher } from './events';
|
import { EventWatcher } from './events';
|
||||||
|
import { UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||||
|
|
||||||
const log = debug('vulcanize:resolver');
|
const log = debug('vulcanize:resolver');
|
||||||
|
|
||||||
@ -59,7 +60,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
|
|||||||
}
|
}
|
||||||
|
|
||||||
const events = await indexer.getEventsByFilter(blockHash, contract, name);
|
const events = await indexer.getEventsByFilter(blockHash, contract, name);
|
||||||
return events.map(event => indexer.getResultEvent(event));
|
return events.filter(event => event.eventName !== UNKNOWN_EVENT_NAME)
|
||||||
|
.map(event => indexer.getResultEvent(event));
|
||||||
},
|
},
|
||||||
|
|
||||||
eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => {
|
eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => {
|
||||||
|
@ -234,7 +234,7 @@ type Query {
|
|||||||
# Get uniswap events at a certain block, optionally filter by event name.
|
# Get uniswap events at a certain block, optionally filter by event name.
|
||||||
events(
|
events(
|
||||||
blockHash: String!
|
blockHash: String!
|
||||||
contract: String!
|
contract: String
|
||||||
name: String
|
name: String
|
||||||
): [ResultEvent!]
|
): [ResultEvent!]
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user