From 0487c05ee1a8aa7bf463ffb0625891dcf0e7682a Mon Sep 17 00:00:00 2001 From: Ashwin Phatak Date: Fri, 6 Aug 2021 10:25:56 +0530 Subject: [PATCH] Event handlers should update db atomically (#186) * Run db operations inside event handlers atomically using transaction. * Implement database transaction for Pool Initialize event. * Implement typeorm transaction without callback. * Implement transaction for NFPM event handlers. Co-authored-by: nabarun --- packages/uni-info-watcher/src/database.ts | 267 ++-- packages/uni-info-watcher/src/indexer.ts | 1284 ++++++++++------- packages/uni-info-watcher/src/utils/index.ts | 7 +- .../src/utils/interval-updates.ts | 45 +- .../uni-info-watcher/src/utils/pricing.ts | 8 +- packages/uni-info-watcher/src/utils/tick.ts | 5 +- 6 files changed, 889 insertions(+), 727 deletions(-) diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 98a4323f..1385dafd 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -1,5 +1,5 @@ import assert from 'assert'; -import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, Repository } from 'typeorm'; +import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, QueryRunner, Repository } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { MAX_REORG_DEPTH } from '@vulcanize/util'; @@ -87,8 +87,15 @@ export class Database { return this._conn.close(); } - async getFactory ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(Factory); + async createTransactionRunner (): Promise { + const queryRunner = this._conn.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); + return queryRunner; + } + + async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(Factory); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -111,8 +118,8 @@ export class Database { return entity; } - async getBundle ({ id, blockHash, blockNumber }: DeepPartial): Promise { - const repo = this._conn.getRepository(Bundle); + async getBundle (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(Bundle); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -139,8 +146,8 @@ export class Database { return entity; } - async getToken ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(Token); + async getToken (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(Token); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -164,8 +171,14 @@ export class Database { return entity; } - async getPool ({ id, blockHash, blockNumber }: DeepPartial): Promise { - const repo = this._conn.getRepository(Pool); + async getTokenNoTx ({ id, blockHash }: DeepPartial): Promise { + const queryRunner = this._conn.createQueryRunner(); + await queryRunner.connect(); + return this.getToken(queryRunner, { id, blockHash }); + } + + async getPool (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(Pool); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -193,6 +206,12 @@ export class Database { return entity; } + async getPoolNoTx ({ id, blockHash, blockNumber }: DeepPartial): Promise { + const queryRunner = this._conn.createQueryRunner(); + await queryRunner.connect(); + return this.getPool(queryRunner, { id, blockHash, blockNumber }); + } + async getPosition ({ id, blockHash }: DeepPartial): Promise { const repo = this._conn.getRepository(Position); const whereOptions: FindConditions = { id }; @@ -218,8 +237,8 @@ export class Database { return entity; } - async getTick ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(Tick); + async getTick (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(Tick); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -243,8 +262,14 @@ export class Database { return entity; } - async getPoolDayData ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(PoolDayData); + async getTickNoTx ({ id, blockHash }: DeepPartial): Promise { + const queryRunner = this._conn.createQueryRunner(); + await queryRunner.connect(); + return this.getTick(queryRunner, { id, blockHash }); + } + + async getPoolDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(PoolDayData); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -268,8 +293,8 @@ export class Database { return entity; } - async getPoolHourData ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(PoolHourData); + async getPoolHourData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(PoolHourData); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -292,8 +317,8 @@ export class Database { return entity; } - async getUniswapDayData ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(UniswapDayData); + async getUniswapDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(UniswapDayData); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -316,8 +341,8 @@ export class Database { return entity; } - async getTokenDayData ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(TokenDayData); + async getTokenDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(TokenDayData); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -340,8 +365,8 @@ export class Database { return entity; } - async getTokenHourData ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(TokenHourData); + async getTokenHourData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(TokenHourData); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -364,8 +389,8 @@ export class Database { return entity; } - async getTransaction ({ id, blockHash }: DeepPartial): Promise { - const repo = this._conn.getRepository(Transaction); + async getTransaction (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(Transaction); const whereOptions: FindConditions = { id }; if (blockHash) { @@ -388,8 +413,8 @@ export class Database { return entity; } - async getEntities (entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise { - const repo = this._conn.getRepository(entity); + async getEntities (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise { + const repo = queryRunner.manager.getRepository(entity); const { tableName } = repo.metadata; let subQuery = repo.createQueryBuilder('subTable') @@ -468,148 +493,116 @@ export class Database { return selectQueryBuilder.getMany(); } - async saveFactory (factory: Factory, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Factory); - factory.blockNumber = block.number; - factory.blockHash = block.hash; - return repo.save(factory); - }); + async saveFactory (queryRunner: QueryRunner, factory: Factory, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Factory); + factory.blockNumber = block.number; + factory.blockHash = block.hash; + return repo.save(factory); } - async saveBundle (bundle: Bundle, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Bundle); - bundle.blockNumber = block.number; - bundle.blockHash = block.hash; - return repo.save(bundle); - }); + async saveBundle (queryRunner: QueryRunner, bundle: Bundle, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Bundle); + bundle.blockNumber = block.number; + bundle.blockHash = block.hash; + return repo.save(bundle); } - async savePool (pool: Pool, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Pool); - pool.blockNumber = block.number; - pool.blockHash = block.hash; - return repo.save(pool); - }); + async savePool (queryRunner: QueryRunner, pool: Pool, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Pool); + pool.blockNumber = block.number; + pool.blockHash = block.hash; + return repo.save(pool); } - async savePoolDayData (poolDayData: PoolDayData, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(PoolDayData); - poolDayData.blockNumber = block.number; - poolDayData.blockHash = block.hash; - return repo.save(poolDayData); - }); + async savePoolDayData (queryRunner: QueryRunner, poolDayData: PoolDayData, block: Block): Promise { + const repo = queryRunner.manager.getRepository(PoolDayData); + poolDayData.blockNumber = block.number; + poolDayData.blockHash = block.hash; + return repo.save(poolDayData); } - async savePoolHourData (poolHourData: PoolHourData, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(PoolHourData); - poolHourData.blockNumber = block.number; - poolHourData.blockHash = block.hash; - return repo.save(poolHourData); - }); + async savePoolHourData (queryRunner: QueryRunner, poolHourData: PoolHourData, block: Block): Promise { + const repo = queryRunner.manager.getRepository(PoolHourData); + poolHourData.blockNumber = block.number; + poolHourData.blockHash = block.hash; + return repo.save(poolHourData); } - async saveToken (token: Token, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Token); - token.blockNumber = block.number; - token.blockHash = block.hash; - return repo.save(token); - }); + async saveToken (queryRunner: QueryRunner, token: Token, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Token); + token.blockNumber = block.number; + token.blockHash = block.hash; + return repo.save(token); } - async saveTransaction (transaction: Transaction, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Transaction); - transaction.blockNumber = block.number; - transaction.blockHash = block.hash; - return repo.save(transaction); - }); + async saveTransaction (queryRunner: QueryRunner, transaction: Transaction, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Transaction); + transaction.blockNumber = block.number; + transaction.blockHash = block.hash; + return repo.save(transaction); } - async saveUniswapDayData (uniswapDayData: UniswapDayData, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(UniswapDayData); - uniswapDayData.blockNumber = block.number; - uniswapDayData.blockHash = block.hash; - return repo.save(uniswapDayData); - }); + async saveUniswapDayData (queryRunner: QueryRunner, uniswapDayData: UniswapDayData, block: Block): Promise { + const repo = queryRunner.manager.getRepository(UniswapDayData); + uniswapDayData.blockNumber = block.number; + uniswapDayData.blockHash = block.hash; + return repo.save(uniswapDayData); } - async saveTokenDayData (tokenDayData: TokenDayData, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(TokenDayData); - tokenDayData.blockNumber = block.number; - tokenDayData.blockHash = block.hash; - return repo.save(tokenDayData); - }); + async saveTokenDayData (queryRunner: QueryRunner, tokenDayData: TokenDayData, block: Block): Promise { + const repo = queryRunner.manager.getRepository(TokenDayData); + tokenDayData.blockNumber = block.number; + tokenDayData.blockHash = block.hash; + return repo.save(tokenDayData); } - async saveTokenHourData (tokenHourData: TokenHourData, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(TokenHourData); - tokenHourData.blockNumber = block.number; - tokenHourData.blockHash = block.hash; - return repo.save(tokenHourData); - }); + async saveTokenHourData (queryRunner: QueryRunner, tokenHourData: TokenHourData, block: Block): Promise { + const repo = queryRunner.manager.getRepository(TokenHourData); + tokenHourData.blockNumber = block.number; + tokenHourData.blockHash = block.hash; + return repo.save(tokenHourData); } - async saveTick (tick: Tick, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Tick); - tick.blockNumber = block.number; - tick.blockHash = block.hash; - return repo.save(tick); - }); + async saveTick (queryRunner: QueryRunner, tick: Tick, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Tick); + tick.blockNumber = block.number; + tick.blockHash = block.hash; + return repo.save(tick); } - async savePosition (position: Position, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Position); - position.blockNumber = block.number; - position.blockHash = block.hash; - return repo.save(position); - }); + async savePosition (queryRunner: QueryRunner, position: Position, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Position); + position.blockNumber = block.number; + position.blockHash = block.hash; + return repo.save(position); } - async savePositionSnapshot (positionSnapshot: PositionSnapshot, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(PositionSnapshot); - positionSnapshot.blockNumber = block.number; - positionSnapshot.blockHash = block.hash; - return repo.save(positionSnapshot); - }); + async savePositionSnapshot (queryRunner: QueryRunner, positionSnapshot: PositionSnapshot, block: Block): Promise { + const repo = queryRunner.manager.getRepository(PositionSnapshot); + positionSnapshot.blockNumber = block.number; + positionSnapshot.blockHash = block.hash; + return repo.save(positionSnapshot); } - async saveMint (mint: Mint, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Mint); - mint.blockNumber = block.number; - mint.blockHash = block.hash; - return repo.save(mint); - }); + async saveMint (queryRunner: QueryRunner, mint: Mint, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Mint); + mint.blockNumber = block.number; + mint.blockHash = block.hash; + return repo.save(mint); } - async saveBurn (burn: Burn, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Burn); - burn.blockNumber = block.number; - burn.blockHash = block.hash; - return repo.save(burn); - }); + async saveBurn (queryRunner: QueryRunner, burn: Burn, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Burn); + burn.blockNumber = block.number; + burn.blockHash = block.hash; + return repo.save(burn); } - async saveSwap (swap: Swap, block: Block): Promise { - return this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Swap); - swap.blockNumber = block.number; - swap.blockHash = block.hash; - return repo.save(swap); - }); + async saveSwap (queryRunner: QueryRunner, swap: Swap, block: Block): Promise { + const repo = queryRunner.manager.getRepository(Swap); + swap.blockNumber = block.number; + swap.blockHash = block.hash; + return repo.save(swap); } // Returns true if events have already been synced for the (block, token) combination. diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 95543dd7..7c62c889 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -1,6 +1,6 @@ import assert from 'assert'; import debug from 'debug'; -import { DeepPartial } from 'typeorm'; +import { DeepPartial, QueryRunner } from 'typeorm'; import JSONbig from 'json-bigint'; import { utils } from 'ethers'; import { Client as UniClient } from '@vulcanize/uni-watcher'; @@ -189,42 +189,93 @@ export class Indexer { } async getBundle (id: string, block: BlockHeight): Promise { - return this._db.getBundle({ id, blockHash: block.hash, blockNumber: block.number }); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = this._db.getBundle(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getPool (id: string, block: BlockHeight): Promise { - return this._db.getPool({ id, blockHash: block.hash, blockNumber: block.number }); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.getPool(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getToken (id: string, block: BlockHeight): Promise { - return this._db.getToken({ id, blockHash: block.hash, blockNumber: block.number }); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getEntities (entity: new () => Entity, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions, relations?: string[]): Promise { - where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => { - const [field, ...suffix] = fieldWithSuffix.split('_'); + const dbTx = await this._db.createTransactionRunner(); + let res; - acc[field] = { - value, - not: false, - operator: 'equals' - }; + try { + where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => { + const [field, ...suffix] = fieldWithSuffix.split('_'); - let operator = suffix.shift(); + acc[field] = { + value, + not: false, + operator: 'equals' + }; - if (operator === 'not') { - acc[field].not = true; - operator = suffix.shift(); - } + let operator = suffix.shift(); - if (operator) { - acc[field].operator = operator; - } + if (operator === 'not') { + acc[field].not = true; + operator = suffix.shift(); + } - return acc; - }, {}); + if (operator) { + acc[field].operator = operator; + } + + return acc; + }, {}); + + res = await this._db.getEntities(dbTx, entity, block, where, queryOptions, relations); + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } - const res = await this._db.getEntities(entity, block, where, queryOptions, relations); return res; } @@ -266,68 +317,81 @@ export class Indexer { return; } - // Load factory. - let factory = await this._db.getFactory({ blockHash: block.hash, id: contractAddress }); - - if (!factory) { - factory = new Factory(); - factory.id = contractAddress; - factory = await this._db.saveFactory(factory, block); - - // Create new bundle for tracking eth price. - const bundle = new Bundle(); - bundle.id = '1'; - await this._db.saveBundle(bundle, block); - } - - // Update Factory. - factory.poolCount = BigInt(factory.poolCount) + BigInt(1); - - let pool = new Pool(); - pool.id = poolAddress; - // Get Tokens. let [token0, token1] = await Promise.all([ - this._db.getToken({ blockHash: block.hash, id: token0Address }), - this._db.getToken({ blockHash: block.hash, id: token1Address }) + this._db.getTokenNoTx({ blockHash: block.hash, id: token0Address }), + this._db.getTokenNoTx({ blockHash: block.hash, id: token1Address }) ]); // Create Tokens if not present. if (!token0) { - token0 = await this._createToken(block, token0Address); + token0 = await this._initToken(block, token0Address); } if (!token1) { - token1 = await this._createToken(block, token1Address); + token1 = await this._initToken(block, token1Address); } - pool.token0 = token0; - pool.token1 = token1; - pool.feeTier = BigInt(fee); - pool = await this._db.savePool(pool, block); - - // Update white listed pools. - if (WHITELIST_TOKENS.includes(token0.id)) { - token1.whitelistPools.push(pool); - } - - if (WHITELIST_TOKENS.includes(token1.id)) { - token0.whitelistPools.push(pool); - } - - // Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph. - // Save entities to DB. - await this._db.saveToken(token0, block); - await this._db.saveToken(token1, block); - await this._db.saveFactory(factory, block); + const dbTx = await this._db.createTransactionRunner(); + + try { + // Load factory. + let factory = await this._db.getFactory(dbTx, { blockHash: block.hash, id: contractAddress }); + + if (!factory) { + factory = new Factory(); + factory.id = contractAddress; + factory = await this._db.saveFactory(dbTx, factory, block); + + // Create new bundle for tracking eth price. + const bundle = new Bundle(); + bundle.id = '1'; + await this._db.saveBundle(dbTx, bundle, block); + } + + // Update Factory. + factory.poolCount = BigInt(factory.poolCount) + BigInt(1); + + let pool = new Pool(); + pool.id = poolAddress; + + token0 = await this._db.saveToken(dbTx, token0, block); + token1 = await this._db.saveToken(dbTx, token1, block); + pool.token0 = token0; + pool.token1 = token1; + pool.feeTier = BigInt(fee); + + // Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph. + + pool = await this._db.savePool(dbTx, pool, block); + + // Update white listed pools. + if (WHITELIST_TOKENS.includes(token0.id)) { + token1.whitelistPools.push(pool); + } + + if (WHITELIST_TOKENS.includes(token1.id)) { + token0.whitelistPools.push(pool); + } + + await this._db.saveToken(dbTx, token0, block); + await this._db.saveToken(dbTx, token1, block); + await this._db.saveFactory(dbTx, factory, block); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } /** * Create new Token. * @param tokenAddress */ - async _createToken (block: Block, tokenAddress: string): Promise { + async _initToken (block: Block, tokenAddress: string): Promise { const token = new Token(); token.id = tokenAddress; @@ -341,499 +405,543 @@ export class Indexer { token.totalSupply = totalSupply; token.decimals = decimals; - return this._db.saveToken(token, block); + return token; } async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise { const { sqrtPriceX96, tick } = initializeEvent; - const pool = await this._db.getPool({ id: contractAddress, blockHash: block.hash }); - assert(pool, `Pool ${contractAddress} not found.`); + const dbTx = await this._db.createTransactionRunner(); - // Update Pool. - pool.sqrtPrice = BigInt(sqrtPriceX96); - pool.tick = BigInt(tick); - this._db.savePool(pool, block); + try { + const pool = await this._db.getPool(dbTx, { id: contractAddress, blockHash: block.hash }); + assert(pool, `Pool ${contractAddress} not found.`); - // Update token prices. - const [token0, token1] = await Promise.all([ - this._db.getToken({ id: pool.token0.id, blockHash: block.hash }), - this._db.getToken({ id: pool.token1.id, blockHash: block.hash }) - ]); + // Update Pool. + pool.sqrtPrice = BigInt(sqrtPriceX96); + pool.tick = BigInt(tick); - // Update ETH price now that prices could have changed. - const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash }); - assert(bundle); - bundle.ethPriceUSD = await getEthPriceInUSD(this._db, block); - this._db.saveBundle(bundle, block); + // Update ETH price now that prices could have changed. + const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash }); + assert(bundle); + bundle.ethPriceUSD = await getEthPriceInUSD(this._db, dbTx, block); - await updatePoolDayData(this._db, { contractAddress, block }); - await updatePoolHourData(this._db, { contractAddress, block }); + // Update token prices. + const [token0, token1] = await Promise.all([ + this._db.getToken(dbTx, { id: pool.token0.id, blockHash: block.hash }), + this._db.getToken(dbTx, { id: pool.token1.id, blockHash: block.hash }) + ]); - assert(token0 && token1, 'Pool tokens not found.'); + assert(token0 && token1, 'Pool tokens not found.'); - token0.derivedETH = await findEthPerToken(token0); - token1.derivedETH = await findEthPerToken(token1); + token0.derivedETH = await findEthPerToken(token0); + token1.derivedETH = await findEthPerToken(token1); - await Promise.all([ - this._db.saveToken(token0, block), - this._db.saveToken(token1, block) - ]); + this._db.savePool(dbTx, pool, block); + this._db.saveBundle(dbTx, bundle, block); + + await updatePoolDayData(this._db, dbTx, { contractAddress, block }); + await updatePoolHourData(this._db, dbTx, { contractAddress, block }); + + await Promise.all([ + this._db.saveToken(dbTx, token0, block), + this._db.saveToken(dbTx, token1, block) + ]); + + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise { - const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash }); - assert(bundle); - const poolAddress = contractAddress; - const pool = await this._db.getPool({ id: poolAddress, blockHash: block.hash }); - assert(pool); + const dbTx = await this._db.createTransactionRunner(); - // TODO: In subgraph factory is fetched by hardcoded factory address. - // Currently fetching first factory in database as only one exists. - const [factory] = await this._db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 }); + try { + const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash }); + assert(bundle); + const poolAddress = contractAddress; + const pool = await this._db.getPool(dbTx, { id: poolAddress, blockHash: block.hash }); + assert(pool); - const token0 = pool.token0; - const token1 = pool.token1; - const amount0 = convertTokenToDecimal(mintEvent.amount0, BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(mintEvent.amount1, BigInt(token1.decimals)); + // TODO: In subgraph factory is fetched by hardcoded factory address. + // Currently fetching first factory in database as only one exists. + const [factory] = await this._db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 }); - const amountUSD = amount0 - .times(token0.derivedETH.times(bundle.ethPriceUSD)) - .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); + const token0 = pool.token0; + const token1 = pool.token1; + const amount0 = convertTokenToDecimal(mintEvent.amount0, BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(mintEvent.amount1, BigInt(token1.decimals)); - // Reset tvl aggregates until new amounts calculated. - factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); + const amountUSD = amount0 + .times(token0.derivedETH.times(bundle.ethPriceUSD)) + .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); - // Update globals. - factory.txCount = BigInt(factory.txCount) + BigInt(1); + // Reset tvl aggregates until new amounts calculated. + factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); - // Update token0 data. - token0.txCount = BigInt(token0.txCount) + BigInt(1); - token0.totalValueLocked = token0.totalValueLocked.plus(amount0); - token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); + // Update globals. + factory.txCount = BigInt(factory.txCount) + BigInt(1); - // Update token1 data. - token1.txCount = BigInt(token1.txCount) + BigInt(1); - token1.totalValueLocked = token1.totalValueLocked.plus(amount1); - token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); + // Update token0 data. + token0.txCount = BigInt(token0.txCount) + BigInt(1); + token0.totalValueLocked = token0.totalValueLocked.plus(amount0); + token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); - // Pool data. - pool.txCount = BigInt(pool.txCount) + BigInt(1); + // Update token1 data. + token1.txCount = BigInt(token1.txCount) + BigInt(1); + token1.totalValueLocked = token1.totalValueLocked.plus(amount1); + token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); - // Pools liquidity tracks the currently active liquidity given pools current tick. - // We only want to update it on mint if the new position includes the current tick. - if (pool.tick !== null) { - if ( - BigInt(mintEvent.tickLower) <= BigInt(pool.tick) && - BigInt(mintEvent.tickUpper) > BigInt(pool.tick) - ) { - pool.liquidity = BigInt(pool.liquidity) + BigInt(mintEvent.amount); + // Pool data. + pool.txCount = BigInt(pool.txCount) + BigInt(1); + + // Pools liquidity tracks the currently active liquidity given pools current tick. + // We only want to update it on mint if the new position includes the current tick. + if (pool.tick !== null) { + if ( + BigInt(mintEvent.tickLower) <= BigInt(pool.tick) && + BigInt(mintEvent.tickUpper) > BigInt(pool.tick) + ) { + pool.liquidity = BigInt(pool.liquidity) + BigInt(mintEvent.amount); + } } + + pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); + pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); + + pool.totalValueLockedETH = pool.totalValueLockedToken0.times(token0.derivedETH) + .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); + + pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); + + // Reset aggregates with new amounts. + factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); + factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); + + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + + const mint = new Mint(); + mint.id = transaction.id + '#' + pool.txCount.toString(); + mint.transaction = transaction; + mint.timestamp = transaction.timestamp; + mint.pool = pool; + mint.token0 = pool.token0; + mint.token1 = pool.token1; + mint.owner = mintEvent.owner; + mint.sender = mintEvent.sender; + mint.origin = tx.from; + mint.amount = mintEvent.amount; + mint.amount0 = amount0; + mint.amount1 = amount1; + mint.amountUSD = amountUSD; + mint.tickLower = mintEvent.tickLower; + mint.tickUpper = mintEvent.tickUpper; + + // Tick entities. + const lowerTickIdx = mintEvent.tickLower; + const upperTickIdx = mintEvent.tickUpper; + + const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString(); + const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString(); + + let lowerTick = await this._db.getTick(dbTx, { id: lowerTickId, blockHash: block.hash }); + let upperTick = await this._db.getTick(dbTx, { id: upperTickId, blockHash: block.hash }); + + if (!lowerTick) { + lowerTick = await createTick(this._db, dbTx, lowerTickId, BigInt(lowerTickIdx), pool, block); + } + + if (!upperTick) { + upperTick = await createTick(this._db, dbTx, upperTickId, BigInt(upperTickIdx), pool, block); + } + + const amount = BigInt(mintEvent.amount); + lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) + amount; + lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) + amount; + upperTick.liquidityGross = BigInt(upperTick.liquidityGross) + amount; + upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; + + // TODO: Update Tick's volume, fees, and liquidity provider count. + // Computing these on the tick level requires reimplementing some of the swapping code from v3-core. + + await updateUniswapDayData(this._db, dbTx, { block, contractAddress }); + await updateTokenDayData(this._db, dbTx, token0, { block }); + await updateTokenDayData(this._db, dbTx, token1, { block }); + await updateTokenHourData(this._db, dbTx, token0, { block }); + await updateTokenHourData(this._db, dbTx, token1, { block }); + + await updatePoolDayData(this._db, dbTx, { block, contractAddress }); + await updatePoolHourData(this._db, dbTx, { block, contractAddress }); + + await Promise.all([ + this._db.saveToken(dbTx, token0, block), + this._db.saveToken(dbTx, token1, block) + ]); + + await this._db.savePool(dbTx, pool, block); + await this._db.saveFactory(dbTx, factory, block); + + await this._db.saveMint(dbTx, mint, block); + + await Promise.all([ + await this._db.saveTick(dbTx, lowerTick, block), + await this._db.saveTick(dbTx, upperTick, block) + ]); + + // Skipping update inner tick vars and tick day data as they are not queried. + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); } - - pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); - pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); - - pool.totalValueLockedETH = pool.totalValueLockedToken0.times(token0.derivedETH) - .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); - - pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); - - // Reset aggregates with new amounts. - factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); - factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); - - const transaction = await loadTransaction(this._db, { block, tx }); - - const mint = new Mint(); - mint.id = transaction.id + '#' + pool.txCount.toString(); - mint.transaction = transaction; - mint.timestamp = transaction.timestamp; - mint.pool = pool; - mint.token0 = pool.token0; - mint.token1 = pool.token1; - mint.owner = mintEvent.owner; - mint.sender = mintEvent.sender; - mint.origin = tx.from; - mint.amount = mintEvent.amount; - mint.amount0 = amount0; - mint.amount1 = amount1; - mint.amountUSD = amountUSD; - mint.tickLower = mintEvent.tickLower; - mint.tickUpper = mintEvent.tickUpper; - - // Tick entities. - const lowerTickIdx = mintEvent.tickLower; - const upperTickIdx = mintEvent.tickUpper; - - const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString(); - const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString(); - - let lowerTick = await this._db.getTick({ id: lowerTickId, blockHash: block.hash }); - let upperTick = await this._db.getTick({ id: upperTickId, blockHash: block.hash }); - - if (!lowerTick) { - lowerTick = await createTick(this._db, lowerTickId, BigInt(lowerTickIdx), pool, block); - } - - if (!upperTick) { - upperTick = await createTick(this._db, upperTickId, BigInt(upperTickIdx), pool, block); - } - - const amount = BigInt(mintEvent.amount); - lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) + amount; - lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) + amount; - upperTick.liquidityGross = BigInt(upperTick.liquidityGross) + amount; - upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; - - // TODO: Update Tick's volume, fees, and liquidity provider count. - // Computing these on the tick level requires reimplementing some of the swapping code from v3-core. - - await updateUniswapDayData(this._db, { block, contractAddress }); - await updatePoolDayData(this._db, { block, contractAddress }); - await updatePoolHourData(this._db, { block, contractAddress }); - await updateTokenDayData(this._db, token0, { block }); - await updateTokenDayData(this._db, token1, { block }); - await updateTokenHourData(this._db, token0, { block }); - await updateTokenHourData(this._db, token1, { block }); - - await Promise.all([ - this._db.saveToken(token0, block), - this._db.saveToken(token1, block) - ]); - - await this._db.savePool(pool, block); - await this._db.saveFactory(factory, block); - await this._db.saveMint(mint, block); - - await Promise.all([ - await this._db.saveTick(lowerTick, block), - await this._db.saveTick(upperTick, block) - ]); - - // Skipping update inner tick vars and tick day data as they are not queried. } async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise { - const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash }); - assert(bundle); - const poolAddress = contractAddress; - const pool = await this._db.getPool({ id: poolAddress, blockHash: block.hash }); - assert(pool); + const dbTx = await this._db.createTransactionRunner(); - // TODO: In subgraph factory is fetched by hardcoded factory address. - // Currently fetching first factory in database as only one exists. - const [factory] = await this._db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 }); + try { + const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash }); + assert(bundle); + const poolAddress = contractAddress; + const pool = await this._db.getPool(dbTx, { id: poolAddress, blockHash: block.hash }); + assert(pool); - const token0 = pool.token0; - const token1 = pool.token1; - const amount0 = convertTokenToDecimal(burnEvent.amount0, BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(burnEvent.amount1, BigInt(token1.decimals)); + // TODO: In subgraph factory is fetched by hardcoded factory address. + // Currently fetching first factory in database as only one exists. + const [factory] = await this._db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 }); - const amountUSD = amount0 - .times(token0.derivedETH.times(bundle.ethPriceUSD)) - .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); + const token0 = pool.token0; + const token1 = pool.token1; + const amount0 = convertTokenToDecimal(burnEvent.amount0, BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(burnEvent.amount1, BigInt(token1.decimals)); - // Reset tvl aggregates until new amounts calculated. - factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); + const amountUSD = amount0 + .times(token0.derivedETH.times(bundle.ethPriceUSD)) + .plus(amount1.times(token1.derivedETH.times(bundle.ethPriceUSD))); - // Update globals. - factory.txCount = BigInt(factory.txCount) + BigInt(1); + // Reset tvl aggregates until new amounts calculated. + factory.totalValueLockedETH = factory.totalValueLockedETH.minus(pool.totalValueLockedETH); - // Update token0 data. - token0.txCount = BigInt(token0.txCount) + BigInt(1); - token0.totalValueLocked = token0.totalValueLocked.minus(amount0); - token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); + // Update globals. + factory.txCount = BigInt(factory.txCount) + BigInt(1); - // Update token1 data. - token1.txCount = BigInt(token1.txCount) + BigInt(1); - token1.totalValueLocked = token1.totalValueLocked.minus(amount1); - token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); + // Update token0 data. + token0.txCount = BigInt(token0.txCount) + BigInt(1); + token0.totalValueLocked = token0.totalValueLocked.minus(amount0); + token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH.times(bundle.ethPriceUSD)); - // Pool data. - pool.txCount = BigInt(pool.txCount) + BigInt(1); + // Update token1 data. + token1.txCount = BigInt(token1.txCount) + BigInt(1); + token1.totalValueLocked = token1.totalValueLocked.minus(amount1); + token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH.times(bundle.ethPriceUSD)); - // Pools liquidity tracks the currently active liquidity given pools current tick. - // We only want to update it on burn if the position being burnt includes the current tick. - if ( - pool.tick !== null && - burnEvent.tickLower <= pool.tick && - burnEvent.tickUpper > pool.tick - ) { - pool.liquidity = pool.liquidity - burnEvent.amount; + // Pool data. + pool.txCount = BigInt(pool.txCount) + BigInt(1); + + // Pools liquidity tracks the currently active liquidity given pools current tick. + // We only want to update it on burn if the position being burnt includes the current tick. + if ( + pool.tick !== null && + burnEvent.tickLower <= pool.tick && + burnEvent.tickUpper > pool.tick + ) { + pool.liquidity = pool.liquidity - burnEvent.amount; + } + + pool.totalValueLockedToken0 = pool.totalValueLockedToken0.minus(amount0); + pool.totalValueLockedToken1 = pool.totalValueLockedToken1.minus(amount1); + + pool.totalValueLockedETH = pool.totalValueLockedToken0 + .times(token0.derivedETH) + .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); + + pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); + + // Reset aggregates with new amounts. + factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); + factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); + + // Burn entity. + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + + const burn = new Burn(); + burn.id = transaction.id + '#' + pool.txCount.toString(); + burn.transaction = transaction; + burn.timestamp = transaction.timestamp; + burn.pool = pool; + burn.token0 = pool.token0; + burn.token1 = pool.token1; + burn.owner = burnEvent.owner; + burn.origin = tx.from; + burn.amount = burnEvent.amount; + burn.amount0 = amount0; + burn.amount1 = amount1; + burn.amountUSD = amountUSD; + burn.tickLower = burnEvent.tickLower; + burn.tickUpper = burnEvent.tickUpper; + + // Tick entities. + const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString(); + const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString(); + const lowerTick = await this._db.getTick(dbTx, { id: lowerTickId, blockHash: block.hash }); + const upperTick = await this._db.getTick(dbTx, { id: upperTickId, blockHash: block.hash }); + assert(lowerTick && upperTick); + const amount = BigInt(burnEvent.amount); + lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount; + lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) - amount; + upperTick.liquidityGross = BigInt(upperTick.liquidityGross) - amount; + upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; + + await updateUniswapDayData(this._db, dbTx, { block, contractAddress }); + await updateTokenDayData(this._db, dbTx, token0, { block }); + await updateTokenDayData(this._db, dbTx, token0, { block }); + await updateTokenHourData(this._db, dbTx, token0, { block }); + await updateTokenHourData(this._db, dbTx, token0, { block }); + await updatePoolDayData(this._db, dbTx, { block, contractAddress }); + await updatePoolHourData(this._db, dbTx, { block, contractAddress }); + + await Promise.all([ + this._db.saveToken(dbTx, token0, block), + this._db.saveToken(dbTx, token1, block) + ]); + + await this._db.savePool(dbTx, pool, block); + await this._db.saveFactory(dbTx, factory, block); + + // Skipping update Tick fee and Tick day data as they are not queried. + + await Promise.all([ + await this._db.saveTick(dbTx, lowerTick, block), + await this._db.saveTick(dbTx, upperTick, block) + ]); + + await this._db.saveBurn(dbTx, burn, block); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); } - - pool.totalValueLockedToken0 = pool.totalValueLockedToken0.minus(amount0); - pool.totalValueLockedToken1 = pool.totalValueLockedToken1.minus(amount1); - - pool.totalValueLockedETH = pool.totalValueLockedToken0 - .times(token0.derivedETH) - .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); - - pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); - - // Reset aggregates with new amounts. - factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); - factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); - - // Burn entity. - const transaction = await loadTransaction(this._db, { block, tx }); - - const burn = new Burn(); - burn.id = transaction.id + '#' + pool.txCount.toString(); - burn.transaction = transaction; - burn.timestamp = transaction.timestamp; - burn.pool = pool; - burn.token0 = pool.token0; - burn.token1 = pool.token1; - burn.owner = burnEvent.owner; - burn.origin = tx.from; - burn.amount = burnEvent.amount; - burn.amount0 = amount0; - burn.amount1 = amount1; - burn.amountUSD = amountUSD; - burn.tickLower = burnEvent.tickLower; - burn.tickUpper = burnEvent.tickUpper; - - // Tick entities. - const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString(); - const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString(); - const lowerTick = await this._db.getTick({ id: lowerTickId, blockHash: block.hash }); - const upperTick = await this._db.getTick({ id: upperTickId, blockHash: block.hash }); - assert(lowerTick && upperTick); - const amount = BigInt(burnEvent.amount); - lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount; - lowerTick.liquidityNet = BigInt(lowerTick.liquidityNet) - amount; - upperTick.liquidityGross = BigInt(upperTick.liquidityGross) - amount; - upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount; - - await updateUniswapDayData(this._db, { block, contractAddress }); - await updatePoolDayData(this._db, { block, contractAddress }); - await updatePoolHourData(this._db, { block, contractAddress }); - await updateTokenDayData(this._db, token0, { block }); - await updateTokenDayData(this._db, token0, { block }); - await updateTokenHourData(this._db, token0, { block }); - await updateTokenHourData(this._db, token0, { block }); - - // Skipping update Tick fee and Tick day data as they are not queried. - - await Promise.all([ - await this._db.saveTick(lowerTick, block), - await this._db.saveTick(upperTick, block) - ]); - - await Promise.all([ - this._db.saveToken(token0, block), - this._db.saveToken(token1, block) - ]); - - await this._db.savePool(pool, block); - await this._db.saveFactory(factory, block); - await this._db.saveBurn(burn, block); } async _handleSwap (block: Block, contractAddress: string, tx: Transaction, swapEvent: SwapEvent): Promise { - const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash }); - assert(bundle); + const dbTx = await this._db.createTransactionRunner(); - // TODO: In subgraph factory is fetched by hardcoded factory address. - // Currently fetching first factory in database as only one exists. - const [factory] = await this._db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 }); + try { + const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash }); + assert(bundle); - const pool = await this._db.getPool({ id: contractAddress, blockHash: block.hash }); - assert(pool); + // TODO: In subgraph factory is fetched by hardcoded factory address. + // Currently fetching first factory in database as only one exists. + const [factory] = await this._db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 }); - // Hot fix for bad pricing. - if (pool.id === '0x9663f2ca0454accad3e094448ea6f77443880454') { - return; + const pool = await this._db.getPool(dbTx, { id: contractAddress, blockHash: block.hash }); + assert(pool); + + // Hot fix for bad pricing. + if (pool.id === '0x9663f2ca0454accad3e094448ea6f77443880454') { + return; + } + + const [token0, token1] = await Promise.all([ + this._db.getToken(dbTx, { id: pool.token0.id, blockHash: block.hash }), + this._db.getToken(dbTx, { id: pool.token1.id, blockHash: block.hash }) + ]); + + assert(token0 && token1, 'Pool tokens not found.'); + + // Amounts - 0/1 are token deltas. Can be positive or negative. + const amount0 = convertTokenToDecimal(swapEvent.amount0, BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(swapEvent.amount1, BigInt(token1.decimals)); + + // Need absolute amounts for volume. + let amount0Abs = amount0; + let amount1Abs = amount1; + + if (amount0.lt(new Decimal(0))) { + amount0Abs = amount0.times(new Decimal('-1')); + } + + if (amount1.lt(new Decimal(0))) { + amount1Abs = amount1.times(new Decimal('-1')); + } + + const amount0ETH = amount0Abs.times(token0.derivedETH); + const amount1ETH = amount1Abs.times(token1.derivedETH); + const amount0USD = amount0ETH.times(bundle.ethPriceUSD); + const amount1USD = amount1ETH.times(bundle.ethPriceUSD); + + // Get amount that should be tracked only - div 2 because cant count both input and output as volume. + const trackedAmountUSD = await getTrackedAmountUSD(this._db, dbTx, amount0Abs, token0, amount1Abs, token1); + const amountTotalUSDTracked = trackedAmountUSD.div(new Decimal('2')); + const amountTotalETHTracked = safeDiv(amountTotalUSDTracked, bundle.ethPriceUSD); + const amountTotalUSDUntracked = amount0USD.plus(amount1USD).div(new Decimal('2')); + + const feesETH = amountTotalETHTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); + const feesUSD = amountTotalUSDTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); + + // Global updates. + factory.txCount = BigInt(factory.txCount) + BigInt(1); + factory.totalVolumeETH = factory.totalVolumeETH.plus(amountTotalETHTracked); + factory.totalVolumeUSD = factory.totalVolumeUSD.plus(amountTotalUSDTracked); + factory.untrackedVolumeUSD = factory.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + factory.totalFeesETH = factory.totalFeesETH.plus(feesETH); + factory.totalFeesUSD = factory.totalFeesUSD.plus(feesUSD); + + // Reset aggregate tvl before individual pool tvl updates. + const currentPoolTvlETH = pool.totalValueLockedETH; + factory.totalValueLockedETH = factory.totalValueLockedETH.minus(currentPoolTvlETH); + + // pool volume + pool.volumeToken0 = pool.volumeToken0.plus(amount0Abs); + pool.volumeToken1 = pool.volumeToken1.plus(amount1Abs); + pool.volumeUSD = pool.volumeUSD.plus(amountTotalUSDTracked); + pool.untrackedVolumeUSD = pool.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + pool.feesUSD = pool.feesUSD.plus(feesUSD); + pool.txCount = BigInt(pool.txCount) + BigInt(1); + + // Update the pool with the new active liquidity, price, and tick. + pool.liquidity = swapEvent.liquidity; + pool.tick = BigInt(swapEvent.tick); + pool.sqrtPrice = swapEvent.sqrtPriceX96; + pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); + pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); + + // Update token0 data. + token0.volume = token0.volume.plus(amount0Abs); + token0.totalValueLocked = token0.totalValueLocked.plus(amount0); + token0.volumeUSD = token0.volumeUSD.plus(amountTotalUSDTracked); + token0.untrackedVolumeUSD = token0.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + token0.feesUSD = token0.feesUSD.plus(feesUSD); + token0.txCount = BigInt(token0.txCount) + BigInt(1); + + // Update token1 data. + token1.volume = token1.volume.plus(amount1Abs); + token1.totalValueLocked = token1.totalValueLocked.plus(amount1); + token1.volumeUSD = token1.volumeUSD.plus(amountTotalUSDTracked); + token1.untrackedVolumeUSD = token1.untrackedVolumeUSD.plus(amountTotalUSDUntracked); + token1.feesUSD = token1.feesUSD.plus(feesUSD); + token1.txCount = BigInt(token1.txCount) + BigInt(1); + + // Updated pool rates. + const prices = sqrtPriceX96ToTokenPrices(pool.sqrtPrice, token0 as Token, token1 as Token); + pool.token0Price = prices[0]; + pool.token1Price = prices[1]; + + // Update USD pricing. + bundle.ethPriceUSD = await getEthPriceInUSD(this._db, dbTx, block); + token0.derivedETH = await findEthPerToken(token0); + token1.derivedETH = await findEthPerToken(token1); + + /** + * Things afffected by new USD rates. + */ + pool.totalValueLockedETH = pool.totalValueLockedToken0 + .times(token0.derivedETH) + .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); + + pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); + + factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); + factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); + + token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH).times(bundle.ethPriceUSD); + token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH).times(bundle.ethPriceUSD); + + // Create Swap event + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + + const swap = new Swap(); + swap.id = transaction.id + '#' + pool.txCount.toString(); + swap.transaction = transaction; + swap.timestamp = transaction.timestamp; + swap.pool = pool; + swap.token0 = pool.token0; + swap.token1 = pool.token1; + swap.sender = swapEvent.sender; + swap.origin = tx.from; + swap.recipient = swapEvent.recipient; + swap.amount0 = amount0; + swap.amount1 = amount1; + swap.amountUSD = amountTotalUSDTracked; + swap.tick = BigInt(swapEvent.tick); + swap.sqrtPriceX96 = swapEvent.sqrtPriceX96; + + // Skipping update pool fee growth as they are not queried. + + // Interval data. + const uniswapDayData = await updateUniswapDayData(this._db, dbTx, { block, contractAddress }); + const poolDayData = await updatePoolDayData(this._db, dbTx, { block, contractAddress }); + const poolHourData = await updatePoolHourData(this._db, dbTx, { block, contractAddress }); + const token0DayData = await updateTokenDayData(this._db, dbTx, token0, { block }); + const token1DayData = await updateTokenDayData(this._db, dbTx, token0, { block }); + const token0HourData = await updateTokenHourData(this._db, dbTx, token0, { block }); + const token1HourData = await updateTokenHourData(this._db, dbTx, token0, { block }); + + // Update volume metrics. + uniswapDayData.volumeETH = uniswapDayData.volumeETH.plus(amountTotalETHTracked); + uniswapDayData.volumeUSD = uniswapDayData.volumeUSD.plus(amountTotalUSDTracked); + uniswapDayData.feesUSD = uniswapDayData.feesUSD.plus(feesUSD); + + poolDayData.volumeUSD = poolDayData.volumeUSD.plus(amountTotalUSDTracked); + poolDayData.volumeToken0 = poolDayData.volumeToken0.plus(amount0Abs); + poolDayData.volumeToken1 = poolDayData.volumeToken1.plus(amount1Abs); + poolDayData.feesUSD = poolDayData.feesUSD.plus(feesUSD); + + poolHourData.volumeUSD = poolHourData.volumeUSD.plus(amountTotalUSDTracked); + poolHourData.volumeToken0 = poolHourData.volumeToken0.plus(amount0Abs); + poolHourData.volumeToken1 = poolHourData.volumeToken1.plus(amount1Abs); + poolHourData.feesUSD = poolHourData.feesUSD.plus(feesUSD); + + token0DayData.volume = token0DayData.volume.plus(amount0Abs); + token0DayData.volumeUSD = token0DayData.volumeUSD.plus(amountTotalUSDTracked); + token0DayData.untrackedVolumeUSD = token0DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token0DayData.feesUSD = token0DayData.feesUSD.plus(feesUSD); + + token0HourData.volume = token0HourData.volume.plus(amount0Abs); + token0HourData.volumeUSD = token0HourData.volumeUSD.plus(amountTotalUSDTracked); + token0HourData.untrackedVolumeUSD = token0HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token0HourData.feesUSD = token0HourData.feesUSD.plus(feesUSD); + + token1DayData.volume = token1DayData.volume.plus(amount1Abs); + token1DayData.volumeUSD = token1DayData.volumeUSD.plus(amountTotalUSDTracked); + token1DayData.untrackedVolumeUSD = token1DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token1DayData.feesUSD = token1DayData.feesUSD.plus(feesUSD); + + token1HourData.volume = token1HourData.volume.plus(amount1Abs); + token1HourData.volumeUSD = token1HourData.volumeUSD.plus(amountTotalUSDTracked); + token1HourData.untrackedVolumeUSD = token1HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); + token1HourData.feesUSD = token1HourData.feesUSD.plus(feesUSD); + + await this._db.saveBundle(dbTx, bundle, block); + await this._db.saveSwap(dbTx, swap, block); + await this._db.saveTokenDayData(dbTx, token0DayData, block); + await this._db.saveTokenDayData(dbTx, token1DayData, block); + await this._db.saveUniswapDayData(dbTx, uniswapDayData, block); + await this._db.savePoolDayData(dbTx, poolDayData, block); + await this._db.saveFactory(dbTx, factory, block); + await this._db.savePool(dbTx, pool, block); + await this._db.saveToken(dbTx, token0, block); + await this._db.saveToken(dbTx, token1, block); + + // Skipping update of inner vars of current or crossed ticks as they are not queried. + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); } - - const [token0, token1] = await Promise.all([ - this._db.getToken({ id: pool.token0.id, blockHash: block.hash }), - this._db.getToken({ id: pool.token1.id, blockHash: block.hash }) - ]); - - assert(token0 && token1, 'Pool tokens not found.'); - - // Amounts - 0/1 are token deltas. Can be positive or negative. - const amount0 = convertTokenToDecimal(swapEvent.amount0, BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(swapEvent.amount1, BigInt(token1.decimals)); - - // Need absolute amounts for volume. - let amount0Abs = amount0; - let amount1Abs = amount1; - - if (amount0.lt(new Decimal(0))) { - amount0Abs = amount0.times(new Decimal('-1')); - } - - if (amount1.lt(new Decimal(0))) { - amount1Abs = amount1.times(new Decimal('-1')); - } - - const amount0ETH = amount0Abs.times(token0.derivedETH); - const amount1ETH = amount1Abs.times(token1.derivedETH); - const amount0USD = amount0ETH.times(bundle.ethPriceUSD); - const amount1USD = amount1ETH.times(bundle.ethPriceUSD); - - // Get amount that should be tracked only - div 2 because cant count both input and output as volume. - const trackedAmountUSD = await getTrackedAmountUSD(this._db, amount0Abs, token0, amount1Abs, token1); - const amountTotalUSDTracked = trackedAmountUSD.div(new Decimal('2')); - const amountTotalETHTracked = safeDiv(amountTotalUSDTracked, bundle.ethPriceUSD); - const amountTotalUSDUntracked = amount0USD.plus(amount1USD).div(new Decimal('2')); - - const feesETH = amountTotalETHTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); - const feesUSD = amountTotalUSDTracked.times(pool.feeTier.toString()).div(new Decimal('1000000')); - - // Global updates. - factory.txCount = BigInt(factory.txCount) + BigInt(1); - factory.totalVolumeETH = factory.totalVolumeETH.plus(amountTotalETHTracked); - factory.totalVolumeUSD = factory.totalVolumeUSD.plus(amountTotalUSDTracked); - factory.untrackedVolumeUSD = factory.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - factory.totalFeesETH = factory.totalFeesETH.plus(feesETH); - factory.totalFeesUSD = factory.totalFeesUSD.plus(feesUSD); - - // Reset aggregate tvl before individual pool tvl updates. - const currentPoolTvlETH = pool.totalValueLockedETH; - factory.totalValueLockedETH = factory.totalValueLockedETH.minus(currentPoolTvlETH); - - // pool volume - pool.volumeToken0 = pool.volumeToken0.plus(amount0Abs); - pool.volumeToken1 = pool.volumeToken1.plus(amount1Abs); - pool.volumeUSD = pool.volumeUSD.plus(amountTotalUSDTracked); - pool.untrackedVolumeUSD = pool.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - pool.feesUSD = pool.feesUSD.plus(feesUSD); - pool.txCount = BigInt(pool.txCount) + BigInt(1); - - // Update the pool with the new active liquidity, price, and tick. - pool.liquidity = swapEvent.liquidity; - pool.tick = BigInt(swapEvent.tick); - pool.sqrtPrice = swapEvent.sqrtPriceX96; - pool.totalValueLockedToken0 = pool.totalValueLockedToken0.plus(amount0); - pool.totalValueLockedToken1 = pool.totalValueLockedToken1.plus(amount1); - - // Update token0 data. - token0.volume = token0.volume.plus(amount0Abs); - token0.totalValueLocked = token0.totalValueLocked.plus(amount0); - token0.volumeUSD = token0.volumeUSD.plus(amountTotalUSDTracked); - token0.untrackedVolumeUSD = token0.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - token0.feesUSD = token0.feesUSD.plus(feesUSD); - token0.txCount = BigInt(token0.txCount) + BigInt(1); - - // Update token1 data. - token1.volume = token1.volume.plus(amount1Abs); - token1.totalValueLocked = token1.totalValueLocked.plus(amount1); - token1.volumeUSD = token1.volumeUSD.plus(amountTotalUSDTracked); - token1.untrackedVolumeUSD = token1.untrackedVolumeUSD.plus(amountTotalUSDUntracked); - token1.feesUSD = token1.feesUSD.plus(feesUSD); - token1.txCount = BigInt(token1.txCount) + BigInt(1); - - // Updated pool rates. - const prices = sqrtPriceX96ToTokenPrices(pool.sqrtPrice, token0 as Token, token1 as Token); - pool.token0Price = prices[0]; - pool.token1Price = prices[1]; - this._db.savePool(pool, block); - - // Update USD pricing. - bundle.ethPriceUSD = await getEthPriceInUSD(this._db, block); - this._db.saveBundle(bundle, block); - token0.derivedETH = await findEthPerToken(token0); - token1.derivedETH = await findEthPerToken(token1); - - /** - * Things afffected by new USD rates. - */ - pool.totalValueLockedETH = pool.totalValueLockedToken0 - .times(token0.derivedETH) - .plus(pool.totalValueLockedToken1.times(token1.derivedETH)); - - pool.totalValueLockedUSD = pool.totalValueLockedETH.times(bundle.ethPriceUSD); - - factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); - factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); - - token0.totalValueLockedUSD = token0.totalValueLocked.times(token0.derivedETH).times(bundle.ethPriceUSD); - token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH).times(bundle.ethPriceUSD); - - // Create Swap event - const transaction = await loadTransaction(this._db, { block, tx }); - - const swap = new Swap(); - swap.id = transaction.id + '#' + pool.txCount.toString(); - swap.transaction = transaction; - swap.timestamp = transaction.timestamp; - swap.pool = pool; - swap.token0 = pool.token0; - swap.token1 = pool.token1; - swap.sender = swapEvent.sender; - swap.origin = tx.from; - swap.recipient = swapEvent.recipient; - swap.amount0 = amount0; - swap.amount1 = amount1; - swap.amountUSD = amountTotalUSDTracked; - swap.tick = BigInt(swapEvent.tick); - swap.sqrtPriceX96 = swapEvent.sqrtPriceX96; - - // Skipping update pool fee growth as they are not queried. - - // Interval data. - const uniswapDayData = await updateUniswapDayData(this._db, { block, contractAddress }); - const poolDayData = await updatePoolDayData(this._db, { block, contractAddress }); - const poolHourData = await updatePoolHourData(this._db, { block, contractAddress }); - const token0DayData = await updateTokenDayData(this._db, token0, { block }); - const token1DayData = await updateTokenDayData(this._db, token0, { block }); - const token0HourData = await updateTokenHourData(this._db, token0, { block }); - const token1HourData = await updateTokenHourData(this._db, token0, { block }); - - // Update volume metrics. - uniswapDayData.volumeETH = uniswapDayData.volumeETH.plus(amountTotalETHTracked); - uniswapDayData.volumeUSD = uniswapDayData.volumeUSD.plus(amountTotalUSDTracked); - uniswapDayData.feesUSD = uniswapDayData.feesUSD.plus(feesUSD); - - poolDayData.volumeUSD = poolDayData.volumeUSD.plus(amountTotalUSDTracked); - poolDayData.volumeToken0 = poolDayData.volumeToken0.plus(amount0Abs); - poolDayData.volumeToken1 = poolDayData.volumeToken1.plus(amount1Abs); - poolDayData.feesUSD = poolDayData.feesUSD.plus(feesUSD); - - poolHourData.volumeUSD = poolHourData.volumeUSD.plus(amountTotalUSDTracked); - poolHourData.volumeToken0 = poolHourData.volumeToken0.plus(amount0Abs); - poolHourData.volumeToken1 = poolHourData.volumeToken1.plus(amount1Abs); - poolHourData.feesUSD = poolHourData.feesUSD.plus(feesUSD); - - token0DayData.volume = token0DayData.volume.plus(amount0Abs); - token0DayData.volumeUSD = token0DayData.volumeUSD.plus(amountTotalUSDTracked); - token0DayData.untrackedVolumeUSD = token0DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token0DayData.feesUSD = token0DayData.feesUSD.plus(feesUSD); - - token0HourData.volume = token0HourData.volume.plus(amount0Abs); - token0HourData.volumeUSD = token0HourData.volumeUSD.plus(amountTotalUSDTracked); - token0HourData.untrackedVolumeUSD = token0HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token0HourData.feesUSD = token0HourData.feesUSD.plus(feesUSD); - - token1DayData.volume = token1DayData.volume.plus(amount1Abs); - token1DayData.volumeUSD = token1DayData.volumeUSD.plus(amountTotalUSDTracked); - token1DayData.untrackedVolumeUSD = token1DayData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token1DayData.feesUSD = token1DayData.feesUSD.plus(feesUSD); - - token1HourData.volume = token1HourData.volume.plus(amount1Abs); - token1HourData.volumeUSD = token1HourData.volumeUSD.plus(amountTotalUSDTracked); - token1HourData.untrackedVolumeUSD = token1HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked); - token1HourData.feesUSD = token1HourData.feesUSD.plus(feesUSD); - - await this._db.saveSwap(swap, block); - await this._db.saveTokenDayData(token0DayData, block); - await this._db.saveTokenDayData(token1DayData, block); - await this._db.saveUniswapDayData(uniswapDayData, block); - await this._db.savePoolDayData(poolDayData, block); - await this._db.saveFactory(factory, block); - await this._db.savePool(pool, block); - await this._db.saveToken(token0, block); - await this._db.saveToken(token1, block); - - // Skipping update of inner vars of current or crossed ticks as they are not queried. } async _handleIncreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: IncreaseLiquidityEvent): Promise { - const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); + let position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); // position was not able to be fetched. if (position === null) { @@ -845,21 +953,36 @@ export class Indexer { return; } - const token0 = position.token0; - const token1 = position.token1; - - const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); - - position.liquidity = BigInt(position.liquidity) + BigInt(event.liquidity); - position.depositedToken0 = position.depositedToken0.plus(amount0); - position.depositedToken1 = position.depositedToken1.plus(amount1); - await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); + const dbTx = await this._db.createTransactionRunner(); - await this._db.savePosition(position, block); + try { + if (!position.transaction) { + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + position.transaction = transaction; + position = await this._db.savePosition(dbTx, position, block); + } - await this._savePositionSnapshot(position, block, tx); + const token0 = position.token0; + const token1 = position.token1; + + const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); + + position.liquidity = BigInt(position.liquidity) + BigInt(event.liquidity); + position.depositedToken0 = position.depositedToken0.plus(amount0); + position.depositedToken1 = position.depositedToken1.plus(amount1); + + await this._db.savePosition(dbTx, position, block); + + await this._savePositionSnapshot(dbTx, position, block, tx); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async _handleDecreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: DecreaseLiquidityEvent): Promise { @@ -875,20 +998,35 @@ export class Indexer { return; } - const token0 = position.token0; - const token1 = position.token1; - const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); - - position.liquidity = BigInt(position.liquidity) - BigInt(event.liquidity); - position.depositedToken0 = position.depositedToken0.plus(amount0); - position.depositedToken1 = position.depositedToken1.plus(amount1); - position = await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); + const dbTx = await this._db.createTransactionRunner(); - await this._db.savePosition(position, block); + try { + if (!position.transaction) { + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + position.transaction = transaction; + position = await this._db.savePosition(dbTx, position, block); + } - await this._savePositionSnapshot(position, block, tx); + const token0 = position.token0; + const token1 = position.token1; + const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); + + position.liquidity = BigInt(position.liquidity) - BigInt(event.liquidity); + position.depositedToken0 = position.depositedToken0.plus(amount0); + position.depositedToken1 = position.depositedToken1.plus(amount1); + + await this._db.savePosition(dbTx, position, block); + + await this._savePositionSnapshot(dbTx, position, block, tx); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async _handleCollect (block: Block, contractAddress: string, tx: Transaction, event: CollectEvent): Promise { @@ -904,31 +1042,62 @@ export class Indexer { return; } - const token0 = position.token0; - const token1 = position.token1; - const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); - position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0); - position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1); - position = await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId)); + const dbTx = await this._db.createTransactionRunner(); - await this._db.savePosition(position, block); + try { + if (!position.transaction) { + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + position.transaction = transaction; + position = await this._db.savePosition(dbTx, position, block); + } - await this._savePositionSnapshot(position, block, tx); + const token0 = position.token0; + const token1 = position.token1; + const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); + position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0); + position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1); + + await this._db.savePosition(dbTx, position, block); + + await this._savePositionSnapshot(dbTx, position, block, tx); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async _handleTransfer (block: Block, contractAddress: string, tx: Transaction, event: TransferEvent): Promise { - const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); + let position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId)); // Position was not able to be fetched. if (position === null) { return; } - position.owner = event.to; - await this._db.savePosition(position, block); + const dbTx = await this._db.createTransactionRunner(); - await this._savePositionSnapshot(position, block, tx); + try { + if (!position.transaction) { + const transaction = await loadTransaction(this._db, dbTx, { block, tx }); + position.transaction = transaction; + position = await this._db.savePosition(dbTx, position, block); + } + + position.owner = event.to; + await this._db.savePosition(dbTx, position, block); + + await this._savePositionSnapshot(dbTx, position, block, tx); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async _getPosition (block: Block, contractAddress: string, tx: Transaction, tokenId: bigint): Promise { @@ -950,33 +1119,28 @@ export class Indexer { position = new Position(); position.id = tokenId.toString(); - const pool = await this._db.getPool({ id: poolAddress, blockHash }); + const pool = await this._db.getPoolNoTx({ id: poolAddress, blockHash }); assert(pool); position.pool = pool; const [token0, token1] = await Promise.all([ - this._db.getToken({ id: token0Address, blockHash }), - this._db.getToken({ id: token0Address, blockHash }) + this._db.getTokenNoTx({ id: token0Address, blockHash }), + this._db.getTokenNoTx({ id: token0Address, blockHash }) ]); assert(token0 && token1); position.token0 = token0; position.token1 = token1; const [tickLower, tickUpper] = await Promise.all([ - this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockHash }), - this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockHash }) + this._db.getTickNoTx({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockHash }), + this._db.getTickNoTx({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockHash }) ]); assert(tickLower && tickUpper); position.tickLower = tickLower; position.tickUpper = tickUpper; - const transaction = await loadTransaction(this._db, { block, tx }); - position.transaction = transaction; - position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString()); position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString()); - - position = await this._db.savePosition(position, block); } } @@ -994,7 +1158,7 @@ export class Indexer { return position; } - async _savePositionSnapshot (position: Position, block: Block, tx: Transaction): Promise { + async _savePositionSnapshot (dbTx: QueryRunner, position: Position, block: Block, tx: Transaction): Promise { const positionSnapshot = new PositionSnapshot(); positionSnapshot.id = position.id.concat('#').concat(block.number.toString()); positionSnapshot.blockNumber = block.number; @@ -1009,10 +1173,10 @@ export class Indexer { positionSnapshot.withdrawnToken1 = position.withdrawnToken1; positionSnapshot.collectedFeesToken0 = position.collectedFeesToken0; positionSnapshot.collectedFeesToken1 = position.collectedFeesToken1; - positionSnapshot.transaction = await loadTransaction(this._db, { block, tx }); + positionSnapshot.transaction = await loadTransaction(this._db, dbTx, { block, tx }); positionSnapshot.feeGrowthInside0LastX128 = position.feeGrowthInside0LastX128; positionSnapshot.feeGrowthInside1LastX128 = position.feeGrowthInside1LastX128; - await this._db.savePositionSnapshot(positionSnapshot, block); + await this._db.savePositionSnapshot(dbTx, positionSnapshot, block); } } diff --git a/packages/uni-info-watcher/src/utils/index.ts b/packages/uni-info-watcher/src/utils/index.ts index 309e5bc1..33159d7d 100644 --- a/packages/uni-info-watcher/src/utils/index.ts +++ b/packages/uni-info-watcher/src/utils/index.ts @@ -1,5 +1,6 @@ import Decimal from 'decimal.js'; import { BigNumber } from 'ethers'; +import { QueryRunner } from 'typeorm'; import { Transaction as TransactionEntity } from '../entity/Transaction'; import { Database } from '../database'; @@ -23,9 +24,9 @@ export const convertTokenToDecimal = (tokenAmount: bigint, exchangeDecimals: big return (new Decimal(tokenAmount.toString())).div(exponentToBigDecimal(exchangeDecimals)); }; -export const loadTransaction = async (db: Database, event: { block: Block, tx: Transaction }): Promise => { +export const loadTransaction = async (db: Database, dbTx: QueryRunner, event: { block: Block, tx: Transaction }): Promise => { const { tx, block } = event; - let transaction = await db.getTransaction({ id: tx.hash, blockHash: block.hash }); + let transaction = await db.getTransaction(dbTx, { id: tx.hash, blockHash: block.hash }); if (!transaction) { transaction = new TransactionEntity(); @@ -35,7 +36,7 @@ export const loadTransaction = async (db: Database, event: { block: Block, tx: T transaction.blockNumber = block.number; transaction.timestamp = BigInt(block.timestamp); - return db.saveTransaction(transaction, block); + return db.saveTransaction(dbTx, transaction, block); }; // Return 0 if denominator is 0 in division. diff --git a/packages/uni-info-watcher/src/utils/interval-updates.ts b/packages/uni-info-watcher/src/utils/interval-updates.ts index aa8ecf0d..9d78acdd 100644 --- a/packages/uni-info-watcher/src/utils/interval-updates.ts +++ b/packages/uni-info-watcher/src/utils/interval-updates.ts @@ -1,5 +1,6 @@ import assert from 'assert'; import { BigNumber } from 'ethers'; +import { QueryRunner } from 'typeorm'; import { Database } from '../database'; import { Factory } from '../entity/Factory'; @@ -16,17 +17,17 @@ import { Block } from '../events'; * @param db * @param event */ -export const updateUniswapDayData = async (db: Database, event: { contractAddress: string, block: Block }): Promise => { +export const updateUniswapDayData = async (db: Database, dbTx: QueryRunner, event: { contractAddress: string, block: Block }): Promise => { const { block } = event; // TODO: In subgraph factory is fetched by hardcoded factory address. // Currently fetching first factory in database as only one exists. - const [factory] = await db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 }); + const [factory] = await db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 }); const dayID = Math.floor(block.timestamp / 86400); // Rounded. const dayStartTimestamp = dayID * 86400; - let uniswapDayData = await db.getUniswapDayData({ id: dayID.toString(), blockHash: block.hash }); + let uniswapDayData = await db.getUniswapDayData(dbTx, { id: dayID.toString(), blockHash: block.hash }); if (!uniswapDayData) { uniswapDayData = new UniswapDayData(); @@ -38,10 +39,10 @@ export const updateUniswapDayData = async (db: Database, event: { contractAddres uniswapDayData.tvlUSD = factory.totalValueLockedUSD; uniswapDayData.txCount = factory.txCount; - return db.saveUniswapDayData(uniswapDayData, block); + return db.saveUniswapDayData(dbTx, uniswapDayData, block); }; -export const updatePoolDayData = async (db: Database, event: { contractAddress: string, block: Block }): Promise => { +export const updatePoolDayData = async (db: Database, dbTx: QueryRunner, event: { contractAddress: string, block: Block }): Promise => { const { contractAddress, block } = event; const dayID = Math.floor(block.timestamp / 86400); const dayStartTimestamp = dayID * 86400; @@ -50,10 +51,10 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress: .concat('-') .concat(dayID.toString()); - const pool = await db.getPool({ id: contractAddress, blockHash: block.hash }); + const pool = await db.getPool(dbTx, { id: contractAddress, blockHash: block.hash }); assert(pool); - let poolDayData = await db.getPoolDayData({ id: dayPoolID, blockHash: block.hash }); + let poolDayData = await db.getPoolDayData(dbTx, { id: dayPoolID, blockHash: block.hash }); if (!poolDayData) { poolDayData = new PoolDayData(); @@ -64,7 +65,7 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress: poolDayData.high = pool.token0Price; poolDayData.low = pool.token0Price; poolDayData.close = pool.token0Price; - poolDayData = await db.savePoolDayData(poolDayData, block); + poolDayData = await db.savePoolDayData(dbTx, poolDayData, block); } if (Number(pool.token0Price) > Number(poolDayData.high)) { @@ -84,12 +85,12 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress: poolDayData.tick = pool.tick; poolDayData.tvlUSD = pool.totalValueLockedUSD; poolDayData.txCount = BigInt(BigNumber.from(poolDayData.txCount).add(1).toHexString()); - poolDayData = await db.savePoolDayData(poolDayData, block); + poolDayData = await db.savePoolDayData(dbTx, poolDayData, block); return poolDayData; }; -export const updatePoolHourData = async (db: Database, event: { contractAddress: string, block: Block }): Promise => { +export const updatePoolHourData = async (db: Database, dbTx: QueryRunner, event: { contractAddress: string, block: Block }): Promise => { const { contractAddress, block } = event; const hourIndex = Math.floor(block.timestamp / 3600); // Get unique hour within unix history. const hourStartUnix = hourIndex * 3600; // Want the rounded effect. @@ -98,10 +99,10 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress: .concat('-') .concat(hourIndex.toString()); - const pool = await db.getPool({ id: contractAddress, blockHash: block.hash }); + const pool = await db.getPool(dbTx, { id: contractAddress, blockHash: block.hash }); assert(pool); - let poolHourData = await db.getPoolHourData({ id: hourPoolID, blockHash: block.hash }); + let poolHourData = await db.getPoolHourData(dbTx, { id: hourPoolID, blockHash: block.hash }); if (!poolHourData) { poolHourData = new PoolHourData(); @@ -112,7 +113,7 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress: poolHourData.high = pool.token0Price; poolHourData.low = pool.token0Price; poolHourData.close = pool.token0Price; - poolHourData = await db.savePoolHourData(poolHourData, block); + poolHourData = await db.savePoolHourData(dbTx, poolHourData, block); } if (Number(pool.token0Price) > Number(poolHourData.high)) { @@ -132,14 +133,14 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress: poolHourData.tick = pool.tick; poolHourData.tvlUSD = pool.totalValueLockedUSD; poolHourData.txCount = BigInt(BigNumber.from(poolHourData.txCount).add(1).toHexString()); - poolHourData = await db.savePoolHourData(poolHourData, block); + poolHourData = await db.savePoolHourData(dbTx, poolHourData, block); return poolHourData; }; -export const updateTokenDayData = async (db: Database, token: Token, event: { block: Block }): Promise => { +export const updateTokenDayData = async (db: Database, dbTx: QueryRunner, token: Token, event: { block: Block }): Promise => { const { block } = event; - const bundle = await db.getBundle({ id: '1', blockHash: block.hash }); + const bundle = await db.getBundle(dbTx, { id: '1', blockHash: block.hash }); assert(bundle); const dayID = Math.floor(block.timestamp / 86400); const dayStartTimestamp = dayID * 86400; @@ -150,7 +151,7 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl const tokenPrice = token.derivedETH.times(bundle.ethPriceUSD); - let tokenDayData = await db.getTokenDayData({ id: tokenDayID, blockHash: block.hash }); + let tokenDayData = await db.getTokenDayData(dbTx, { id: tokenDayID, blockHash: block.hash }); if (!tokenDayData) { tokenDayData = new TokenDayData(); @@ -178,12 +179,12 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl tokenDayData.priceUSD = token.derivedETH.times(bundle.ethPriceUSD); tokenDayData.totalValueLocked = token.totalValueLocked; tokenDayData.totalValueLockedUSD = token.totalValueLockedUSD; - return db.saveTokenDayData(tokenDayData, block); + return db.saveTokenDayData(dbTx, tokenDayData, block); }; -export const updateTokenHourData = async (db: Database, token: Token, event: { block: Block }): Promise => { +export const updateTokenHourData = async (db: Database, dbTx: QueryRunner, token: Token, event: { block: Block }): Promise => { const { block } = event; - const bundle = await db.getBundle({ id: '1', blockHash: block.hash }); + const bundle = await db.getBundle(dbTx, { id: '1', blockHash: block.hash }); assert(bundle); const hourIndex = Math.floor(block.timestamp / 3600); // Get unique hour within unix history. const hourStartUnix = hourIndex * 3600; // Want the rounded effect. @@ -194,7 +195,7 @@ export const updateTokenHourData = async (db: Database, token: Token, event: { b const tokenPrice = token.derivedETH.times(bundle.ethPriceUSD); - let tokenHourData = await db.getTokenHourData({ id: tokenHourID, blockHash: block.hash }); + let tokenHourData = await db.getTokenHourData(dbTx, { id: tokenHourID, blockHash: block.hash }); if (!tokenHourData) { tokenHourData = new TokenHourData(); @@ -222,5 +223,5 @@ export const updateTokenHourData = async (db: Database, token: Token, event: { b tokenHourData.priceUSD = tokenPrice; tokenHourData.totalValueLocked = token.totalValueLocked; tokenHourData.totalValueLockedUSD = token.totalValueLockedUSD; - return db.saveTokenHourData(tokenHourData, block); + return db.saveTokenHourData(dbTx, tokenHourData, block); }; diff --git a/packages/uni-info-watcher/src/utils/pricing.ts b/packages/uni-info-watcher/src/utils/pricing.ts index b1bc7dc0..85a4b385 100644 --- a/packages/uni-info-watcher/src/utils/pricing.ts +++ b/packages/uni-info-watcher/src/utils/pricing.ts @@ -1,6 +1,7 @@ import assert from 'assert'; import Decimal from 'decimal.js'; import { BigNumber } from 'ethers'; +import { QueryRunner } from 'typeorm'; import { exponentToBigDecimal, safeDiv } from '.'; import { Database } from '../database'; @@ -55,9 +56,9 @@ export const sqrtPriceX96ToTokenPrices = (sqrtPriceX96: bigint, token0: Token, t return [price0, price1]; }; -export const getEthPriceInUSD = async (db: Database, block: Block): Promise => { +export const getEthPriceInUSD = async (db: Database, dbTx: QueryRunner, block: Block): Promise => { // Fetch eth prices for each stablecoin. - const usdcPool = await db.getPool({ id: USDC_WETH_03_POOL, blockHash: block.hash }); // DAI is token0. + const usdcPool = await db.getPool(dbTx, { id: USDC_WETH_03_POOL, blockHash: block.hash }); // DAI is token0. if (usdcPool) { return usdcPool.token0Price; @@ -122,12 +123,13 @@ export const findEthPerToken = async (token: Token): Promise => { */ export const getTrackedAmountUSD = async ( db: Database, + dbTx: QueryRunner, tokenAmount0: Decimal, token0: Token, tokenAmount1: Decimal, token1: Token ): Promise => { - const bundle = await db.getBundle({ id: '1' }); + const bundle = await db.getBundle(dbTx, { id: '1' }); assert(bundle); const price0USD = token0.derivedETH.times(bundle.ethPriceUSD); const price1USD = token1.derivedETH.times(bundle.ethPriceUSD); diff --git a/packages/uni-info-watcher/src/utils/tick.ts b/packages/uni-info-watcher/src/utils/tick.ts index cbadea5b..b53e7e39 100644 --- a/packages/uni-info-watcher/src/utils/tick.ts +++ b/packages/uni-info-watcher/src/utils/tick.ts @@ -1,4 +1,5 @@ import Decimal from 'decimal.js'; +import { QueryRunner } from 'typeorm'; import { Pool } from '../entity/Pool'; import { Database } from '../database'; @@ -6,7 +7,7 @@ import { bigDecimalExponated, safeDiv } from '.'; import { Tick } from '../entity/Tick'; import { Block } from '../events'; -export const createTick = async (db: Database, tickId: string, tickIdx: bigint, pool: Pool, block: Block): Promise => { +export const createTick = async (db: Database, dbTx: QueryRunner, tickId: string, tickIdx: bigint, pool: Pool, block: Block): Promise => { const tick = new Tick(); tick.id = tickId; tick.tickIdx = tickIdx; @@ -19,5 +20,5 @@ export const createTick = async (db: Database, tickId: string, tickIdx: bigint, tick.price0 = price0; tick.price1 = safeDiv(new Decimal(1), price0); - return db.saveTick(tick, block); + return db.saveTick(dbTx, tick, block); };