mirror of
https://github.com/cerc-io/watcher-ts
synced 2024-11-19 20:36:19 +00:00
Get/create entity based on block hash and parent traversal (#163)
* Watch all blocks and fetch events for each block. * Implement getPrevVersionEntity in database. * Implement getPrevVersionEntity for PoolCreated event. * Implement getPrevEntityVersion for Pool events. * Implement getPrevEntityVersion for NFPM events. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
f8335aad03
commit
a5d6bfa285
@ -63,6 +63,10 @@ export class EthClient {
|
||||
return this._graphqlClient.query(ethQueries.getBlockWithTransactions, { blockNumber });
|
||||
}
|
||||
|
||||
async getBlockByHash (blockHash: string): Promise<any> {
|
||||
return this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
|
||||
}
|
||||
|
||||
async getLogs (vars: Vars): Promise<any> {
|
||||
const result = await this._getCachedOrFetch('getLogs', vars);
|
||||
const {
|
||||
|
@ -55,6 +55,20 @@ query allEthHeaderCids($blockNumber: BigInt) {
|
||||
}
|
||||
`;
|
||||
|
||||
export const getBlockByHash = gql`
|
||||
query allEthHeaderCids($blockHash: Bytes32) {
|
||||
allEthHeaderCids(condition: { blockHash: $blockHash }) {
|
||||
nodes {
|
||||
cid
|
||||
blockNumber
|
||||
blockHash
|
||||
parentHash
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
export const subscribeLogs = gql`
|
||||
subscription SubscriptionReceipt {
|
||||
listen(topic: "receipt_cids") {
|
||||
@ -88,6 +102,7 @@ subscription {
|
||||
blockHash
|
||||
blockNumber
|
||||
parentHash
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -115,6 +130,7 @@ export default {
|
||||
getStorageAt,
|
||||
getLogs,
|
||||
getBlockWithTransactions,
|
||||
getBlockByHash,
|
||||
subscribeLogs,
|
||||
subscribeBlocks,
|
||||
subscribeTransactions
|
||||
|
@ -1,6 +1,7 @@
|
||||
import assert from 'assert';
|
||||
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, LessThanOrEqual } from 'typeorm';
|
||||
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, Repository } from 'typeorm';
|
||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
|
||||
import { MAX_REORG_DEPTH } from '@vulcanize/util';
|
||||
|
||||
import { EventSyncProgress } from './entity/EventProgress';
|
||||
import { Factory } from './entity/Factory';
|
||||
@ -22,6 +23,7 @@ import { Position } from './entity/Position';
|
||||
import { PositionSnapshot } from './entity/PositionSnapshot';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { Block } from './events';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
|
||||
export class Database {
|
||||
_config: ConnectionOptions
|
||||
@ -45,54 +47,63 @@ export class Database {
|
||||
return this._conn.close();
|
||||
}
|
||||
|
||||
async getFactory ({ id, blockNumber }: DeepPartial<Factory>): Promise<Factory | undefined> {
|
||||
async getFactory ({ id, blockHash }: DeepPartial<Factory>): Promise<Factory | undefined> {
|
||||
const repo = this._conn.getRepository(Factory);
|
||||
|
||||
const whereOptions: FindConditions<Factory> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Factory> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Factory>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getBundle ({ id, blockNumber }: DeepPartial<Bundle>): Promise<Bundle | undefined> {
|
||||
async getBundle ({ id, blockHash }: DeepPartial<Bundle>): Promise<Bundle | undefined> {
|
||||
const repo = this._conn.getRepository(Bundle);
|
||||
|
||||
const whereOptions: FindConditions<Bundle> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Bundle> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
}
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Bundle>);
|
||||
|
||||
async getToken ({ id, blockNumber }: DeepPartial<Token>): Promise<Token | undefined> {
|
||||
const repo = this._conn.getRepository(Token);
|
||||
|
||||
const whereOptions: FindConditions<Token> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Token> = {
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getToken ({ id, blockHash }: DeepPartial<Token>): Promise<Token | undefined> {
|
||||
const repo = this._conn.getRepository(Token);
|
||||
const whereOptions: FindConditions<Token> = { id };
|
||||
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
relations: ['whitelistPools', 'whitelistPools.token0', 'whitelistPools.token1'],
|
||||
order: {
|
||||
@ -100,18 +111,24 @@ export class Database {
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Token>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getPool ({ id, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> {
|
||||
async getPool ({ id, blockHash }: DeepPartial<Pool>): Promise<Pool | undefined> {
|
||||
const repo = this._conn.getRepository(Pool);
|
||||
const whereOptions: FindConditions<Pool> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Pool> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
relations: ['token0', 'token1'],
|
||||
order: {
|
||||
@ -119,18 +136,24 @@ export class Database {
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Pool>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getPosition ({ id, blockNumber }: DeepPartial<Position>): Promise<Position | undefined> {
|
||||
async getPosition ({ id, blockHash }: DeepPartial<Position>): Promise<Position | undefined> {
|
||||
const repo = this._conn.getRepository(Position);
|
||||
const whereOptions: FindConditions<Position> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Position> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
relations: ['pool', 'token0', 'token1', 'tickLower', 'tickUpper', 'transaction'],
|
||||
order: {
|
||||
@ -138,18 +161,24 @@ export class Database {
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Position>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getTick ({ id, blockNumber }: DeepPartial<Tick>): Promise<Tick | undefined> {
|
||||
async getTick ({ id, blockHash }: DeepPartial<Tick>): Promise<Tick | undefined> {
|
||||
const repo = this._conn.getRepository(Tick);
|
||||
const whereOptions: FindConditions<Tick> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Tick> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
relations: ['pool'],
|
||||
order: {
|
||||
@ -157,118 +186,160 @@ export class Database {
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Tick>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getPoolDayData ({ id, blockNumber }: DeepPartial<PoolDayData>): Promise<PoolDayData | undefined> {
|
||||
async getPoolDayData ({ id, blockHash }: DeepPartial<PoolDayData>): Promise<PoolDayData | undefined> {
|
||||
const repo = this._conn.getRepository(PoolDayData);
|
||||
const whereOptions: FindConditions<PoolDayData> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<PoolDayData> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<PoolDayData>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getPoolHourData ({ id, blockNumber }: DeepPartial<PoolHourData>): Promise<PoolHourData | undefined> {
|
||||
async getPoolHourData ({ id, blockHash }: DeepPartial<PoolHourData>): Promise<PoolHourData | undefined> {
|
||||
const repo = this._conn.getRepository(PoolHourData);
|
||||
const whereOptions: FindConditions<PoolHourData> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<PoolHourData> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<PoolHourData>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getUniswapDayData ({ id, blockNumber }: DeepPartial<UniswapDayData>): Promise<UniswapDayData | undefined> {
|
||||
async getUniswapDayData ({ id, blockHash }: DeepPartial<UniswapDayData>): Promise<UniswapDayData | undefined> {
|
||||
const repo = this._conn.getRepository(UniswapDayData);
|
||||
const whereOptions: FindConditions<UniswapDayData> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<UniswapDayData> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<UniswapDayData>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getTokenDayData ({ id, blockNumber }: DeepPartial<TokenDayData>): Promise<TokenDayData | undefined> {
|
||||
async getTokenDayData ({ id, blockHash }: DeepPartial<TokenDayData>): Promise<TokenDayData | undefined> {
|
||||
const repo = this._conn.getRepository(TokenDayData);
|
||||
const whereOptions: FindConditions<TokenDayData> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<TokenDayData> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<TokenDayData>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getTokenHourData ({ id, blockNumber }: DeepPartial<TokenHourData>): Promise<TokenHourData | undefined> {
|
||||
async getTokenHourData ({ id, blockHash }: DeepPartial<TokenHourData>): Promise<TokenHourData | undefined> {
|
||||
const repo = this._conn.getRepository(TokenHourData);
|
||||
const whereOptions: FindConditions<TokenHourData> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<TokenHourData> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<TokenHourData>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getTransaction ({ id, blockNumber }: DeepPartial<Transaction>): Promise<Transaction | undefined> {
|
||||
async getTransaction ({ id, blockHash }: DeepPartial<Transaction>): Promise<Transaction | undefined> {
|
||||
const repo = this._conn.getRepository(Transaction);
|
||||
const whereOptions: FindConditions<Transaction> = { id };
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions: FindOneOptions<Transaction> = {
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
let entity = await repo.findOne(findOptions as FindOneOptions<Transaction>);
|
||||
|
||||
if (!entity && findOptions.where.blockHash) {
|
||||
entity = await this._getPrevEntityVersion(repo, findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async getFactories ({ blockNumber }: DeepPartial<Factory>, queryOptions: { [key: string]: any }): Promise<Array<Factory>> {
|
||||
async getFactories ({ blockHash }: DeepPartial<Factory>, queryOptions: { [key: string]: any }): Promise<Array<Factory>> {
|
||||
const repo = this._conn.getRepository(Factory);
|
||||
|
||||
let selectQueryBuilder = repo.createQueryBuilder('factory')
|
||||
@ -276,8 +347,12 @@ export class Database {
|
||||
.orderBy('id')
|
||||
.addOrderBy('block_number', 'DESC');
|
||||
|
||||
if (blockNumber) {
|
||||
selectQueryBuilder = selectQueryBuilder.where('block_number <= :blockNumber', { blockNumber });
|
||||
if (blockHash) {
|
||||
const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(blockHash);
|
||||
|
||||
selectQueryBuilder = selectQueryBuilder
|
||||
.where('block_hash IN (:...blockHashes)', { blockHashes })
|
||||
.orWhere('block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
|
||||
}
|
||||
|
||||
const { limit } = queryOptions;
|
||||
@ -525,6 +600,32 @@ export class Database {
|
||||
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
|
||||
}
|
||||
|
||||
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(SyncStatus);
|
||||
|
||||
let entity = await repo.findOne();
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
latestCanonicalBlockHash: blockHash,
|
||||
latestCanonicalBlockNumber: blockNumber
|
||||
});
|
||||
}
|
||||
|
||||
if (blockNumber >= entity.latestCanonicalBlockNumber) {
|
||||
entity.chainHeadBlockHash = blockHash;
|
||||
entity.chainHeadBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return await repo.save(entity);
|
||||
});
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
const repo = this._conn.getRepository(SyncStatus);
|
||||
return repo.findOne();
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
return repo.findOne({ where: { blockHash } });
|
||||
@ -543,4 +644,37 @@ export class Database {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async _getPrevEntityVersion<Entity> (repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
|
||||
assert(findOptions.order.blockNumber);
|
||||
const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(findOptions.where.blockHash);
|
||||
findOptions.where.blockHash = In(blockHashes)
|
||||
let entity = await repo.findOne(findOptions);
|
||||
|
||||
if (!entity) {
|
||||
delete findOptions.where.blockHash;
|
||||
findOptions.where.blockNumber = LessThanOrEqual(canonicalBlockNumber);
|
||||
entity = await repo.findOne(findOptions);
|
||||
}
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
async _getBranchInfo (blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
|
||||
const blockRepo = this._conn.getRepository(BlockProgress);
|
||||
let block = await blockRepo.findOne({ blockHash });
|
||||
assert(block);
|
||||
const blockHashes = [blockHash];
|
||||
|
||||
// TODO: Should be calcualted from chainHeadBlockNumber?
|
||||
const canonicalBlockNumber = block.blockNumber - MAX_REORG_DEPTH;
|
||||
|
||||
while (block.blockNumber > canonicalBlockNumber) {
|
||||
blockHashes.push(block.parentHash);
|
||||
block = await blockRepo.findOne({ blockHash: block.parentHash });
|
||||
assert(block);
|
||||
}
|
||||
|
||||
return { canonicalBlockNumber, blockHashes };
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ export class Bundle {
|
||||
@PrimaryColumn('varchar', { length: 1 })
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -11,6 +11,7 @@ export class Burn {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -7,6 +7,7 @@ export class Factory {
|
||||
@PrimaryColumn('varchar', { length: 42 })
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -11,6 +11,7 @@ export class Mint {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -9,6 +9,7 @@ export class Pool {
|
||||
@PrimaryColumn('varchar', { length: 42 })
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -9,6 +9,7 @@ export class PoolDayData {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -9,6 +9,7 @@ export class PoolHourData {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -13,6 +13,7 @@ export class Position {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -12,6 +12,7 @@ export class PositionSnapshot {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -11,6 +11,7 @@ export class Swap {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
23
packages/uni-info-watcher/src/entity/SyncStatus.ts
Normal file
23
packages/uni-info-watcher/src/entity/SyncStatus.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
|
||||
|
||||
@Entity()
|
||||
export class SyncStatus {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
// Latest block hash and number from the chain itself.
|
||||
@Column('varchar', { length: 66 })
|
||||
chainHeadBlockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
chainHeadBlockNumber!: number;
|
||||
|
||||
// Most recent block hash and number that we can consider as part
|
||||
// of the canonical/finalized chain. Reorgs older than this block
|
||||
// cannot be processed and processing will halt.
|
||||
@Column('varchar', { length: 66 })
|
||||
latestCanonicalBlockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
latestCanonicalBlockNumber!: number;
|
||||
}
|
@ -9,6 +9,7 @@ export class Tick {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -9,6 +9,7 @@ export class Token {
|
||||
@PrimaryColumn('varchar', { length: 42 })
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -9,6 +9,7 @@ export class TokenDayData {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -9,6 +9,7 @@ export class TokenHourData {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -11,6 +11,7 @@ export class Transaction {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -7,6 +7,7 @@ export class UniswapDayData {
|
||||
@PrimaryColumn('varchar')
|
||||
id!: string;
|
||||
|
||||
// https://typeorm.io/#/entities/primary-columns
|
||||
@PrimaryColumn('varchar', { length: 66 })
|
||||
blockHash!: string
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||
import _ from 'lodash';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
|
||||
import { JobQueue } from '../../util';
|
||||
import { Indexer } from './indexer';
|
||||
|
||||
@ -112,12 +114,12 @@ export const QUEUE_BLOCK_PROCESSING = 'block-processing';
|
||||
|
||||
export class EventWatcher {
|
||||
_subscription?: ZenObservable.Subscription
|
||||
_uniClient: UniClient
|
||||
_ethClient: EthClient
|
||||
_jobQueue: JobQueue
|
||||
_indexer: Indexer
|
||||
|
||||
constructor (indexer: Indexer, uniClient: UniClient, jobQueue: JobQueue) {
|
||||
this._uniClient = uniClient;
|
||||
constructor (indexer: Indexer, ethClient: EthClient, jobQueue: JobQueue) {
|
||||
this._ethClient = ethClient;
|
||||
this._jobQueue = jobQueue;
|
||||
this._indexer = indexer;
|
||||
}
|
||||
@ -126,21 +128,9 @@ export class EventWatcher {
|
||||
assert(!this._subscription, 'subscription already started');
|
||||
log('Started watching upstream events...');
|
||||
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { request: { data: { block } } } } = job;
|
||||
log(`Job onComplete block ${block.hash} ${block.number}`);
|
||||
});
|
||||
|
||||
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const { data: { request } } = job;
|
||||
|
||||
log(`Job onComplete event ${request.data.id}`);
|
||||
});
|
||||
|
||||
this._subscription = await this._uniClient.watchEvents(async ({ block }: ResultEvent) => {
|
||||
log('watchEvent', block.hash, block.number);
|
||||
return this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { block });
|
||||
});
|
||||
await this._initBlockProcessingOnCompleteHandler();
|
||||
await this._initEventProcessingOnCompleteHandler();
|
||||
await this._watchBlocksAtChainHead();
|
||||
}
|
||||
|
||||
async stop (): Promise<void> {
|
||||
@ -149,4 +139,44 @@ export class EventWatcher {
|
||||
this._subscription.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
async _watchBlocksAtChainHead (): Promise<void> {
|
||||
log('Started watching upstream blocks...');
|
||||
this._subscription = await this._ethClient.watchBlocks(async (value) => {
|
||||
const { blockHash, blockNumber, parentHash, timestamp } = _.get(value, 'data.listen.relatedNode');
|
||||
|
||||
await this._indexer.updateSyncStatus(blockHash, blockNumber);
|
||||
|
||||
log('watchBlock', blockHash, blockNumber);
|
||||
|
||||
const block = {
|
||||
hash: blockHash,
|
||||
number: blockNumber,
|
||||
parentHash,
|
||||
timestamp
|
||||
};
|
||||
|
||||
await this._jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { block });
|
||||
});
|
||||
}
|
||||
|
||||
async _initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { request: { data: { block } } } } = job;
|
||||
log(`Job onComplete block ${block.hash} ${block.number}`);
|
||||
});
|
||||
}
|
||||
|
||||
async _initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const { data: { request } } = job;
|
||||
|
||||
const dbEvent = await this._indexer.getEvent(request.data.id);
|
||||
assert(dbEvent);
|
||||
|
||||
await this._indexer.updateBlockProgress(dbEvent.block.blockHash);
|
||||
|
||||
log(`Job onComplete event ${request.data.id}`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import JSONbig from 'json-bigint';
|
||||
import { utils } from 'ethers';
|
||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
|
||||
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
|
||||
import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
|
||||
@ -23,6 +24,8 @@ import { Mint } from './entity/Mint';
|
||||
import { Burn } from './entity/Burn';
|
||||
import { Swap } from './entity/Swap';
|
||||
import { PositionSnapshot } from './entity/PositionSnapshot';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
|
||||
const log = debug('vulcanize:indexer');
|
||||
|
||||
@ -37,14 +40,18 @@ export class Indexer {
|
||||
_db: Database
|
||||
_uniClient: UniClient
|
||||
_erc20Client: ERC20Client
|
||||
_ethClient: EthClient
|
||||
|
||||
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client) {
|
||||
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient) {
|
||||
assert(db);
|
||||
assert(uniClient);
|
||||
assert(erc20Client);
|
||||
assert(ethClient);
|
||||
|
||||
this._db = db;
|
||||
this._uniClient = uniClient;
|
||||
this._erc20Client = erc20Client;
|
||||
this._ethClient = ethClient;
|
||||
}
|
||||
|
||||
getResultEvent (event: Event): ResultEvent {
|
||||
@ -96,9 +103,9 @@ export class Indexer {
|
||||
|
||||
// TODO: Process proof (proof.data) in event.
|
||||
const { contract, tx, block, event } = resultEvent;
|
||||
const { __typename: eventType } = event;
|
||||
const { __typename: eventName } = event;
|
||||
|
||||
switch (eventType) {
|
||||
switch (eventName) {
|
||||
case 'PoolCreatedEvent':
|
||||
log('Factory PoolCreated event', contract);
|
||||
await this._handlePoolCreated(block, contract, tx, event as PoolCreatedEvent);
|
||||
@ -145,14 +152,34 @@ export class Indexer {
|
||||
break;
|
||||
|
||||
default:
|
||||
log('Event not handled', eventName);
|
||||
break;
|
||||
}
|
||||
|
||||
log('Event processing completed for', eventName);
|
||||
}
|
||||
|
||||
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._db.updateSyncStatus(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
return this._db.getSyncStatus();
|
||||
}
|
||||
|
||||
async getBlock (blockHash: string): Promise<any> {
|
||||
const { block } = await this._ethClient.getLogs({ blockHash });
|
||||
return block;
|
||||
}
|
||||
|
||||
async getEvent (id: string): Promise<Event | undefined> {
|
||||
return this._db.getEvent(id);
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
return this._db.getBlockProgress(blockHash);
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string): Promise<void> {
|
||||
return this._db.updateBlockProgress(blockHash);
|
||||
}
|
||||
@ -186,7 +213,6 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise<void> {
|
||||
const { number: blockNumber } = block;
|
||||
const { token0: token0Address, token1: token1Address, fee, pool: poolAddress } = poolCreatedEvent;
|
||||
|
||||
// Temp fix from Subgraph mapping code.
|
||||
@ -195,7 +221,7 @@ export class Indexer {
|
||||
}
|
||||
|
||||
// Load factory.
|
||||
let factory = await this._db.getFactory({ blockNumber, id: contractAddress });
|
||||
let factory = await this._db.getFactory({ blockHash: block.hash, id: contractAddress });
|
||||
|
||||
if (!factory) {
|
||||
factory = new Factory();
|
||||
@ -216,8 +242,8 @@ export class Indexer {
|
||||
|
||||
// Get Tokens.
|
||||
let [token0, token1] = await Promise.all([
|
||||
this._db.getToken({ blockNumber, id: token0Address }),
|
||||
this._db.getToken({ blockNumber, id: token1Address })
|
||||
this._db.getToken({ blockHash: block.hash, id: token0Address }),
|
||||
this._db.getToken({ blockHash: block.hash, id: token1Address })
|
||||
]);
|
||||
|
||||
// Create Tokens if not present.
|
||||
@ -274,9 +300,8 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise<void> {
|
||||
const { number: blockNumber } = block;
|
||||
const { sqrtPriceX96, tick } = initializeEvent;
|
||||
const pool = await this._db.getPool({ id: contractAddress, blockNumber });
|
||||
const pool = await this._db.getPool({ id: contractAddress, blockHash: block.hash });
|
||||
assert(pool, `Pool ${contractAddress} not found.`);
|
||||
|
||||
// Update Pool.
|
||||
@ -286,14 +311,14 @@ export class Indexer {
|
||||
|
||||
// Update token prices.
|
||||
const [token0, token1] = await Promise.all([
|
||||
this._db.getToken({ id: pool.token0.id, blockNumber }),
|
||||
this._db.getToken({ id: pool.token1.id, blockNumber })
|
||||
this._db.getToken({ id: pool.token0.id, blockHash: block.hash }),
|
||||
this._db.getToken({ id: pool.token1.id, blockHash: block.hash })
|
||||
]);
|
||||
|
||||
// Update ETH price now that prices could have changed.
|
||||
const bundle = await this._db.getBundle({ id: '1', blockNumber });
|
||||
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
|
||||
assert(bundle);
|
||||
bundle.ethPriceUSD = await getEthPriceInUSD(this._db);
|
||||
bundle.ethPriceUSD = await getEthPriceInUSD(this._db, block);
|
||||
this._db.saveBundle(bundle, block);
|
||||
|
||||
await updatePoolDayData(this._db, { contractAddress, block });
|
||||
@ -311,16 +336,15 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise<void> {
|
||||
const { number: blockNumber } = block;
|
||||
const bundle = await this._db.getBundle({ id: '1', blockNumber });
|
||||
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
|
||||
assert(bundle);
|
||||
const poolAddress = contractAddress;
|
||||
const pool = await this._db.getPool({ id: poolAddress, blockNumber });
|
||||
const pool = await this._db.getPool({ id: poolAddress, blockHash: block.hash });
|
||||
assert(pool);
|
||||
|
||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
||||
// Currently fetching first factory in database as only one exists.
|
||||
const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 });
|
||||
const [factory] = await this._db.getFactories({ blockHash: block.hash }, { limit: 1 });
|
||||
|
||||
const token0 = pool.token0;
|
||||
const token1 = pool.token1;
|
||||
@ -402,8 +426,8 @@ export class Indexer {
|
||||
const lowerTickId = poolAddress + '#' + mintEvent.tickLower.toString();
|
||||
const upperTickId = poolAddress + '#' + mintEvent.tickUpper.toString();
|
||||
|
||||
let lowerTick = await this._db.getTick({ id: lowerTickId, blockNumber });
|
||||
let upperTick = await this._db.getTick({ id: upperTickId, blockNumber });
|
||||
let lowerTick = await this._db.getTick({ id: lowerTickId, blockHash: block.hash });
|
||||
let upperTick = await this._db.getTick({ id: upperTickId, blockHash: block.hash });
|
||||
|
||||
if (!lowerTick) {
|
||||
lowerTick = await createTick(this._db, lowerTickId, BigInt(lowerTickIdx), pool, block);
|
||||
@ -448,16 +472,15 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise<void> {
|
||||
const { number: blockNumber } = block;
|
||||
const bundle = await this._db.getBundle({ id: '1', blockNumber });
|
||||
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
|
||||
assert(bundle);
|
||||
const poolAddress = contractAddress;
|
||||
const pool = await this._db.getPool({ id: poolAddress, blockNumber });
|
||||
const pool = await this._db.getPool({ id: poolAddress, blockHash: block.hash });
|
||||
assert(pool);
|
||||
|
||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
||||
// Currently fetching first factory in database as only one exists.
|
||||
const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 });
|
||||
const [factory] = await this._db.getFactories({ blockHash: block.hash }, { limit: 1 });
|
||||
|
||||
const token0 = pool.token0;
|
||||
const token1 = pool.token1;
|
||||
@ -535,8 +558,8 @@ export class Indexer {
|
||||
// Tick entities.
|
||||
const lowerTickId = poolAddress + '#' + (burnEvent.tickLower).toString();
|
||||
const upperTickId = poolAddress + '#' + (burnEvent.tickUpper).toString();
|
||||
const lowerTick = await this._db.getTick({ id: lowerTickId, blockNumber });
|
||||
const upperTick = await this._db.getTick({ id: upperTickId, blockNumber });
|
||||
const lowerTick = await this._db.getTick({ id: lowerTickId, blockHash: block.hash });
|
||||
const upperTick = await this._db.getTick({ id: upperTickId, blockHash: block.hash });
|
||||
assert(lowerTick && upperTick);
|
||||
const amount = BigInt(burnEvent.amount);
|
||||
lowerTick.liquidityGross = BigInt(lowerTick.liquidityGross) - amount;
|
||||
@ -570,15 +593,14 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async _handleSwap (block: Block, contractAddress: string, tx: Transaction, swapEvent: SwapEvent): Promise<void> {
|
||||
const { number: blockNumber } = block;
|
||||
const bundle = await this._db.getBundle({ id: '1', blockNumber });
|
||||
const bundle = await this._db.getBundle({ id: '1', blockHash: block.hash });
|
||||
assert(bundle);
|
||||
|
||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
||||
// Currently fetching first factory in database as only one exists.
|
||||
const [factory] = await this._db.getFactories({ blockNumber }, { limit: 1 });
|
||||
const [factory] = await this._db.getFactories({ blockHash: block.hash }, { limit: 1 });
|
||||
|
||||
const pool = await this._db.getPool({ id: contractAddress, blockNumber });
|
||||
const pool = await this._db.getPool({ id: contractAddress, blockHash: block.hash });
|
||||
assert(pool);
|
||||
|
||||
// Hot fix for bad pricing.
|
||||
@ -587,8 +609,8 @@ export class Indexer {
|
||||
}
|
||||
|
||||
const [token0, token1] = await Promise.all([
|
||||
this._db.getToken({ id: pool.token0.id, blockNumber }),
|
||||
this._db.getToken({ id: pool.token1.id, blockNumber })
|
||||
this._db.getToken({ id: pool.token0.id, blockHash: block.hash }),
|
||||
this._db.getToken({ id: pool.token1.id, blockHash: block.hash })
|
||||
]);
|
||||
|
||||
assert(token0 && token1, 'Pool tokens not found.');
|
||||
@ -673,7 +695,7 @@ export class Indexer {
|
||||
this._db.savePool(pool, block);
|
||||
|
||||
// Update USD pricing.
|
||||
bundle.ethPriceUSD = await getEthPriceInUSD(this._db);
|
||||
bundle.ethPriceUSD = await getEthPriceInUSD(this._db, block);
|
||||
this._db.saveBundle(bundle, block);
|
||||
token0.derivedETH = await findEthPerToken(token0);
|
||||
token1.derivedETH = await findEthPerToken(token1);
|
||||
@ -874,8 +896,8 @@ export class Indexer {
|
||||
}
|
||||
|
||||
async _getPosition (block: Block, contractAddress: string, tx: Transaction, tokenId: bigint): Promise<Position | null> {
|
||||
const { number: blockNumber, hash: blockHash } = block;
|
||||
let position = await this._db.getPosition({ id: tokenId.toString(), blockNumber });
|
||||
const { hash: blockHash } = block;
|
||||
let position = await this._db.getPosition({ id: tokenId.toString(), blockHash });
|
||||
|
||||
if (!position) {
|
||||
const nfpmPosition = await this._uniClient.getPosition(blockHash, tokenId);
|
||||
@ -892,21 +914,21 @@ export class Indexer {
|
||||
position = new Position();
|
||||
position.id = tokenId.toString();
|
||||
|
||||
const pool = await this._db.getPool({ id: poolAddress, blockNumber });
|
||||
const pool = await this._db.getPool({ id: poolAddress, blockHash });
|
||||
assert(pool);
|
||||
position.pool = pool;
|
||||
|
||||
const [token0, token1] = await Promise.all([
|
||||
this._db.getToken({ id: token0Address, blockNumber }),
|
||||
this._db.getToken({ id: token0Address, blockNumber })
|
||||
this._db.getToken({ id: token0Address, blockHash }),
|
||||
this._db.getToken({ id: token0Address, blockHash })
|
||||
]);
|
||||
assert(token0 && token1);
|
||||
position.token0 = token0;
|
||||
position.token1 = token1;
|
||||
|
||||
const [tickLower, tickUpper] = await Promise.all([
|
||||
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockNumber }),
|
||||
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockNumber })
|
||||
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickLower.toString()), blockHash }),
|
||||
this._db.getTick({ id: poolAddress.concat('#').concat(nfpmPosition.tickUpper.toString()), blockHash })
|
||||
]);
|
||||
assert(tickLower && tickUpper);
|
||||
position.tickLower = tickLower;
|
||||
|
@ -7,6 +7,8 @@ import debug from 'debug';
|
||||
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||
import { getConfig, JobQueue } from '@vulcanize/util';
|
||||
import { getCache } from '@vulcanize/cache';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { Database } from './database';
|
||||
@ -36,10 +38,17 @@ export const main = async (): Promise<any> => {
|
||||
await db.init();
|
||||
|
||||
assert(upstream, 'Missing upstream config');
|
||||
const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint }, tokenWatcher } = upstream;
|
||||
const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint }, tokenWatcher, cache: cacheConfig, ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint } } = upstream;
|
||||
assert(gqlEndpoint, 'Missing upstream uniWatcher.gqlEndpoint');
|
||||
assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint');
|
||||
|
||||
const cache = await getCache(cacheConfig);
|
||||
const ethClient = new EthClient({
|
||||
gqlEndpoint: gqlApiEndpoint,
|
||||
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
|
||||
cache
|
||||
});
|
||||
|
||||
const uniClient = new UniClient({
|
||||
gqlEndpoint,
|
||||
gqlSubscriptionEndpoint
|
||||
@ -47,7 +56,7 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const erc20Client = new ERC20Client(tokenWatcher);
|
||||
|
||||
const indexer = new Indexer(db, uniClient, erc20Client);
|
||||
const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
|
||||
|
||||
assert(jobQueueConfig, 'Missing job queue config');
|
||||
|
||||
@ -58,8 +67,49 @@ export const main = async (): Promise<any> => {
|
||||
await jobQueue.start();
|
||||
|
||||
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { data: { block } } = job;
|
||||
const { data: { block, priority } } = job;
|
||||
log(`Processing block hash ${block.hash} number ${block.number}`);
|
||||
|
||||
// Check if parent block has been processed yet, if not, push a high priority job to process that first and abort.
|
||||
// However, don't go beyond the `latestCanonicalBlockHash` from SyncStatus as we have to assume the reorg can't be that deep.
|
||||
let syncStatus = await indexer.getSyncStatus();
|
||||
if (!syncStatus) {
|
||||
syncStatus = await indexer.updateSyncStatus(block.hash, block.number);
|
||||
}
|
||||
|
||||
if (block.hash !== syncStatus.latestCanonicalBlockHash) {
|
||||
const parent = await indexer.getBlockProgress(block.parentHash);
|
||||
if (!parent) {
|
||||
const { number: parentBlockNumber, parent: { hash: grandparentHash }, timestamp: parentTimestamp } = await indexer.getBlock(block.parentHash);
|
||||
|
||||
// Create a higher priority job to index parent block and then abort.
|
||||
// We don't have to worry about aborting as this job will get retried later.
|
||||
const newPriority = (priority || 0) + 1;
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, {
|
||||
block: {
|
||||
hash: block.parentHash,
|
||||
number: parentBlockNumber,
|
||||
parentHash: grandparentHash,
|
||||
timestamp: parentTimestamp
|
||||
},
|
||||
priority: newPriority
|
||||
}, { priority: newPriority });
|
||||
|
||||
const message = `Parent block number ${parentBlockNumber} hash ${block.parentHash} of block number ${block.number} hash ${block.hash} not fetched yet, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
if (block.parentHash !== syncStatus.latestCanonicalBlockHash && !parent.isComplete) {
|
||||
// Parent block indexing needs to finish before this block can be indexed.
|
||||
const message = `Indexing incomplete for parent block number ${parent.blockNumber} hash ${block.parentHash} of block number ${block.number} hash ${block.hash}, aborting`;
|
||||
log(message);
|
||||
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
const events = await indexer.getOrFetchBlockEvents(block);
|
||||
|
||||
for (let ei = 0; ei < events.length; ei++) {
|
||||
@ -79,7 +129,6 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
if (!dbEvent.block.isComplete) {
|
||||
await indexer.processEvent(dbEvent);
|
||||
await indexer.updateBlockProgress(dbEvent.block.blockHash);
|
||||
}
|
||||
|
||||
await jobQueue.markComplete(job);
|
||||
|
@ -10,7 +10,9 @@ import { createServer } from 'http';
|
||||
|
||||
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { getConfig, JobQueue } from '@vulcanize/util';
|
||||
import { getCache } from '@vulcanize/cache';
|
||||
|
||||
import typeDefs from './schema';
|
||||
|
||||
@ -52,15 +54,23 @@ export const main = async (): Promise<any> => {
|
||||
gqlPostgraphileEndpoint
|
||||
},
|
||||
uniWatcher,
|
||||
tokenWatcher
|
||||
tokenWatcher,
|
||||
cache: cacheConfig
|
||||
} = upstream;
|
||||
|
||||
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
|
||||
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
|
||||
|
||||
const cache = await getCache(cacheConfig);
|
||||
const ethClient = new EthClient({
|
||||
gqlEndpoint: gqlApiEndpoint,
|
||||
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
|
||||
cache
|
||||
});
|
||||
|
||||
const uniClient = new UniClient(uniWatcher);
|
||||
const erc20Client = new ERC20Client(tokenWatcher);
|
||||
const indexer = new Indexer(db, uniClient, erc20Client);
|
||||
const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
|
||||
|
||||
assert(jobQueueConfig, 'Missing job queue config');
|
||||
|
||||
@ -70,7 +80,7 @@ export const main = async (): Promise<any> => {
|
||||
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag });
|
||||
await jobQueue.start();
|
||||
|
||||
const eventWatcher = new EventWatcher(indexer, uniClient, jobQueue);
|
||||
const eventWatcher = new EventWatcher(indexer, ethClient, jobQueue);
|
||||
await eventWatcher.start();
|
||||
|
||||
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer);
|
||||
|
@ -25,7 +25,7 @@ export const convertTokenToDecimal = (tokenAmount: bigint, exchangeDecimals: big
|
||||
|
||||
export const loadTransaction = async (db: Database, event: { block: Block, tx: Transaction }): Promise<TransactionEntity> => {
|
||||
const { tx, block } = event;
|
||||
let transaction = await db.getTransaction({ id: tx.hash, blockNumber: block.number });
|
||||
let transaction = await db.getTransaction({ id: tx.hash, blockHash: block.hash });
|
||||
|
||||
if (!transaction) {
|
||||
transaction = new TransactionEntity();
|
||||
|
@ -20,12 +20,12 @@ export const updateUniswapDayData = async (db: Database, event: { contractAddres
|
||||
|
||||
// TODO: In subgraph factory is fetched by hardcoded factory address.
|
||||
// Currently fetching first factory in database as only one exists.
|
||||
const [factory] = await db.getFactories({ blockNumber: block.number }, { limit: 1 });
|
||||
const [factory] = await db.getFactories({ blockHash: block.hash }, { limit: 1 });
|
||||
|
||||
const dayID = Math.floor(block.timestamp / 86400); // Rounded.
|
||||
const dayStartTimestamp = dayID * 86400;
|
||||
|
||||
let uniswapDayData = await db.getUniswapDayData({ id: dayID.toString(), blockNumber: block.number });
|
||||
let uniswapDayData = await db.getUniswapDayData({ id: dayID.toString(), blockHash: block.hash });
|
||||
|
||||
if (!uniswapDayData) {
|
||||
uniswapDayData = new UniswapDayData();
|
||||
@ -49,10 +49,10 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress:
|
||||
.concat('-')
|
||||
.concat(dayID.toString());
|
||||
|
||||
const pool = await db.getPool({ id: contractAddress, blockNumber: block.number });
|
||||
const pool = await db.getPool({ id: contractAddress, blockHash: block.hash });
|
||||
assert(pool);
|
||||
|
||||
let poolDayData = await db.getPoolDayData({ id: dayPoolID, blockNumber: block.number });
|
||||
let poolDayData = await db.getPoolDayData({ id: dayPoolID, blockHash: block.hash });
|
||||
|
||||
if (!poolDayData) {
|
||||
poolDayData = new PoolDayData();
|
||||
@ -97,10 +97,10 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress:
|
||||
.concat('-')
|
||||
.concat(hourIndex.toString());
|
||||
|
||||
const pool = await db.getPool({ id: contractAddress, blockNumber: block.number });
|
||||
const pool = await db.getPool({ id: contractAddress, blockHash: block.hash });
|
||||
assert(pool);
|
||||
|
||||
let poolHourData = await db.getPoolHourData({ id: hourPoolID, blockNumber: block.number });
|
||||
let poolHourData = await db.getPoolHourData({ id: hourPoolID, blockHash: block.hash });
|
||||
|
||||
if (!poolHourData) {
|
||||
poolHourData = new PoolHourData();
|
||||
@ -138,7 +138,7 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress:
|
||||
|
||||
export const updateTokenDayData = async (db: Database, token: Token, event: { block: Block }): Promise<TokenDayData> => {
|
||||
const { block } = event;
|
||||
const bundle = await db.getBundle({ id: '1', blockNumber: block.number });
|
||||
const bundle = await db.getBundle({ id: '1', blockHash: block.hash });
|
||||
assert(bundle);
|
||||
const dayID = Math.floor(block.timestamp / 86400);
|
||||
const dayStartTimestamp = dayID * 86400;
|
||||
@ -149,7 +149,7 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl
|
||||
|
||||
const tokenPrice = token.derivedETH.times(bundle.ethPriceUSD);
|
||||
|
||||
let tokenDayData = await db.getTokenDayData({ id: tokenDayID, blockNumber: block.number });
|
||||
let tokenDayData = await db.getTokenDayData({ id: tokenDayID, blockHash: block.hash });
|
||||
|
||||
if (!tokenDayData) {
|
||||
tokenDayData = new TokenDayData();
|
||||
@ -182,7 +182,7 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl
|
||||
|
||||
export const updateTokenHourData = async (db: Database, token: Token, event: { block: Block }): Promise<TokenHourData> => {
|
||||
const { block } = event;
|
||||
const bundle = await db.getBundle({ id: '1', blockNumber: block.number });
|
||||
const bundle = await db.getBundle({ id: '1', blockHash: block.hash });
|
||||
assert(bundle);
|
||||
const hourIndex = Math.floor(block.timestamp / 3600); // Get unique hour within unix history.
|
||||
const hourStartUnix = hourIndex * 3600; // Want the rounded effect.
|
||||
@ -193,7 +193,7 @@ export const updateTokenHourData = async (db: Database, token: Token, event: { b
|
||||
|
||||
const tokenPrice = token.derivedETH.times(bundle.ethPriceUSD);
|
||||
|
||||
let tokenHourData = await db.getTokenHourData({ id: tokenHourID, blockNumber: block.number });
|
||||
let tokenHourData = await db.getTokenHourData({ id: tokenHourID, blockHash: block.hash });
|
||||
|
||||
if (!tokenHourData) {
|
||||
tokenHourData = new TokenHourData();
|
||||
|
@ -5,6 +5,7 @@ import { BigNumber } from 'ethers';
|
||||
import { exponentToBigDecimal, safeDiv } from '.';
|
||||
import { Database } from '../database';
|
||||
import { Token } from '../entity/Token';
|
||||
import { Block } from '../events';
|
||||
|
||||
// TODO: Move constants to config.
|
||||
const WETH_ADDRESS = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2';
|
||||
@ -54,9 +55,9 @@ export const sqrtPriceX96ToTokenPrices = (sqrtPriceX96: bigint, token0: Token, t
|
||||
return [price0, price1];
|
||||
};
|
||||
|
||||
export const getEthPriceInUSD = async (db: Database): Promise<Decimal> => {
|
||||
export const getEthPriceInUSD = async (db: Database, block: Block): Promise<Decimal> => {
|
||||
// Fetch eth prices for each stablecoin.
|
||||
const usdcPool = await db.getPool({ id: USDC_WETH_03_POOL }); // DAI is token0.
|
||||
const usdcPool = await db.getPool({ id: USDC_WETH_03_POOL, blockHash: block.hash }); // DAI is token0.
|
||||
|
||||
if (usdcPool) {
|
||||
return usdcPool.token0Price;
|
||||
|
@ -1,3 +1,4 @@
|
||||
export * from './src/config';
|
||||
export * from './src/database';
|
||||
export * from './src/job-queue';
|
||||
export * from './src/constants';
|
||||
|
1
packages/util/src/constants.ts
Normal file
1
packages/util/src/constants.ts
Normal file
@ -0,0 +1 @@
|
||||
export const MAX_REORG_DEPTH = 16;
|
Loading…
Reference in New Issue
Block a user