diff --git a/packages/uni-info-watcher/README.md b/packages/uni-info-watcher/README.md index 6ffcb34f..9ef325ee 100644 --- a/packages/uni-info-watcher/README.md +++ b/packages/uni-info-watcher/README.md @@ -2,7 +2,52 @@ ## Instructions -* To start the server run `yarn server`. +### Setup + +Create a postgres12 database for the job queue: + +``` +sudo su - postgres +createdb uni-info-watcher-job-queue +``` + +Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro). + +Example: + +``` +postgres@tesla:~$ psql -U postgres -h localhost uni-info-watcher-job-queue +Password for user postgres: +psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) +SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) +Type "help" for help. + +uni-watcher-job-queue=# CREATE EXTENSION pgcrypto; +CREATE EXTENSION +uni-info-watcher-job-queue=# exit +``` + +Create a postgres12 database for the uni-info watcher: + +``` +sudo su - postgres +createdb uni-info-watcher +``` + +Update `environments/local.toml` with database connection settings for both the databases. + +### Run + +* Start the server: + ```bash + $ yarn server + ``` + +* Start the job runner: + + ```bash + $ yarn job-runner + ``` * Run `yarn server:mock` to run server with mock data. diff --git a/packages/uni-info-watcher/environments/local.toml b/packages/uni-info-watcher/environments/local.toml index c53401eb..07aee124 100644 --- a/packages/uni-info-watcher/environments/local.toml +++ b/packages/uni-info-watcher/environments/local.toml @@ -38,3 +38,7 @@ [upstream.tokenWatcher] gqlEndpoint = "http://127.0.0.1:3001/graphql" gqlSubscriptionEndpoint = "http://127.0.0.1:3001/graphql" + +[jobQueue] + dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue" + maxCompletionLag = 300 diff --git a/packages/uni-info-watcher/package.json b/packages/uni-info-watcher/package.json index cca9033f..72a6f346 100644 --- a/packages/uni-info-watcher/package.json +++ b/packages/uni-info-watcher/package.json @@ -17,6 +17,7 @@ "scripts": { "server": "DEBUG=vulcanize:* nodemon src/server.ts -f environments/local.toml", "server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml", + "job-runner": "DEBUG=vulcanize:* nodemon src/job-runner.ts -f environments/local.toml", "test": "mocha -r ts-node/register src/**/*.spec.ts", "lint": "eslint .", "build": "tsc", diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index fa6517c3..e78fd405 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -20,6 +20,8 @@ import { Burn } from './entity/Burn'; import { Swap } from './entity/Swap'; import { Position } from './entity/Position'; import { PositionSnapshot } from './entity/PositionSnapshot'; +import { BlockProgress } from './entity/BlockProgress'; +import { Block } from './events'; export class Database { _config: ConnectionOptions @@ -633,56 +635,100 @@ export class Database { return numRows > 0; } + async getBlockEvents (blockHash: string): Promise { + return this._conn.getRepository(Event) + .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') + .where('block_hash = :blockHash', { blockHash }) + .addOrderBy('event.id', 'ASC') + .getMany(); + } + async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') .where('block_hash = :blockHash AND token = :token', { blockHash, token }) - .addOrderBy('id', 'ASC') + .addOrderBy('event.id', 'ASC') .getMany(); } async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') .where('block_hash = :blockHash AND token = :token AND :eventName = :eventName', { blockHash, token, eventName }) + .addOrderBy('event.id', 'ASC') .getMany(); } - async saveEvents ({ blockHash, token, events }: { blockHash: string, token: string, events: DeepPartial[] }): Promise { + async saveEvents (block: Block, events: DeepPartial[]): Promise { + const { + hash: blockHash, + number: blockNumber, + timestamp: blockTimestamp, + parentHash + } = block; + + assert(blockHash); + assert(blockNumber); + assert(blockTimestamp); + assert(parentHash); + // In a transaction: // (1) Save all the events in the database. - // (2) Add an entry to the event progress table. - + // (2) Add an entry to the block progress table. await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(EventSyncProgress); + const numEvents = events.length; + const blockProgressRepo = tx.getRepository(BlockProgress); + let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); - // Check sync progress inside the transaction. - const numRows = await repo - .createQueryBuilder() - .where('block_hash = :blockHash AND token = :token', { + if (!blockProgress) { + const entity = blockProgressRepo.create({ blockHash, - token - }) - .getCount(); + parentHash, + blockNumber, + blockTimestamp, + numEvents, + numProcessedEvents: 0, + isComplete: (numEvents === 0) + }); + + blockProgress = await blockProgressRepo.save(entity); - if (numRows === 0) { // Bulk insert events. - await tx.createQueryBuilder() - .insert() - .into(Event) - .values(events) - .execute(); + events.forEach(event => { event.block = blockProgress; }); + await tx.createQueryBuilder().insert().into(Event).values(events).execute(); + } + }); + } - // Update event sync progress. - const progress = repo.create({ blockHash, token }); - await repo.save(progress); + async getEvent (id: string): Promise { + return this._conn.getRepository(Event).findOne(id, { relations: ['block'] }); + } + + async getBlockProgress (blockHash: string): Promise { + const repo = this._conn.getRepository(BlockProgress); + return repo.findOne({ where: { blockHash } }); + } + + async updateBlockProgress (blockHash: string): Promise { + await this._conn.transaction(async (tx) => { + const repo = tx.getRepository(BlockProgress); + const entity = await repo.findOne({ where: { blockHash } }); + if (entity && !entity.isComplete) { + entity.numProcessedEvents++; + if (entity.numProcessedEvents >= entity.numEvents) { + entity.isComplete = true; + } + await repo.save(entity); } }); } diff --git a/packages/uni-info-watcher/src/entity/BlockProgress.ts b/packages/uni-info-watcher/src/entity/BlockProgress.ts new file mode 100644 index 00000000..ef732154 --- /dev/null +++ b/packages/uni-info-watcher/src/entity/BlockProgress.ts @@ -0,0 +1,31 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; + +@Entity() +@Index(['blockHash'], { unique: true }) +@Index(['blockNumber']) +@Index(['parentHash']) +export class BlockProgress { + @PrimaryGeneratedColumn() + id!: number; + + @Column('varchar', { length: 66 }) + blockHash!: string; + + @Column('varchar', { length: 66 }) + parentHash!: string; + + @Column('integer') + blockNumber!: number; + + @Column('integer') + blockTimestamp!: number; + + @Column('integer') + numEvents!: number; + + @Column('integer') + numProcessedEvents!: number; + + @Column('boolean') + isComplete!: boolean +} diff --git a/packages/uni-info-watcher/src/entity/Burn.ts b/packages/uni-info-watcher/src/entity/Burn.ts index f89a28fd..a2a49460 100644 --- a/packages/uni-info-watcher/src/entity/Burn.ts +++ b/packages/uni-info-watcher/src/entity/Burn.ts @@ -14,7 +14,7 @@ export class Burn { @PrimaryColumn('integer') blockNumber!: number; - @ManyToOne(() => Transaction, transaction => transaction.mints) + @ManyToOne(() => Transaction, transaction => transaction.burns) transaction!: Transaction @Column('bigint') diff --git a/packages/uni-info-watcher/src/entity/Event.ts b/packages/uni-info-watcher/src/entity/Event.ts index 6edfcb4e..b8f6e86f 100644 --- a/packages/uni-info-watcher/src/entity/Event.ts +++ b/packages/uni-info-watcher/src/entity/Event.ts @@ -1,21 +1,32 @@ -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm'; +import { BlockProgress } from './BlockProgress'; @Entity() // Index to query all events for a contract efficiently. -@Index(['blockHash', 'token']) +@Index(['contract']) export class Event { @PrimaryGeneratedColumn() id!: number; + @ManyToOne(() => BlockProgress) + block!: BlockProgress; + @Column('varchar', { length: 66 }) - blockHash!: string; + txHash!: string; + + // Index of the log in the block. + @Column('integer') + index!: number; @Column('varchar', { length: 42 }) - token!: string; + contract!: string; @Column('varchar', { length: 256 }) eventName!: string; + @Column('text') + eventInfo!: string; + @Column('text') proof!: string; } diff --git a/packages/uni-info-watcher/src/entity/Swap.ts b/packages/uni-info-watcher/src/entity/Swap.ts index b50df8c5..bff8e6af 100644 --- a/packages/uni-info-watcher/src/entity/Swap.ts +++ b/packages/uni-info-watcher/src/entity/Swap.ts @@ -14,7 +14,7 @@ export class Swap { @PrimaryColumn('integer') blockNumber!: number; - @ManyToOne(() => Transaction, transaction => transaction.mints) + @ManyToOne(() => Transaction, transaction => transaction.swaps) transaction!: Transaction @Column('bigint') diff --git a/packages/uni-info-watcher/src/entity/Transaction.ts b/packages/uni-info-watcher/src/entity/Transaction.ts index ae4aa8a5..4bb9c1ae 100644 --- a/packages/uni-info-watcher/src/entity/Transaction.ts +++ b/packages/uni-info-watcher/src/entity/Transaction.ts @@ -3,6 +3,8 @@ import { Entity, PrimaryColumn, Column, OneToMany } from 'typeorm'; import { decimalTransformer } from '@vulcanize/util'; import { Mint } from './Mint'; +import { Burn } from './Burn'; +import { Swap } from './Swap'; @Entity() export class Transaction { @@ -21,6 +23,9 @@ export class Transaction { @OneToMany(() => Mint, mint => mint.transaction) mints!: Mint[]; - // burns: [Burn]! - // swaps: [Swap]! + @OneToMany(() => Burn, burn => burn.transaction) + burns!: Burn[]; + + @OneToMany(() => Swap, swap => swap.transaction) + swaps!: Swap[]; } diff --git a/packages/uni-info-watcher/src/events.ts b/packages/uni-info-watcher/src/events.ts index ad94359d..e40c9052 100644 --- a/packages/uni-info-watcher/src/events.ts +++ b/packages/uni-info-watcher/src/events.ts @@ -1,21 +1,12 @@ import assert from 'assert'; import debug from 'debug'; import { Client as UniClient } from '@vulcanize/uni-watcher'; -import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; -import { BigNumber, utils } from 'ethers'; - -import { Database } from './database'; -import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; -import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; -import { Token } from './entity/Token'; -import { convertTokenToDecimal, loadTransaction, safeDiv } from './utils'; -import { loadTick } from './utils/tick'; -import Decimal from 'decimal.js'; -import { Position } from './entity/Position'; +import { JobQueue } from '../../util'; +import { Indexer } from './indexer'; const log = debug('vulcanize:events'); -interface PoolCreatedEvent { +export interface PoolCreatedEvent { __typename: 'PoolCreatedEvent'; token0: string; token1: string; @@ -24,13 +15,13 @@ interface PoolCreatedEvent { pool: string; } -interface InitializeEvent { +export interface InitializeEvent { __typename: 'InitializeEvent'; sqrtPriceX96: bigint; tick: bigint; } -interface MintEvent { +export interface MintEvent { __typename: 'MintEvent'; sender: string; owner: string; @@ -41,7 +32,7 @@ interface MintEvent { amount1: bigint; } -interface BurnEvent { +export interface BurnEvent { __typename: 'BurnEvent'; owner: string; tickLower: bigint; @@ -51,7 +42,7 @@ interface BurnEvent { amount1: bigint; } -interface SwapEvent { +export interface SwapEvent { __typename: 'SwapEvent'; sender: string; recipient: string; @@ -62,7 +53,7 @@ interface SwapEvent { tick: bigint; } -interface IncreaseLiquidityEvent { +export interface IncreaseLiquidityEvent { __typename: 'IncreaseLiquidityEvent'; tokenId: bigint; liquidity: bigint; @@ -70,7 +61,7 @@ interface IncreaseLiquidityEvent { amount1: bigint; } -interface DecreaseLiquidityEvent { +export interface DecreaseLiquidityEvent { __typename: 'DecreaseLiquidityEvent'; tokenId: bigint; liquidity: bigint; @@ -78,45 +69,63 @@ interface DecreaseLiquidityEvent { amount1: bigint; } -interface Block { +export interface Block { number: number; hash: string; timestamp: number; + parentHash: string; } -interface Transaction { +export interface Transaction { hash: string; - from: string; + from?: string; } -interface ResultEvent { +export interface ResultEvent { block: Block; tx: Transaction; contract: string; + eventIndex: number; event: PoolCreatedEvent | InitializeEvent | MintEvent | BurnEvent | SwapEvent | IncreaseLiquidityEvent | DecreaseLiquidityEvent; proof: { data: string; } } +export const QUEUE_EVENT_PROCESSING = 'event-processing'; +export const QUEUE_BLOCK_PROCESSING = 'block-processing'; + export class EventWatcher { - _db: Database _subscription?: ZenObservable.Subscription _uniClient: UniClient - _erc20Client: ERC20Client + _jobQueue: JobQueue + _indexer: Indexer - constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client) { - assert(db); - - this._db = db; + constructor (indexer: Indexer, uniClient: UniClient, jobQueue: JobQueue) { this._uniClient = uniClient; - this._erc20Client = erc20Client; + this._jobQueue = jobQueue; + this._indexer = indexer; } async start (): Promise { assert(!this._subscription, 'subscription already started'); log('Started watching upstream events...'); - this._subscription = await this._uniClient.watchEvents(this._handleEvents.bind(this)); + + this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { + const { data: { request: { data: { block } } } } = job; + log(`Job onComplete block ${block.hash} ${block.number}`); + }); + + this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + const { data: { request } } = job; + + log(`Job onComplete event ${request.data.id}`); + }); + + this._subscription = await this._uniClient.watchEvents(async ({ block }: ResultEvent) => { + log('watchEvent', block.hash, block.number); + return this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { block }); + }); } async stop (): Promise { @@ -125,756 +134,4 @@ export class EventWatcher { this._subscription.unsubscribe(); } } - - async _handleEvents ({ block, tx, contract, event }: ResultEvent): Promise { - // TODO: Process proof (proof.data) in event. - const { __typename: eventType } = event; - - switch (eventType) { - case 'PoolCreatedEvent': - log('Factory PoolCreated event', contract); - this._handlePoolCreated(block, contract, tx, event as PoolCreatedEvent); - break; - - case 'InitializeEvent': - log('Pool Initialize event', contract); - this._handleInitialize(block, contract, tx, event as InitializeEvent); - break; - - case 'MintEvent': - log('Pool Mint event', contract); - this._handleMint(block, contract, tx, event as MintEvent); - break; - - case 'BurnEvent': - log('Pool Burn event', contract); - this._handleBurn(block, contract, tx, event as BurnEvent); - break; - - case 'SwapEvent': - log('Pool Swap event', contract); - this._handleSwap(block, contract, tx, event as SwapEvent); - break; - - case 'IncreaseLiquidityEvent': - log('NFPM IncreaseLiquidity event', contract); - this._handleIncreaseLiquidity(block, contract, tx, event as IncreaseLiquidityEvent); - break; - - case 'DecreaseLiquidityEvent': - log('NFPM DecreaseLiquidity event', contract); - this._handleDecreaseLiquidity(block, contract, tx, event as DecreaseLiquidityEvent); - break; - - default: - break; - } - } - - async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise { - const { number: blockNumber, hash: blockHash } = block; - const { token0: token0Address, token1: token1Address, fee, pool: poolAddress } = poolCreatedEvent; - - // Load factory. - const factory = await this._db.loadFactory({ blockNumber, id: contractAddress }); - - // Update Factory. - let factoryPoolCount = BigNumber.from(factory.poolCount); - factoryPoolCount = factoryPoolCount.add(1); - factory.poolCount = BigInt(factoryPoolCount.toHexString()); - - // Get Tokens. - let [token0, token1] = await Promise.all([ - this._db.getToken({ blockNumber, id: token0Address }), - this._db.getToken({ blockNumber, id: token1Address }) - ]); - - // Create Tokens if not present. - if (!token0) { - token0 = await this._createToken(blockHash, blockNumber, token0Address); - } - - if (!token1) { - token1 = await this._createToken(blockHash, blockNumber, token1Address); - } - - // Create new Pool entity. - // Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph. - const pool = await this._db.loadPool({ - blockNumber, - id: poolAddress, - token0: token0, - token1: token1, - feeTier: BigInt(fee) - }); - - // Update white listed pools. - if (WHITELIST_TOKENS.includes(token0.id)) { - token1.whitelistPools.push(pool); - await this._db.saveToken(token1, blockNumber); - } - - if (WHITELIST_TOKENS.includes(token1.id)) { - token0.whitelistPools.push(pool); - await this._db.saveToken(token0, blockNumber); - } - - // Save entities to DB. - await this._db.saveFactory(factory, blockNumber); - } - - /** - * Create new Token. - * @param tokenAddress - */ - async _createToken (blockHash: string, blockNumber: number, tokenAddress: string): Promise { - const { value: symbol } = await this._erc20Client.getSymbol(blockHash, tokenAddress); - const { value: name } = await this._erc20Client.getName(blockHash, tokenAddress); - const { value: totalSupply } = await this._erc20Client.getTotalSupply(blockHash, tokenAddress); - - // TODO: Decimals not implemented by erc20-watcher. - // const { value: decimals } = await this._erc20Client.getDecimals(blockHash, tokenAddress); - - return this._db.loadToken({ - blockNumber, - id: tokenAddress, - symbol, - name, - totalSupply - }); - } - - async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise { - const { number: blockNumber, timestamp: blockTimestamp } = block; - const { sqrtPriceX96, tick } = initializeEvent; - const pool = await this._db.getPool({ id: contractAddress, blockNumber }); - assert(pool, `Pool ${contractAddress} not found.`); - - // Update Pool. - pool.sqrtPrice = BigInt(sqrtPriceX96); - pool.tick = BigInt(tick); - this._db.savePool(pool, blockNumber); - - // Update ETH price now that prices could have changed. - const bundle = await this._db.loadBundle({ id: '1', blockNumber }); - bundle.ethPriceUSD = await getEthPriceInUSD(this._db); - this._db.saveBundle(bundle, blockNumber); - - await updatePoolDayData(this._db, { contractAddress, blockNumber, blockTimestamp }); - await updatePoolHourData(this._db, { contractAddress, blockNumber, blockTimestamp }); - - const [token0, token1] = await Promise.all([ - this._db.getToken({ id: pool.token0.id, blockNumber }), - this._db.getToken({ id: pool.token1.id, blockNumber }) - ]); - - assert(token0 && token1, 'Pool tokens not found.'); - - // Update token prices. - token0.derivedETH = await findEthPerToken(token0); - token1.derivedETH = await findEthPerToken(token1); - - await Promise.all([ - this._db.saveToken(token0, blockNumber), - this._db.saveToken(token1, blockNumber) - ]); - } - - async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise { - const { number: blockNumber, timestamp: blockTimestamp } = block; - const { hash: txHash } = tx; - const bundle = await this._db.loadBundle({ id: '1', blockNumber }); - const poolAddress = contractAddress; - const pool = await this._db.loadPool({ id: poolAddress, blockNumber }); - - // TODO: In subgraph factory is fetched by hardcoded factory address. - // Currently fetching first factory in database as only one exists. - const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 }); - - const token0 = pool.token0; - const token1 = pool.token1; - const amount0 = convertTokenToDecimal(mintEvent.amount0, BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(mintEvent.amount1, BigInt(token1.decimals)); - - const amountUSD = amount0 - .times(token0.derivedETH.times(bundle.ethPriceUSD)) - .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); - - // Reset tvl aggregates until new amounts calculated. - factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); - - // Update globals. - factory.txCount = BigInt(factory.txCount) + BigInt(1); - - // Update token0 data. - token0.txCount = BigInt(token0.txCount) + BigInt(1); - token0.totalValueLocked = token0.totalValueLocked.plus(amount0); - token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); - - // Update token1 data. - token1.txCount = BigInt(token1.txCount) + BigInt(1); - token1.totalValueLocked = token1.totalValueLocked.plus(amount1); - token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); - - // Pool data. - pool.txCount = BigInt(pool.txCount) + BigInt(1); - - // Pools liquidity tracks the currently active liquidity given pools current tick. - // We only want to update it on mint if the new position includes the current tick. - if (pool.tick !== null) { - if ( - BigInt(mintEvent.tickLower) <= BigInt(pool.tick) && - BigInt(mintEvent.tickUpper) > BigInt(pool.tick) - ) { - pool.liquidity = BigInt(pool.liquidity) + BigInt(mintEvent.amount); - } - } - - pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); - pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); - - pool.totalValueLockedETH = pool.totalValueLockedToken0.times(token0.derivedETH) - .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); - - pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); - - // Reset aggregates with new amounts. - factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); - factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); - - const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); - - await this._db.loadMint({ - id: transaction.id + '#' + pool.txCount.toString(), - blockNumber, - transaction, - timestamp: transaction.timestamp, - pool, - token0: pool.token0, - token1: pool.token1, - owner: mintEvent.owner, - sender: mintEvent.sender, - - // TODO: Assign origin with Transaction from address. - // origin: event.transaction.from - - amount: mintEvent.amount, - amount0: amount0, - amount1: amount1, - amountUSD: amountUSD, - tickLower: mintEvent.tickLower, - tickUpper: mintEvent.tickUpper - }); - - // Tick entities. - const lowerTickIdx = mintEvent.tickLower; - const upperTickIdx = mintEvent.tickUpper; - - const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString(); - const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString(); - - const lowerTick = await loadTick(this._db, lowerTickId, BigInt(lowerTickIdx), pool, blockNumber); - const upperTick = await loadTick(this._db, upperTickId, BigInt(upperTickIdx), pool, blockNumber); - - const amount = BigInt(mintEvent.amount); - lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) + amount; - lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) + amount; - upperTick.liquidityGross = BigInt(upperTick.liquidityGross) + amount; - upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; - - // TODO: Update Tick's volume, fees, and liquidity provider count. - // Computing these on the tick level requires reimplementing some of the swapping code from v3-core. - - await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); - await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); - await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp }); - await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); - await updateTokenDayData(this._db, token1, { blockNumber, blockTimestamp }); - await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); - await updateTokenHourData(this._db, token1, { blockNumber, blockTimestamp }); - - await Promise.all([ - this._db.saveToken(token0, blockNumber), - this._db.saveToken(token1, blockNumber) - ]); - - await this._db.savePool(pool, blockNumber); - await this._db.saveFactory(factory, blockNumber); - - await Promise.all([ - await this._db.saveTick(lowerTick, blockNumber), - await this._db.saveTick(upperTick, blockNumber) - ]); - - // Skipping update inner tick vars and tick day data as they are not queried. - } - - async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise { - const { number: blockNumber, timestamp: blockTimestamp } = block; - const { hash: txHash } = tx; - const bundle = await this._db.loadBundle({ id: '1', blockNumber }); - const poolAddress = contractAddress; - const pool = await this._db.loadPool({ id: poolAddress, blockNumber }); - - // TODO: In subgraph factory is fetched by hardcoded factory address. - // Currently fetching first factory in database as only one exists. - const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 }); - - const token0 = pool.token0; - const token1 = pool.token1; - const amount0 = convertTokenToDecimal(burnEvent.amount0, BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(burnEvent.amount1, BigInt(token1.decimals)); - - const amountUSD = amount0 - .times(token0.derivedETH.times(bundle.ethPriceUSD)) - .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); - - // Reset tvl aggregates until new amounts calculated. - factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); - - // Update globals. - factory.txCount = BigInt(factory.txCount) + BigInt(1); - - // Update token0 data. - token0.txCount = BigInt(token0.txCount) + BigInt(1); - token0.totalValueLocked = token0.totalValueLocked.minus(amount0); - token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); - - // Update token1 data. - token1.txCount = BigInt(token1.txCount) + BigInt(1); - token1.totalValueLocked = token1.totalValueLocked.minus(amount1); - token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); - - // Pool data. - pool.txCount = BigInt(pool.txCount) + BigInt(1); - - // Pools liquidity tracks the currently active liquidity given pools current tick. - // We only want to update it on burn if the position being burnt includes the current tick. - if ( - pool.tick !== null && - burnEvent.tickLower <= pool.tick && - burnEvent.tickUpper > pool.tick - ) { - pool.liquidity = pool.liquidity - burnEvent.amount; - } - - pool.totalValueLockedToken0 = pool.totalValueLockedToken0.minus(amount0); - pool.totalValueLockedToken1 = pool.totalValueLockedToken1.minus(amount1); - - pool.totalValueLockedETH = pool.totalValueLockedToken0 - .times(token0.derivedETH) - .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); - - pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); - - // Reset aggregates with new amounts. - factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); - factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); - - // Burn entity. - const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); - - await this._db.loadBurn({ - id: transaction.id + '#' + pool.txCount.toString(), - blockNumber, - transaction, - timestamp: transaction.timestamp, - pool, - token0: pool.token0, - token1: pool.token1, - owner: burnEvent.owner, - - // TODO: Assign origin with Transaction from address. - // origin: event.transaction.from - - amount: burnEvent.amount, - amount0, - amount1, - amountUSD, - tickLower: burnEvent.tickLower, - tickUpper: burnEvent.tickUpper - }); - - // Tick entities. - const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString(); - const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString(); - const lowerTick = await this._db.loadTick({ id: lowerTickId, blockNumber }); - const upperTick = await this._db.loadTick({ id: upperTickId, blockNumber }); - const amount = BigInt(burnEvent.amount); - lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount; - lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) - amount; - upperTick.liquidityGross = BigInt(upperTick.liquidityGross) - amount; - upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; - - await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); - await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); - await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp }); - await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); - await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); - await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); - await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); - - // Skipping update Tick fee and Tick day data as they are not queried. - - await Promise.all([ - await this._db.saveTick(lowerTick, blockNumber), - await this._db.saveTick(upperTick, blockNumber) - ]); - - await Promise.all([ - this._db.saveToken(token0, blockNumber), - this._db.saveToken(token1, blockNumber) - ]); - - await this._db.savePool(pool, blockNumber); - await this._db.saveFactory(factory, blockNumber); - } - - async _handleSwap (block: Block, contractAddress: string, tx: Transaction, swapEvent: SwapEvent): Promise { - const { number: blockNumber, timestamp: blockTimestamp } = block; - const { hash: txHash } = tx; - const bundle = await this._db.loadBundle({ id: '1', blockNumber }); - - // TODO: In subgraph factory is fetched by hardcoded factory address. - // Currently fetching first factory in database as only one exists. - const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 }); - - const pool = await this._db.loadPool({ id: contractAddress, blockNumber }); - - // Hot fix for bad pricing. - if (pool.id === '0x9663f2ca0454accad3e094448ea6f77443880454') { - return; - } - - const [token0, token1] = await Promise.all([ - this._db.getToken({ id: pool.token0.id, blockNumber }), - this._db.getToken({ id: pool.token1.id, blockNumber }) - ]); - - assert(token0 && token1, 'Pool tokens not found.'); - - // Amounts - 0/1 are token deltas. Can be positive or negative. - const amount0 = convertTokenToDecimal(swapEvent.amount0, BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(swapEvent.amount1, BigInt(token1.decimals)); - - // Need absolute amounts for volume. - let amount0Abs = amount0; - let amount1Abs = amount1; - - if (amount0.lt(new Decimal(0))) { - amount0Abs = amount0.times(new Decimal('-1')); - } - - if (amount1.lt(new Decimal(0))) { - amount1Abs = amount1.times(new Decimal('-1')); - } - - const amount0ETH = amount0Abs.times(token0.derivedETH); - const amount1ETH = amount1Abs.times(token1.derivedETH); - const amount0USD = amount0ETH.times(bundle.ethPriceUSD); - const amount1USD = amount1ETH.times(bundle.ethPriceUSD); - - // Get amount that should be tracked only - div 2 because cant count both input and output as volume. - const trackedAmountUSD = await getTrackedAmountUSD(this._db, amount0Abs, token0, amount1Abs, token1); - const amountTotalUSDTracked = trackedAmountUSD.div(new Decimal('2')); - const amountTotalETHTracked = safeDiv(amountTotalUSDTracked, bundle.ethPriceUSD); - const amountTotalUSDUntracked = amount0USD.plus(amount1USD).div(new Decimal('2')); - - const feesETH = amountTotalETHTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); - const feesUSD = amountTotalUSDTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); - - // Global updates. - factory.txCount = BigInt(factory.txCount) + BigInt(1); - factory.totalVolumeETH = factory.totalVolumeETH.plus(amountTotalETHTracked); - factory.totalVolumeUSD = factory.totalVolumeUSD.plus(amountTotalUSDTracked); - factory.untrackedVolumeUSD = factory.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - factory.totalFeesETH = factory.totalFeesETH.plus(feesETH); - factory.totalFeesUSD = factory.totalFeesUSD.plus(feesUSD); - - // Reset aggregate tvl before individual pool tvl updates. - const currentPoolTvlETH = pool.totalValueLockedETH; - factory.totalValueLockedETH = factory.totalValueLockedETH.minus(currentPoolTvlETH); - - // pool volume - pool.volumeToken0 = pool.volumeToken0.plus(amount0Abs); - pool.volumeToken1 = pool.volumeToken1.plus(amount1Abs); - pool.volumeUSD = pool.volumeUSD.plus(amountTotalUSDTracked); - pool.untrackedVolumeUSD = pool.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - pool.feesUSD = pool.feesUSD.plus(feesUSD); - pool.txCount = BigInt(pool.txCount) + BigInt(1); - - // Update the pool with the new active liquidity, price, and tick. - pool.liquidity = swapEvent.liquidity; - pool.tick = BigInt(swapEvent.tick); - pool.sqrtPrice = swapEvent.sqrtPriceX96; - pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); - pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); - - // Update token0 data. - token0.volume = token0.volume.plus(amount0Abs); - token0.totalValueLocked = token0.totalValueLocked.plus(amount0); - token0.volumeUSD = token0.volumeUSD.plus(amountTotalUSDTracked); - token0.untrackedVolumeUSD = token0.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - token0.feesUSD = token0.feesUSD.plus(feesUSD); - token0.txCount = BigInt(token0.txCount) + BigInt(1); - - // Update token1 data. - token1.volume = token1.volume.plus(amount1Abs); - token1.totalValueLocked = token1.totalValueLocked.plus(amount1); - token1.volumeUSD = token1.volumeUSD.plus(amountTotalUSDTracked); - token1.untrackedVolumeUSD = token1.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - token1.feesUSD = token1.feesUSD.plus(feesUSD); - token1.txCount = BigInt(token1.txCount) + BigInt(1); - - // Updated pool rates. - const prices = sqrtPriceX96ToTokenPrices(pool.sqrtPrice, token0 as Token, token1 as Token); - pool.token0Price = prices[0]; - pool.token1Price = prices[1]; - this._db.savePool(pool, blockNumber); - - // Update USD pricing. - bundle.ethPriceUSD = await getEthPriceInUSD(this._db); - this._db.saveBundle(bundle, blockNumber); - token0.derivedETH = await findEthPerToken(token0); - token1.derivedETH = await findEthPerToken(token1); - - /** - * Things afffected by new USD rates. - */ - pool.totalValueLockedETH = pool.totalValueLockedToken0 - .times(token0.derivedETH) - .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); - - pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); - - factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); - factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); - - token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH).times(bundle.ethPriceUSD); - token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH).times(bundle.ethPriceUSD); - - // Create Swap event - const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); - - await this._db.loadSwap({ - id: transaction.id + '#' + pool.txCount.toString(), - blockNumber, - transaction, - timestamp: transaction.timestamp, - pool, - token0: pool.token0, - token1: pool.token1, - sender: swapEvent.sender, - - // TODO: Assign origin with Transaction from address. - // origin: event.transaction.from - - recipient: swapEvent.recipient, - amount0: amount0, - amount1: amount1, - amountUSD: amountTotalUSDTracked, - tick: BigInt(swapEvent.tick), - sqrtPriceX96: swapEvent.sqrtPriceX96 - }); - - // Skipping update pool fee growth as they are not queried. - - // Interval data. - const uniswapDayData = await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); - const poolDayData = await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); - const poolHourData = await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp }); - const token0DayData = await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); - const token1DayData = await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); - const token0HourData = await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); - const token1HourData = await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); - - // Update volume metrics. - uniswapDayData.volumeETH = uniswapDayData.volumeETH.plus(amountTotalETHTracked); - uniswapDayData.volumeUSD = uniswapDayData.volumeUSD.plus(amountTotalUSDTracked); - uniswapDayData.feesUSD = uniswapDayData.feesUSD.plus(feesUSD); - - poolDayData.volumeUSD = poolDayData.volumeUSD.plus(amountTotalUSDTracked); - poolDayData.volumeToken0 = poolDayData.volumeToken0.plus(amount0Abs); - poolDayData.volumeToken1 = poolDayData.volumeToken1.plus(amount1Abs); - poolDayData.feesUSD = poolDayData.feesUSD.plus(feesUSD); - - poolHourData.volumeUSD = poolHourData.volumeUSD.plus(amountTotalUSDTracked); - poolHourData.volumeToken0 = poolHourData.volumeToken0.plus(amount0Abs); - poolHourData.volumeToken1 = poolHourData.volumeToken1.plus(amount1Abs); - poolHourData.feesUSD = poolHourData.feesUSD.plus(feesUSD); - - token0DayData.volume = token0DayData.volume.plus(amount0Abs); - token0DayData.volumeUSD = token0DayData.volumeUSD.plus(amountTotalUSDTracked); - token0DayData.untrackedVolumeUSD = token0DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token0DayData.feesUSD = token0DayData.feesUSD.plus(feesUSD); - - token0HourData.volume = token0HourData.volume.plus(amount0Abs); - token0HourData.volumeUSD = token0HourData.volumeUSD.plus(amountTotalUSDTracked); - token0HourData.untrackedVolumeUSD = token0HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token0HourData.feesUSD = token0HourData.feesUSD.plus(feesUSD); - - token1DayData.volume = token1DayData.volume.plus(amount1Abs); - token1DayData.volumeUSD = token1DayData.volumeUSD.plus(amountTotalUSDTracked); - token1DayData.untrackedVolumeUSD = token1DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token1DayData.feesUSD = token1DayData.feesUSD.plus(feesUSD); - - token1HourData.volume = token1HourData.volume.plus(amount1Abs); - token1HourData.volumeUSD = token1HourData.volumeUSD.plus(amountTotalUSDTracked); - token1HourData.untrackedVolumeUSD = token1HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token1HourData.feesUSD = token1HourData.feesUSD.plus(feesUSD); - - this._db.saveTokenDayData(token0DayData, blockNumber); - this._db.saveTokenDayData(token1DayData, blockNumber); - this._db.saveUniswapDayData(uniswapDayData, blockNumber); - this._db.savePoolDayData(poolDayData, blockNumber); - this._db.saveFactory(factory, blockNumber); - this._db.savePool(pool, blockNumber); - this._db.saveToken(token0, blockNumber); - this._db.saveToken(token1, blockNumber); - - // Skipping update of inner vars of current or crossed ticks as they are not queried. - } - - async _handleIncreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: IncreaseLiquidityEvent): Promise { - const { number: blockNumber } = block; - const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); - - // position was not able to be fetched. - if (position === null) { - return; - } - - // Temp fix from Subgraph mapping code. - if (utils.getAddress(position.pool.id) === utils.getAddress('0x8fe8d9bb8eeba3ed688069c3d6b556c9ca258248')) { - return; - } - - const token0 = position.token0; - const token1 = position.token1; - - const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); - - position.liquidity = BigInt(position.liquidity) + BigInt(event.liquidity); - position.depositedToken0 = position.depositedToken0.plus(amount0); - position.depositedToken1 = position.depositedToken1.plus(amount1); - - await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); - - await this._db.savePosition(position, blockNumber); - - await this._savePositionSnapshot(position, block, tx); - } - - async _handleDecreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: DecreaseLiquidityEvent): Promise { - const { number: blockNumber } = block; - const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); - - // Position was not able to be fetched. - if (position == null) { - return; - } - - // Temp fix from Subgraph mapping code. - if (utils.getAddress(position.pool.id) === utils.getAddress('0x8fe8d9bb8eeba3ed688069c3d6b556c9ca258248')) { - return; - } - - const token0 = position.token0; - const token1 = position.token1; - const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); - - position.liquidity = BigInt(position.liquidity) - BigInt(event.liquidity); - position.depositedToken0 = position.depositedToken0.plus(amount0); - position.depositedToken1 = position.depositedToken1.plus(amount1); - - await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); - - await this._db.savePosition(position, blockNumber); - - await this._savePositionSnapshot(position, block, tx); - } - - async _getPosition (block: Block, contractAddress: string, tx: Transaction, tokenId: bigint): Promise { - const { number: blockNumber, hash: blockHash, timestamp: blockTimestamp } = block; - const { hash: txHash } = tx; - let position = await this._db.getPosition({ id: tokenId.toString(), blockNumber }); - - if (!position) { - const nfpmPosition = await this._uniClient.getPosition(blockHash, tokenId); - - // The contract call reverts in situations where the position is minted and deleted in the same block. - // From my investigation this happens in calls from BancorSwap. - // (e.g. 0xf7867fa19aa65298fadb8d4f72d0daed5e836f3ba01f0b9b9631cdc6c36bed40) - - if (nfpmPosition) { - const { token0: token0Address, token1: token1Address, fee } = await this._uniClient.poolIdToPoolKey(blockHash, nfpmPosition.poolId); - - const { pool: poolAddress } = await this._uniClient.getPool(blockHash, token0Address, token1Address, fee); - - const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); - const pool = await this._db.getPool({ id: poolAddress, blockNumber }); - - const [token0, token1] = await Promise.all([ - this._db.getToken({ id: token0Address, blockNumber }), - this._db.getToken({ id: token0Address, blockNumber }) - ]); - - const [tickLower, tickUpper] = await Promise.all([ - this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockNumber }), - this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockNumber }) - ]); - - position = await this._db.loadPosition({ - id: tokenId.toString(), - blockNumber, - pool, - token0, - token1, - tickLower, - tickUpper, - transaction, - feeGrowthInside0LastX128: BigInt(nfpmPosition.feeGrowthInside0LastX128.toString()), - feeGrowthInside1LastX128: BigInt(nfpmPosition.feeGrowthInside1LastX128.toString()) - }); - } - } - - return position || null; - } - - async _updateFeeVars (position: Position, block: Block, contractAddress: string, tokenId: bigint): Promise { - const nfpmPosition = await this._uniClient.getPosition(block.hash, tokenId); - - if (nfpmPosition) { - position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString()); - position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString()); - } - - return position; - } - - async _savePositionSnapshot (position: Position, block: Block, tx: Transaction): Promise { - const transaction = await loadTransaction(this._db, { txHash: tx.hash, blockNumber: block.number, blockTimestamp: block.timestamp }); - - await this._db.loadPositionSnapshot({ - id: position.id.concat('#').concat(block.number.toString()), - blockNumber: block.number, - owner: position.owner, - pool: position.pool, - position: position, - timestamp: block.timestamp, - liquidity: position.liquidity, - depositedToken0: position.depositedToken0, - depositedToken1: position.depositedToken1, - withdrawnToken0: position.withdrawnToken0, - withdrawnToken1: position.withdrawnToken1, - collectedFeesToken0: position.collectedFeesToken0, - collectedFeesToken1: position.collectedFeesToken1, - transaction, - feeGrowthInside0LastX128: position.feeGrowthInside0LastX128, - feeGrowthInside1LastX128: position.feeGrowthInside1LastX128 - }); - } } diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 63bb7faa..9f80bc9a 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -1,18 +1,21 @@ import assert from 'assert'; import debug from 'debug'; -import { invert } from 'lodash'; import { DeepPartial } from 'typeorm'; import JSONbig from 'json-bigint'; -import { PubSub } from 'apollo-server-express'; - -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { - GetStorageAt - // StorageLayout -} from '@vulcanize/solidity-mapper'; +import { BigNumber, utils } from 'ethers'; +import { Client as UniClient } from '@vulcanize/uni-watcher'; +import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; +import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; +import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; +import { Token } from './entity/Token'; +import { convertTokenToDecimal, loadTransaction, safeDiv } from './utils'; +import { loadTick } from './utils/tick'; +import Decimal from 'decimal.js'; +import { Position } from './entity/Position'; import { Database } from './database'; import { Event } from './entity/Event'; +import { ResultEvent, Block, Transaction, PoolCreatedEvent, InitializeEvent, MintEvent, BurnEvent, SwapEvent, IncreaseLiquidityEvent, DecreaseLiquidityEvent } from './events'; const log = debug('vulcanize:indexer'); @@ -23,211 +26,852 @@ export interface ValueResult { } } -export interface BlockHeight { - number: number; - hash: string; -} - -type EventsResult = Array<{ - event: { - from?: string; - to?: string; - owner?: string; - spender?: string; - value?: BigInt; - __typename: string; - } - proof: string; -}> - export class Indexer { _db: Database - _ethClient: EthClient - _pubsub: PubSub - _getStorageAt: GetStorageAt + _uniClient: UniClient + _erc20Client: ERC20Client - // _abi: JsonFragment[] - // _storageLayout: StorageLayout - // _contract: ethers.utils.Interface - - constructor (db: Database, ethClient: EthClient, pubsub: PubSub) { + constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client) { assert(db); - assert(ethClient); - assert(pubsub); - - // const { abi, storageLayout } = artifacts; - - // assert(abi); - // assert(storageLayout); + assert(uniClient); this._db = db; - this._ethClient = ethClient; - this._pubsub = pubsub; - this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); - - // this._abi = abi; - // this._storageLayout = storageLayout; - - // this._contract = new ethers.utils.Interface(this._abi); + this._uniClient = uniClient; + this._erc20Client = erc20Client; } - getEventIterator (): AsyncIterator { - return this._pubsub.asyncIterator(['event']); + getResultEvent (event: Event): ResultEvent { + const block = event.block; + const eventFields = JSON.parse(event.eventInfo); + + return { + block: { + hash: block.blockHash, + number: block.blockNumber, + timestamp: block.blockTimestamp, + parentHash: block.parentHash + }, + + tx: { + hash: event.txHash + }, + + contract: event.contract, + + eventIndex: event.index, + event: { + __typename: event.eventName, + ...eventFields + }, + + proof: JSON.parse(event.proof) + }; } - async getEvents (blockHash: string, token: string, name: string | null): Promise { - const didSyncEvents = await this._db.didSyncEvents({ blockHash, token }); - if (!didSyncEvents) { + // Note: Some event names might be unknown at this point, as earlier events might not yet be processed. + async getOrFetchBlockEvents (block: Block): Promise> { + const blockProgress = await this._db.getBlockProgress(block.hash); + + if (!blockProgress) { // Fetch and save events first and make a note in the event sync progress table. - await this._fetchAndSaveEvents({ blockHash, token }); - log('getEvents: db miss, fetching from upstream server'); + await this._fetchAndSaveEvents(block); + log('getBlockEvents: db miss, fetching from upstream server'); } - assert(await this._db.didSyncEvents({ blockHash, token })); + const events = await this._db.getBlockEvents(block.hash); + log(`getBlockEvents: db hit, num events: ${events.length}`); - const events = await this._db.getEvents({ blockHash, token }); - log('getEvents: db hit'); - - const result = events - // TODO: Filter using db WHERE condition when name is not empty. - .filter(event => !name || name === event.eventName) - .map(e => { - const eventFields: { - from?: string, - to?: string, - value?: BigInt, - owner?: string, - spender?: string, - } = {}; - - // switch (e.eventName) { - // // TODO: Handle events. - // } - - return { - event: { - __typename: `${e.eventName}Event`, - ...eventFields - }, - // TODO: Return proof only if requested. - proof: JSON.parse(e.proof) - }; - }); - - // log(JSONbig.stringify(result, null, 2)); - - return result; + return events; } - async triggerIndexingOnEvent (blockHash: string, token: string, receipt: any, logIndex: number): Promise { - const topics = []; + async processEvent (dbEvent: Event): Promise { + const resultEvent = this.getResultEvent(dbEvent); - // We only care about the event type for now. - // const data = '0x0000000000000000000000000000000000000000000000000000000000000000'; + // TODO: Process proof (proof.data) in event. + const { contract, block, tx, event } = resultEvent; + const { __typename: eventType } = event; - topics.push(receipt.topic0S[logIndex]); - topics.push(receipt.topic1S[logIndex]); - topics.push(receipt.topic2S[logIndex]); + switch (eventType) { + case 'PoolCreatedEvent': + log('Factory PoolCreated event', contract); + this._handlePoolCreated(block, contract, tx, event as PoolCreatedEvent); + break; - // const { name: eventName, args } = this._contract.parseLog({ topics, data }); - // log(`trigger indexing on event: ${eventName} ${args}`); + case 'InitializeEvent': + log('Pool Initialize event', contract); + this._handleInitialize(block, contract, tx, event as InitializeEvent); + break; - // What data we index depends on the kind of event. - // switch (eventName) { - // TODO: Index event. - // } + case 'MintEvent': + log('Pool Mint event', contract); + this._handleMint(block, contract, tx, event as MintEvent); + break; + + case 'BurnEvent': + log('Pool Burn event', contract); + this._handleBurn(block, contract, tx, event as BurnEvent); + break; + + case 'SwapEvent': + log('Pool Swap event', contract); + this._handleSwap(block, contract, tx, event as SwapEvent); + break; + + case 'IncreaseLiquidityEvent': + log('NFPM IncreaseLiquidity event', contract); + this._handleIncreaseLiquidity(block, contract, tx, event as IncreaseLiquidityEvent); + break; + + case 'DecreaseLiquidityEvent': + log('NFPM DecreaseLiquidity event', contract); + this._handleDecreaseLiquidity(block, contract, tx, event as DecreaseLiquidityEvent); + break; + + default: + break; + } } - async publishEventToSubscribers (blockHash: string, token: string, logIndex: number): Promise { - // TODO: Optimize this fetching of events. - const events = await this.getEvents(blockHash, token, null); - const event = events[logIndex]; - - log(`pushing event to GQL subscribers: ${event.event.__typename}`); - - // Publishing the event here will result in pushing the payload to GQL subscribers for `onTokenEvent`. - await this._pubsub.publish('event', { - onTokenEvent: { - blockHash, - token, - event - } - }); + async getEvent (id: string): Promise { + return this._db.getEvent(id); } - async isUniswapContract (address: string): Promise { - // TODO: Return true for uniswap contracts of interest to the indexer (from config?). - return address != null; + async updateBlockProgress (blockHash: string): Promise { + return this._db.updateBlockProgress(blockHash); } - async processEvent (blockHash: string, token: string, receipt: any, logIndex: number): Promise { - // Trigger indexing of data based on the event. - await this.triggerIndexingOnEvent(blockHash, token, receipt, logIndex); + async _fetchAndSaveEvents (block: Block): Promise { + const events = await this._uniClient.getEvents(block.hash); + const dbEvents: Array> = []; - // Also trigger downstream event watcher subscriptions. - await this.publishEventToSubscribers(blockHash, token, logIndex); - } + for (let i = 0; i < events.length; i++) { + const { + tx, + contract, + eventIndex, + event, + proof + } = events[i]; - // TODO: Move into base/class or framework package. - async _getStorageValue ( - // blockHash: string, - // token: string, - // variable: string, - // ...mappingKeys: string[] - ): Promise { - return { - value: '', - proof: { - data: '' - } - }; + const { __typename: eventName, ...eventInfo } = event; - // return getStorageValue( - // this._storageLayout, - // this._getStorageAt, - // blockHash, - // token, - // variable, - // ...mappingKeys - // ); - } - - async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { - const { logs } = await this._ethClient.getLogs({ blockHash, contract: token }); - - const eventNameToTopic = {}; // getEventNameTopics(this._abi); - const logTopicToEventName = invert(eventNameToTopic); - - const dbEvents = logs.map((log: any) => { - const { topics, cid, ipldBlock } = log; - - const [topic0] = topics; - - const eventName = logTopicToEventName[topic0]; - - const event: DeepPartial = { - blockHash, - token, + dbEvents.push({ + index: eventIndex, + txHash: tx.hash, + contract, eventName, + eventInfo: JSONbig.stringify(eventInfo), + proof: JSONbig.stringify(proof) + }); + } - proof: JSONbig.stringify({ - data: JSONbig.stringify({ - blockHash, - receipt: { - cid, - ipldBlock - } - }) - }) - }; + await this._db.saveEvents(block, dbEvents); + } - // switch (eventName) { - // // TODO: Handle event. - // } + async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise { + const { number: blockNumber, hash: blockHash } = block; + const { token0: token0Address, token1: token1Address, fee, pool: poolAddress } = poolCreatedEvent; - return event; + // Load factory. + const factory = await this._db.loadFactory({ blockNumber, id: contractAddress }); + + // Update Factory. + let factoryPoolCount = BigNumber.from(factory.poolCount); + factoryPoolCount = factoryPoolCount.add(1); + factory.poolCount = BigInt(factoryPoolCount.toHexString()); + + // Get Tokens. + let [token0, token1] = await Promise.all([ + this._db.getToken({ blockNumber, id: token0Address }), + this._db.getToken({ blockNumber, id: token1Address }) + ]); + + // Create Tokens if not present. + if (!token0) { + token0 = await this._createToken(blockHash, blockNumber, token0Address); + } + + if (!token1) { + token1 = await this._createToken(blockHash, blockNumber, token1Address); + } + + // Create new Pool entity. + // Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph. + const pool = await this._db.loadPool({ + blockNumber, + id: poolAddress, + token0: token0, + token1: token1, + feeTier: BigInt(fee) }); - await this._db.saveEvents({ blockHash, token, events: dbEvents }); + // Update white listed pools. + if (WHITELIST_TOKENS.includes(token0.id)) { + token1.whitelistPools.push(pool); + await this._db.saveToken(token1, blockNumber); + } + + if (WHITELIST_TOKENS.includes(token1.id)) { + token0.whitelistPools.push(pool); + await this._db.saveToken(token0, blockNumber); + } + + // Save entities to DB. + await this._db.saveFactory(factory, blockNumber); + } + + /** + * Create new Token. + * @param tokenAddress + */ + async _createToken (blockHash: string, blockNumber: number, tokenAddress: string): Promise { + const { value: symbol } = await this._erc20Client.getSymbol(blockHash, tokenAddress); + const { value: name } = await this._erc20Client.getName(blockHash, tokenAddress); + const { value: totalSupply } = await this._erc20Client.getTotalSupply(blockHash, tokenAddress); + + // TODO: Decimals not implemented by erc20-watcher. + // const { value: decimals } = await this._erc20Client.getDecimals(blockHash, tokenAddress); + + return this._db.loadToken({ + blockNumber, + id: tokenAddress, + symbol, + name, + totalSupply + }); + } + + async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise { + const { number: blockNumber, timestamp: blockTimestamp } = block; + const { sqrtPriceX96, tick } = initializeEvent; + const pool = await this._db.getPool({ id: contractAddress, blockNumber }); + assert(pool, `Pool ${contractAddress} not found.`); + + // Update Pool. + pool.sqrtPrice = BigInt(sqrtPriceX96); + pool.tick = BigInt(tick); + this._db.savePool(pool, blockNumber); + + // Update ETH price now that prices could have changed. + const bundle = await this._db.loadBundle({ id: '1', blockNumber }); + bundle.ethPriceUSD = await getEthPriceInUSD(this._db); + this._db.saveBundle(bundle, blockNumber); + + await updatePoolDayData(this._db, { contractAddress, blockNumber, blockTimestamp }); + await updatePoolHourData(this._db, { contractAddress, blockNumber, blockTimestamp }); + + const [token0, token1] = await Promise.all([ + this._db.getToken({ id: pool.token0.id, blockNumber }), + this._db.getToken({ id: pool.token1.id, blockNumber }) + ]); + + assert(token0 && token1, 'Pool tokens not found.'); + + // Update token prices. + token0.derivedETH = await findEthPerToken(token0); + token1.derivedETH = await findEthPerToken(token1); + + await Promise.all([ + this._db.saveToken(token0, blockNumber), + this._db.saveToken(token1, blockNumber) + ]); + } + + async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise { + const { number: blockNumber, timestamp: blockTimestamp } = block; + const { hash: txHash } = tx; + const bundle = await this._db.loadBundle({ id: '1', blockNumber }); + const poolAddress = contractAddress; + const pool = await this._db.loadPool({ id: poolAddress, blockNumber }); + + // TODO: In subgraph factory is fetched by hardcoded factory address. + // Currently fetching first factory in database as only one exists. + const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 }); + + const token0 = pool.token0; + const token1 = pool.token1; + const amount0 = convertTokenToDecimal(mintEvent.amount0, BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(mintEvent.amount1, BigInt(token1.decimals)); + + const amountUSD = amount0 + .times(token0.derivedETH.times(bundle.ethPriceUSD)) + .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); + + // Reset tvl aggregates until new amounts calculated. + factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); + + // Update globals. + factory.txCount = BigInt(factory.txCount) + BigInt(1); + + // Update token0 data. + token0.txCount = BigInt(token0.txCount) + BigInt(1); + token0.totalValueLocked = token0.totalValueLocked.plus(amount0); + token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); + + // Update token1 data. + token1.txCount = BigInt(token1.txCount) + BigInt(1); + token1.totalValueLocked = token1.totalValueLocked.plus(amount1); + token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); + + // Pool data. + pool.txCount = BigInt(pool.txCount) + BigInt(1); + + // Pools liquidity tracks the currently active liquidity given pools current tick. + // We only want to update it on mint if the new position includes the current tick. + if (pool.tick !== null) { + if ( + BigInt(mintEvent.tickLower) <= BigInt(pool.tick) && + BigInt(mintEvent.tickUpper) > BigInt(pool.tick) + ) { + pool.liquidity = BigInt(pool.liquidity) + BigInt(mintEvent.amount); + } + } + + pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); + pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); + + pool.totalValueLockedETH = pool.totalValueLockedToken0.times(token0.derivedETH) + .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); + + pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); + + // Reset aggregates with new amounts. + factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); + factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); + + const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); + + await this._db.loadMint({ + id: transaction.id + '#' + pool.txCount.toString(), + blockNumber, + transaction, + timestamp: transaction.timestamp, + pool, + token0: pool.token0, + token1: pool.token1, + owner: mintEvent.owner, + sender: mintEvent.sender, + + // TODO: Assign origin with Transaction from address. + // origin: event.transaction.from + + amount: mintEvent.amount, + amount0: amount0, + amount1: amount1, + amountUSD: amountUSD, + tickLower: mintEvent.tickLower, + tickUpper: mintEvent.tickUpper + }); + + // Tick entities. + const lowerTickIdx = mintEvent.tickLower; + const upperTickIdx = mintEvent.tickUpper; + + const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString(); + const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString(); + + const lowerTick = await loadTick(this._db, lowerTickId, BigInt(lowerTickIdx), pool, blockNumber); + const upperTick = await loadTick(this._db, upperTickId, BigInt(upperTickIdx), pool, blockNumber); + + const amount = BigInt(mintEvent.amount); + lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) + amount; + lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) + amount; + upperTick.liquidityGross = BigInt(upperTick.liquidityGross) + amount; + upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; + + // TODO: Update Tick's volume, fees, and liquidity provider count. + // Computing these on the tick level requires reimplementing some of the swapping code from v3-core. + + await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); + await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); + await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp }); + await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); + await updateTokenDayData(this._db, token1, { blockNumber, blockTimestamp }); + await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); + await updateTokenHourData(this._db, token1, { blockNumber, blockTimestamp }); + + await Promise.all([ + this._db.saveToken(token0, blockNumber), + this._db.saveToken(token1, blockNumber) + ]); + + await this._db.savePool(pool, blockNumber); + await this._db.saveFactory(factory, blockNumber); + + await Promise.all([ + await this._db.saveTick(lowerTick, blockNumber), + await this._db.saveTick(upperTick, blockNumber) + ]); + + // Skipping update inner tick vars and tick day data as they are not queried. + } + + async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise { + const { number: blockNumber, timestamp: blockTimestamp } = block; + const { hash: txHash } = tx; + const bundle = await this._db.loadBundle({ id: '1', blockNumber }); + const poolAddress = contractAddress; + const pool = await this._db.loadPool({ id: poolAddress, blockNumber }); + + // TODO: In subgraph factory is fetched by hardcoded factory address. + // Currently fetching first factory in database as only one exists. + const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 }); + + const token0 = pool.token0; + const token1 = pool.token1; + const amount0 = convertTokenToDecimal(burnEvent.amount0, BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(burnEvent.amount1, BigInt(token1.decimals)); + + const amountUSD = amount0 + .times(token0.derivedETH.times(bundle.ethPriceUSD)) + .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); + + // Reset tvl aggregates until new amounts calculated. + factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); + + // Update globals. + factory.txCount = BigInt(factory.txCount) + BigInt(1); + + // Update token0 data. + token0.txCount = BigInt(token0.txCount) + BigInt(1); + token0.totalValueLocked = token0.totalValueLocked.minus(amount0); + token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); + + // Update token1 data. + token1.txCount = BigInt(token1.txCount) + BigInt(1); + token1.totalValueLocked = token1.totalValueLocked.minus(amount1); + token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); + + // Pool data. + pool.txCount = BigInt(pool.txCount) + BigInt(1); + + // Pools liquidity tracks the currently active liquidity given pools current tick. + // We only want to update it on burn if the position being burnt includes the current tick. + if ( + pool.tick !== null && + burnEvent.tickLower <= pool.tick && + burnEvent.tickUpper > pool.tick + ) { + pool.liquidity = pool.liquidity - burnEvent.amount; + } + + pool.totalValueLockedToken0 = pool.totalValueLockedToken0.minus(amount0); + pool.totalValueLockedToken1 = pool.totalValueLockedToken1.minus(amount1); + + pool.totalValueLockedETH = pool.totalValueLockedToken0 + .times(token0.derivedETH) + .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); + + pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); + + // Reset aggregates with new amounts. + factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); + factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); + + // Burn entity. + const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); + + await this._db.loadBurn({ + id: transaction.id + '#' + pool.txCount.toString(), + blockNumber, + transaction, + timestamp: transaction.timestamp, + pool, + token0: pool.token0, + token1: pool.token1, + owner: burnEvent.owner, + + // TODO: Assign origin with Transaction from address. + // origin: event.transaction.from + + amount: burnEvent.amount, + amount0, + amount1, + amountUSD, + tickLower: burnEvent.tickLower, + tickUpper: burnEvent.tickUpper + }); + + // Tick entities. + const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString(); + const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString(); + const lowerTick = await this._db.loadTick({ id: lowerTickId, blockNumber }); + const upperTick = await this._db.loadTick({ id: upperTickId, blockNumber }); + const amount = BigInt(burnEvent.amount); + lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount; + lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) - amount; + upperTick.liquidityGross = BigInt(upperTick.liquidityGross) - amount; + upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; + + await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); + await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); + await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp }); + await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); + await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); + await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); + await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); + + // Skipping update Tick fee and Tick day data as they are not queried. + + await Promise.all([ + await this._db.saveTick(lowerTick, blockNumber), + await this._db.saveTick(upperTick, blockNumber) + ]); + + await Promise.all([ + this._db.saveToken(token0, blockNumber), + this._db.saveToken(token1, blockNumber) + ]); + + await this._db.savePool(pool, blockNumber); + await this._db.saveFactory(factory, blockNumber); + } + + async _handleSwap (block: Block, contractAddress: string, tx: Transaction, swapEvent: SwapEvent): Promise { + const { number: blockNumber, timestamp: blockTimestamp } = block; + const { hash: txHash } = tx; + const bundle = await this._db.loadBundle({ id: '1', blockNumber }); + + // TODO: In subgraph factory is fetched by hardcoded factory address. + // Currently fetching first factory in database as only one exists. + const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 }); + + const pool = await this._db.loadPool({ id: contractAddress, blockNumber }); + + // Hot fix for bad pricing. + if (pool.id === '0x9663f2ca0454accad3e094448ea6f77443880454') { + return; + } + + const [token0, token1] = await Promise.all([ + this._db.getToken({ id: pool.token0.id, blockNumber }), + this._db.getToken({ id: pool.token1.id, blockNumber }) + ]); + + assert(token0 && token1, 'Pool tokens not found.'); + + // Amounts - 0/1 are token deltas. Can be positive or negative. + const amount0 = convertTokenToDecimal(swapEvent.amount0, BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(swapEvent.amount1, BigInt(token1.decimals)); + + // Need absolute amounts for volume. + let amount0Abs = amount0; + let amount1Abs = amount1; + + if (amount0.lt(new Decimal(0))) { + amount0Abs = amount0.times(new Decimal('-1')); + } + + if (amount1.lt(new Decimal(0))) { + amount1Abs = amount1.times(new Decimal('-1')); + } + + const amount0ETH = amount0Abs.times(token0.derivedETH); + const amount1ETH = amount1Abs.times(token1.derivedETH); + const amount0USD = amount0ETH.times(bundle.ethPriceUSD); + const amount1USD = amount1ETH.times(bundle.ethPriceUSD); + + // Get amount that should be tracked only - div 2 because cant count both input and output as volume. + const trackedAmountUSD = await getTrackedAmountUSD(this._db, amount0Abs, token0, amount1Abs, token1); + const amountTotalUSDTracked = trackedAmountUSD.div(new Decimal('2')); + const amountTotalETHTracked = safeDiv(amountTotalUSDTracked, bundle.ethPriceUSD); + const amountTotalUSDUntracked = amount0USD.plus(amount1USD).div(new Decimal('2')); + + const feesETH = amountTotalETHTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); + const feesUSD = amountTotalUSDTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); + + // Global updates. + factory.txCount = BigInt(factory.txCount) + BigInt(1); + factory.totalVolumeETH = factory.totalVolumeETH.plus(amountTotalETHTracked); + factory.totalVolumeUSD = factory.totalVolumeUSD.plus(amountTotalUSDTracked); + factory.untrackedVolumeUSD = factory.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + factory.totalFeesETH = factory.totalFeesETH.plus(feesETH); + factory.totalFeesUSD = factory.totalFeesUSD.plus(feesUSD); + + // Reset aggregate tvl before individual pool tvl updates. + const currentPoolTvlETH = pool.totalValueLockedETH; + factory.totalValueLockedETH = factory.totalValueLockedETH.minus(currentPoolTvlETH); + + // pool volume + pool.volumeToken0 = pool.volumeToken0.plus(amount0Abs); + pool.volumeToken1 = pool.volumeToken1.plus(amount1Abs); + pool.volumeUSD = pool.volumeUSD.plus(amountTotalUSDTracked); + pool.untrackedVolumeUSD = pool.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + pool.feesUSD = pool.feesUSD.plus(feesUSD); + pool.txCount = BigInt(pool.txCount) + BigInt(1); + + // Update the pool with the new active liquidity, price, and tick. + pool.liquidity = swapEvent.liquidity; + pool.tick = BigInt(swapEvent.tick); + pool.sqrtPrice = swapEvent.sqrtPriceX96; + pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); + pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); + + // Update token0 data. + token0.volume = token0.volume.plus(amount0Abs); + token0.totalValueLocked = token0.totalValueLocked.plus(amount0); + token0.volumeUSD = token0.volumeUSD.plus(amountTotalUSDTracked); + token0.untrackedVolumeUSD = token0.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + token0.feesUSD = token0.feesUSD.plus(feesUSD); + token0.txCount = BigInt(token0.txCount) + BigInt(1); + + // Update token1 data. + token1.volume = token1.volume.plus(amount1Abs); + token1.totalValueLocked = token1.totalValueLocked.plus(amount1); + token1.volumeUSD = token1.volumeUSD.plus(amountTotalUSDTracked); + token1.untrackedVolumeUSD = token1.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + token1.feesUSD = token1.feesUSD.plus(feesUSD); + token1.txCount = BigInt(token1.txCount) + BigInt(1); + + // Updated pool rates. + const prices = sqrtPriceX96ToTokenPrices(pool.sqrtPrice, token0 as Token, token1 as Token); + pool.token0Price = prices[0]; + pool.token1Price = prices[1]; + this._db.savePool(pool, blockNumber); + + // Update USD pricing. + bundle.ethPriceUSD = await getEthPriceInUSD(this._db); + this._db.saveBundle(bundle, blockNumber); + token0.derivedETH = await findEthPerToken(token0); + token1.derivedETH = await findEthPerToken(token1); + + /** + * Things afffected by new USD rates. + */ + pool.totalValueLockedETH = pool.totalValueLockedToken0 + .times(token0.derivedETH) + .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); + + pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); + + factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); + factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); + + token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH).times(bundle.ethPriceUSD); + token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH).times(bundle.ethPriceUSD); + + // Create Swap event + const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); + + await this._db.loadSwap({ + id: transaction.id + '#' + pool.txCount.toString(), + blockNumber, + transaction, + timestamp: transaction.timestamp, + pool, + token0: pool.token0, + token1: pool.token1, + sender: swapEvent.sender, + + // TODO: Assign origin with Transaction from address. + // origin: event.transaction.from + + recipient: swapEvent.recipient, + amount0: amount0, + amount1: amount1, + amountUSD: amountTotalUSDTracked, + tick: BigInt(swapEvent.tick), + sqrtPriceX96: swapEvent.sqrtPriceX96 + }); + + // Skipping update pool fee growth as they are not queried. + + // Interval data. + const uniswapDayData = await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); + const poolDayData = await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp }); + const poolHourData = await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp }); + const token0DayData = await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); + const token1DayData = await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp }); + const token0HourData = await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); + const token1HourData = await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp }); + + // Update volume metrics. + uniswapDayData.volumeETH = uniswapDayData.volumeETH.plus(amountTotalETHTracked); + uniswapDayData.volumeUSD = uniswapDayData.volumeUSD.plus(amountTotalUSDTracked); + uniswapDayData.feesUSD = uniswapDayData.feesUSD.plus(feesUSD); + + poolDayData.volumeUSD = poolDayData.volumeUSD.plus(amountTotalUSDTracked); + poolDayData.volumeToken0 = poolDayData.volumeToken0.plus(amount0Abs); + poolDayData.volumeToken1 = poolDayData.volumeToken1.plus(amount1Abs); + poolDayData.feesUSD = poolDayData.feesUSD.plus(feesUSD); + + poolHourData.volumeUSD = poolHourData.volumeUSD.plus(amountTotalUSDTracked); + poolHourData.volumeToken0 = poolHourData.volumeToken0.plus(amount0Abs); + poolHourData.volumeToken1 = poolHourData.volumeToken1.plus(amount1Abs); + poolHourData.feesUSD = poolHourData.feesUSD.plus(feesUSD); + + token0DayData.volume = token0DayData.volume.plus(amount0Abs); + token0DayData.volumeUSD = token0DayData.volumeUSD.plus(amountTotalUSDTracked); + token0DayData.untrackedVolumeUSD = token0DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token0DayData.feesUSD = token0DayData.feesUSD.plus(feesUSD); + + token0HourData.volume = token0HourData.volume.plus(amount0Abs); + token0HourData.volumeUSD = token0HourData.volumeUSD.plus(amountTotalUSDTracked); + token0HourData.untrackedVolumeUSD = token0HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token0HourData.feesUSD = token0HourData.feesUSD.plus(feesUSD); + + token1DayData.volume = token1DayData.volume.plus(amount1Abs); + token1DayData.volumeUSD = token1DayData.volumeUSD.plus(amountTotalUSDTracked); + token1DayData.untrackedVolumeUSD = token1DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token1DayData.feesUSD = token1DayData.feesUSD.plus(feesUSD); + + token1HourData.volume = token1HourData.volume.plus(amount1Abs); + token1HourData.volumeUSD = token1HourData.volumeUSD.plus(amountTotalUSDTracked); + token1HourData.untrackedVolumeUSD = token1HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token1HourData.feesUSD = token1HourData.feesUSD.plus(feesUSD); + + this._db.saveTokenDayData(token0DayData, blockNumber); + this._db.saveTokenDayData(token1DayData, blockNumber); + this._db.saveUniswapDayData(uniswapDayData, blockNumber); + this._db.savePoolDayData(poolDayData, blockNumber); + this._db.saveFactory(factory, blockNumber); + this._db.savePool(pool, blockNumber); + this._db.saveToken(token0, blockNumber); + this._db.saveToken(token1, blockNumber); + + // Skipping update of inner vars of current or crossed ticks as they are not queried. + } + + async _handleIncreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: IncreaseLiquidityEvent): Promise { + const { number: blockNumber } = block; + const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); + + // position was not able to be fetched. + if (position === null) { + return; + } + + // Temp fix from Subgraph mapping code. + if (utils.getAddress(position.pool.id) === utils.getAddress('0x8fe8d9bb8eeba3ed688069c3d6b556c9ca258248')) { + return; + } + + const token0 = position.token0; + const token1 = position.token1; + + const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); + + position.liquidity = BigInt(position.liquidity) + BigInt(event.liquidity); + position.depositedToken0 = position.depositedToken0.plus(amount0); + position.depositedToken1 = position.depositedToken1.plus(amount1); + + await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); + + await this._db.savePosition(position, blockNumber); + + await this._savePositionSnapshot(position, block, tx); + } + + async _handleDecreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: DecreaseLiquidityEvent): Promise { + const { number: blockNumber } = block; + const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); + + // Position was not able to be fetched. + if (position == null) { + return; + } + + // Temp fix from Subgraph mapping code. + if (utils.getAddress(position.pool.id) === utils.getAddress('0x8fe8d9bb8eeba3ed688069c3d6b556c9ca258248')) { + return; + } + + const token0 = position.token0; + const token1 = position.token1; + const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); + + position.liquidity = BigInt(position.liquidity) - BigInt(event.liquidity); + position.depositedToken0 = position.depositedToken0.plus(amount0); + position.depositedToken1 = position.depositedToken1.plus(amount1); + + await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); + + await this._db.savePosition(position, blockNumber); + + await this._savePositionSnapshot(position, block, tx); + } + + async _getPosition (block: Block, contractAddress: string, tx: Transaction, tokenId: bigint): Promise { + const { number: blockNumber, hash: blockHash, timestamp: blockTimestamp } = block; + const { hash: txHash } = tx; + let position = await this._db.getPosition({ id: tokenId.toString(), blockNumber }); + + if (!position) { + const nfpmPosition = await this._uniClient.getPosition(blockHash, tokenId); + + // The contract call reverts in situations where the position is minted and deleted in the same block. + // From my investigation this happens in calls from BancorSwap. + // (e.g. 0xf7867fa19aa65298fadb8d4f72d0daed5e836f3ba01f0b9b9631cdc6c36bed40) + + if (nfpmPosition) { + const { token0: token0Address, token1: token1Address, fee } = await this._uniClient.poolIdToPoolKey(blockHash, nfpmPosition.poolId); + + const { pool: poolAddress } = await this._uniClient.getPool(blockHash, token0Address, token1Address, fee); + + const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp }); + const pool = await this._db.getPool({ id: poolAddress, blockNumber }); + + const [token0, token1] = await Promise.all([ + this._db.getToken({ id: token0Address, blockNumber }), + this._db.getToken({ id: token0Address, blockNumber }) + ]); + + const [tickLower, tickUpper] = await Promise.all([ + this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockNumber }), + this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockNumber }) + ]); + + position = await this._db.loadPosition({ + id: tokenId.toString(), + blockNumber, + pool, + token0, + token1, + tickLower, + tickUpper, + transaction, + feeGrowthInside0LastX128: BigInt(nfpmPosition.feeGrowthInside0LastX128.toString()), + feeGrowthInside1LastX128: BigInt(nfpmPosition.feeGrowthInside1LastX128.toString()) + }); + } + } + + return position || null; + } + + async _updateFeeVars (position: Position, block: Block, contractAddress: string, tokenId: bigint): Promise { + const nfpmPosition = await this._uniClient.getPosition(block.hash, tokenId); + + if (nfpmPosition) { + position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString()); + position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString()); + } + + return position; + } + + async _savePositionSnapshot (position: Position, block: Block, tx: Transaction): Promise { + const transaction = await loadTransaction(this._db, { txHash: tx.hash, blockNumber: block.number, blockTimestamp: block.timestamp }); + + await this._db.loadPositionSnapshot({ + id: position.id.concat('#').concat(block.number.toString()), + blockNumber: block.number, + owner: position.owner, + pool: position.pool, + position: position, + timestamp: block.timestamp, + liquidity: position.liquidity, + depositedToken0: position.depositedToken0, + depositedToken1: position.depositedToken1, + withdrawnToken0: position.withdrawnToken0, + withdrawnToken1: position.withdrawnToken1, + collectedFeesToken0: position.collectedFeesToken0, + collectedFeesToken1: position.collectedFeesToken1, + transaction, + feeGrowthInside0LastX128: position.feeGrowthInside0LastX128, + feeGrowthInside1LastX128: position.feeGrowthInside1LastX128 + }); } } diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts new file mode 100644 index 00000000..75f8fab0 --- /dev/null +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -0,0 +1,97 @@ +import assert from 'assert'; +import 'reflect-metadata'; +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import debug from 'debug'; + +import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; +import { Client as UniClient } from '@vulcanize/uni-watcher'; +import { getConfig, JobQueue } from '@vulcanize/util'; + +import { Indexer } from './indexer'; +import { Database } from './database'; +import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './events'; + +const log = debug('vulcanize:job-runner'); + +export const main = async (): Promise => { + const argv = await yargs(hideBin(process.argv)) + .option('f', { + alias: 'config-file', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string' + }) + .argv; + + const config = await getConfig(argv.f); + + assert(config.server, 'Missing server config'); + + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; + + assert(dbConfig, 'Missing database config'); + + const db = new Database(dbConfig); + await db.init(); + + assert(upstream, 'Missing upstream config'); + const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint }, tokenWatcher } = upstream; + assert(gqlEndpoint, 'Missing upstream uniWatcher.gqlEndpoint'); + assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint'); + + const uniClient = new UniClient({ + gqlEndpoint, + gqlSubscriptionEndpoint + }); + + const erc20Client = new ERC20Client(tokenWatcher); + + const indexer = new Indexer(db, uniClient, erc20Client); + + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + await jobQueue.start(); + + await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { + const { data: { block } } = job; + log(`Processing block hash ${block.hash} number ${block.number}`); + const events = await indexer.getOrFetchBlockEvents(block); + + for (let ei = 0; ei < events.length; ei++) { + const { id } = events[ei]; + await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id }); + } + + await jobQueue.markComplete(job); + }); + + await jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { + const { data: { id } } = job; + + log(`Processing event ${id}`); + const dbEvent = await db.getEvent(id); + assert(dbEvent); + + if (!dbEvent.block.isComplete) { + await indexer.processEvent(dbEvent); + await indexer.updateBlockProgress(dbEvent.block.blockHash); + } + + await jobQueue.markComplete(job); + }); +}; + +main().then(() => { + log('Starting job runner...'); +}).catch(err => { + log(err); +}); + +process.on('uncaughtException', err => { + log('uncaughtException', err); +}); diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 60e5f704..0c34fa9b 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -1,18 +1,16 @@ import assert from 'assert'; import 'reflect-metadata'; import express, { Application } from 'express'; -import { ApolloServer, PubSub } from 'apollo-server-express'; +import { ApolloServer } from 'apollo-server-express'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; import { createServer } from 'http'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; -import { getConfig } from '@vulcanize/util'; +import { getConfig, JobQueue } from '@vulcanize/util'; import typeDefs from './schema'; @@ -40,7 +38,7 @@ export const main = async (): Promise => { const { host, port } = config.server; - const { upstream, database: dbConfig } = config; + const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; assert(dbConfig, 'Missing database config'); @@ -53,7 +51,6 @@ export const main = async (): Promise => { gqlApiEndpoint, gqlPostgraphileEndpoint }, - cache: cacheConfig, uniWatcher, tokenWatcher } = upstream; @@ -61,22 +58,19 @@ export const main = async (): Promise => { assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - const cache = await getCache(cacheConfig); - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - const uniClient = new UniClient(uniWatcher); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); const erc20Client = new ERC20Client(tokenWatcher); - const indexer = new Indexer(db, ethClient, pubsub); + const indexer = new Indexer(db, uniClient, erc20Client); - const eventWatcher = new EventWatcher(db, uniClient, erc20Client); + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + await jobQueue.start(); + + const eventWatcher = new EventWatcher(indexer, uniClient, jobQueue); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer); @@ -105,3 +99,7 @@ main().then(() => { }).catch(err => { log(err); }); + +process.on('uncaughtException', err => { + log('uncaughtException', err); +}); diff --git a/packages/uni-watcher/src/client.ts b/packages/uni-watcher/src/client.ts index 14322d13..a1c7555b 100644 --- a/packages/uni-watcher/src/client.ts +++ b/packages/uni-watcher/src/client.ts @@ -1,7 +1,7 @@ import { gql } from '@apollo/client/core'; import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client'; -import { queryGetPool, queryPoolIdToPoolKey, queryPosition } from './queries'; +import { queryGetPool, queryPoolIdToPoolKey, queryPosition, queryEvents, subscribeEvents } from './queries'; export class Client { _config: GraphQLConfig; @@ -15,89 +15,25 @@ export class Client { async watchEvents (onNext: (value: any) => void): Promise { return this._client.subscribe( - gql` - subscription SubscriptionReceipt { - onEvent { - block { - number - hash - timestamp - } - contract - tx { - hash - } - proof { - data - } - event { - __typename - - ... on PoolCreatedEvent { - token0 - token1 - fee - tickSpacing - pool - } - - ... on InitializeEvent { - sqrtPriceX96 - tick - } - - ... on MintEvent { - sender - owner - tickLower - tickUpper - amount - amount0 - amount1 - } - - ... on BurnEvent { - owner - tickLower - tickUpper - amount - amount0 - amount1 - } - - ... on SwapEvent { - sender - recipient - amount0 - amount1 - sqrtPriceX96 - liquidity - tick - } - - ... on IncreaseLiquidityEvent { - tokenId - liquidity - amount0 - amount1 - } - - ... on DecreaseLiquidityEvent { - tokenId - liquidity - amount0 - amount1 - } - } - } - } - `, + gql(subscribeEvents), ({ data }) => { onNext(data.onEvent); } ); } + async getEvents (blockHash: string, contract?: string): Promise { + const { events } = await this._client.query( + gql(queryEvents), + { + blockHash, + contract + } + ); + + return events; + } + async getPosition (blockHash: string, tokenId: bigint): Promise { const { position } = await this._client.query( gql(queryPosition), diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index f56de579..c3f81800 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -38,31 +38,6 @@ export class Database { .getMany(); } - async getEvents (blockHash: string, contract: string): Promise { - return this._conn.getRepository(Event) - .createQueryBuilder('event') - .innerJoinAndSelect('event.block', 'block') - .where('block_hash = :blockHash AND contract = :contract', { - blockHash, - contract - }) - .addOrderBy('event.id', 'ASC') - .getMany(); - } - - async getEventsByName (blockHash: string, contract: string, eventName: string): Promise { - return this._conn.getRepository(Event) - .createQueryBuilder('event') - .innerJoinAndSelect('event.block', 'block') - .where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', { - blockHash, - contract, - eventName - }) - .addOrderBy('event.id', 'ASC') - .getMany(); - } - async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1); const expected = blockNumbers.length; @@ -135,7 +110,7 @@ export class Database { } async getEvent (id: string): Promise { - return this._conn.getRepository(Event).findOne(id, { relations: [ 'block' ]}); + return this._conn.getRepository(Event).findOne(id, { relations: ['block'] }); } async saveEventEntity (entity: Event): Promise { diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index 1b7ac98c..20a26fdb 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -52,8 +52,11 @@ export class EventWatcher { this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { const { data: { request, failed, state, createdOn } } = job; - await this._indexer.updateBlockProgress(request.data.blockHash); - const blockProgress = await this._indexer.getBlockProgress(request.data.blockHash); + const dbEvent = await this._indexer.getEvent(request.data.id); + assert(dbEvent); + + await this._indexer.updateBlockProgress(dbEvent.block.blockHash); + const blockProgress = await this._indexer.getBlockProgress(dbEvent.block.blockHash); if (blockProgress && request.data.publishBlockProgress) { await this.publishBlockProgressToSubscribers(blockProgress); } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index db69239a..9444bb36 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -107,18 +107,20 @@ export class Indexer { } async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { - const uniContract = await this.isUniswapContract(contract); - if (!uniContract) { - throw new Error('Not a uniswap contract'); + if (contract) { + const uniContract = await this.isUniswapContract(contract); + if (!uniContract) { + throw new Error('Not a uniswap contract'); + } } - const events = await this._db.getEvents(blockHash, contract); + const events = await this._db.getBlockEvents(blockHash); log(`getEvents: db hit, num events: ${events.length}`); // Filtering. const result = events // TODO: Filter using db WHERE condition on contract. - .filter(event => contract === event.contract) + .filter(event => !contract || contract === event.contract) // TODO: Filter using db WHERE condition when name is not empty. .filter(event => !name || name === event.eventName); @@ -340,7 +342,6 @@ export class Indexer { }); } - await this._db.saveEvents(block, dbEvents); } diff --git a/packages/uni-watcher/src/queries.ts b/packages/uni-watcher/src/queries.ts index 0f880a3c..54bd124c 100644 --- a/packages/uni-watcher/src/queries.ts +++ b/packages/uni-watcher/src/queries.ts @@ -1,15 +1,96 @@ import { gql } from 'graphql-request'; -export const queryEvents = gql` -query getEvents($blockHash: String!, $token: String!) { - events(blockHash: $blockHash, token: $token) { - event { - __typename +const resultEvent = ` +{ + block { + number + hash + timestamp + parentHash + } + tx { + hash + } + contract + eventIndex + + event { + __typename + + ... on PoolCreatedEvent { + token0 + token1 + fee + tickSpacing + pool } - proof { - data + + ... on InitializeEvent { + sqrtPriceX96 + tick + } + + ... on MintEvent { + sender + owner + tickLower + tickUpper + amount + amount0 + amount1 + } + + ... on BurnEvent { + owner + tickLower + tickUpper + amount + amount0 + amount1 + } + + ... on SwapEvent { + sender + recipient + amount0 + amount1 + sqrtPriceX96 + liquidity + tick + } + + ... on IncreaseLiquidityEvent { + tokenId + liquidity + amount0 + amount1 + } + + ... on DecreaseLiquidityEvent { + tokenId + liquidity + amount0 + amount1 } } + + proof { + data + } +} +`; + +export const subscribeEvents = gql` + subscription SubscriptionEvents { + onEvent + ${resultEvent} + } +`; + +export const queryEvents = gql` +query getEvents($blockHash: String!, $contract: String) { + events(blockHash: $blockHash, contract: $contract) + ${resultEvent} } `; diff --git a/packages/uni-watcher/src/resolvers.ts b/packages/uni-watcher/src/resolvers.ts index 0d6fcb03..fa8336f5 100644 --- a/packages/uni-watcher/src/resolvers.ts +++ b/packages/uni-watcher/src/resolvers.ts @@ -4,6 +4,7 @@ import debug from 'debug'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; +import { UNKNOWN_EVENT_NAME } from './entity/Event'; const log = debug('vulcanize:resolver'); @@ -59,7 +60,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch } const events = await indexer.getEventsByFilter(blockHash, contract, name); - return events.map(event => indexer.getResultEvent(event)); + return events.filter(event => event.eventName !== UNKNOWN_EVENT_NAME) + .map(event => indexer.getResultEvent(event)); }, eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => { diff --git a/packages/uni-watcher/src/schema.ts b/packages/uni-watcher/src/schema.ts index 86dfa560..6f6b0f50 100644 --- a/packages/uni-watcher/src/schema.ts +++ b/packages/uni-watcher/src/schema.ts @@ -234,7 +234,7 @@ type Query { # Get uniswap events at a certain block, optionally filter by event name. events( blockHash: String! - contract: String! + contract: String name: String ): [ResultEvent!]