Event handlers should update db atomically (#186)

* Run db operations inside event handlers atomically using transaction.

* Implement database transaction for Pool Initialize event.

* Implement typeorm transaction without callback.

* Implement transaction for NFPM event handlers.

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-06 10:25:56 +05:30 committed by GitHub
parent b6fe8a8c47
commit 0487c05ee1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 889 additions and 727 deletions

View File

@ -1,5 +1,5 @@
import assert from 'assert';
import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, Repository } from 'typeorm';
import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, QueryRunner, Repository } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { MAX_REORG_DEPTH } from '@vulcanize/util';
@ -87,8 +87,15 @@ export class Database {
return this._conn.close();
}
async getFactory ({ id, blockHash }: DeepPartial<Factory>): Promise<Factory | undefined> {
const repo = this._conn.getRepository(Factory);
async createTransactionRunner (): Promise<QueryRunner> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
}
async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Factory>): Promise<Factory | undefined> {
const repo = queryRunner.manager.getRepository(Factory);
const whereOptions: FindConditions<Factory> = { id };
if (blockHash) {
@ -111,8 +118,8 @@ export class Database {
return entity;
}
async getBundle ({ id, blockHash, blockNumber }: DeepPartial<Bundle>): Promise<Bundle | undefined> {
const repo = this._conn.getRepository(Bundle);
async getBundle (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial<Bundle>): Promise<Bundle | undefined> {
const repo = queryRunner.manager.getRepository(Bundle);
const whereOptions: FindConditions<Bundle> = { id };
if (blockHash) {
@ -139,8 +146,8 @@ export class Database {
return entity;
}
async getToken ({ id, blockHash }: DeepPartial<Token>): Promise<Token | undefined> {
const repo = this._conn.getRepository(Token);
async getToken (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Token>): Promise<Token | undefined> {
const repo = queryRunner.manager.getRepository(Token);
const whereOptions: FindConditions<Token> = { id };
if (blockHash) {
@ -164,8 +171,14 @@ export class Database {
return entity;
}
async getPool ({ id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> {
const repo = this._conn.getRepository(Pool);
async getTokenNoTx ({ id, blockHash }: DeepPartial<Token>): Promise<Token | undefined> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
return this.getToken(queryRunner, { id, blockHash });
}
async getPool (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> {
const repo = queryRunner.manager.getRepository(Pool);
const whereOptions: FindConditions<Pool> = { id };
if (blockHash) {
@ -193,6 +206,12 @@ export class Database {
return entity;
}
async getPoolNoTx ({ id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
return this.getPool(queryRunner, { id, blockHash, blockNumber });
}
async getPosition ({ id, blockHash }: DeepPartial<Position>): Promise<Position | undefined> {
const repo = this._conn.getRepository(Position);
const whereOptions: FindConditions<Position> = { id };
@ -218,8 +237,8 @@ export class Database {
return entity;
}
async getTick ({ id, blockHash }: DeepPartial<Tick>): Promise<Tick | undefined> {
const repo = this._conn.getRepository(Tick);
async getTick (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Tick>): Promise<Tick | undefined> {
const repo = queryRunner.manager.getRepository(Tick);
const whereOptions: FindConditions<Tick> = { id };
if (blockHash) {
@ -243,8 +262,14 @@ export class Database {
return entity;
}
async getPoolDayData ({ id, blockHash }: DeepPartial<PoolDayData>): Promise<PoolDayData | undefined> {
const repo = this._conn.getRepository(PoolDayData);
async getTickNoTx ({ id, blockHash }: DeepPartial<Tick>): Promise<Tick | undefined> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
return this.getTick(queryRunner, { id, blockHash });
}
async getPoolDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<PoolDayData>): Promise<PoolDayData | undefined> {
const repo = queryRunner.manager.getRepository(PoolDayData);
const whereOptions: FindConditions<PoolDayData> = { id };
if (blockHash) {
@ -268,8 +293,8 @@ export class Database {
return entity;
}
async getPoolHourData ({ id, blockHash }: DeepPartial<PoolHourData>): Promise<PoolHourData | undefined> {
const repo = this._conn.getRepository(PoolHourData);
async getPoolHourData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<PoolHourData>): Promise<PoolHourData | undefined> {
const repo = queryRunner.manager.getRepository(PoolHourData);
const whereOptions: FindConditions<PoolHourData> = { id };
if (blockHash) {
@ -292,8 +317,8 @@ export class Database {
return entity;
}
async getUniswapDayData ({ id, blockHash }: DeepPartial<UniswapDayData>): Promise<UniswapDayData | undefined> {
const repo = this._conn.getRepository(UniswapDayData);
async getUniswapDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<UniswapDayData>): Promise<UniswapDayData | undefined> {
const repo = queryRunner.manager.getRepository(UniswapDayData);
const whereOptions: FindConditions<UniswapDayData> = { id };
if (blockHash) {
@ -316,8 +341,8 @@ export class Database {
return entity;
}
async getTokenDayData ({ id, blockHash }: DeepPartial<TokenDayData>): Promise<TokenDayData | undefined> {
const repo = this._conn.getRepository(TokenDayData);
async getTokenDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<TokenDayData>): Promise<TokenDayData | undefined> {
const repo = queryRunner.manager.getRepository(TokenDayData);
const whereOptions: FindConditions<TokenDayData> = { id };
if (blockHash) {
@ -340,8 +365,8 @@ export class Database {
return entity;
}
async getTokenHourData ({ id, blockHash }: DeepPartial<TokenHourData>): Promise<TokenHourData | undefined> {
const repo = this._conn.getRepository(TokenHourData);
async getTokenHourData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<TokenHourData>): Promise<TokenHourData | undefined> {
const repo = queryRunner.manager.getRepository(TokenHourData);
const whereOptions: FindConditions<TokenHourData> = { id };
if (blockHash) {
@ -364,8 +389,8 @@ export class Database {
return entity;
}
async getTransaction ({ id, blockHash }: DeepPartial<Transaction>): Promise<Transaction | undefined> {
const repo = this._conn.getRepository(Transaction);
async getTransaction (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Transaction>): Promise<Transaction | undefined> {
const repo = queryRunner.manager.getRepository(Transaction);
const whereOptions: FindConditions<Transaction> = { id };
if (blockHash) {
@ -388,8 +413,8 @@ export class Database {
return entity;
}
async getEntities<Entity> (entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise<Entity[]> {
const repo = this._conn.getRepository(entity);
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise<Entity[]> {
const repo = queryRunner.manager.getRepository(entity);
const { tableName } = repo.metadata;
let subQuery = repo.createQueryBuilder('subTable')
@ -468,148 +493,116 @@ export class Database {
return selectQueryBuilder.getMany();
}
async saveFactory (factory: Factory, block: Block): Promise<Factory> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Factory);
async saveFactory (queryRunner: QueryRunner, factory: Factory, block: Block): Promise<Factory> {
const repo = queryRunner.manager.getRepository(Factory);
factory.blockNumber = block.number;
factory.blockHash = block.hash;
return repo.save(factory);
});
}
async saveBundle (bundle: Bundle, block: Block): Promise<Bundle> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Bundle);
async saveBundle (queryRunner: QueryRunner, bundle: Bundle, block: Block): Promise<Bundle> {
const repo = queryRunner.manager.getRepository(Bundle);
bundle.blockNumber = block.number;
bundle.blockHash = block.hash;
return repo.save(bundle);
});
}
async savePool (pool: Pool, block: Block): Promise<Pool> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Pool);
async savePool (queryRunner: QueryRunner, pool: Pool, block: Block): Promise<Pool> {
const repo = queryRunner.manager.getRepository(Pool);
pool.blockNumber = block.number;
pool.blockHash = block.hash;
return repo.save(pool);
});
}
async savePoolDayData (poolDayData: PoolDayData, block: Block): Promise<PoolDayData> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(PoolDayData);
async savePoolDayData (queryRunner: QueryRunner, poolDayData: PoolDayData, block: Block): Promise<PoolDayData> {
const repo = queryRunner.manager.getRepository(PoolDayData);
poolDayData.blockNumber = block.number;
poolDayData.blockHash = block.hash;
return repo.save(poolDayData);
});
}
async savePoolHourData (poolHourData: PoolHourData, block: Block): Promise<PoolHourData> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(PoolHourData);
async savePoolHourData (queryRunner: QueryRunner, poolHourData: PoolHourData, block: Block): Promise<PoolHourData> {
const repo = queryRunner.manager.getRepository(PoolHourData);
poolHourData.blockNumber = block.number;
poolHourData.blockHash = block.hash;
return repo.save(poolHourData);
});
}
async saveToken (token: Token, block: Block): Promise<Token> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Token);
async saveToken (queryRunner: QueryRunner, token: Token, block: Block): Promise<Token> {
const repo = queryRunner.manager.getRepository(Token);
token.blockNumber = block.number;
token.blockHash = block.hash;
return repo.save(token);
});
}
async saveTransaction (transaction: Transaction, block: Block): Promise<Transaction> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Transaction);
async saveTransaction (queryRunner: QueryRunner, transaction: Transaction, block: Block): Promise<Transaction> {
const repo = queryRunner.manager.getRepository(Transaction);
transaction.blockNumber = block.number;
transaction.blockHash = block.hash;
return repo.save(transaction);
});
}
async saveUniswapDayData (uniswapDayData: UniswapDayData, block: Block): Promise<UniswapDayData> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(UniswapDayData);
async saveUniswapDayData (queryRunner: QueryRunner, uniswapDayData: UniswapDayData, block: Block): Promise<UniswapDayData> {
const repo = queryRunner.manager.getRepository(UniswapDayData);
uniswapDayData.blockNumber = block.number;
uniswapDayData.blockHash = block.hash;
return repo.save(uniswapDayData);
});
}
async saveTokenDayData (tokenDayData: TokenDayData, block: Block): Promise<TokenDayData> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(TokenDayData);
async saveTokenDayData (queryRunner: QueryRunner, tokenDayData: TokenDayData, block: Block): Promise<TokenDayData> {
const repo = queryRunner.manager.getRepository(TokenDayData);
tokenDayData.blockNumber = block.number;
tokenDayData.blockHash = block.hash;
return repo.save(tokenDayData);
});
}
async saveTokenHourData (tokenHourData: TokenHourData, block: Block): Promise<TokenHourData> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(TokenHourData);
async saveTokenHourData (queryRunner: QueryRunner, tokenHourData: TokenHourData, block: Block): Promise<TokenHourData> {
const repo = queryRunner.manager.getRepository(TokenHourData);
tokenHourData.blockNumber = block.number;
tokenHourData.blockHash = block.hash;
return repo.save(tokenHourData);
});
}
async saveTick (tick: Tick, block: Block): Promise<Tick> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Tick);
async saveTick (queryRunner: QueryRunner, tick: Tick, block: Block): Promise<Tick> {
const repo = queryRunner.manager.getRepository(Tick);
tick.blockNumber = block.number;
tick.blockHash = block.hash;
return repo.save(tick);
});
}
async savePosition (position: Position, block: Block): Promise<Position> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Position);
async savePosition (queryRunner: QueryRunner, position: Position, block: Block): Promise<Position> {
const repo = queryRunner.manager.getRepository(Position);
position.blockNumber = block.number;
position.blockHash = block.hash;
return repo.save(position);
});
}
async savePositionSnapshot (positionSnapshot: PositionSnapshot, block: Block): Promise<PositionSnapshot> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(PositionSnapshot);
async savePositionSnapshot (queryRunner: QueryRunner, positionSnapshot: PositionSnapshot, block: Block): Promise<PositionSnapshot> {
const repo = queryRunner.manager.getRepository(PositionSnapshot);
positionSnapshot.blockNumber = block.number;
positionSnapshot.blockHash = block.hash;
return repo.save(positionSnapshot);
});
}
async saveMint (mint: Mint, block: Block): Promise<Mint> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Mint);
async saveMint (queryRunner: QueryRunner, mint: Mint, block: Block): Promise<Mint> {
const repo = queryRunner.manager.getRepository(Mint);
mint.blockNumber = block.number;
mint.blockHash = block.hash;
return repo.save(mint);
});
}
async saveBurn (burn: Burn, block: Block): Promise<Burn> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Burn);
async saveBurn (queryRunner: QueryRunner, burn: Burn, block: Block): Promise<Burn> {
const repo = queryRunner.manager.getRepository(Burn);
burn.blockNumber = block.number;
burn.blockHash = block.hash;
return repo.save(burn);
});
}
async saveSwap (swap: Swap, block: Block): Promise<Swap> {
return this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Swap);
async saveSwap (queryRunner: QueryRunner, swap: Swap, block: Block): Promise<Swap> {
const repo = queryRunner.manager.getRepository(Swap);
swap.blockNumber = block.number;
swap.blockHash = block.hash;
return repo.save(swap);
});
}
// Returns true if events have already been synced for the (block, token) combination.

View File

@ -1,6 +1,6 @@
import assert from 'assert';
import debug from 'debug';
import { DeepPartial } from 'typeorm';
import { DeepPartial, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { utils } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
@ -189,18 +189,61 @@ export class Indexer {
}
async getBundle (id: string, block: BlockHeight): Promise<Bundle | undefined> {
return this._db.getBundle({ id, blockHash: block.hash, blockNumber: block.number });
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = this._db.getBundle(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getPool (id: string, block: BlockHeight): Promise<Pool | undefined> {
return this._db.getPool({ id, blockHash: block.hash, blockNumber: block.number });
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getPool(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getToken (id: string, block: BlockHeight): Promise<Token | undefined> {
return this._db.getToken({ id, blockHash: block.hash, blockNumber: block.number });
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getEntities<Entity> (entity: new () => Entity, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions, relations?: string[]): Promise<Entity[]> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => {
const [field, ...suffix] = fieldWithSuffix.split('_');
@ -224,7 +267,15 @@ export class Indexer {
return acc;
}, {});
const res = await this._db.getEntities(entity, block, where, queryOptions, relations);
res = await this._db.getEntities(dbTx, entity, block, where, queryOptions, relations);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
@ -266,18 +317,37 @@ export class Indexer {
return;
}
// Get Tokens.
let [token0, token1] = await Promise.all([
this._db.getTokenNoTx({ blockHash: block.hash, id: token0Address }),
this._db.getTokenNoTx({ blockHash: block.hash, id: token1Address })
]);
// Create Tokens if not present.
if (!token0) {
token0 = await this._initToken(block, token0Address);
}
if (!token1) {
token1 = await this._initToken(block, token1Address);
}
// Save entities to DB.
const dbTx = await this._db.createTransactionRunner();
try {
// Load factory.
let factory = await this._db.getFactory({ blockHash: block.hash, id: contractAddress });
let factory = await this._db.getFactory(dbTx, { blockHash: block.hash, id: contractAddress });
if (!factory) {
factory = new Factory();
factory.id = contractAddress;
factory = await this._db.saveFactory(factory, block);
factory = await this._db.saveFactory(dbTx, factory, block);
// Create new bundle for tracking eth price.
const bundle = new Bundle();
bundle.id = '1';
await this._db.saveBundle(bundle, block);
await this._db.saveBundle(dbTx, bundle, block);
}
// Update Factory.
@ -286,25 +356,15 @@ export class Indexer {
let pool = new Pool();
pool.id = poolAddress;
// Get Tokens.
let [token0, token1] = await Promise.all([
this._db.getToken({ blockHash: block.hash, id: token0Address }),
this._db.getToken({ blockHash: block.hash, id: token1Address })
]);
// Create Tokens if not present.
if (!token0) {
token0 = await this._createToken(block, token0Address);
}
if (!token1) {
token1 = await this._createToken(block, token1Address);
}
token0 = await this._db.saveToken(dbTx, token0, block);
token1 = await this._db.saveToken(dbTx, token1, block);
pool.token0 = token0;
pool.token1 = token1;
pool.feeTier = BigInt(fee);
pool = await this._db.savePool(pool, block);
// Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph.
pool = await this._db.savePool(dbTx, pool, block);
// Update white listed pools.
if (WHITELIST_TOKENS.includes(token0.id)) {
@ -315,19 +375,23 @@ export class Indexer {
token0.whitelistPools.push(pool);
}
// Skipping adding createdAtTimestamp field as it is not queried in frontend subgraph.
// Save entities to DB.
await this._db.saveToken(token0, block);
await this._db.saveToken(token1, block);
await this._db.saveFactory(factory, block);
await this._db.saveToken(dbTx, token0, block);
await this._db.saveToken(dbTx, token1, block);
await this._db.saveFactory(dbTx, factory, block);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
/**
* Create new Token.
* @param tokenAddress
*/
async _createToken (block: Block, tokenAddress: string): Promise<Token> {
async _initToken (block: Block, tokenAddress: string): Promise<Token> {
const token = new Token();
token.id = tokenAddress;
@ -341,55 +405,70 @@ export class Indexer {
token.totalSupply = totalSupply;
token.decimals = decimals;
return this._db.saveToken(token, block);
return token;
}
async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise<void> {
const { sqrtPriceX96, tick } = initializeEvent;
const pool = await this._db.getPool({ id: contractAddress, blockHash: block.hash });
const dbTx = await this._db.createTransactionRunner();
try {
const pool = await this._db.getPool(dbTx, { id: contractAddress, blockHash: block.hash });
assert(pool, `Pool ${contractAddress} not found.`);
// Update Pool.
pool.sqrtPrice = BigInt(sqrtPriceX96);
pool.tick = BigInt(tick);
this._db.savePool(pool, block);
// Update ETH price now that prices could have changed.
const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash });
assert(bundle);
bundle.ethPriceUSD = await getEthPriceInUSD(this._db, dbTx, block);
// Update token prices.
const [token0, token1] = await Promise.all([
this._db.getToken({ id: pool.token0.id, blockHash: block.hash }),
this._db.getToken({ id: pool.token1.id, blockHash: block.hash })
this._db.getToken(dbTx, { id: pool.token0.id, blockHash: block.hash }),
this._db.getToken(dbTx, { id: pool.token1.id, blockHash: block.hash })
]);
// Update ETH price now that prices could have changed.
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
assert(bundle);
bundle.ethPriceUSD = await getEthPriceInUSD(this._db, block);
this._db.saveBundle(bundle, block);
await updatePoolDayData(this._db, { contractAddress, block });
await updatePoolHourData(this._db, { contractAddress, block });
assert(token0 && token1, 'Pool tokens not found.');
token0.derivedETH = await findEthPerToken(token0);
token1.derivedETH = await findEthPerToken(token1);
this._db.savePool(dbTx, pool, block);
this._db.saveBundle(dbTx, bundle, block);
await updatePoolDayData(this._db, dbTx, { contractAddress, block });
await updatePoolHourData(this._db, dbTx, { contractAddress, block });
await Promise.all([
this._db.saveToken(token0, block),
this._db.saveToken(token1, block)
this._db.saveToken(dbTx, token0, block),
this._db.saveToken(dbTx, token1, block)
]);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise<void> {
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
const dbTx = await this._db.createTransactionRunner();
try {
const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash });
assert(bundle);
const poolAddress = contractAddress;
const pool = await this._db.getPool({ id: poolAddress, blockHash: block.hash });
const pool = await this._db.getPool(dbTx, { id: poolAddress, blockHash: block.hash });
assert(pool);
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await this._db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const token0 = pool.token0;
const token1 = pool.token1;
@ -442,7 +521,7 @@ export class Indexer {
factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH);
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
const transaction = await loadTransaction(this._db, { block, tx });
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
const mint = new Mint();
mint.id = transaction.id + '#' + pool.txCount.toString();
@ -468,15 +547,15 @@ export class Indexer {
const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString();
const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString();
let lowerTick = await this._db.getTick({ id: lowerTickId, blockHash: block.hash });
let upperTick = await this._db.getTick({ id: upperTickId, blockHash: block.hash });
let lowerTick = await this._db.getTick(dbTx, { id: lowerTickId, blockHash: block.hash });
let upperTick = await this._db.getTick(dbTx, { id: upperTickId, blockHash: block.hash });
if (!lowerTick) {
lowerTick = await createTick(this._db, lowerTickId, BigInt(lowerTickIdx), pool, block);
lowerTick = await createTick(this._db, dbTx, lowerTickId, BigInt(lowerTickIdx), pool, block);
}
if (!upperTick) {
upperTick = await createTick(this._db, upperTickId, BigInt(upperTickIdx), pool, block);
upperTick = await createTick(this._db, dbTx, upperTickId, BigInt(upperTickIdx), pool, block);
}
const amount = BigInt(mintEvent.amount);
@ -488,41 +567,53 @@ export class Indexer {
// TODO: Update Tick's volume, fees, and liquidity provider count.
// Computing these on the tick level requires reimplementing some of the swapping code from v3-core.
await updateUniswapDayData(this._db, { block, contractAddress });
await updatePoolDayData(this._db, { block, contractAddress });
await updatePoolHourData(this._db, { block, contractAddress });
await updateTokenDayData(this._db, token0, { block });
await updateTokenDayData(this._db, token1, { block });
await updateTokenHourData(this._db, token0, { block });
await updateTokenHourData(this._db, token1, { block });
await updateUniswapDayData(this._db, dbTx, { block, contractAddress });
await updateTokenDayData(this._db, dbTx, token0, { block });
await updateTokenDayData(this._db, dbTx, token1, { block });
await updateTokenHourData(this._db, dbTx, token0, { block });
await updateTokenHourData(this._db, dbTx, token1, { block });
await updatePoolDayData(this._db, dbTx, { block, contractAddress });
await updatePoolHourData(this._db, dbTx, { block, contractAddress });
await Promise.all([
this._db.saveToken(token0, block),
this._db.saveToken(token1, block)
this._db.saveToken(dbTx, token0, block),
this._db.saveToken(dbTx, token1, block)
]);
await this._db.savePool(pool, block);
await this._db.saveFactory(factory, block);
await this._db.saveMint(mint, block);
await this._db.savePool(dbTx, pool, block);
await this._db.saveFactory(dbTx, factory, block);
await this._db.saveMint(dbTx, mint, block);
await Promise.all([
await this._db.saveTick(lowerTick, block),
await this._db.saveTick(upperTick, block)
await this._db.saveTick(dbTx, lowerTick, block),
await this._db.saveTick(dbTx, upperTick, block)
]);
// Skipping update inner tick vars and tick day data as they are not queried.
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise<void> {
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
const dbTx = await this._db.createTransactionRunner();
try {
const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash });
assert(bundle);
const poolAddress = contractAddress;
const pool = await this._db.getPool({ id: poolAddress, blockHash: block.hash });
const pool = await this._db.getPool(dbTx, { id: poolAddress, blockHash: block.hash });
assert(pool);
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await this._db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const token0 = pool.token0;
const token1 = pool.token1;
@ -576,7 +667,7 @@ export class Indexer {
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
// Burn entity.
const transaction = await loadTransaction(this._db, { block, tx });
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
const burn = new Burn();
burn.id = transaction.id + '#' + pool.txCount.toString();
@ -597,8 +688,8 @@ export class Indexer {
// Tick entities.
const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString();
const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString();
const lowerTick = await this._db.getTick({ id: lowerTickId, blockHash: block.hash });
const upperTick = await this._db.getTick({ id: upperTickId, blockHash: block.hash });
const lowerTick = await this._db.getTick(dbTx, { id: lowerTickId, blockHash: block.hash });
const upperTick = await this._db.getTick(dbTx, { id: upperTickId, blockHash: block.hash });
assert(lowerTick && upperTick);
const amount = BigInt(burnEvent.amount);
lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount;
@ -606,40 +697,51 @@ export class Indexer {
upperTick.liquidityGross = BigInt(upperTick.liquidityGross) - amount;
upperTick.liquidityNet = BigInt(upperTick.liquidityNet) + amount;
await updateUniswapDayData(this._db, { block, contractAddress });
await updatePoolDayData(this._db, { block, contractAddress });
await updatePoolHourData(this._db, { block, contractAddress });
await updateTokenDayData(this._db, token0, { block });
await updateTokenDayData(this._db, token0, { block });
await updateTokenHourData(this._db, token0, { block });
await updateTokenHourData(this._db, token0, { block });
await updateUniswapDayData(this._db, dbTx, { block, contractAddress });
await updateTokenDayData(this._db, dbTx, token0, { block });
await updateTokenDayData(this._db, dbTx, token0, { block });
await updateTokenHourData(this._db, dbTx, token0, { block });
await updateTokenHourData(this._db, dbTx, token0, { block });
await updatePoolDayData(this._db, dbTx, { block, contractAddress });
await updatePoolHourData(this._db, dbTx, { block, contractAddress });
await Promise.all([
this._db.saveToken(dbTx, token0, block),
this._db.saveToken(dbTx, token1, block)
]);
await this._db.savePool(dbTx, pool, block);
await this._db.saveFactory(dbTx, factory, block);
// Skipping update Tick fee and Tick day data as they are not queried.
await Promise.all([
await this._db.saveTick(lowerTick, block),
await this._db.saveTick(upperTick, block)
await this._db.saveTick(dbTx, lowerTick, block),
await this._db.saveTick(dbTx, upperTick, block)
]);
await Promise.all([
this._db.saveToken(token0, block),
this._db.saveToken(token1, block)
]);
await this._db.savePool(pool, block);
await this._db.saveFactory(factory, block);
await this._db.saveBurn(burn, block);
await this._db.saveBurn(dbTx, burn, block);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleSwap (block: Block, contractAddress: string, tx: Transaction, swapEvent: SwapEvent): Promise<void> {
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
const dbTx = await this._db.createTransactionRunner();
try {
const bundle = await this._db.getBundle(dbTx, { id: '1', blockHash: block.hash });
assert(bundle);
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await this._db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const pool = await this._db.getPool({ id: contractAddress, blockHash: block.hash });
const pool = await this._db.getPool(dbTx, { id: contractAddress, blockHash: block.hash });
assert(pool);
// Hot fix for bad pricing.
@ -648,8 +750,8 @@ export class Indexer {
}
const [token0, token1] = await Promise.all([
this._db.getToken({ id: pool.token0.id, blockHash: block.hash }),
this._db.getToken({ id: pool.token1.id, blockHash: block.hash })
this._db.getToken(dbTx, { id: pool.token0.id, blockHash: block.hash }),
this._db.getToken(dbTx, { id: pool.token1.id, blockHash: block.hash })
]);
assert(token0 && token1, 'Pool tokens not found.');
@ -676,7 +778,7 @@ export class Indexer {
const amount1USD = amount1ETH.times(bundle.ethPriceUSD);
// Get amount that should be tracked only - div 2 because cant count both input and output as volume.
const trackedAmountUSD = await getTrackedAmountUSD(this._db, amount0Abs, token0, amount1Abs, token1);
const trackedAmountUSD = await getTrackedAmountUSD(this._db, dbTx, amount0Abs, token0, amount1Abs, token1);
const amountTotalUSDTracked = trackedAmountUSD.div(new Decimal('2'));
const amountTotalETHTracked = safeDiv(amountTotalUSDTracked, bundle.ethPriceUSD);
const amountTotalUSDUntracked = amount0USD.plus(amount1USD).div(new Decimal('2'));
@ -731,11 +833,9 @@ export class Indexer {
const prices = sqrtPriceX96ToTokenPrices(pool.sqrtPrice, token0 as Token, token1 as Token);
pool.token0Price = prices[0];
pool.token1Price = prices[1];
this._db.savePool(pool, block);
// Update USD pricing.
bundle.ethPriceUSD = await getEthPriceInUSD(this._db, block);
this._db.saveBundle(bundle, block);
bundle.ethPriceUSD = await getEthPriceInUSD(this._db, dbTx, block);
token0.derivedETH = await findEthPerToken(token0);
token1.derivedETH = await findEthPerToken(token1);
@ -755,7 +855,7 @@ export class Indexer {
token1.totalValueLockedUSD = token1.totalValueLocked.times(token1.derivedETH).times(bundle.ethPriceUSD);
// Create Swap event
const transaction = await loadTransaction(this._db, { block, tx });
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
const swap = new Swap();
swap.id = transaction.id + '#' + pool.txCount.toString();
@ -776,13 +876,13 @@ export class Indexer {
// Skipping update pool fee growth as they are not queried.
// Interval data.
const uniswapDayData = await updateUniswapDayData(this._db, { block, contractAddress });
const poolDayData = await updatePoolDayData(this._db, { block, contractAddress });
const poolHourData = await updatePoolHourData(this._db, { block, contractAddress });
const token0DayData = await updateTokenDayData(this._db, token0, { block });
const token1DayData = await updateTokenDayData(this._db, token0, { block });
const token0HourData = await updateTokenHourData(this._db, token0, { block });
const token1HourData = await updateTokenHourData(this._db, token0, { block });
const uniswapDayData = await updateUniswapDayData(this._db, dbTx, { block, contractAddress });
const poolDayData = await updatePoolDayData(this._db, dbTx, { block, contractAddress });
const poolHourData = await updatePoolHourData(this._db, dbTx, { block, contractAddress });
const token0DayData = await updateTokenDayData(this._db, dbTx, token0, { block });
const token1DayData = await updateTokenDayData(this._db, dbTx, token0, { block });
const token0HourData = await updateTokenHourData(this._db, dbTx, token0, { block });
const token1HourData = await updateTokenHourData(this._db, dbTx, token0, { block });
// Update volume metrics.
uniswapDayData.volumeETH = uniswapDayData.volumeETH.plus(amountTotalETHTracked);
@ -819,21 +919,29 @@ export class Indexer {
token1HourData.untrackedVolumeUSD = token1HourData.untrackedVolumeUSD.plus(amountTotalUSDTracked);
token1HourData.feesUSD = token1HourData.feesUSD.plus(feesUSD);
await this._db.saveSwap(swap, block);
await this._db.saveTokenDayData(token0DayData, block);
await this._db.saveTokenDayData(token1DayData, block);
await this._db.saveUniswapDayData(uniswapDayData, block);
await this._db.savePoolDayData(poolDayData, block);
await this._db.saveFactory(factory, block);
await this._db.savePool(pool, block);
await this._db.saveToken(token0, block);
await this._db.saveToken(token1, block);
await this._db.saveBundle(dbTx, bundle, block);
await this._db.saveSwap(dbTx, swap, block);
await this._db.saveTokenDayData(dbTx, token0DayData, block);
await this._db.saveTokenDayData(dbTx, token1DayData, block);
await this._db.saveUniswapDayData(dbTx, uniswapDayData, block);
await this._db.savePoolDayData(dbTx, poolDayData, block);
await this._db.saveFactory(dbTx, factory, block);
await this._db.savePool(dbTx, pool, block);
await this._db.saveToken(dbTx, token0, block);
await this._db.saveToken(dbTx, token1, block);
// Skipping update of inner vars of current or crossed ticks as they are not queried.
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleIncreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: IncreaseLiquidityEvent): Promise<void> {
const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId));
let position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId));
// position was not able to be fetched.
if (position === null) {
@ -845,6 +953,16 @@ export class Indexer {
return;
}
await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
const dbTx = await this._db.createTransactionRunner();
try {
if (!position.transaction) {
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
position.transaction = transaction;
position = await this._db.savePosition(dbTx, position, block);
}
const token0 = position.token0;
const token1 = position.token1;
@ -855,11 +973,16 @@ export class Indexer {
position.depositedToken0 = position.depositedToken0.plus(amount0);
position.depositedToken1 = position.depositedToken1.plus(amount1);
await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
await this._db.savePosition(dbTx, position, block);
await this._db.savePosition(position, block);
await this._savePositionSnapshot(position, block, tx);
await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleDecreaseLiquidity (block: Block, contractAddress: string, tx: Transaction, event: DecreaseLiquidityEvent): Promise<void> {
@ -875,6 +998,16 @@ export class Indexer {
return;
}
position = await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
const dbTx = await this._db.createTransactionRunner();
try {
if (!position.transaction) {
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
position.transaction = transaction;
position = await this._db.savePosition(dbTx, position, block);
}
const token0 = position.token0;
const token1 = position.token1;
const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
@ -884,11 +1017,16 @@ export class Indexer {
position.depositedToken0 = position.depositedToken0.plus(amount0);
position.depositedToken1 = position.depositedToken1.plus(amount1);
position = await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
await this._db.savePosition(dbTx, position, block);
await this._db.savePosition(position, block);
await this._savePositionSnapshot(position, block, tx);
await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleCollect (block: Block, contractAddress: string, tx: Transaction, event: CollectEvent): Promise<void> {
@ -904,6 +1042,16 @@ export class Indexer {
return;
}
position = await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
const dbTx = await this._db.createTransactionRunner();
try {
if (!position.transaction) {
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
position.transaction = transaction;
position = await this._db.savePosition(dbTx, position, block);
}
const token0 = position.token0;
const token1 = position.token1;
const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
@ -911,24 +1059,45 @@ export class Indexer {
position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0);
position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1);
position = await this._updateFeeVars(position, block, contractAddress, BigInt(event.tokenId));
await this._db.savePosition(dbTx, position, block);
await this._db.savePosition(position, block);
await this._savePositionSnapshot(position, block, tx);
await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handleTransfer (block: Block, contractAddress: string, tx: Transaction, event: TransferEvent): Promise<void> {
const position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId));
let position = await this._getPosition(block, contractAddress, tx, BigInt(event.tokenId));
// Position was not able to be fetched.
if (position === null) {
return;
}
position.owner = event.to;
await this._db.savePosition(position, block);
const dbTx = await this._db.createTransactionRunner();
await this._savePositionSnapshot(position, block, tx);
try {
if (!position.transaction) {
const transaction = await loadTransaction(this._db, dbTx, { block, tx });
position.transaction = transaction;
position = await this._db.savePosition(dbTx, position, block);
}
position.owner = event.to;
await this._db.savePosition(dbTx, position, block);
await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _getPosition (block: Block, contractAddress: string, tx: Transaction, tokenId: bigint): Promise<Position | null> {
@ -950,33 +1119,28 @@ export class Indexer {
position = new Position();
position.id = tokenId.toString();
const pool = await this._db.getPool({ id: poolAddress, blockHash });
const pool = await this._db.getPoolNoTx({ id: poolAddress, blockHash });
assert(pool);
position.pool = pool;
const [token0, token1] = await Promise.all([
this._db.getToken({ id: token0Address, blockHash }),
this._db.getToken({ id: token0Address, blockHash })
this._db.getTokenNoTx({ id: token0Address, blockHash }),
this._db.getTokenNoTx({ id: token0Address, blockHash })
]);
assert(token0 && token1);
position.token0 = token0;
position.token1 = token1;
const [tickLower, tickUpper] = await Promise.all([
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockHash }),
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockHash })
this._db.getTickNoTx({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockHash }),
this._db.getTickNoTx({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockHash })
]);
assert(tickLower && tickUpper);
position.tickLower = tickLower;
position.tickUpper = tickUpper;
const transaction = await loadTransaction(this._db, { block, tx });
position.transaction = transaction;
position.feeGrowthInside0LastX128 = BigInt(nfpmPosition.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(nfpmPosition.feeGrowthInside1LastX128.toString());
position = await this._db.savePosition(position, block);
}
}
@ -994,7 +1158,7 @@ export class Indexer {
return position;
}
async _savePositionSnapshot (position: Position, block: Block, tx: Transaction): Promise<void> {
async _savePositionSnapshot (dbTx: QueryRunner, position: Position, block: Block, tx: Transaction): Promise<void> {
const positionSnapshot = new PositionSnapshot();
positionSnapshot.id = position.id.concat('#').concat(block.number.toString());
positionSnapshot.blockNumber = block.number;
@ -1009,10 +1173,10 @@ export class Indexer {
positionSnapshot.withdrawnToken1 = position.withdrawnToken1;
positionSnapshot.collectedFeesToken0 = position.collectedFeesToken0;
positionSnapshot.collectedFeesToken1 = position.collectedFeesToken1;
positionSnapshot.transaction = await loadTransaction(this._db, { block, tx });
positionSnapshot.transaction = await loadTransaction(this._db, dbTx, { block, tx });
positionSnapshot.feeGrowthInside0LastX128 = position.feeGrowthInside0LastX128;
positionSnapshot.feeGrowthInside1LastX128 = position.feeGrowthInside1LastX128;
await this._db.savePositionSnapshot(positionSnapshot, block);
await this._db.savePositionSnapshot(dbTx, positionSnapshot, block);
}
}

View File

@ -1,5 +1,6 @@
import Decimal from 'decimal.js';
import { BigNumber } from 'ethers';
import { QueryRunner } from 'typeorm';
import { Transaction as TransactionEntity } from '../entity/Transaction';
import { Database } from '../database';
@ -23,9 +24,9 @@ export const convertTokenToDecimal = (tokenAmount: bigint, exchangeDecimals: big
return (new Decimal(tokenAmount.toString())).div(exponentToBigDecimal(exchangeDecimals));
};
export const loadTransaction = async (db: Database, event: { block: Block, tx: Transaction }): Promise<TransactionEntity> => {
export const loadTransaction = async (db: Database, dbTx: QueryRunner, event: { block: Block, tx: Transaction }): Promise<TransactionEntity> => {
const { tx, block } = event;
let transaction = await db.getTransaction({ id: tx.hash, blockHash: block.hash });
let transaction = await db.getTransaction(dbTx, { id: tx.hash, blockHash: block.hash });
if (!transaction) {
transaction = new TransactionEntity();
@ -35,7 +36,7 @@ export const loadTransaction = async (db: Database, event: { block: Block, tx: T
transaction.blockNumber = block.number;
transaction.timestamp = BigInt(block.timestamp);
return db.saveTransaction(transaction, block);
return db.saveTransaction(dbTx, transaction, block);
};
// Return 0 if denominator is 0 in division.

View File

@ -1,5 +1,6 @@
import assert from 'assert';
import { BigNumber } from 'ethers';
import { QueryRunner } from 'typeorm';
import { Database } from '../database';
import { Factory } from '../entity/Factory';
@ -16,17 +17,17 @@ import { Block } from '../events';
* @param db
* @param event
*/
export const updateUniswapDayData = async (db: Database, event: { contractAddress: string, block: Block }): Promise<UniswapDayData> => {
export const updateUniswapDayData = async (db: Database, dbTx: QueryRunner, event: { contractAddress: string, block: Block }): Promise<UniswapDayData> => {
const { block } = event;
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await db.getEntities(Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await db.getEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const dayID = Math.floor(block.timestamp / 86400); // Rounded.
const dayStartTimestamp = dayID * 86400;
let uniswapDayData = await db.getUniswapDayData({ id: dayID.toString(), blockHash: block.hash });
let uniswapDayData = await db.getUniswapDayData(dbTx, { id: dayID.toString(), blockHash: block.hash });
if (!uniswapDayData) {
uniswapDayData = new UniswapDayData();
@ -38,10 +39,10 @@ export const updateUniswapDayData = async (db: Database, event: { contractAddres
uniswapDayData.tvlUSD = factory.totalValueLockedUSD;
uniswapDayData.txCount = factory.txCount;
return db.saveUniswapDayData(uniswapDayData, block);
return db.saveUniswapDayData(dbTx, uniswapDayData, block);
};
export const updatePoolDayData = async (db: Database, event: { contractAddress: string, block: Block }): Promise<PoolDayData> => {
export const updatePoolDayData = async (db: Database, dbTx: QueryRunner, event: { contractAddress: string, block: Block }): Promise<PoolDayData> => {
const { contractAddress, block } = event;
const dayID = Math.floor(block.timestamp / 86400);
const dayStartTimestamp = dayID * 86400;
@ -50,10 +51,10 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress:
.concat('-')
.concat(dayID.toString());
const pool = await db.getPool({ id: contractAddress, blockHash: block.hash });
const pool = await db.getPool(dbTx, { id: contractAddress, blockHash: block.hash });
assert(pool);
let poolDayData = await db.getPoolDayData({ id: dayPoolID, blockHash: block.hash });
let poolDayData = await db.getPoolDayData(dbTx, { id: dayPoolID, blockHash: block.hash });
if (!poolDayData) {
poolDayData = new PoolDayData();
@ -64,7 +65,7 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress:
poolDayData.high = pool.token0Price;
poolDayData.low = pool.token0Price;
poolDayData.close = pool.token0Price;
poolDayData = await db.savePoolDayData(poolDayData, block);
poolDayData = await db.savePoolDayData(dbTx, poolDayData, block);
}
if (Number(pool.token0Price) > Number(poolDayData.high)) {
@ -84,12 +85,12 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress:
poolDayData.tick = pool.tick;
poolDayData.tvlUSD = pool.totalValueLockedUSD;
poolDayData.txCount = BigInt(BigNumber.from(poolDayData.txCount).add(1).toHexString());
poolDayData = await db.savePoolDayData(poolDayData, block);
poolDayData = await db.savePoolDayData(dbTx, poolDayData, block);
return poolDayData;
};
export const updatePoolHourData = async (db: Database, event: { contractAddress: string, block: Block }): Promise<PoolHourData> => {
export const updatePoolHourData = async (db: Database, dbTx: QueryRunner, event: { contractAddress: string, block: Block }): Promise<PoolHourData> => {
const { contractAddress, block } = event;
const hourIndex = Math.floor(block.timestamp / 3600); // Get unique hour within unix history.
const hourStartUnix = hourIndex * 3600; // Want the rounded effect.
@ -98,10 +99,10 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress:
.concat('-')
.concat(hourIndex.toString());
const pool = await db.getPool({ id: contractAddress, blockHash: block.hash });
const pool = await db.getPool(dbTx, { id: contractAddress, blockHash: block.hash });
assert(pool);
let poolHourData = await db.getPoolHourData({ id: hourPoolID, blockHash: block.hash });
let poolHourData = await db.getPoolHourData(dbTx, { id: hourPoolID, blockHash: block.hash });
if (!poolHourData) {
poolHourData = new PoolHourData();
@ -112,7 +113,7 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress:
poolHourData.high = pool.token0Price;
poolHourData.low = pool.token0Price;
poolHourData.close = pool.token0Price;
poolHourData = await db.savePoolHourData(poolHourData, block);
poolHourData = await db.savePoolHourData(dbTx, poolHourData, block);
}
if (Number(pool.token0Price) > Number(poolHourData.high)) {
@ -132,14 +133,14 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress:
poolHourData.tick = pool.tick;
poolHourData.tvlUSD = pool.totalValueLockedUSD;
poolHourData.txCount = BigInt(BigNumber.from(poolHourData.txCount).add(1).toHexString());
poolHourData = await db.savePoolHourData(poolHourData, block);
poolHourData = await db.savePoolHourData(dbTx, poolHourData, block);
return poolHourData;
};
export const updateTokenDayData = async (db: Database, token: Token, event: { block: Block }): Promise<TokenDayData> => {
export const updateTokenDayData = async (db: Database, dbTx: QueryRunner, token: Token, event: { block: Block }): Promise<TokenDayData> => {
const { block } = event;
const bundle = await db.getBundle({ id: '1', blockHash: block.hash });
const bundle = await db.getBundle(dbTx, { id: '1', blockHash: block.hash });
assert(bundle);
const dayID = Math.floor(block.timestamp / 86400);
const dayStartTimestamp = dayID * 86400;
@ -150,7 +151,7 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl
const tokenPrice = token.derivedETH.times(bundle.ethPriceUSD);
let tokenDayData = await db.getTokenDayData({ id: tokenDayID, blockHash: block.hash });
let tokenDayData = await db.getTokenDayData(dbTx, { id: tokenDayID, blockHash: block.hash });
if (!tokenDayData) {
tokenDayData = new TokenDayData();
@ -178,12 +179,12 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl
tokenDayData.priceUSD = token.derivedETH.times(bundle.ethPriceUSD);
tokenDayData.totalValueLocked = token.totalValueLocked;
tokenDayData.totalValueLockedUSD = token.totalValueLockedUSD;
return db.saveTokenDayData(tokenDayData, block);
return db.saveTokenDayData(dbTx, tokenDayData, block);
};
export const updateTokenHourData = async (db: Database, token: Token, event: { block: Block }): Promise<TokenHourData> => {
export const updateTokenHourData = async (db: Database, dbTx: QueryRunner, token: Token, event: { block: Block }): Promise<TokenHourData> => {
const { block } = event;
const bundle = await db.getBundle({ id: '1', blockHash: block.hash });
const bundle = await db.getBundle(dbTx, { id: '1', blockHash: block.hash });
assert(bundle);
const hourIndex = Math.floor(block.timestamp / 3600); // Get unique hour within unix history.
const hourStartUnix = hourIndex * 3600; // Want the rounded effect.
@ -194,7 +195,7 @@ export const updateTokenHourData = async (db: Database, token: Token, event: { b
const tokenPrice = token.derivedETH.times(bundle.ethPriceUSD);
let tokenHourData = await db.getTokenHourData({ id: tokenHourID, blockHash: block.hash });
let tokenHourData = await db.getTokenHourData(dbTx, { id: tokenHourID, blockHash: block.hash });
if (!tokenHourData) {
tokenHourData = new TokenHourData();
@ -222,5 +223,5 @@ export const updateTokenHourData = async (db: Database, token: Token, event: { b
tokenHourData.priceUSD = tokenPrice;
tokenHourData.totalValueLocked = token.totalValueLocked;
tokenHourData.totalValueLockedUSD = token.totalValueLockedUSD;
return db.saveTokenHourData(tokenHourData, block);
return db.saveTokenHourData(dbTx, tokenHourData, block);
};

View File

@ -1,6 +1,7 @@
import assert from 'assert';
import Decimal from 'decimal.js';
import { BigNumber } from 'ethers';
import { QueryRunner } from 'typeorm';
import { exponentToBigDecimal, safeDiv } from '.';
import { Database } from '../database';
@ -55,9 +56,9 @@ export const sqrtPriceX96ToTokenPrices = (sqrtPriceX96: bigint, token0: Token, t
return [price0, price1];
};
export const getEthPriceInUSD = async (db: Database, block: Block): Promise<Decimal> => {
export const getEthPriceInUSD = async (db: Database, dbTx: QueryRunner, block: Block): Promise<Decimal> => {
// Fetch eth prices for each stablecoin.
const usdcPool = await db.getPool({ id: USDC_WETH_03_POOL, blockHash: block.hash }); // DAI is token0.
const usdcPool = await db.getPool(dbTx, { id: USDC_WETH_03_POOL, blockHash: block.hash }); // DAI is token0.
if (usdcPool) {
return usdcPool.token0Price;
@ -122,12 +123,13 @@ export const findEthPerToken = async (token: Token): Promise<Decimal> => {
*/
export const getTrackedAmountUSD = async (
db: Database,
dbTx: QueryRunner,
tokenAmount0: Decimal,
token0: Token,
tokenAmount1: Decimal,
token1: Token
): Promise<Decimal> => {
const bundle = await db.getBundle({ id: '1' });
const bundle = await db.getBundle(dbTx, { id: '1' });
assert(bundle);
const price0USD = token0.derivedETH.times(bundle.ethPriceUSD);
const price1USD = token1.derivedETH.times(bundle.ethPriceUSD);

View File

@ -1,4 +1,5 @@
import Decimal from 'decimal.js';
import { QueryRunner } from 'typeorm';
import { Pool } from '../entity/Pool';
import { Database } from '../database';
@ -6,7 +7,7 @@ import { bigDecimalExponated, safeDiv } from '.';
import { Tick } from '../entity/Tick';
import { Block } from '../events';
export const createTick = async (db: Database, tickId: string, tickIdx: bigint, pool: Pool, block: Block): Promise<Tick> => {
export const createTick = async (db: Database, dbTx: QueryRunner, tickId: string, tickIdx: bigint, pool: Pool, block: Block): Promise<Tick> => {
const tick = new Tick();
tick.id = tickId;
tick.tickIdx = tickIdx;
@ -19,5 +20,5 @@ export const createTick = async (db: Database, tickId: string, tickIdx: bigint,
tick.price0 = price0;
tick.price1 = safeDiv(new Decimal(1), price0);
return db.saveTick(tick, block);
return db.saveTick(dbTx, tick, block);
};