diff --git a/packages/address-watcher/environments/local.toml b/packages/address-watcher/environments/local.toml index 8f64ab18..c6ca617b 100644 --- a/packages/address-watcher/environments/local.toml +++ b/packages/address-watcher/environments/local.toml @@ -35,4 +35,4 @@ [jobQueue] dbConnectionString = "postgres://postgres:postgres@localhost/address-watcher-job-queue" - maxCompletionLag = 300 + maxCompletionLagInSecs = 300 diff --git a/packages/address-watcher/src/fill.ts b/packages/address-watcher/src/fill.ts index bd342501..2dac033b 100644 --- a/packages/address-watcher/src/fill.ts +++ b/packages/address-watcher/src/fill.ts @@ -67,10 +67,10 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) { diff --git a/packages/address-watcher/src/job-runner.ts b/packages/address-watcher/src/job-runner.ts index 096348ae..59228ab7 100644 --- a/packages/address-watcher/src/job-runner.ts +++ b/packages/address-watcher/src/job-runner.ts @@ -59,10 +59,10 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); await jobQueue.subscribe(QUEUE_TX_TRACING, async (job) => { diff --git a/packages/address-watcher/src/server.ts b/packages/address-watcher/src/server.ts index aad84771..a7e5cb2d 100644 --- a/packages/address-watcher/src/server.ts +++ b/packages/address-watcher/src/server.ts @@ -67,11 +67,11 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue max completion lag time (seconds)'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. diff --git a/packages/ipld-eth-client/src/graphql-client.ts b/packages/ipld-eth-client/src/graphql-client.ts index 79e694c0..8a72e61f 100644 --- a/packages/ipld-eth-client/src/graphql-client.ts +++ b/packages/ipld-eth-client/src/graphql-client.ts @@ -8,7 +8,17 @@ import fetch from 'cross-fetch'; import { SubscriptionClient } from 'subscriptions-transport-ws'; import ws from 'ws'; -import { ApolloClient, NormalizedCacheObject, split, HttpLink, InMemoryCache, DocumentNode, TypedDocumentNode, from } from '@apollo/client/core'; +import { + ApolloClient, + NormalizedCacheObject, + split, + HttpLink, + InMemoryCache, + DocumentNode, + TypedDocumentNode, + from, + DefaultOptions +} from '@apollo/client/core'; import { getMainDefinition } from '@apollo/client/utilities'; import { WebSocketLink } from '@apollo/client/link/ws'; @@ -71,9 +81,19 @@ export class GraphQLClient { link = splitLink; } + const defaultOptions: DefaultOptions = { + watchQuery: { + fetchPolicy: 'no-cache' + }, + query: { + fetchPolicy: 'no-cache' + } + }; + this._client = new ApolloClient({ link, - cache: new InMemoryCache() + cache: new InMemoryCache(), + defaultOptions }); } diff --git a/packages/uni-info-watcher/environments/local.toml b/packages/uni-info-watcher/environments/local.toml index ca075378..885a22bb 100644 --- a/packages/uni-info-watcher/environments/local.toml +++ b/packages/uni-info-watcher/environments/local.toml @@ -41,5 +41,5 @@ [jobQueue] dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue" - maxCompletionLag = 300 - jobDelay = 1000 + maxCompletionLagInSecs = 300 + jobDelayInMilliSecs = 1000 diff --git a/packages/uni-info-watcher/src/client.ts b/packages/uni-info-watcher/src/client.ts new file mode 100644 index 00000000..8abe35c1 --- /dev/null +++ b/packages/uni-info-watcher/src/client.ts @@ -0,0 +1,241 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import { gql } from '@apollo/client/core'; +import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client'; + +import { BlockHeight, OrderDirection } from './indexer'; +import { + queryBundles, + queryBurns, + queryFactories, + queryMints, + queryPoolById, + queryPoolDayDatas, + queryPools, + queryPositions, + querySwaps, + queryTicks, + queryToken, + queryTokenDayDatas, + queryTokenHourDatas, + queryTransactions, + queryUniswapDayDatas +} from './queries'; + +export class Client { + _config: GraphQLConfig; + _client: GraphQLClient; + + constructor (config: GraphQLConfig) { + this._config = config; + + this._client = new GraphQLClient(config); + } + + async getToken (tokenId: string, block?: BlockHeight): Promise { + const { token } = await this._client.query( + gql(queryToken), + { + block, + id: tokenId + } + ); + + return token; + } + + async getFactories (first?: number, block?: BlockHeight): Promise { + const { factories } = await this._client.query( + gql(queryFactories), + { + block, + first + } + ); + + return factories; + } + + async getBundles (first?: number, block?: BlockHeight): Promise { + const { bundles } = await this._client.query( + gql(queryBundles), + { + block, + first + } + ); + + return bundles; + } + + async getPoolById (id: string): Promise { + const { pool } = await this._client.query( + gql(queryPoolById), + { + id + } + ); + + return pool; + } + + async getTicks (where?: any, skip?: number, first?: number, block?: BlockHeight): Promise { + const { ticks } = await this._client.query( + gql(queryTicks), + { + where, + skip, + first, + block + } + ); + + return ticks; + } + + async getPools (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { pools } = await this._client.query( + gql(queryPools), + { + where, + first, + orderBy, + orderDirection + } + ); + + return pools; + } + + async getUniswapDayDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { uniswapDayDatas } = await this._client.query( + gql(queryUniswapDayDatas), + { + where, + skip, + first, + orderBy, + orderDirection + } + ); + + return uniswapDayDatas; + } + + async getPoolDayDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { poolDayDatas } = await this._client.query( + gql(queryPoolDayDatas), + { + where, + skip, + first, + orderBy, + orderDirection + } + ); + + return poolDayDatas; + } + + async getTokenDayDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { tokenDayDatas } = await this._client.query( + gql(queryTokenDayDatas), + { + where, + skip, + first, + orderBy, + orderDirection + } + ); + + return tokenDayDatas; + } + + async getTokenHourDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { tokenHourDatas } = await this._client.query( + gql(queryTokenHourDatas), + { + where, + skip, + first, + orderBy, + orderDirection + } + ); + + return tokenHourDatas; + } + + async getMints (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { mints } = await this._client.query( + gql(queryMints), + { + where, + first, + orderBy, + orderDirection + } + ); + + return mints; + } + + async getBurns (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { burns } = await this._client.query( + gql(queryBurns), + { + where, + first, + orderBy, + orderDirection + } + ); + + return burns; + } + + async getSwaps (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise { + const { swaps } = await this._client.query( + gql(querySwaps), + { + where, + first, + orderBy, + orderDirection + } + ); + + return swaps; + } + + async getTransactions (first?: number, { orderBy, mintOrderBy, burnOrderBy, swapOrderBy }: {[key: string]: string} = {}, orderDirection?: OrderDirection): Promise { + const { transactions } = await this._client.query( + gql(queryTransactions), + { + first, + orderBy, + orderDirection, + mintOrderBy, + burnOrderBy, + swapOrderBy + } + ); + + return transactions; + } + + async getPositions (where?: any, first?: number): Promise { + const { positions } = await this._client.query( + gql(queryPositions), + { + where, + first + } + ); + + return positions; + } +} diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 61ded81a..82a8b612 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -88,10 +88,6 @@ export class Database implements DatabaseInterface { return this._baseDatabase.close(); } - async createTransactionRunner (): Promise { - return this._baseDatabase.createTransactionRunner(); - } - async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { const repo = queryRunner.manager.getRepository(Factory); const whereOptions: FindConditions = { id }; @@ -651,12 +647,6 @@ export class Database implements DatabaseInterface { return numRows > 0; } - async getBlockEvents (blockHash: string): Promise { - const repo = this._conn.getRepository(Event); - - return this._baseDatabase.getBlockEvents(repo, blockHash); - } - async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') @@ -682,6 +672,33 @@ export class Database implements DatabaseInterface { .getMany(); } + async createTransactionRunner (): Promise { + return this._baseDatabase.createTransactionRunner(); + } + + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + const repo = this._conn.getRepository(BlockProgress); + + return this._baseDatabase.getProcessedBlockCountForRange(repo, fromBlockNumber, toBlockNumber); + } + + async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { + const repo = this._conn.getRepository(Event); + + return this._baseDatabase.getEventsInRange(repo, fromBlockNumber, toBlockNumber); + } + + async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise { + const repo = queryRunner.manager.getRepository(Event); + return this._baseDatabase.saveEventEntity(repo, entity); + } + + async getBlockEvents (blockHash: string): Promise { + const repo = this._conn.getRepository(Event); + + return this._baseDatabase.getBlockEvents(repo, blockHash); + } + async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 9c3c3f00..d1325a22 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -77,10 +77,10 @@ export const main = async (): Promise => { const indexer = new Indexer(db, uniClient, erc20Client, ethClient); assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 24507bda..abf178c3 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -90,14 +90,6 @@ export class Indexer implements IndexerInterface { }; } - async getOrFetchBlockEvents (block: DeepPartial): Promise> { - return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); - } - - async getBlockEvents (blockHash: string): Promise> { - return this._baseIndexer.getBlockEvents(blockHash); - } - async processEvent (dbEvent: Event): Promise { const resultEvent = this.getResultEvent(dbEvent); @@ -159,26 +151,6 @@ export class Indexer implements IndexerInterface { log('Event processing completed for', eventName); } - async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber); - } - - async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); - } - - async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber); - } - - async getSyncStatus (): Promise { - return this._baseIndexer.getSyncStatus(); - } - - async getBlock (blockHash: string): Promise { - return this._baseIndexer.getBlock(blockHash); - } - async getBlocks (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise { if (where.timestamp_gt) { where.blockTimestamp_gt = where.timestamp_gt; @@ -203,26 +175,10 @@ export class Indexer implements IndexerInterface { })); } - async getEvent (id: string): Promise { - return this._baseIndexer.getEvent(id); - } - - async getBlockProgress (blockHash: string): Promise { - return this._baseIndexer.getBlockProgress(blockHash); - } - - async getBlocksAtHeight (height: number, isPruned: boolean): Promise { - return this._baseIndexer.getBlocksAtHeight(height, isPruned); - } - async markBlocksAsPruned (blocks: BlockProgress[]): Promise { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); - } - async getBundle (id: string, block: BlockHeight): Promise { const dbTx = await this._db.createTransactionRunner(); let res; @@ -324,6 +280,50 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } + async getOrFetchBlockEvents (block: DeepPartial): Promise> { + return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + } + + async getBlockEvents (blockHash: string): Promise> { + return this._baseIndexer.getBlockEvents(blockHash); + } + + async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber); + } + + async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); + } + + async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber); + } + + async getSyncStatus (): Promise { + return this._baseIndexer.getSyncStatus(); + } + + async getBlock (blockHash: string): Promise { + return this._baseIndexer.getBlock(blockHash); + } + + async getEvent (id: string): Promise { + return this._baseIndexer.getEvent(id); + } + + async getBlockProgress (blockHash: string): Promise { + return this._baseIndexer.getBlockProgress(blockHash); + } + + async getBlocksAtHeight (height: number, isPruned: boolean): Promise { + return this._baseIndexer.getBlocksAtHeight(height, isPruned); + } + + async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + } + async _fetchAndSaveEvents (block: DeepPartial): Promise { assert(block.blockHash); const events = await this._uniClient.getEvents(block.blockHash); diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 9b408ae6..f76fe6f4 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -10,9 +10,17 @@ import debug from 'debug'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; -import { getConfig, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING, JobRunner as BaseJobRunner, wait, JobQueueConfig } from '@vulcanize/util'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { + getConfig, + JobQueue, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + QUEUE_CHAIN_PRUNING, + JobRunner as BaseJobRunner, + JobQueueConfig +} from '@vulcanize/util'; import { Indexer } from './indexer'; import { Database } from './database'; @@ -29,7 +37,7 @@ export class JobRunner { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._baseJobRunner = new BaseJobRunner(this._indexer, this._jobQueue); + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -42,24 +50,6 @@ export class JobRunner { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._baseJobRunner.processBlock(job); - const { data: { blockHash, blockNumber, parentHash, timestamp } } = job; - - // Check if block is being already processed. - // TODO: Debug issue block getting processed twice without this check. Can reproduce with NFPM.mint(). - const blockProgress = await this._indexer.getBlockProgress(blockHash); - - if (!blockProgress) { - const { jobDelay } = this._jobQueueConfig; - assert(jobDelay); - // Delay to allow uni-watcher to process block. - await wait(jobDelay); - const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); - - for (let ei = 0; ei < events.length; ei++) { - await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); - } - } - await this._jobQueue.markComplete(job); }); } @@ -130,11 +120,10 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag, jobDelay } = jobQueueConfig; - assert(jobDelay, 'Missing job delay time'); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/uni-info-watcher/src/mock/server.spec.ts b/packages/uni-info-watcher/src/mock/server.spec.ts index aafd019c..82e0aefa 100644 --- a/packages/uni-info-watcher/src/mock/server.spec.ts +++ b/packages/uni-info-watcher/src/mock/server.spec.ts @@ -6,11 +6,11 @@ import 'mocha'; import { expect } from 'chai'; import { GraphQLClient } from 'graphql-request'; -import { queryBundle } from '../queries'; +import { queryBundles } from '../queries'; import { Data } from './data'; describe('server', () => { - const client = new GraphQLClient('http://localhost:3003/graphql'); + const client = new GraphQLClient('http://localhost:3004/graphql'); const data = Data.getInstance(); it('query bundle', async () => { @@ -21,9 +21,9 @@ describe('server', () => { const { id, blockNumber, ethPriceUSD } = bundles[i]; // Bundle query. - const result = await client.request(queryBundle, { id, blockNumber }); - expect(result.bundle.id).to.equal(id); - expect(result.bundle.ethPriceUSD).to.equal(ethPriceUSD); + const [bundle] = await client.request(queryBundles, { first: 1, block: { number: blockNumber } }); + expect(bundle.id).to.equal(id); + expect(bundle.ethPriceUSD).to.equal(ethPriceUSD); } }); }); diff --git a/packages/uni-info-watcher/src/queries.ts b/packages/uni-info-watcher/src/queries.ts index ce6c79e4..d15a569a 100644 --- a/packages/uni-info-watcher/src/queries.ts +++ b/packages/uni-info-watcher/src/queries.ts @@ -4,11 +4,280 @@ import { gql } from 'graphql-request'; -export const queryBundle = gql` -query getBundle($id: ID!, $blockNumber: Int!) { - bundle(id: $id, block: { number: $blockNumber }) { +const resultPool = ` +{ + id, + feeTier, + liquidity, + sqrtPrice, + tick, + token0 { + id + }, + token0Price, + token1 { + id + }, + token1Price, + totalValueLockedToken0, + totalValueLockedToken1, + totalValueLockedUSD, + txCount, + volumeUSD, +} +`; + +export const queryToken = gql` +query queryToken($id: ID!, $block: Block_height) { + token(id: $id, block: $block) { + derivedETH + feesUSD + id + name + symbol + totalValueLocked + totalValueLockedUSD + txCount + volume + volumeUSD + } +}`; + +export const queryFactories = gql` +query queryFactories($block: Block_height, $first: Int) { + factories(first: $first, block: $block) { + id + totalFeesUSD + totalValueLockedUSD + totalVolumeUSD + txCount + } +}`; + +export const queryBundles = gql` +query queryBundles($block: Block_height, $first: Int) { + bundles(first: $first, block: $block) { id ethPriceUSD } -} -`; +}`; + +// Getting Pool by id. +export const queryPoolById = gql` +query queryPoolById($id: ID!) { + pool(id: $id) + ${resultPool} +}`; + +export const queryTicks = gql` +query queryTicks($skip: Int, $first: Int, $where: Tick_filter, $block: Block_height) { + ticks(skip: $skip, first: $first, where: $where, block: $block) { + id + liquidityGross + liquidityNet + price0 + price1 + tickIdx + } +}`; + +// Getting Pool(s). +export const queryPools = gql` +query queryPools($where: Pool_filter, $first: Int, $orderBy: Pool_orderBy, $orderDirection: OrderDirection) { + pools(where: $where, first: $first, orderBy: $orderBy, orderDirection: $orderDirection) + ${resultPool} +}`; + +// Getting UniswapDayData(s). +export const queryUniswapDayDatas = gql` +query queryUniswapDayDatas($first: Int, $skip: Int, $orderBy: UniswapDayData_orderBy, $orderDirection: OrderDirection, $where: UniswapDayData_filter) { + uniswapDayDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) { + id, + date, + tvlUSD, + volumeUSD + } +}`; + +// Getting PoolDayData(s). +export const queryPoolDayDatas = gql` +query queryPoolDayDatas($first: Int, $skip: Int, $orderBy: PoolDayData_orderBy, $orderDirection: OrderDirection, $where: PoolDayData_filter) { + poolDayDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) { + id, + date, + tvlUSD, + volumeUSD + } +}`; + +// Getting TokenDayDatas(s). +export const queryTokenDayDatas = gql` +query queryTokenDayData($first: Int, $skip: Int, $orderBy: TokenDayData_orderBy, $orderDirection: OrderDirection, $where: TokenDayData_filter) { + tokenDayDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) { + id, + date, + totalValueLockedUSD, + volumeUSD + } +}`; + +// Getting TokenDayDatas(s). +export const queryTokenHourDatas = gql` +query queryTokenHourData($first: Int, $skip: Int, $orderBy: TokenHourData_orderBy, $orderDirection: OrderDirection, $where: TokenHourData_filter) { + tokenHourDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) { + id, + low, + high, + open, + close, + periodStartUnix + } +}`; + +// Getting mint(s). +export const queryMints = gql` +query queryMints( + $first: Int, + $orderBy: Mint_orderBy, + $orderDirection: OrderDirection, + $where: Mint_filter) { + mints( + first: $first, + orderBy: $orderBy, + orderDirection: $orderDirection, + where: $where) { + amount0, + amount1, + amountUSD, + id, + origin, + owner, + sender, + timestamp, + pool { + id + }, + transaction { + id + } + } +}`; + +// Getting burns(s). +export const queryBurns = gql` +query queryBurns( + $first: Int, + $orderBy: Burn_orderBy, + $orderDirection: OrderDirection, + $where: Burn_filter) { + burns( + first: $first, + orderBy: $orderBy, + orderDirection: $orderDirection, + where: $where) { + amount0, + amount1, + amountUSD, + id, + origin, + owner, + timestamp, + pool { + id + }, + transaction { + id + } + } +}`; + +// Getting swap(s) . +export const querySwaps = gql` +query querySwaps( + $first: Int, + $orderBy: Swap_orderBy, + $orderDirection: OrderDirection, + $where: Swap_filter) { + swaps( + first: $first, + orderBy: $orderBy, + orderDirection: $orderDirection, + where: $where) { + amount0, + amount1, + amountUSD, + id, + origin, + timestamp, + pool { + id + }, + transaction { + id + } + } +}`; + +// Getting transactions(s). +export const queryTransactions = gql` +query queryTransactions( + $first: Int, + $orderBy: Transaction_orderBy, + $mintOrderBy: Mint_orderBy, + $burnOrderBy: Burn_orderBy, + $swapOrderBy: Swap_orderBy, + $orderDirection: OrderDirection) { + transactions( + first: $first, + orderBy: $orderBy, + orderDirection: $orderDirection) { + id, + mints( first: $first, orderBy: $mintOrderBy, orderDirection: $orderDirection) { + id, + timestamp + }, + burns( first: $first, orderBy: $burnOrderBy, orderDirection: $orderDirection) { + id, + timestamp + }, + swaps( first: $first, orderBy: $swapOrderBy, orderDirection: $orderDirection) { + id, + timestamp + }, + timestamp + } +}`; + +// Getting positions. +export const queryPositions = gql` +query queryPositions($first: Int, $where: Position_filter) { + positions(first: $first, where: $where) { + id, + pool { + id + }, + token0 { + id + }, + token1 { + id + }, + tickLower { + id + }, + tickUpper { + id + }, + transaction { + id + }, + liquidity, + depositedToken0, + depositedToken1, + collectedFeesToken0, + collectedFeesToken1, + owner, + feeGrowthInside0LastX128, + feeGrowthInside1LastX128 + } +}`; diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 30f95955..5bb403c3 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -78,10 +78,10 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); const pubSub = new PubSub(); diff --git a/packages/uni-info-watcher/src/smoke.test.ts b/packages/uni-info-watcher/src/smoke.test.ts index 6fb31fd9..9cc5a154 100644 --- a/packages/uni-info-watcher/src/smoke.test.ts +++ b/packages/uni-info-watcher/src/smoke.test.ts @@ -4,7 +4,6 @@ import { expect } from 'chai'; import { ethers, Contract, Signer, constants } from 'ethers'; -import { request } from 'graphql-request'; import 'mocha'; import _ from 'lodash'; @@ -33,18 +32,7 @@ import { abi as POOL_ABI } from '@uniswap/v3-core/artifacts/contracts/UniswapV3Pool.sol/UniswapV3Pool.json'; -import { - queryFactory, - queryBundle, - queryToken, - queryPoolsByTokens, - queryPoolById, - queryMints, - queryTicks, - queryBurns, - querySwaps, - queryPositions -} from '../test/queries'; +import { Client } from './client'; import { checkUniswapDayData, checkPoolDayData, @@ -52,6 +40,7 @@ import { checkTokenHourData, fetchTransaction } from '../test/utils'; +import { OrderDirection } from './indexer'; const NETWORK_RPC_URL = 'http://localhost:8545'; @@ -70,8 +59,8 @@ describe('uni-info-watcher', () => { let signer: Signer; let recipient: string; let config: Config; - let endpoint: string; let uniClient: UniClient; + let client: Client; before(async () => { const provider = new ethers.providers.JsonRpcProvider(NETWORK_RPC_URL); @@ -82,33 +71,40 @@ describe('uni-info-watcher', () => { config = await getConfig(configFile); const { upstream, server: { host, port } } = config; - endpoint = `http://${host}:${port}/graphql`; + const endpoint = `http://${host}:${port}/graphql`; - const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint } } = upstream; + let { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint } } = upstream; uniClient = new UniClient({ gqlEndpoint, gqlSubscriptionEndpoint }); + + gqlEndpoint = endpoint; + gqlSubscriptionEndpoint = endpoint; + client = new Client({ + gqlEndpoint, + gqlSubscriptionEndpoint + }); }); it('should have a Factory entity', async () => { // Getting the Factory from uni-info-watcher graphQL endpoint. - const data = await request(endpoint, queryFactory); - expect(data.factories).to.not.be.empty; + const factories = await client.getFactories(1); + expect(factories).to.not.be.empty; // Initializing the factory variable. - const factoryAddress = data.factories[0].id; + const factoryAddress = factories[0].id; factory = new ethers.Contract(factoryAddress, FACTORY_ABI, signer); expect(factory.address).to.not.be.empty; }); it('should have a Bundle entity', async () => { // Getting the Bundle from uni-info-watcher graphQL endpoint. - const data = await request(endpoint, queryBundle); - expect(data.bundles).to.not.be.empty; + const bundles = await client.getBundles(1); + expect(bundles).to.not.be.empty; const bundleId = '1'; - expect(data.bundles[0].id).to.equal(bundleId); + expect(bundles[0].id).to.equal(bundleId); }); describe('PoolCreatedEvent', () => { @@ -126,11 +122,11 @@ describe('uni-info-watcher', () => { it('should not have Token entities', async () => { // Check that Token entities are absent. - const data0 = await request(endpoint, queryToken, { id: token0Address }); - expect(data0.token).to.be.null; + const token0 = await client.getToken(token0Address); + expect(token0).to.be.null; - const data1 = await request(endpoint, queryToken, { id: token0Address }); - expect(data1.token).to.be.null; + const token1 = await client.getToken(token1Address); + expect(token1).to.be.null; }); it('should trigger PoolCreatedEvent', async () => { @@ -147,29 +143,30 @@ describe('uni-info-watcher', () => { it('should create Token entities', async () => { // Check that Token entities are present. - const data0 = await request(endpoint, queryToken, { id: token0Address }); - expect(data0.token).to.not.be.null; + const token0 = await client.getToken(token0Address); + expect(token0).to.not.be.null; - const data1 = await request(endpoint, queryToken, { id: token0Address }); - expect(data1.token).to.not.be.null; + const token1 = await client.getToken(token1Address); + expect(token1).to.not.be.null; }); it('should create a Pool entity', async () => { // Checked values: feeTier - const variables = { - tokens: [token0Address, token1Address] + const poolWhere = { + token0_in: [token0Address, token1Address], + token1_in: [token0Address, token1Address] }; // Getting the Pool that has the deployed tokens. - const data = await request(endpoint, queryPoolsByTokens, variables); - expect(data.pools).to.have.lengthOf(1); + const pools = await client.getPools(poolWhere); + expect(pools).to.have.lengthOf(1); // Initializing the pool variable. - const poolAddress = data.pools[0].id; + const poolAddress = pools[0].id; pool = new Contract(poolAddress, POOL_ABI, signer); expect(pool.address).to.not.be.empty; - expect(data.pools[0].feeTier).to.be.equal(fee.toString()); + expect(pools[0].feeTier).to.be.equal(fee.toString()); // Initializing the token variables. token0Address = await pool.token0(); @@ -187,9 +184,9 @@ describe('uni-info-watcher', () => { const tick = TICK_MIN; it('should not have pool entity initialized', async () => { - const data = await request(endpoint, queryPoolById, { id: pool.address }); - expect(data.pool.sqrtPrice).to.not.be.equal(sqrtPrice); - expect(data.pool.tick).to.be.null; + const poolData = await client.getPoolById(pool.address); + expect(poolData.sqrtPrice).to.not.be.equal(sqrtPrice); + expect(poolData.tick).to.be.null; }); it('should trigger InitializeEvent', async () => { @@ -207,13 +204,13 @@ describe('uni-info-watcher', () => { it('should update Pool entity', async () => { // Checked values: sqrtPrice, tick. - const data = await request(endpoint, queryPoolById, { id: pool.address }); - expect(data.pool.sqrtPrice).to.be.equal(sqrtPrice); - expect(data.pool.tick).to.be.equal(tick.toString()); + const poolData = await client.getPoolById(pool.address); + expect(poolData.sqrtPrice).to.be.equal(sqrtPrice); + expect(poolData.tick).to.be.equal(tick.toString()); }); it('should update PoolDayData entity', async () => { - checkPoolDayData(endpoint, pool.address); + checkPoolDayData(client, pool.address); }); }); @@ -244,19 +241,14 @@ describe('uni-info-watcher', () => { await approveToken(token1, poolCallee.address, approveAmount); // Get initial entity values. - let data: any; + const factories = await client.getFactories(1); + oldFactory = factories[0]; - data = await request(endpoint, queryFactory); - oldFactory = data.factories[0]; + oldToken0 = await client.getToken(token0.address); - data = await request(endpoint, queryToken, { id: token0.address }); - oldToken0 = data.token; + oldToken1 = await client.getToken(token1.address); - data = await request(endpoint, queryToken, { id: token1.address }); - oldToken1 = data.token; - - data = await request(endpoint, queryPoolById, { id: pool.address }); - oldPool = data.pool; + oldPool = await client.getPoolById(pool.address); }); it('should trigger MintEvent', async () => { @@ -275,13 +267,9 @@ describe('uni-info-watcher', () => { // Checked values: txCount. // Unchecked values: totalValueLocked, totalValueLockedUSD. - let data: any; + const newToken0 = await client.getToken(token0.address); - data = await request(endpoint, queryToken, { id: token0.address }); - const newToken0 = data.token; - - data = await request(endpoint, queryToken, { id: token1.address }); - const newToken1 = data.token; + const newToken1 = await client.getToken(token1.address); expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString()); expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString()); @@ -291,8 +279,8 @@ describe('uni-info-watcher', () => { // Checked values: txCount. // Unchecked values: totalValueLockedUSD. - const data = await request(endpoint, queryFactory); - const newFactory = data.factories[0]; + const factories = await client.getFactories(1); + const newFactory = factories[0]; expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString()); }); @@ -310,8 +298,7 @@ describe('uni-info-watcher', () => { } } - const data = await request(endpoint, queryPoolById, { id: pool.address }); - const newPool = data.pool; + const newPool = await client.getPoolById(pool.address); expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString()); expect(BigInt(newPool.liquidity)).to.be.equal(expectedLiquidity); @@ -320,7 +307,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); expectedTxID = transaction.id; expectedTxTimestamp = transaction.timestamp; @@ -338,22 +325,15 @@ describe('uni-info-watcher', () => { // Unchecked values: amount0, amount1, amountUSD. // Get the latest Mint. - let data: any; - const variables = { - first: 1, - orderBy: 'timestamp', - orderDirection: 'desc', - pool: pool.address - }; - data = await request(endpoint, queryMints, variables); - expect(data.mints).to.not.be.empty; + const mints = await client.getMints({ pool: pool.address }, 1, 'timestamp', OrderDirection.desc); + expect(mints).to.not.be.empty; - const mint = data.mints[0]; + const mint = mints[0]; const txID = mint.id.split('#')[0]; const txCountID = mint.id.split('#')[1]; - data = await request(endpoint, queryPoolById, { id: pool.address }); - const poolTxCount = data.pool.txCount; + const poolData = await client.getPoolById(pool.address); + const poolTxCount = poolData.txCount; const expectedOrigin = recipient; const expectedOwner = recipient; const expectedSender = poolCallee.address; @@ -373,11 +353,11 @@ describe('uni-info-watcher', () => { // Checked values: liquidityGross, liquidityNet. // Unchecked values: id, price0, price1. - const data = await request(endpoint, queryTicks, { pool: pool.address }); - expect(data.ticks).to.not.be.empty; + const ticks = await client.getTicks({ poolAddress: pool.address }); + expect(ticks).to.not.be.empty; - const lowerTick: any = _.filter(data.ticks, { tickIdx: tickLower.toString() })[0]; - const upperTick: any = _.filter(data.ticks, { tickIdx: tickUpper.toString() })[0]; + const lowerTick: any = _.filter(ticks, { tickIdx: tickLower.toString() })[0]; + const upperTick: any = _.filter(ticks, { tickIdx: tickUpper.toString() })[0]; expect(lowerTick.liquidityGross).to.be.equal(amount.toString()); expect(lowerTick.liquidityNet).to.be.equal(amount.toString()); @@ -386,21 +366,21 @@ describe('uni-info-watcher', () => { }); it('should update UniswapDayData entity', async () => { - checkUniswapDayData(endpoint); + checkUniswapDayData(client); }); it('should update PoolDayData entity', async () => { - checkPoolDayData(endpoint, pool.address); + checkPoolDayData(client, pool.address); }); it('should update TokenDayData entities', async () => { - checkTokenDayData(endpoint, token0.address); - checkTokenDayData(endpoint, token1.address); + checkTokenDayData(client, token0.address); + checkTokenDayData(client, token1.address); }); it('should update TokenHourData entities', async () => { - checkTokenHourData(endpoint, token0.address); - checkTokenHourData(endpoint, token1.address); + checkTokenHourData(client, token0.address); + checkTokenHourData(client, token1.address); }); }); @@ -421,25 +401,20 @@ describe('uni-info-watcher', () => { before(async () => { // Get initial entity values. - let data: any; + const factories = await await client.getFactories(1); + oldFactory = factories[0]; - data = await request(endpoint, queryFactory); - oldFactory = data.factories[0]; + oldToken0 = await client.getToken(token0.address); - data = await request(endpoint, queryToken, { id: token0.address }); - oldToken0 = data.token; + oldToken1 = await client.getToken(token1.address); - data = await request(endpoint, queryToken, { id: token1.address }); - oldToken1 = data.token; + oldPool = await client.getPoolById(pool.address); - data = await request(endpoint, queryPoolById, { id: pool.address }); - oldPool = data.pool; + const ticks = await client.getTicks({ poolAddress: pool.address }); + expect(ticks).to.not.be.empty; - data = await request(endpoint, queryTicks, { pool: pool.address }); - expect(data.ticks).to.not.be.empty; - - oldLowerTick = _.filter(data.ticks, { tickIdx: tickLower.toString() })[0]; - oldUpperTick = _.filter(data.ticks, { tickIdx: tickUpper.toString() })[0]; + oldLowerTick = _.filter(ticks, { tickIdx: tickLower.toString() })[0]; + oldUpperTick = _.filter(ticks, { tickIdx: tickUpper.toString() })[0]; }); it('should trigger BurnEvent', async () => { @@ -458,13 +433,9 @@ describe('uni-info-watcher', () => { // Checked values: txCount. // Unchecked values: totalValueLocked, totalValueLockedUSD. - let data: any; + const newToken0 = await client.getToken(token0.address); - data = await request(endpoint, queryToken, { id: token0.address }); - const newToken0 = data.token; - - data = await request(endpoint, queryToken, { id: token1.address }); - const newToken1 = data.token; + const newToken1 = await client.getToken(token1.address); expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString()); expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString()); @@ -474,8 +445,8 @@ describe('uni-info-watcher', () => { // Checked values: txCount. // Unchecked values: totalValueLockedUSD. - const data = await request(endpoint, queryFactory); - const newFactory = data.factories[0]; + const factories = await client.getFactories(1); + const newFactory = factories[0]; expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString()); }); @@ -493,8 +464,7 @@ describe('uni-info-watcher', () => { } } - const data = await request(endpoint, queryPoolById, { id: pool.address }); - const newPool = data.pool; + const newPool = await client.getPoolById(pool.address); expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString()); expect(BigInt(newPool.liquidity)).to.be.equal(expectedLiquidity); @@ -503,7 +473,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); expectedTxID = transaction.id; expectedTxTimestamp = transaction.timestamp; @@ -521,23 +491,15 @@ describe('uni-info-watcher', () => { // Unchecked values: amount0, amount1, amountUSD. // Get the latest Burn. - let data: any; - const variables = { - first: 1, - orderBy: 'timestamp', - orderDirection: 'desc', - pool: pool.address - }; + const burns = await client.getBurns({ pool: pool.address }, 1, 'timestamp', OrderDirection.desc); + expect(burns).to.not.be.empty; - data = await request(endpoint, queryBurns, variables); - expect(data.burns).to.not.be.empty; - - const burn = data.burns[0]; + const burn = burns[0]; const txID = burn.id.split('#')[0]; const txCountID = burn.id.split('#')[1]; - data = await request(endpoint, queryPoolById, { id: pool.address }); - const poolTxCount = data.pool.txCount; + const poolData = await client.getPoolById(pool.address); + const poolTxCount = poolData.txCount; const expectedOrigin = recipient; const expectedOwner = recipient; @@ -555,11 +517,11 @@ describe('uni-info-watcher', () => { // Checked values: liquidityGross, liquidityNet. // Unchecked values: id, price0, price1. - const data = await request(endpoint, queryTicks, { pool: pool.address }); - expect(data.ticks).to.not.be.empty; + const ticks = await client.getTicks({ poolAddress: pool.address }); + expect(ticks).to.not.be.empty; - const newLowerTick: any = _.filter(data.ticks, { tickIdx: tickLower.toString() })[0]; - const newUpperTick: any = _.filter(data.ticks, { tickIdx: tickUpper.toString() })[0]; + const newLowerTick: any = _.filter(ticks, { tickIdx: tickLower.toString() })[0]; + const newUpperTick: any = _.filter(ticks, { tickIdx: tickUpper.toString() })[0]; const expectedLLG = BigInt(oldLowerTick.liquidityGross) - BigInt(amount); const expectedLN = BigInt(oldLowerTick.liquidityNet) - BigInt(amount); @@ -573,21 +535,21 @@ describe('uni-info-watcher', () => { }); it('should update UniswapDayData entity', async () => { - checkUniswapDayData(endpoint); + checkUniswapDayData(client); }); it('should update PoolDayData entity', async () => { - checkPoolDayData(endpoint, pool.address); + checkPoolDayData(client, pool.address); }); it('should update TokenDayData entities', async () => { - checkTokenDayData(endpoint, token0.address); - checkTokenDayData(endpoint, token1.address); + checkTokenDayData(client, token0.address); + checkTokenDayData(client, token1.address); }); it('should update TokenHourData entities', async () => { - checkTokenHourData(endpoint, token0.address); - checkTokenHourData(endpoint, token1.address); + checkTokenHourData(client, token0.address); + checkTokenHourData(client, token1.address); }); }); @@ -608,19 +570,14 @@ describe('uni-info-watcher', () => { before(async () => { // Get initial entity values. - let data: any; + const factories = await client.getFactories(1); + oldFactory = factories[0]; - data = await request(endpoint, queryFactory); - oldFactory = data.factories[0]; + oldToken0 = await client.getToken(token0.address); - data = await request(endpoint, queryToken, { id: token0.address }); - oldToken0 = data.token; + oldToken1 = await client.getToken(token1.address); - data = await request(endpoint, queryToken, { id: token1.address }); - oldToken1 = data.token; - - data = await request(endpoint, queryPoolById, { id: pool.address }); - oldPool = data.pool; + oldPool = await client.getPoolById(pool.address); }); it('should trigger SwapEvent', async () => { @@ -640,13 +597,9 @@ describe('uni-info-watcher', () => { // Checked values: txCount. // Unchecked values: derivedETH, feesUSD, totalValueLocked, totalValueLockedUSD, volume, volumeUSD. - let data: any; + const newToken0 = await client.getToken(token0.address); - data = await request(endpoint, queryToken, { id: token0.address }); - const newToken0 = data.token; - - data = await request(endpoint, queryToken, { id: token1.address }); - const newToken1 = data.token; + const newToken1 = await client.getToken(token1.address); expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString()); expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString()); @@ -656,8 +609,8 @@ describe('uni-info-watcher', () => { // Checked values: txCount. // Unchecked values: totalFeesUSD, totalValueLockedUSD, totalVolumeUSD. - const data = await request(endpoint, queryFactory); - const newFactory = data.factories[0]; + const factories = await client.getFactories(1); + const newFactory = factories[0]; expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString()); }); @@ -669,8 +622,7 @@ describe('uni-info-watcher', () => { const expectedTick = eventValue.event.tick; const expectedSqrtPrice = eventValue.event.sqrtPriceX96; - const data = await request(endpoint, queryPoolById, { id: pool.address }); - const newPool = data.pool; + const newPool = await client.getPoolById(pool.address); expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString()); expect(newPool.liquidity).to.be.equal(expectedLiquidity); @@ -681,7 +633,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); expectedTxID = transaction.id; expectedTxTimestamp = transaction.timestamp; @@ -698,23 +650,15 @@ describe('uni-info-watcher', () => { // Checked values: id, origin, timestamp, pool, transaction. // Unchecked values: amount0, amount1, amountUSD. - let data: any; - const variables = { - first: 1, - orderBy: 'timestamp', - orderDirection: 'desc', - pool: pool.address - }; + const swaps = await client.getSwaps({ pool: pool.address }, 1, 'timestamp', OrderDirection.desc); + expect(swaps).to.not.be.empty; - data = await request(endpoint, querySwaps, variables); - expect(data.swaps).to.not.be.empty; - - const swap = data.swaps[0]; + const swap = swaps[0]; const txID = swap.id.split('#')[0]; const txCountID = swap.id.split('#')[1]; - data = await request(endpoint, queryPoolById, { id: pool.address }); - const poolTxCount = data.pool.txCount; + const poolData = await client.getPoolById(pool.address); + const poolTxCount = poolData.txCount; const expectedOrigin = recipient; expect(txID).to.be.equal(expectedTxID); @@ -727,21 +671,21 @@ describe('uni-info-watcher', () => { }); it('should update UniswapDayData entity', async () => { - checkUniswapDayData(endpoint); + checkUniswapDayData(client); }); it('should update PoolDayData entity', async () => { - checkPoolDayData(endpoint, pool.address); + checkPoolDayData(client, pool.address); }); it('should update TokenDayData entities', async () => { - checkTokenDayData(endpoint, token0.address); - checkTokenDayData(endpoint, token1.address); + checkTokenDayData(client, token0.address); + checkTokenDayData(client, token1.address); }); it('should update TokenHourData entities', async () => { - checkTokenHourData(endpoint, token0.address); - checkTokenHourData(endpoint, token1.address); + checkTokenHourData(client, token0.address); + checkTokenHourData(client, token1.address); }); }); @@ -841,7 +785,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); expectedTxID = transaction.id; const expectedTxTimestamp = transaction.timestamp; @@ -854,15 +798,15 @@ describe('uni-info-watcher', () => { expect(timestamp).to.be.equal(expectedTxTimestamp); }); - it('should createa a Position entity', async () => { + it('should create a Position entity', async () => { // Checked values: pool, token0, token1, tickLower, tickUpper, transaction, owner. // Unchecked values: feeGrowthInside0LastX128, feeGrowthInside0LastX128. // Get the Position using tokenId. - const data = await request(endpoint, queryPositions, { id: Number(eventValue.event.tokenId) }); - expect(data.positions).to.not.be.empty; + const positions = await client.getPositions({ id: Number(eventValue.event.tokenId) }, 1); + expect(positions).to.not.be.empty; - const position = data.positions[0]; + const position = positions[0]; const positionTickLower = position.tickLower.id.split('#')[1]; const positionTickUpper = position.tickUpper.id.split('#')[1]; @@ -894,8 +838,8 @@ describe('uni-info-watcher', () => { before(async () => { // Get initial entity values. - const data = await request(endpoint, queryPositions, { id: Number(tokenId) }); - oldPosition = data.positions[0]; + const positions = await client.getPositions({ id: Number(tokenId) }, 1); + oldPosition = positions[0]; }); it('should trigger IncreaseLiquidityEvent', async () => { @@ -926,7 +870,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); const expectedTxTimestamp = transaction.timestamp; @@ -943,10 +887,10 @@ describe('uni-info-watcher', () => { // Unchecked values: depositedToken0, depositedToken1, feeGrowthInside0LastX128, feeGrowthInside0LastX128. // Get the Position using tokenId. - const data = await request(endpoint, queryPositions, { id: Number(eventValue.event.tokenId) }); - expect(data.positions).to.not.be.empty; + const positions = await client.getPositions({ id: Number(eventValue.event.tokenId) }, 1); + expect(positions).to.not.be.empty; - const position = data.positions[0]; + const position = positions[0]; const expectedLiquidity = BigInt(oldPosition.liquidity) + BigInt(eventValue.event.liquidity); @@ -969,8 +913,8 @@ describe('uni-info-watcher', () => { before(async () => { // Get initial entity values. - const data = await request(endpoint, queryPositions, { id: Number(tokenId) }); - oldPosition = data.positions[0]; + const positions = await client.getPositions({ id: Number(tokenId) }, 1); + oldPosition = positions[0]; }); it('should trigger DecreaseLiquidityEvent', async () => { @@ -1000,7 +944,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); const expectedTxTimestamp = transaction.timestamp; @@ -1017,10 +961,10 @@ describe('uni-info-watcher', () => { // Unchecked values: depositedToken0, depositedToken1, feeGrowthInside0LastX128, feeGrowthInside0LastX128. // Get the Position using tokenId. - const data = await request(endpoint, queryPositions, { id: Number(eventValue.event.tokenId) }); - expect(data.positions).to.not.be.empty; + const positions = await client.getPositions({ id: Number(eventValue.event.tokenId) }, 1); + expect(positions).to.not.be.empty; - const position = data.positions[0]; + const position = positions[0]; const expectedLiquidity = BigInt(oldPosition.liquidity) - BigInt(eventValue.event.liquidity); @@ -1064,7 +1008,7 @@ describe('uni-info-watcher', () => { it('should create a Transaction entity', async () => { // Checked values: mints, burns, swaps. - const transaction: any = await fetchTransaction(endpoint); + const transaction: any = await fetchTransaction(client); const expectedTxTimestamp = transaction.timestamp; diff --git a/packages/uni-info-watcher/test/queries.ts b/packages/uni-info-watcher/test/queries.ts deleted file mode 100644 index 6e2215d8..00000000 --- a/packages/uni-info-watcher/test/queries.ts +++ /dev/null @@ -1,292 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import { gql } from 'graphql-request'; - -export const queryToken = gql` -query queryToken($id: ID!) { - token(id: $id) { - derivedETH - feesUSD - id - name - symbol - totalValueLocked - totalValueLockedUSD - txCount - volume - volumeUSD - } -}`; - -// Getting the first Factory entity. -export const queryFactory = gql` -{ - factories(first: 1) { - id - totalFeesUSD - totalValueLockedUSD - totalVolumeUSD - txCount - } -}`; - -// Getting the first Bundle entity. -export const queryBundle = gql` -{ - bundles(first: 1) { - id - ethPriceUSD - } -}`; - -// Getting Pool by id. -export const queryPoolById = gql` -query queryPoolById($id: ID!) { - pool(id: $id) { - feeTier - id - liquidity - sqrtPrice - tick - token0Price - token1Price - totalValueLockedToken0 - totalValueLockedToken1 - totalValueLockedUSD - txCount - volumeUSD - } -}`; - -// Getting Tick(s) filtered by pool. -export const queryTicks = gql` -query queryTicksByPool($pool: String) { - ticks(where: { poolAddress: $pool }) { - id - liquidityGross - liquidityNet - price0 - price1 - tickIdx - } -}`; - -// Getting Pool(s) filtered by tokens. -export const queryPoolsByTokens = gql` -query queryPoolsByTokens($tokens: [String!]) { - pools(where: { token0_in: $tokens, token1_in: $tokens }) { - id, - feeTier - } -}`; - -// Getting UniswapDayData(s). -export const queryUniswapDayData = gql` -query queryUniswapDayData($first: Int, $orderBy: UniswapDayData_orderBy, $orderDirection: OrderDirection) { - uniswapDayDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection) { - id, - date, - tvlUSD - } -}`; - -// Getting PoolDayData(s) filtered by pool and ordered by date. -export const queryPoolDayData = gql` -query queryPoolDayData($first: Int, $orderBy: PoolDayData_orderBy, $orderDirection: OrderDirection, $pool: String) { - poolDayDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection, where: { pool: $pool }) { - id, - date, - tvlUSD - } -}`; - -// Getting TokenDayDatas(s) filtered by token and ordered by date. -export const queryTokenDayData = gql` -query queryTokenDayData($first: Int, $orderBy: TokenDayData_orderBy, $orderDirection: OrderDirection, $token: String) { - tokenDayDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection, where: { token: $token }) { - id, - date, - totalValueLockedUSD - } -}`; - -// Getting TokenDayDatas(s) filtered by token and ordered by date. -export const queryTokenHourData = gql` -query queryTokenHourData($first: Int, $orderBy: TokenHourData_orderBy, $orderDirection: OrderDirection, $token: String) { - tokenHourDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection, where: { token: $token }) { - id, - low, - high, - open, - close, - periodStartUnix - } -}`; - -// Getting mint(s) filtered by pool, tokens and ordered by timestamp. -export const queryMints = gql` -query queryMints( - $first: Int, - $orderBy: Mint_orderBy, - $orderDirection: OrderDirection, - $pool: String, - $token0: String, - $token1: String) { - mints( - first: $first, - orderBy: $orderBy, - orderDirection: $orderDirection, - where: { - pool: $pool, - token0: $token0, - token1: $token1 - }) { - amount0, - amount1, - amountUSD, - id, - origin, - owner, - sender, - timestamp, - pool { - id - }, - transaction { - id - } - } -}`; - -// Getting burns(s) filtered by pool, tokens and ordered by timestamp. -export const queryBurns = gql` -query queryBurns( - $first: Int, - $orderBy: Burn_orderBy, - $orderDirection: OrderDirection, - $pool: String, - $token0: String, - $token1: String) { - burns( - first: $first, - orderBy: $orderBy, - orderDirection: $orderDirection, - where: { - pool: $pool, - token0: $token0, - token1: $token1 - }) { - amount0, - amount1, - amountUSD, - id, - origin, - owner, - timestamp, - pool { - id - }, - transaction { - id - } - } -}`; - -// Getting burns(s) filtered by pool, tokens and ordered by timestamp. -export const querySwaps = gql` -query querySwaps( - $first: Int, - $orderBy: Swap_orderBy, - $orderDirection: OrderDirection, - $pool: String, - $token0: String, - $token1: String) { - swaps( - first: $first, - orderBy: $orderBy, - orderDirection: $orderDirection, - where: { - pool: $pool, - token0: $token0, - token1: $token1 - }) { - amount0, - amount1, - amountUSD, - id, - origin, - timestamp, - pool { - id - }, - transaction { - id - } - } -}`; - -// Getting transactions(s) ordered by timestamp. -export const queryTransactions = gql` -query queryTransactions( - $first: Int, - $orderBy: Transaction_orderBy, - $mintOrderBy: Mint_orderBy, - $burnOrderBy: Burn_orderBy, - $swapOrderBy: Swap_orderBy, - $orderDirection: OrderDirection) { - transactions( - first: $first, - orderBy: $orderBy, - orderDirection: $orderDirection) { - id, - mints( first: $first, orderBy: $mintOrderBy, orderDirection: $orderDirection) { - id, - timestamp - }, - burns( first: $first, orderBy: $burnOrderBy, orderDirection: $orderDirection) { - id, - timestamp - }, - swaps( first: $first, orderBy: $swapOrderBy, orderDirection: $orderDirection) { - id, - timestamp - }, - timestamp - } -}`; - -// Getting position filtered by id. -export const queryPositions = gql` -query queryPositions($first: Int, $id: ID) { - positions(first: $first, where: { id: $id }) { - id, - pool { - id - }, - token0 { - id - }, - token1 { - id - }, - tickLower { - id - }, - tickUpper { - id - }, - transaction { - id - }, - liquidity, - depositedToken0, - depositedToken1, - collectedFeesToken0, - collectedFeesToken1, - owner, - feeGrowthInside0LastX128, - feeGrowthInside1LastX128 - } -}`; diff --git a/packages/uni-info-watcher/test/utils.ts b/packages/uni-info-watcher/test/utils.ts index 89f1c43c..d000e442 100644 --- a/packages/uni-info-watcher/test/utils.ts +++ b/packages/uni-info-watcher/test/utils.ts @@ -4,75 +4,53 @@ import { expect } from 'chai'; import { ethers } from 'ethers'; -import { request } from 'graphql-request'; import Decimal from 'decimal.js'; import _ from 'lodash'; import { insertNDummyBlocks } from '@vulcanize/util/test'; -import { - queryFactory, - queryBundle, - queryToken, - queryPoolById, - queryPoolDayData, - queryUniswapDayData, - queryTokenDayData, - queryTokenHourData, - queryTransactions -} from '../test/queries'; -import { Database } from '../src/database'; +import { Database, OrderDirection } from '../src/database'; import { Block } from '../src/events'; import { Token } from '../src/entity/Token'; +import { Client } from '../src/client'; -export const checkUniswapDayData = async (endpoint: string): Promise => { +export const checkUniswapDayData = async (client: Client): Promise => { // Checked values: date, tvlUSD. // Unchecked values: volumeUSD. // Get the latest UniswapDayData. - const variables = { - first: 1, - orderBy: 'date', - orderDirection: 'desc' - }; - const data = await request(endpoint, queryUniswapDayData, variables); - expect(data.uniswapDayDatas).to.not.be.empty; + const uniswapDayDatas = await client.getUniswapDayDatas({}, 0, 1, 'date', OrderDirection.desc); + expect(uniswapDayDatas).to.not.be.empty; - const id: string = data.uniswapDayDatas[0].id; + const id: string = uniswapDayDatas[0].id; const dayID = Number(id); - const date = data.uniswapDayDatas[0].date; - const tvlUSD = data.uniswapDayDatas[0].tvlUSD; + const date = uniswapDayDatas[0].date; + const tvlUSD = uniswapDayDatas[0].tvlUSD; const dayStartTimestamp = dayID * 86400; - const factoryData = await request(endpoint, queryFactory); - const totalValueLockedUSD: string = factoryData.factories[0].totalValueLockedUSD; + const factories = await client.getFactories(1); + const totalValueLockedUSD: string = factories[0].totalValueLockedUSD; expect(date).to.be.equal(dayStartTimestamp); expect(tvlUSD).to.be.equal(totalValueLockedUSD); }; -export const checkPoolDayData = async (endpoint: string, poolAddress: string): Promise => { +export const checkPoolDayData = async (client: Client, poolAddress: string): Promise => { // Checked values: id, date, tvlUSD. // Unchecked values: volumeUSD. // Get the latest PoolDayData. - const variables = { - first: 1, - orderBy: 'date', - orderDirection: 'desc', - pool: poolAddress - }; - const data = await request(endpoint, queryPoolDayData, variables); - expect(data.poolDayDatas).to.not.be.empty; + const poolDayDatas = await client.getPoolDayDatas({ pool: poolAddress }, 0, 1, 'date', OrderDirection.desc); + expect(poolDayDatas).to.not.be.empty; - const dayPoolID: string = data.poolDayDatas[0].id; + const dayPoolID: string = poolDayDatas[0].id; const poolID: string = dayPoolID.split('-')[0]; const dayID = Number(dayPoolID.split('-')[1]); - const date = data.poolDayDatas[0].date; - const tvlUSD = data.poolDayDatas[0].tvlUSD; + const date = poolDayDatas[0].date; + const tvlUSD = poolDayDatas[0].tvlUSD; const dayStartTimestamp = dayID * 86400; - const poolData = await request(endpoint, queryPoolById, { id: poolAddress }); + const poolData = await client.getPoolById(poolAddress); const totalValueLockedUSD: string = poolData.pool.totalValueLockedUSD; expect(poolID).to.be.equal(poolAddress); @@ -80,28 +58,22 @@ export const checkPoolDayData = async (endpoint: string, poolAddress: string): P expect(tvlUSD).to.be.equal(totalValueLockedUSD); }; -export const checkTokenDayData = async (endpoint: string, tokenAddress: string): Promise => { +export const checkTokenDayData = async (client: Client, tokenAddress: string): Promise => { // Checked values: id, date, totalValueLockedUSD. // Unchecked values: volumeUSD. // Get the latest TokenDayData. - const variables = { - first: 1, - orderBy: 'date', - orderDirection: 'desc', - token: tokenAddress - }; - const data = await request(endpoint, queryTokenDayData, variables); - expect(data.tokenDayDatas).to.not.be.empty; + const tokenDayDatas = await client.getTokenDayDatas({ token: tokenAddress }, 0, 1, 'date', OrderDirection.desc); + expect(tokenDayDatas).to.not.be.empty; - const tokenDayID: string = data.tokenDayDatas[0].id; + const tokenDayID: string = tokenDayDatas[0].id; const tokenID: string = tokenDayID.split('-')[0]; const dayID = Number(tokenDayID.split('-')[1]); - const date = data.tokenDayDatas[0].date; - const tvlUSD = data.tokenDayDatas[0].totalValueLockedUSD; + const date = tokenDayDatas[0].date; + const tvlUSD = tokenDayDatas[0].totalValueLockedUSD; const dayStartTimestamp = dayID * 86400; - const tokenData = await request(endpoint, queryToken, { id: tokenAddress }); + const tokenData = await client.getToken(tokenAddress); const totalValueLockedUSD: string = tokenData.token.totalValueLockedUSD; expect(tokenID).to.be.equal(tokenAddress); @@ -109,33 +81,27 @@ export const checkTokenDayData = async (endpoint: string, tokenAddress: string): expect(tvlUSD).to.be.equal(totalValueLockedUSD); }; -export const checkTokenHourData = async (endpoint: string, tokenAddress: string): Promise => { +export const checkTokenHourData = async (client: Client, tokenAddress: string): Promise => { // Checked values: id, periodStartUnix, low, high, open, close. // Unchecked values: // Get the latest TokenHourData. - const variables = { - first: 1, - orderBy: 'periodStartUnix', - orderDirection: 'desc', - token: tokenAddress - }; - const data = await request(endpoint, queryTokenHourData, variables); - expect(data.tokenHourDatas).to.not.be.empty; + const tokenHourDatas = await client.getTokenHourDatas({ token: tokenAddress }, 0, 1, 'periodStartUnix', OrderDirection.desc); + expect(tokenHourDatas).to.not.be.empty; - const tokenHourID: string = data.tokenHourDatas[0].id; + const tokenHourID: string = tokenHourDatas[0].id; const tokenID: string = tokenHourID.split('-')[0]; const hourIndex = Number(tokenHourID.split('-')[1]); - const periodStartUnix = data.tokenHourDatas[0].periodStartUnix; - const low = data.tokenHourDatas[0].low; - const high = data.tokenHourDatas[0].high; - const open = data.tokenHourDatas[0].open; - const close = data.tokenHourDatas[0].close; + const periodStartUnix = tokenHourDatas[0].periodStartUnix; + const low = tokenHourDatas[0].low; + const high = tokenHourDatas[0].high; + const open = tokenHourDatas[0].open; + const close = tokenHourDatas[0].close; const hourStartUnix = hourIndex * 3600; - const tokenData = await request(endpoint, queryToken, { id: tokenAddress }); - const bundleData = await request(endpoint, queryBundle); - const tokenPrice = new Decimal(tokenData.token.derivedETH).times(bundleData.bundles[0].ethPriceUSD); + const tokenData = await client.getToken(tokenAddress); + const bundles = await client.getBundles(1); + const tokenPrice = new Decimal(tokenData.token.derivedETH).times(bundles[0].ethPriceUSD); expect(tokenID).to.be.equal(tokenAddress); expect(periodStartUnix).to.be.equal(hourStartUnix); @@ -145,22 +111,22 @@ export const checkTokenHourData = async (endpoint: string, tokenAddress: string) expect(close).to.be.equal(tokenPrice.toString()); }; -export const fetchTransaction = async (endpoint: string): Promise<{transaction: any}> => { +export const fetchTransaction = async (client: Client): Promise<{transaction: any}> => { // Get the latest Transaction. // Get only the latest mint, burn and swap entity in the transaction. + const transactions = await client.getTransactions( + 1, + { + orderBy: 'timestamp', + mintOrderBy: 'timestamp', + burnOrderBy: 'timestamp', + swapOrderBy: 'timestamp' + }, + OrderDirection.desc + ); - const variables = { - first: 1, - orderBy: 'timestamp', - mintOrderBy: 'timestamp', - burnOrderBy: 'timestamp', - swapOrderBy: 'timestamp', - orderDirection: 'desc' - }; - - const data = await request(endpoint, queryTransactions, variables); - expect(data.transactions).to.not.be.empty; - const transaction = data.transactions[0]; + expect(transactions).to.not.be.empty; + const transaction = transactions[0]; expect(transaction.mints).to.be.an.instanceOf(Array); expect(transaction.burns).to.be.an.instanceOf(Array); diff --git a/packages/uni-watcher/environments/local.toml b/packages/uni-watcher/environments/local.toml index ba8a40a2..728fdcfc 100644 --- a/packages/uni-watcher/environments/local.toml +++ b/packages/uni-watcher/environments/local.toml @@ -33,4 +33,5 @@ [jobQueue] dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue" - maxCompletionLag = 300 + maxCompletionLagInSecs = 300 + jobDelayInMilliSecs = 100 diff --git a/packages/uni-watcher/src/chain-pruning.test.ts b/packages/uni-watcher/src/chain-pruning.test.ts index 0a9ed1c9..72f28804 100644 --- a/packages/uni-watcher/src/chain-pruning.test.ts +++ b/packages/uni-watcher/src/chain-pruning.test.ts @@ -63,9 +63,12 @@ describe('chain pruning', () => { indexer = new Indexer(db, ethClient, postgraphileClient); assert(indexer, 'Could not create indexer object.'); - const jobQueue = new JobQueue(jobQueueConfig); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); - jobRunner = new JobRunner(indexer, jobQueue); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + + jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); }); afterEach(async () => { diff --git a/packages/uni-watcher/src/client.ts b/packages/uni-watcher/src/client.ts index b5abeb08..e5e96ae1 100644 --- a/packages/uni-watcher/src/client.ts +++ b/packages/uni-watcher/src/client.ts @@ -5,7 +5,15 @@ import { gql } from '@apollo/client/core'; import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client'; -import { queryGetPool, queryPoolIdToPoolKey, queryPosition, queryEvents, subscribeEvents, queryGetContract } from './queries'; +import { + queryGetPool, + queryPoolIdToPoolKey, + queryPosition, + queryEvents, + subscribeEvents, + queryGetContract, + queryEventsInRange +} from './queries'; export class Client { _config: GraphQLConfig; @@ -86,4 +94,16 @@ export class Client { return getContract; } + + async getEventsInRange (fromblockNumber: number, toBlockNumber: number): Promise { + const { events } = await this._client.query( + gql(queryEventsInRange), + { + fromblockNumber, + toBlockNumber + } + ); + + return events; + } } diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index f46cd650..55818294 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -3,12 +3,11 @@ // import assert from 'assert'; -import _ from 'lodash'; import { Connection, ConnectionOptions, DeepPartial, QueryRunner, FindConditions } from 'typeorm'; import { Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util'; -import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; +import { Event } from './entity/Event'; import { Contract } from './entity/Contract'; import { BlockProgress } from './entity/BlockProgress'; import { SyncStatus } from './entity/SyncStatus'; @@ -32,45 +31,62 @@ export class Database implements DatabaseInterface { return this._baseDatabase.close(); } + async getContract (address: string): Promise { + return this._conn.getRepository(Contract) + .createQueryBuilder('contract') + .where('address = :address', { address }) + .getOne(); + } + + async getLatestContract (kind: string): Promise { + return this._conn.getRepository(Contract) + .createQueryBuilder('contract') + .where('kind = :kind', { kind }) + .orderBy('id', 'DESC') + .getOne(); + } + + async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { + const repo = queryRunner.manager.getRepository(Contract); + + const numRows = await repo + .createQueryBuilder() + .where('address = :address', { address }) + .getCount(); + + if (numRows === 0) { + const entity = repo.create({ address, kind, startingBlock }); + await repo.save(entity); + } + } + async createTransactionRunner (): Promise { return this._baseDatabase.createTransactionRunner(); } + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + const repo = this._conn.getRepository(BlockProgress); + + return this._baseDatabase.getProcessedBlockCountForRange(repo, fromBlockNumber, toBlockNumber); + } + + async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { + const repo = this._conn.getRepository(Event); + + return this._baseDatabase.getEventsInRange(repo, fromBlockNumber, toBlockNumber); + } + + async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise { + const repo = queryRunner.manager.getRepository(Event); + return this._baseDatabase.saveEventEntity(repo, entity); + } + async getBlockEvents (blockHash: string): Promise { const repo = this._conn.getRepository(Event); return this._baseDatabase.getBlockEvents(repo, blockHash); } - async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { - const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1); - const expected = blockNumbers.length; - - const repo = this._conn.getRepository(BlockProgress); - const { count: actual } = await repo - .createQueryBuilder('block_progress') - .select('COUNT(DISTINCT(block_number))', 'count') - .where('block_number IN (:...blockNumbers) AND is_complete = :isComplete', { blockNumbers, isComplete: true }) - .getRawOne(); - - return { expected, actual: parseInt(actual) }; - } - - async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { - const events = await this._conn.getRepository(Event) - .createQueryBuilder('event') - .innerJoinAndSelect('event.block', 'block') - .where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', { - fromBlockNumber, - toBlockNumber, - eventName: UNKNOWN_EVENT_NAME - }) - .addOrderBy('event.id', 'ASC') - .getMany(); - - return events; - } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); @@ -108,40 +124,6 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getEvent(repo, id); } - async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise { - const repo = queryRunner.manager.getRepository(Event); - return await repo.save(entity); - } - - async getContract (address: string): Promise { - return this._conn.getRepository(Contract) - .createQueryBuilder('contract') - .where('address = :address', { address }) - .getOne(); - } - - async getLatestContract (kind: string): Promise { - return this._conn.getRepository(Contract) - .createQueryBuilder('contract') - .where('kind = :kind', { kind }) - .orderBy('id', 'DESC') - .getOne(); - } - - async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { - const repo = queryRunner.manager.getRepository(Contract); - - const numRows = await repo - .createQueryBuilder() - .where('address = :address', { address }) - .getCount(); - - if (numRows === 0) { - const entity = repo.create({ address, kind, startingBlock }); - await repo.save(entity); - } - } - async getBlocksAtHeight (height: number, isPruned: boolean): Promise { const repo = this._conn.getRepository(BlockProgress); diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index 83033fc1..6087b408 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -76,10 +76,10 @@ export const main = async (): Promise => { const pubsub = new PubSub(); const indexer = new Indexer(db, ethClient, postgraphileClient); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 4b624150..5e910937 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -22,9 +22,6 @@ import { abi as factoryABI, storageLayout as factoryStorageLayout } from './arti import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json'; import poolABI from './artifacts/pool.json'; -// TODO: Move to config. -const MAX_EVENTS_BLOCK_RANGE = 1000; - const log = debug('vulcanize:indexer'); type ResultEvent = { @@ -102,15 +99,6 @@ export class Indexer implements IndexerInterface { }; } - // Note: Some event names might be unknown at this point, as earlier events might not yet be processed. - async getOrFetchBlockEvents (block: DeepPartial): Promise> { - return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); - } - - async getBlockEvents (blockHash: string): Promise> { - return this._baseIndexer.getBlockEvents(blockHash); - } - async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise> { if (contract) { const uniContract = await this.isUniswapContract(contract); @@ -304,6 +292,109 @@ export class Indexer implements IndexerInterface { return { eventName, eventInfo }; } + async position (blockHash: string, tokenId: string): Promise { + const nfpmContract = await this._db.getLatestContract('nfpm'); + assert(nfpmContract, 'No NFPM contract watched.'); + const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId)); + + return { + ...value, + proof + }; + } + + async poolIdToPoolKey (blockHash: string, poolId: string): Promise { + const nfpmContract = await this._db.getLatestContract('nfpm'); + assert(nfpmContract, 'No NFPM contract watched.'); + const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId)); + + return { + ...value, + proof + }; + } + + async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise { + const factoryContract = await this._db.getLatestContract('factory'); + assert(factoryContract, 'No Factory contract watched.'); + const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee)); + + return { + pool: value, + proof + }; + } + + async getContract (type: string): Promise { + const contract = await this._db.getLatestContract(type); + return contract; + } + + async saveEventEntity (dbEvent: Event): Promise { + return this._baseIndexer.saveEventEntity(dbEvent); + } + + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + return this._baseIndexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); + } + + async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { + return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber); + } + + // Note: Some event names might be unknown at this point, as earlier events might not yet be processed. + async getOrFetchBlockEvents (block: DeepPartial): Promise> { + return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + } + + async getBlockEvents (blockHash: string): Promise> { + return this._baseIndexer.getBlockEvents(blockHash); + } + + async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber); + } + + async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); + } + + async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { + return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber); + } + + async getSyncStatus (): Promise { + return this._baseIndexer.getSyncStatus(); + } + + async getBlock (blockHash: string): Promise { + return this._baseIndexer.getBlock(blockHash); + } + + async getEvent (id: string): Promise { + return this._baseIndexer.getEvent(id); + } + + async getBlockProgress (blockHash: string): Promise { + return this._baseIndexer.getBlockProgress(blockHash); + } + + async getBlocksAtHeight (height: number, isPruned: boolean): Promise { + return this._baseIndexer.getBlocksAtHeight(height, isPruned); + } + + async markBlocksAsPruned (blocks: BlockProgress[]): Promise { + return this._baseIndexer.markBlocksAsPruned(blocks); + } + + async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseIndexer.getAncestorAtDepth(blockHash, depth); + } + async _fetchAndSaveEvents ({ blockHash }: DeepPartial): Promise { assert(blockHash); let { block, logs } = await this._ethClient.getLogs({ blockHash }); @@ -396,121 +487,6 @@ export class Indexer implements IndexerInterface { } } - async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber); - } - - async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber); - } - - async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { - return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber); - } - - async getSyncStatus (): Promise { - return this._baseIndexer.getSyncStatus(); - } - - async getBlock (blockHash: string): Promise { - return this._baseIndexer.getBlock(blockHash); - } - - async getEvent (id: string): Promise { - return this._baseIndexer.getEvent(id); - } - - async saveEventEntity (dbEvent: Event): Promise { - const dbTx = await this._db.createTransactionRunner(); - let res; - - try { - res = this._db.saveEventEntity(dbTx, dbEvent); - await dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - - return res; - } - - async getBlockProgress (blockHash: string): Promise { - return this._baseIndexer.getBlockProgress(blockHash); - } - - async getBlocksAtHeight (height: number, isPruned: boolean): Promise { - return this._baseIndexer.getBlocksAtHeight(height, isPruned); - } - - async markBlocksAsPruned (blocks: BlockProgress[]): Promise { - return this._baseIndexer.markBlocksAsPruned(blocks); - } - - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); - } - - async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { - return this._db.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); - } - - async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { - if (toBlockNumber <= fromBlockNumber) { - throw new Error('toBlockNumber should be greater than fromBlockNumber'); - } - - if ((toBlockNumber - fromBlockNumber) > MAX_EVENTS_BLOCK_RANGE) { - throw new Error(`Max range (${MAX_EVENTS_BLOCK_RANGE}) exceeded`); - } - - return this._db.getEventsInRange(fromBlockNumber, toBlockNumber); - } - - async position (blockHash: string, tokenId: string): Promise { - const nfpmContract = await this._db.getLatestContract('nfpm'); - assert(nfpmContract, 'No NFPM contract watched.'); - const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId)); - - return { - ...value, - proof - }; - } - - async poolIdToPoolKey (blockHash: string, poolId: string): Promise { - const nfpmContract = await this._db.getLatestContract('nfpm'); - assert(nfpmContract, 'No NFPM contract watched.'); - const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId)); - - return { - ...value, - proof - }; - } - - async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise { - const factoryContract = await this._db.getLatestContract('factory'); - assert(factoryContract, 'No Factory contract watched.'); - const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee)); - - return { - pool: value, - proof - }; - } - - async getContract (type: string): Promise { - const contract = await this._db.getLatestContract(type); - return contract; - } - - async getAncestorAtDepth (blockHash: string, depth: number): Promise { - return this._baseIndexer.getAncestorAtDepth(blockHash, depth); - } - // TODO: Move into base/class or framework package. async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise { return getStorageValue( diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 96d2f00c..32b6593e 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -10,7 +10,15 @@ import debug from 'debug'; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, JobQueue, JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util'; +import { + getConfig, + JobQueue, + JobRunner as BaseJobRunner, + QUEUE_BLOCK_PROCESSING, + QUEUE_EVENT_PROCESSING, + QUEUE_CHAIN_PRUNING, + JobQueueConfig +} from '@vulcanize/util'; import { Indexer } from './indexer'; import { Database } from './database'; @@ -22,11 +30,13 @@ export class JobRunner { _indexer: Indexer _jobQueue: JobQueue _baseJobRunner: BaseJobRunner + _jobQueueConfig: JobQueueConfig - constructor (indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; - this._baseJobRunner = new BaseJobRunner(this._indexer, this._jobQueue); + this._jobQueueConfig = jobQueueConfig; + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -39,13 +49,6 @@ export class JobRunner { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._baseJobRunner.processBlock(job); - const { data: { blockHash, blockNumber, parentHash, timestamp } } = job; - - const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); - for (let ei = 0; ei < events.length; ei++) { - await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); - } - await this._jobQueue.markComplete(job); }); } @@ -130,13 +133,13 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-watcher/src/queries.ts b/packages/uni-watcher/src/queries.ts index 5b6abf91..41b29f32 100644 --- a/packages/uni-watcher/src/queries.ts +++ b/packages/uni-watcher/src/queries.ts @@ -167,3 +167,10 @@ query queryGetContract($type: String!) { } } `; + +export const queryEventsInRange = gql` +query getEventsInRange($fromBlockNumber: Int!, $toBlockNumber: Int!) { + eventsInRange(fromBlockNumber: $fromBlockNumber, toBlockNumber: $toBlockNumber) + ${resultEvent} +} +`; diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index 142bc7da..d0385790 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -73,10 +73,10 @@ export const main = async (): Promise => { assert(jobQueueConfig, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLag } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index f45922c4..dc8d99c6 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -14,8 +14,8 @@ const log = debug('vulcanize:config'); export interface JobQueueConfig { dbConnectionString: string; - maxCompletionLag: number; - jobDelay?: number; + maxCompletionLagInSecs: number; + jobDelayInMilliSecs?: number; } export interface Config { diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 68268e2a..20380e54 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -5,9 +5,12 @@ import assert from 'assert'; import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, In, QueryRunner, Repository } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; +import _ from 'lodash'; import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types'; +const UNKNOWN_EVENT_NAME = '__unknown__'; + export class Database { _config: ConnectionOptions _conn!: Connection @@ -245,4 +248,35 @@ export class Database { return ancestorBlockHash; } + + async getProcessedBlockCountForRange (repo: Repository, fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1); + const expected = blockNumbers.length; + + const { count: actual } = await repo + .createQueryBuilder('block_progress') + .select('COUNT(DISTINCT(block_number))', 'count') + .where('block_number IN (:...blockNumbers) AND is_complete = :isComplete', { blockNumbers, isComplete: true }) + .getRawOne(); + + return { expected, actual: parseInt(actual) }; + } + + async getEventsInRange (repo: Repository, fromBlockNumber: number, toBlockNumber: number): Promise> { + const events = repo.createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') + .where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', { + fromBlockNumber, + toBlockNumber, + eventName: UNKNOWN_EVENT_NAME + }) + .addOrderBy('event.id', 'ASC') + .getMany(); + + return events; + } + + async saveEventEntity (repo: Repository, entity: EventInterface): Promise { + return await repo.save(entity); + } } diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 5a01994f..2e1486fa 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -10,6 +10,8 @@ import { EthClient } from '@vulcanize/ipld-eth-client'; import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface } from './types'; +const MAX_EVENTS_BLOCK_RANGE = 1000; + const log = debug('vulcanize:indexer'); export class Indexer { @@ -159,4 +161,37 @@ export class Indexer { async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._db.getAncestorAtDepth(blockHash, depth); } + + async saveEventEntity (dbEvent: EventInterface): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = this._db.saveEventEntity(dbTx, dbEvent); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; + } + + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { + return this._db.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); + } + + async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { + if (toBlockNumber <= fromBlockNumber) { + throw new Error('toBlockNumber should be greater than fromBlockNumber'); + } + + if ((toBlockNumber - fromBlockNumber) > MAX_EVENTS_BLOCK_RANGE) { + throw new Error(`Max range (${MAX_EVENTS_BLOCK_RANGE}) exceeded`); + } + + return this._db.getEventsInRange(fromBlockNumber, toBlockNumber); + } } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 88eb4ea1..c69a9d31 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,8 +4,10 @@ import assert from 'assert'; import debug from 'debug'; +import { wait } from '.'; -import { MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING } from './constants'; +import { JobQueueConfig } from './config'; +import { MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface } from './types'; @@ -14,14 +16,16 @@ const log = debug('vulcanize:job-runner'); export class JobRunner { _indexer: IndexerInterface _jobQueue: JobQueue + _jobQueueConfig: JobQueueConfig - constructor (indexer: IndexerInterface, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; + this._jobQueueConfig = jobQueueConfig; } async processBlock (job: any): Promise { - const { data: { blockHash, blockNumber, parentHash, priority } } = job; + const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job; log(`Processing block number ${blockNumber} hash ${blockHash} `); @@ -63,6 +67,21 @@ export class JobRunner { throw new Error(message); } } + + // Check if block is being already processed. + const blockProgress = await this._indexer.getBlockProgress(blockHash); + + if (!blockProgress) { + const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; + + // Delay required to process block. + await wait(jobDelayInMilliSecs); + const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + + for (let ei = 0; ei < events.length; ei++) { + await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); + } + } } async processEvent (job: any): Promise { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index fe2cc3fa..32946503 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -47,6 +47,7 @@ export interface IndexerInterface { getBlocksAtHeight (height: number, isPruned: boolean): Promise; getBlockEvents (blockHash: string): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise + getOrFetchBlockEvents (block: DeepPartial): Promise> updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise @@ -68,11 +69,14 @@ export interface DatabaseInterface { getEvent (id: string): Promise getSyncStatus (queryRunner: QueryRunner): Promise getAncestorAtDepth (blockHash: string, depth: number): Promise + getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>; + getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise>; markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise; updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; - removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise + saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise; + removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise; }