Refactoring, extract uni-info-watcher GQL client (#226)

* Refactor common code in uni-watcher and uni-info-watcher.

* Implement client.ts in uni-info-watcher for queries and use in smoke-test.

* Disable cache in graphql client.

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-24 11:55:29 +05:30 committed by GitHub
parent 885a55e513
commit 3ff2fdf11b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1126 additions and 888 deletions

View File

@ -35,4 +35,4 @@
[jobQueue] [jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/address-watcher-job-queue" dbConnectionString = "postgres://postgres:postgres@localhost/address-watcher-job-queue"
maxCompletionLag = 300 maxCompletionLagInSecs = 300

View File

@ -67,10 +67,10 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) { for (let blockNumber = argv.startBlock; blockNumber <= argv.endBlock; blockNumber++) {

View File

@ -59,10 +59,10 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
await jobQueue.subscribe(QUEUE_TX_TRACING, async (job) => { await jobQueue.subscribe(QUEUE_TX_TRACING, async (job) => {

View File

@ -67,11 +67,11 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
assert(dbConnectionString, 'Missing job queue max completion lag time (seconds)'); assert(dbConnectionString, 'Missing job queue max completion lag time (seconds)');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.

View File

@ -8,7 +8,17 @@ import fetch from 'cross-fetch';
import { SubscriptionClient } from 'subscriptions-transport-ws'; import { SubscriptionClient } from 'subscriptions-transport-ws';
import ws from 'ws'; import ws from 'ws';
import { ApolloClient, NormalizedCacheObject, split, HttpLink, InMemoryCache, DocumentNode, TypedDocumentNode, from } from '@apollo/client/core'; import {
ApolloClient,
NormalizedCacheObject,
split,
HttpLink,
InMemoryCache,
DocumentNode,
TypedDocumentNode,
from,
DefaultOptions
} from '@apollo/client/core';
import { getMainDefinition } from '@apollo/client/utilities'; import { getMainDefinition } from '@apollo/client/utilities';
import { WebSocketLink } from '@apollo/client/link/ws'; import { WebSocketLink } from '@apollo/client/link/ws';
@ -71,9 +81,19 @@ export class GraphQLClient {
link = splitLink; link = splitLink;
} }
const defaultOptions: DefaultOptions = {
watchQuery: {
fetchPolicy: 'no-cache'
},
query: {
fetchPolicy: 'no-cache'
}
};
this._client = new ApolloClient({ this._client = new ApolloClient({
link, link,
cache: new InMemoryCache() cache: new InMemoryCache(),
defaultOptions
}); });
} }

View File

@ -41,5 +41,5 @@
[jobQueue] [jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue" dbConnectionString = "postgres://postgres:postgres@localhost/uni-info-watcher-job-queue"
maxCompletionLag = 300 maxCompletionLagInSecs = 300
jobDelay = 1000 jobDelayInMilliSecs = 1000

View File

@ -0,0 +1,241 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { gql } from '@apollo/client/core';
import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';
import { BlockHeight, OrderDirection } from './indexer';
import {
queryBundles,
queryBurns,
queryFactories,
queryMints,
queryPoolById,
queryPoolDayDatas,
queryPools,
queryPositions,
querySwaps,
queryTicks,
queryToken,
queryTokenDayDatas,
queryTokenHourDatas,
queryTransactions,
queryUniswapDayDatas
} from './queries';
export class Client {
_config: GraphQLConfig;
_client: GraphQLClient;
constructor (config: GraphQLConfig) {
this._config = config;
this._client = new GraphQLClient(config);
}
async getToken (tokenId: string, block?: BlockHeight): Promise<any> {
const { token } = await this._client.query(
gql(queryToken),
{
block,
id: tokenId
}
);
return token;
}
async getFactories (first?: number, block?: BlockHeight): Promise<any> {
const { factories } = await this._client.query(
gql(queryFactories),
{
block,
first
}
);
return factories;
}
async getBundles (first?: number, block?: BlockHeight): Promise<any> {
const { bundles } = await this._client.query(
gql(queryBundles),
{
block,
first
}
);
return bundles;
}
async getPoolById (id: string): Promise<any> {
const { pool } = await this._client.query(
gql(queryPoolById),
{
id
}
);
return pool;
}
async getTicks (where?: any, skip?: number, first?: number, block?: BlockHeight): Promise<any> {
const { ticks } = await this._client.query(
gql(queryTicks),
{
where,
skip,
first,
block
}
);
return ticks;
}
async getPools (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { pools } = await this._client.query(
gql(queryPools),
{
where,
first,
orderBy,
orderDirection
}
);
return pools;
}
async getUniswapDayDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { uniswapDayDatas } = await this._client.query(
gql(queryUniswapDayDatas),
{
where,
skip,
first,
orderBy,
orderDirection
}
);
return uniswapDayDatas;
}
async getPoolDayDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { poolDayDatas } = await this._client.query(
gql(queryPoolDayDatas),
{
where,
skip,
first,
orderBy,
orderDirection
}
);
return poolDayDatas;
}
async getTokenDayDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { tokenDayDatas } = await this._client.query(
gql(queryTokenDayDatas),
{
where,
skip,
first,
orderBy,
orderDirection
}
);
return tokenDayDatas;
}
async getTokenHourDatas (where?: any, skip?: number, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { tokenHourDatas } = await this._client.query(
gql(queryTokenHourDatas),
{
where,
skip,
first,
orderBy,
orderDirection
}
);
return tokenHourDatas;
}
async getMints (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { mints } = await this._client.query(
gql(queryMints),
{
where,
first,
orderBy,
orderDirection
}
);
return mints;
}
async getBurns (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { burns } = await this._client.query(
gql(queryBurns),
{
where,
first,
orderBy,
orderDirection
}
);
return burns;
}
async getSwaps (where?: any, first?: number, orderBy?: string, orderDirection?: OrderDirection): Promise<any> {
const { swaps } = await this._client.query(
gql(querySwaps),
{
where,
first,
orderBy,
orderDirection
}
);
return swaps;
}
async getTransactions (first?: number, { orderBy, mintOrderBy, burnOrderBy, swapOrderBy }: {[key: string]: string} = {}, orderDirection?: OrderDirection): Promise<any> {
const { transactions } = await this._client.query(
gql(queryTransactions),
{
first,
orderBy,
orderDirection,
mintOrderBy,
burnOrderBy,
swapOrderBy
}
);
return transactions;
}
async getPositions (where?: any, first?: number): Promise<any> {
const { positions } = await this._client.query(
gql(queryPositions),
{
where,
first
}
);
return positions;
}
}

View File

@ -88,10 +88,6 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.close(); return this._baseDatabase.close();
} }
async createTransactionRunner (): Promise<QueryRunner> {
return this._baseDatabase.createTransactionRunner();
}
async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Factory>): Promise<Factory | undefined> { async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<Factory>): Promise<Factory | undefined> {
const repo = queryRunner.manager.getRepository(Factory); const repo = queryRunner.manager.getRepository(Factory);
const whereOptions: FindConditions<Factory> = { id }; const whereOptions: FindConditions<Factory> = { id };
@ -651,12 +647,6 @@ export class Database implements DatabaseInterface {
return numRows > 0; return numRows > 0;
} }
async getBlockEvents (blockHash: string): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash);
}
async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> { async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
@ -682,6 +672,33 @@ export class Database implements DatabaseInterface {
.getMany(); .getMany();
} }
async createTransactionRunner (): Promise<QueryRunner> {
return this._baseDatabase.createTransactionRunner();
}
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getProcessedBlockCountForRange(repo, fromBlockNumber, toBlockNumber);
}
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getEventsInRange(repo, fromBlockNumber, toBlockNumber);
}
async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise<Event> {
const repo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> { async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress); const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event); const eventRepo = queryRunner.manager.getRepository(Event);

View File

@ -77,10 +77,10 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(db, uniClient, erc20Client, ethClient); const indexer = new Indexer(db, uniClient, erc20Client, ethClient);
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

View File

@ -90,14 +90,6 @@ export class Indexer implements IndexerInterface {
}; };
} }
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
}
async processEvent (dbEvent: Event): Promise<void> { async processEvent (dbEvent: Event): Promise<void> {
const resultEvent = this.getResultEvent(dbEvent); const resultEvent = this.getResultEvent(dbEvent);
@ -159,26 +151,6 @@ export class Indexer implements IndexerInterface {
log('Event processing completed for', eventName); log('Event processing completed for', eventName);
} }
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
}
async getBlocks (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise<any> { async getBlocks (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise<any> {
if (where.timestamp_gt) { if (where.timestamp_gt) {
where.blockTimestamp_gt = where.timestamp_gt; where.blockTimestamp_gt = where.timestamp_gt;
@ -203,26 +175,10 @@ export class Indexer implements IndexerInterface {
})); }));
} }
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> { async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks); return this._baseIndexer.markBlocksAsPruned(blocks);
} }
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
}
async getBundle (id: string, block: BlockHeight): Promise<Bundle | undefined> { async getBundle (id: string, block: BlockHeight): Promise<Bundle | undefined> {
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
let res; let res;
@ -324,6 +280,50 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth); return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
} }
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
}
async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<void> { async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<void> {
assert(block.blockHash); assert(block.blockHash);
const events = await this._uniClient.getEvents(block.blockHash); const events = await this._uniClient.getEvents(block.blockHash);

View File

@ -10,9 +10,17 @@ import debug from 'debug';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher';
import { getConfig, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING, JobRunner as BaseJobRunner, wait, JobQueueConfig } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import {
getConfig,
JobQueue,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_CHAIN_PRUNING,
JobRunner as BaseJobRunner,
JobQueueConfig
} from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
@ -29,7 +37,7 @@ export class JobRunner {
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig; this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._indexer, this._jobQueue); this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
} }
async start (): Promise<void> { async start (): Promise<void> {
@ -42,24 +50,6 @@ export class JobRunner {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job); await this._baseJobRunner.processBlock(job);
const { data: { blockHash, blockNumber, parentHash, timestamp } } = job;
// Check if block is being already processed.
// TODO: Debug issue block getting processed twice without this check. Can reproduce with NFPM.mint().
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (!blockProgress) {
const { jobDelay } = this._jobQueueConfig;
assert(jobDelay);
// Delay to allow uni-watcher to process block.
await wait(jobDelay);
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
}
}
await this._jobQueue.markComplete(job); await this._jobQueue.markComplete(job);
}); });
} }
@ -130,11 +120,10 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag, jobDelay } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(jobDelay, 'Missing job delay time');
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);

View File

@ -6,11 +6,11 @@ import 'mocha';
import { expect } from 'chai'; import { expect } from 'chai';
import { GraphQLClient } from 'graphql-request'; import { GraphQLClient } from 'graphql-request';
import { queryBundle } from '../queries'; import { queryBundles } from '../queries';
import { Data } from './data'; import { Data } from './data';
describe('server', () => { describe('server', () => {
const client = new GraphQLClient('http://localhost:3003/graphql'); const client = new GraphQLClient('http://localhost:3004/graphql');
const data = Data.getInstance(); const data = Data.getInstance();
it('query bundle', async () => { it('query bundle', async () => {
@ -21,9 +21,9 @@ describe('server', () => {
const { id, blockNumber, ethPriceUSD } = bundles[i]; const { id, blockNumber, ethPriceUSD } = bundles[i];
// Bundle query. // Bundle query.
const result = await client.request(queryBundle, { id, blockNumber }); const [bundle] = await client.request(queryBundles, { first: 1, block: { number: blockNumber } });
expect(result.bundle.id).to.equal(id); expect(bundle.id).to.equal(id);
expect(result.bundle.ethPriceUSD).to.equal(ethPriceUSD); expect(bundle.ethPriceUSD).to.equal(ethPriceUSD);
} }
}); });
}); });

View File

@ -4,11 +4,280 @@
import { gql } from 'graphql-request'; import { gql } from 'graphql-request';
export const queryBundle = gql` const resultPool = `
query getBundle($id: ID!, $blockNumber: Int!) { {
bundle(id: $id, block: { number: $blockNumber }) { id,
feeTier,
liquidity,
sqrtPrice,
tick,
token0 {
id
},
token0Price,
token1 {
id
},
token1Price,
totalValueLockedToken0,
totalValueLockedToken1,
totalValueLockedUSD,
txCount,
volumeUSD,
}
`;
export const queryToken = gql`
query queryToken($id: ID!, $block: Block_height) {
token(id: $id, block: $block) {
derivedETH
feesUSD
id
name
symbol
totalValueLocked
totalValueLockedUSD
txCount
volume
volumeUSD
}
}`;
export const queryFactories = gql`
query queryFactories($block: Block_height, $first: Int) {
factories(first: $first, block: $block) {
id
totalFeesUSD
totalValueLockedUSD
totalVolumeUSD
txCount
}
}`;
export const queryBundles = gql`
query queryBundles($block: Block_height, $first: Int) {
bundles(first: $first, block: $block) {
id id
ethPriceUSD ethPriceUSD
} }
} }`;
`;
// Getting Pool by id.
export const queryPoolById = gql`
query queryPoolById($id: ID!) {
pool(id: $id)
${resultPool}
}`;
export const queryTicks = gql`
query queryTicks($skip: Int, $first: Int, $where: Tick_filter, $block: Block_height) {
ticks(skip: $skip, first: $first, where: $where, block: $block) {
id
liquidityGross
liquidityNet
price0
price1
tickIdx
}
}`;
// Getting Pool(s).
export const queryPools = gql`
query queryPools($where: Pool_filter, $first: Int, $orderBy: Pool_orderBy, $orderDirection: OrderDirection) {
pools(where: $where, first: $first, orderBy: $orderBy, orderDirection: $orderDirection)
${resultPool}
}`;
// Getting UniswapDayData(s).
export const queryUniswapDayDatas = gql`
query queryUniswapDayDatas($first: Int, $skip: Int, $orderBy: UniswapDayData_orderBy, $orderDirection: OrderDirection, $where: UniswapDayData_filter) {
uniswapDayDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) {
id,
date,
tvlUSD,
volumeUSD
}
}`;
// Getting PoolDayData(s).
export const queryPoolDayDatas = gql`
query queryPoolDayDatas($first: Int, $skip: Int, $orderBy: PoolDayData_orderBy, $orderDirection: OrderDirection, $where: PoolDayData_filter) {
poolDayDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) {
id,
date,
tvlUSD,
volumeUSD
}
}`;
// Getting TokenDayDatas(s).
export const queryTokenDayDatas = gql`
query queryTokenDayData($first: Int, $skip: Int, $orderBy: TokenDayData_orderBy, $orderDirection: OrderDirection, $where: TokenDayData_filter) {
tokenDayDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) {
id,
date,
totalValueLockedUSD,
volumeUSD
}
}`;
// Getting TokenDayDatas(s).
export const queryTokenHourDatas = gql`
query queryTokenHourData($first: Int, $skip: Int, $orderBy: TokenHourData_orderBy, $orderDirection: OrderDirection, $where: TokenHourData_filter) {
tokenHourDatas(first: $first, skip: $skip, orderBy: $orderBy, orderDirection: $orderDirection, where: $where) {
id,
low,
high,
open,
close,
periodStartUnix
}
}`;
// Getting mint(s).
export const queryMints = gql`
query queryMints(
$first: Int,
$orderBy: Mint_orderBy,
$orderDirection: OrderDirection,
$where: Mint_filter) {
mints(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection,
where: $where) {
amount0,
amount1,
amountUSD,
id,
origin,
owner,
sender,
timestamp,
pool {
id
},
transaction {
id
}
}
}`;
// Getting burns(s).
export const queryBurns = gql`
query queryBurns(
$first: Int,
$orderBy: Burn_orderBy,
$orderDirection: OrderDirection,
$where: Burn_filter) {
burns(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection,
where: $where) {
amount0,
amount1,
amountUSD,
id,
origin,
owner,
timestamp,
pool {
id
},
transaction {
id
}
}
}`;
// Getting swap(s) .
export const querySwaps = gql`
query querySwaps(
$first: Int,
$orderBy: Swap_orderBy,
$orderDirection: OrderDirection,
$where: Swap_filter) {
swaps(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection,
where: $where) {
amount0,
amount1,
amountUSD,
id,
origin,
timestamp,
pool {
id
},
transaction {
id
}
}
}`;
// Getting transactions(s).
export const queryTransactions = gql`
query queryTransactions(
$first: Int,
$orderBy: Transaction_orderBy,
$mintOrderBy: Mint_orderBy,
$burnOrderBy: Burn_orderBy,
$swapOrderBy: Swap_orderBy,
$orderDirection: OrderDirection) {
transactions(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection) {
id,
mints( first: $first, orderBy: $mintOrderBy, orderDirection: $orderDirection) {
id,
timestamp
},
burns( first: $first, orderBy: $burnOrderBy, orderDirection: $orderDirection) {
id,
timestamp
},
swaps( first: $first, orderBy: $swapOrderBy, orderDirection: $orderDirection) {
id,
timestamp
},
timestamp
}
}`;
// Getting positions.
export const queryPositions = gql`
query queryPositions($first: Int, $where: Position_filter) {
positions(first: $first, where: $where) {
id,
pool {
id
},
token0 {
id
},
token1 {
id
},
tickLower {
id
},
tickUpper {
id
},
transaction {
id
},
liquidity,
depositedToken0,
depositedToken1,
collectedFeesToken0,
collectedFeesToken1,
owner,
feeGrowthInside0LastX128,
feeGrowthInside1LastX128
}
}`;

View File

@ -78,10 +78,10 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const pubSub = new PubSub(); const pubSub = new PubSub();

View File

@ -4,7 +4,6 @@
import { expect } from 'chai'; import { expect } from 'chai';
import { ethers, Contract, Signer, constants } from 'ethers'; import { ethers, Contract, Signer, constants } from 'ethers';
import { request } from 'graphql-request';
import 'mocha'; import 'mocha';
import _ from 'lodash'; import _ from 'lodash';
@ -33,18 +32,7 @@ import {
abi as POOL_ABI abi as POOL_ABI
} from '@uniswap/v3-core/artifacts/contracts/UniswapV3Pool.sol/UniswapV3Pool.json'; } from '@uniswap/v3-core/artifacts/contracts/UniswapV3Pool.sol/UniswapV3Pool.json';
import { import { Client } from './client';
queryFactory,
queryBundle,
queryToken,
queryPoolsByTokens,
queryPoolById,
queryMints,
queryTicks,
queryBurns,
querySwaps,
queryPositions
} from '../test/queries';
import { import {
checkUniswapDayData, checkUniswapDayData,
checkPoolDayData, checkPoolDayData,
@ -52,6 +40,7 @@ import {
checkTokenHourData, checkTokenHourData,
fetchTransaction fetchTransaction
} from '../test/utils'; } from '../test/utils';
import { OrderDirection } from './indexer';
const NETWORK_RPC_URL = 'http://localhost:8545'; const NETWORK_RPC_URL = 'http://localhost:8545';
@ -70,8 +59,8 @@ describe('uni-info-watcher', () => {
let signer: Signer; let signer: Signer;
let recipient: string; let recipient: string;
let config: Config; let config: Config;
let endpoint: string;
let uniClient: UniClient; let uniClient: UniClient;
let client: Client;
before(async () => { before(async () => {
const provider = new ethers.providers.JsonRpcProvider(NETWORK_RPC_URL); const provider = new ethers.providers.JsonRpcProvider(NETWORK_RPC_URL);
@ -82,33 +71,40 @@ describe('uni-info-watcher', () => {
config = await getConfig(configFile); config = await getConfig(configFile);
const { upstream, server: { host, port } } = config; const { upstream, server: { host, port } } = config;
endpoint = `http://${host}:${port}/graphql`; const endpoint = `http://${host}:${port}/graphql`;
const { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint } } = upstream; let { uniWatcher: { gqlEndpoint, gqlSubscriptionEndpoint } } = upstream;
uniClient = new UniClient({ uniClient = new UniClient({
gqlEndpoint, gqlEndpoint,
gqlSubscriptionEndpoint gqlSubscriptionEndpoint
}); });
gqlEndpoint = endpoint;
gqlSubscriptionEndpoint = endpoint;
client = new Client({
gqlEndpoint,
gqlSubscriptionEndpoint
});
}); });
it('should have a Factory entity', async () => { it('should have a Factory entity', async () => {
// Getting the Factory from uni-info-watcher graphQL endpoint. // Getting the Factory from uni-info-watcher graphQL endpoint.
const data = await request(endpoint, queryFactory); const factories = await client.getFactories(1);
expect(data.factories).to.not.be.empty; expect(factories).to.not.be.empty;
// Initializing the factory variable. // Initializing the factory variable.
const factoryAddress = data.factories[0].id; const factoryAddress = factories[0].id;
factory = new ethers.Contract(factoryAddress, FACTORY_ABI, signer); factory = new ethers.Contract(factoryAddress, FACTORY_ABI, signer);
expect(factory.address).to.not.be.empty; expect(factory.address).to.not.be.empty;
}); });
it('should have a Bundle entity', async () => { it('should have a Bundle entity', async () => {
// Getting the Bundle from uni-info-watcher graphQL endpoint. // Getting the Bundle from uni-info-watcher graphQL endpoint.
const data = await request(endpoint, queryBundle); const bundles = await client.getBundles(1);
expect(data.bundles).to.not.be.empty; expect(bundles).to.not.be.empty;
const bundleId = '1'; const bundleId = '1';
expect(data.bundles[0].id).to.equal(bundleId); expect(bundles[0].id).to.equal(bundleId);
}); });
describe('PoolCreatedEvent', () => { describe('PoolCreatedEvent', () => {
@ -126,11 +122,11 @@ describe('uni-info-watcher', () => {
it('should not have Token entities', async () => { it('should not have Token entities', async () => {
// Check that Token entities are absent. // Check that Token entities are absent.
const data0 = await request(endpoint, queryToken, { id: token0Address }); const token0 = await client.getToken(token0Address);
expect(data0.token).to.be.null; expect(token0).to.be.null;
const data1 = await request(endpoint, queryToken, { id: token0Address }); const token1 = await client.getToken(token1Address);
expect(data1.token).to.be.null; expect(token1).to.be.null;
}); });
it('should trigger PoolCreatedEvent', async () => { it('should trigger PoolCreatedEvent', async () => {
@ -147,29 +143,30 @@ describe('uni-info-watcher', () => {
it('should create Token entities', async () => { it('should create Token entities', async () => {
// Check that Token entities are present. // Check that Token entities are present.
const data0 = await request(endpoint, queryToken, { id: token0Address }); const token0 = await client.getToken(token0Address);
expect(data0.token).to.not.be.null; expect(token0).to.not.be.null;
const data1 = await request(endpoint, queryToken, { id: token0Address }); const token1 = await client.getToken(token1Address);
expect(data1.token).to.not.be.null; expect(token1).to.not.be.null;
}); });
it('should create a Pool entity', async () => { it('should create a Pool entity', async () => {
// Checked values: feeTier // Checked values: feeTier
const variables = { const poolWhere = {
tokens: [token0Address, token1Address] token0_in: [token0Address, token1Address],
token1_in: [token0Address, token1Address]
}; };
// Getting the Pool that has the deployed tokens. // Getting the Pool that has the deployed tokens.
const data = await request(endpoint, queryPoolsByTokens, variables); const pools = await client.getPools(poolWhere);
expect(data.pools).to.have.lengthOf(1); expect(pools).to.have.lengthOf(1);
// Initializing the pool variable. // Initializing the pool variable.
const poolAddress = data.pools[0].id; const poolAddress = pools[0].id;
pool = new Contract(poolAddress, POOL_ABI, signer); pool = new Contract(poolAddress, POOL_ABI, signer);
expect(pool.address).to.not.be.empty; expect(pool.address).to.not.be.empty;
expect(data.pools[0].feeTier).to.be.equal(fee.toString()); expect(pools[0].feeTier).to.be.equal(fee.toString());
// Initializing the token variables. // Initializing the token variables.
token0Address = await pool.token0(); token0Address = await pool.token0();
@ -187,9 +184,9 @@ describe('uni-info-watcher', () => {
const tick = TICK_MIN; const tick = TICK_MIN;
it('should not have pool entity initialized', async () => { it('should not have pool entity initialized', async () => {
const data = await request(endpoint, queryPoolById, { id: pool.address }); const poolData = await client.getPoolById(pool.address);
expect(data.pool.sqrtPrice).to.not.be.equal(sqrtPrice); expect(poolData.sqrtPrice).to.not.be.equal(sqrtPrice);
expect(data.pool.tick).to.be.null; expect(poolData.tick).to.be.null;
}); });
it('should trigger InitializeEvent', async () => { it('should trigger InitializeEvent', async () => {
@ -207,13 +204,13 @@ describe('uni-info-watcher', () => {
it('should update Pool entity', async () => { it('should update Pool entity', async () => {
// Checked values: sqrtPrice, tick. // Checked values: sqrtPrice, tick.
const data = await request(endpoint, queryPoolById, { id: pool.address }); const poolData = await client.getPoolById(pool.address);
expect(data.pool.sqrtPrice).to.be.equal(sqrtPrice); expect(poolData.sqrtPrice).to.be.equal(sqrtPrice);
expect(data.pool.tick).to.be.equal(tick.toString()); expect(poolData.tick).to.be.equal(tick.toString());
}); });
it('should update PoolDayData entity', async () => { it('should update PoolDayData entity', async () => {
checkPoolDayData(endpoint, pool.address); checkPoolDayData(client, pool.address);
}); });
}); });
@ -244,19 +241,14 @@ describe('uni-info-watcher', () => {
await approveToken(token1, poolCallee.address, approveAmount); await approveToken(token1, poolCallee.address, approveAmount);
// Get initial entity values. // Get initial entity values.
let data: any; const factories = await client.getFactories(1);
oldFactory = factories[0];
data = await request(endpoint, queryFactory); oldToken0 = await client.getToken(token0.address);
oldFactory = data.factories[0];
data = await request(endpoint, queryToken, { id: token0.address }); oldToken1 = await client.getToken(token1.address);
oldToken0 = data.token;
data = await request(endpoint, queryToken, { id: token1.address }); oldPool = await client.getPoolById(pool.address);
oldToken1 = data.token;
data = await request(endpoint, queryPoolById, { id: pool.address });
oldPool = data.pool;
}); });
it('should trigger MintEvent', async () => { it('should trigger MintEvent', async () => {
@ -275,13 +267,9 @@ describe('uni-info-watcher', () => {
// Checked values: txCount. // Checked values: txCount.
// Unchecked values: totalValueLocked, totalValueLockedUSD. // Unchecked values: totalValueLocked, totalValueLockedUSD.
let data: any; const newToken0 = await client.getToken(token0.address);
data = await request(endpoint, queryToken, { id: token0.address }); const newToken1 = await client.getToken(token1.address);
const newToken0 = data.token;
data = await request(endpoint, queryToken, { id: token1.address });
const newToken1 = data.token;
expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString()); expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString());
expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString()); expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString());
@ -291,8 +279,8 @@ describe('uni-info-watcher', () => {
// Checked values: txCount. // Checked values: txCount.
// Unchecked values: totalValueLockedUSD. // Unchecked values: totalValueLockedUSD.
const data = await request(endpoint, queryFactory); const factories = await client.getFactories(1);
const newFactory = data.factories[0]; const newFactory = factories[0];
expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString()); expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString());
}); });
@ -310,8 +298,7 @@ describe('uni-info-watcher', () => {
} }
} }
const data = await request(endpoint, queryPoolById, { id: pool.address }); const newPool = await client.getPoolById(pool.address);
const newPool = data.pool;
expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString()); expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString());
expect(BigInt(newPool.liquidity)).to.be.equal(expectedLiquidity); expect(BigInt(newPool.liquidity)).to.be.equal(expectedLiquidity);
@ -320,7 +307,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
expectedTxID = transaction.id; expectedTxID = transaction.id;
expectedTxTimestamp = transaction.timestamp; expectedTxTimestamp = transaction.timestamp;
@ -338,22 +325,15 @@ describe('uni-info-watcher', () => {
// Unchecked values: amount0, amount1, amountUSD. // Unchecked values: amount0, amount1, amountUSD.
// Get the latest Mint. // Get the latest Mint.
let data: any; const mints = await client.getMints({ pool: pool.address }, 1, 'timestamp', OrderDirection.desc);
const variables = { expect(mints).to.not.be.empty;
first: 1,
orderBy: 'timestamp',
orderDirection: 'desc',
pool: pool.address
};
data = await request(endpoint, queryMints, variables);
expect(data.mints).to.not.be.empty;
const mint = data.mints[0]; const mint = mints[0];
const txID = mint.id.split('#')[0]; const txID = mint.id.split('#')[0];
const txCountID = mint.id.split('#')[1]; const txCountID = mint.id.split('#')[1];
data = await request(endpoint, queryPoolById, { id: pool.address }); const poolData = await client.getPoolById(pool.address);
const poolTxCount = data.pool.txCount; const poolTxCount = poolData.txCount;
const expectedOrigin = recipient; const expectedOrigin = recipient;
const expectedOwner = recipient; const expectedOwner = recipient;
const expectedSender = poolCallee.address; const expectedSender = poolCallee.address;
@ -373,11 +353,11 @@ describe('uni-info-watcher', () => {
// Checked values: liquidityGross, liquidityNet. // Checked values: liquidityGross, liquidityNet.
// Unchecked values: id, price0, price1. // Unchecked values: id, price0, price1.
const data = await request(endpoint, queryTicks, { pool: pool.address }); const ticks = await client.getTicks({ poolAddress: pool.address });
expect(data.ticks).to.not.be.empty; expect(ticks).to.not.be.empty;
const lowerTick: any = _.filter(data.ticks, { tickIdx: tickLower.toString() })[0]; const lowerTick: any = _.filter(ticks, { tickIdx: tickLower.toString() })[0];
const upperTick: any = _.filter(data.ticks, { tickIdx: tickUpper.toString() })[0]; const upperTick: any = _.filter(ticks, { tickIdx: tickUpper.toString() })[0];
expect(lowerTick.liquidityGross).to.be.equal(amount.toString()); expect(lowerTick.liquidityGross).to.be.equal(amount.toString());
expect(lowerTick.liquidityNet).to.be.equal(amount.toString()); expect(lowerTick.liquidityNet).to.be.equal(amount.toString());
@ -386,21 +366,21 @@ describe('uni-info-watcher', () => {
}); });
it('should update UniswapDayData entity', async () => { it('should update UniswapDayData entity', async () => {
checkUniswapDayData(endpoint); checkUniswapDayData(client);
}); });
it('should update PoolDayData entity', async () => { it('should update PoolDayData entity', async () => {
checkPoolDayData(endpoint, pool.address); checkPoolDayData(client, pool.address);
}); });
it('should update TokenDayData entities', async () => { it('should update TokenDayData entities', async () => {
checkTokenDayData(endpoint, token0.address); checkTokenDayData(client, token0.address);
checkTokenDayData(endpoint, token1.address); checkTokenDayData(client, token1.address);
}); });
it('should update TokenHourData entities', async () => { it('should update TokenHourData entities', async () => {
checkTokenHourData(endpoint, token0.address); checkTokenHourData(client, token0.address);
checkTokenHourData(endpoint, token1.address); checkTokenHourData(client, token1.address);
}); });
}); });
@ -421,25 +401,20 @@ describe('uni-info-watcher', () => {
before(async () => { before(async () => {
// Get initial entity values. // Get initial entity values.
let data: any; const factories = await await client.getFactories(1);
oldFactory = factories[0];
data = await request(endpoint, queryFactory); oldToken0 = await client.getToken(token0.address);
oldFactory = data.factories[0];
data = await request(endpoint, queryToken, { id: token0.address }); oldToken1 = await client.getToken(token1.address);
oldToken0 = data.token;
data = await request(endpoint, queryToken, { id: token1.address }); oldPool = await client.getPoolById(pool.address);
oldToken1 = data.token;
data = await request(endpoint, queryPoolById, { id: pool.address }); const ticks = await client.getTicks({ poolAddress: pool.address });
oldPool = data.pool; expect(ticks).to.not.be.empty;
data = await request(endpoint, queryTicks, { pool: pool.address }); oldLowerTick = _.filter(ticks, { tickIdx: tickLower.toString() })[0];
expect(data.ticks).to.not.be.empty; oldUpperTick = _.filter(ticks, { tickIdx: tickUpper.toString() })[0];
oldLowerTick = _.filter(data.ticks, { tickIdx: tickLower.toString() })[0];
oldUpperTick = _.filter(data.ticks, { tickIdx: tickUpper.toString() })[0];
}); });
it('should trigger BurnEvent', async () => { it('should trigger BurnEvent', async () => {
@ -458,13 +433,9 @@ describe('uni-info-watcher', () => {
// Checked values: txCount. // Checked values: txCount.
// Unchecked values: totalValueLocked, totalValueLockedUSD. // Unchecked values: totalValueLocked, totalValueLockedUSD.
let data: any; const newToken0 = await client.getToken(token0.address);
data = await request(endpoint, queryToken, { id: token0.address }); const newToken1 = await client.getToken(token1.address);
const newToken0 = data.token;
data = await request(endpoint, queryToken, { id: token1.address });
const newToken1 = data.token;
expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString()); expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString());
expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString()); expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString());
@ -474,8 +445,8 @@ describe('uni-info-watcher', () => {
// Checked values: txCount. // Checked values: txCount.
// Unchecked values: totalValueLockedUSD. // Unchecked values: totalValueLockedUSD.
const data = await request(endpoint, queryFactory); const factories = await client.getFactories(1);
const newFactory = data.factories[0]; const newFactory = factories[0];
expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString()); expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString());
}); });
@ -493,8 +464,7 @@ describe('uni-info-watcher', () => {
} }
} }
const data = await request(endpoint, queryPoolById, { id: pool.address }); const newPool = await client.getPoolById(pool.address);
const newPool = data.pool;
expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString()); expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString());
expect(BigInt(newPool.liquidity)).to.be.equal(expectedLiquidity); expect(BigInt(newPool.liquidity)).to.be.equal(expectedLiquidity);
@ -503,7 +473,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
expectedTxID = transaction.id; expectedTxID = transaction.id;
expectedTxTimestamp = transaction.timestamp; expectedTxTimestamp = transaction.timestamp;
@ -521,23 +491,15 @@ describe('uni-info-watcher', () => {
// Unchecked values: amount0, amount1, amountUSD. // Unchecked values: amount0, amount1, amountUSD.
// Get the latest Burn. // Get the latest Burn.
let data: any; const burns = await client.getBurns({ pool: pool.address }, 1, 'timestamp', OrderDirection.desc);
const variables = { expect(burns).to.not.be.empty;
first: 1,
orderBy: 'timestamp',
orderDirection: 'desc',
pool: pool.address
};
data = await request(endpoint, queryBurns, variables); const burn = burns[0];
expect(data.burns).to.not.be.empty;
const burn = data.burns[0];
const txID = burn.id.split('#')[0]; const txID = burn.id.split('#')[0];
const txCountID = burn.id.split('#')[1]; const txCountID = burn.id.split('#')[1];
data = await request(endpoint, queryPoolById, { id: pool.address }); const poolData = await client.getPoolById(pool.address);
const poolTxCount = data.pool.txCount; const poolTxCount = poolData.txCount;
const expectedOrigin = recipient; const expectedOrigin = recipient;
const expectedOwner = recipient; const expectedOwner = recipient;
@ -555,11 +517,11 @@ describe('uni-info-watcher', () => {
// Checked values: liquidityGross, liquidityNet. // Checked values: liquidityGross, liquidityNet.
// Unchecked values: id, price0, price1. // Unchecked values: id, price0, price1.
const data = await request(endpoint, queryTicks, { pool: pool.address }); const ticks = await client.getTicks({ poolAddress: pool.address });
expect(data.ticks).to.not.be.empty; expect(ticks).to.not.be.empty;
const newLowerTick: any = _.filter(data.ticks, { tickIdx: tickLower.toString() })[0]; const newLowerTick: any = _.filter(ticks, { tickIdx: tickLower.toString() })[0];
const newUpperTick: any = _.filter(data.ticks, { tickIdx: tickUpper.toString() })[0]; const newUpperTick: any = _.filter(ticks, { tickIdx: tickUpper.toString() })[0];
const expectedLLG = BigInt(oldLowerTick.liquidityGross) - BigInt(amount); const expectedLLG = BigInt(oldLowerTick.liquidityGross) - BigInt(amount);
const expectedLN = BigInt(oldLowerTick.liquidityNet) - BigInt(amount); const expectedLN = BigInt(oldLowerTick.liquidityNet) - BigInt(amount);
@ -573,21 +535,21 @@ describe('uni-info-watcher', () => {
}); });
it('should update UniswapDayData entity', async () => { it('should update UniswapDayData entity', async () => {
checkUniswapDayData(endpoint); checkUniswapDayData(client);
}); });
it('should update PoolDayData entity', async () => { it('should update PoolDayData entity', async () => {
checkPoolDayData(endpoint, pool.address); checkPoolDayData(client, pool.address);
}); });
it('should update TokenDayData entities', async () => { it('should update TokenDayData entities', async () => {
checkTokenDayData(endpoint, token0.address); checkTokenDayData(client, token0.address);
checkTokenDayData(endpoint, token1.address); checkTokenDayData(client, token1.address);
}); });
it('should update TokenHourData entities', async () => { it('should update TokenHourData entities', async () => {
checkTokenHourData(endpoint, token0.address); checkTokenHourData(client, token0.address);
checkTokenHourData(endpoint, token1.address); checkTokenHourData(client, token1.address);
}); });
}); });
@ -608,19 +570,14 @@ describe('uni-info-watcher', () => {
before(async () => { before(async () => {
// Get initial entity values. // Get initial entity values.
let data: any; const factories = await client.getFactories(1);
oldFactory = factories[0];
data = await request(endpoint, queryFactory); oldToken0 = await client.getToken(token0.address);
oldFactory = data.factories[0];
data = await request(endpoint, queryToken, { id: token0.address }); oldToken1 = await client.getToken(token1.address);
oldToken0 = data.token;
data = await request(endpoint, queryToken, { id: token1.address }); oldPool = await client.getPoolById(pool.address);
oldToken1 = data.token;
data = await request(endpoint, queryPoolById, { id: pool.address });
oldPool = data.pool;
}); });
it('should trigger SwapEvent', async () => { it('should trigger SwapEvent', async () => {
@ -640,13 +597,9 @@ describe('uni-info-watcher', () => {
// Checked values: txCount. // Checked values: txCount.
// Unchecked values: derivedETH, feesUSD, totalValueLocked, totalValueLockedUSD, volume, volumeUSD. // Unchecked values: derivedETH, feesUSD, totalValueLocked, totalValueLockedUSD, volume, volumeUSD.
let data: any; const newToken0 = await client.getToken(token0.address);
data = await request(endpoint, queryToken, { id: token0.address }); const newToken1 = await client.getToken(token1.address);
const newToken0 = data.token;
data = await request(endpoint, queryToken, { id: token1.address });
const newToken1 = data.token;
expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString()); expect(newToken0.txCount).to.be.equal((BigInt(oldToken0.txCount) + BigInt(1)).toString());
expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString()); expect(newToken1.txCount).to.be.equal((BigInt(oldToken1.txCount) + BigInt(1)).toString());
@ -656,8 +609,8 @@ describe('uni-info-watcher', () => {
// Checked values: txCount. // Checked values: txCount.
// Unchecked values: totalFeesUSD, totalValueLockedUSD, totalVolumeUSD. // Unchecked values: totalFeesUSD, totalValueLockedUSD, totalVolumeUSD.
const data = await request(endpoint, queryFactory); const factories = await client.getFactories(1);
const newFactory = data.factories[0]; const newFactory = factories[0];
expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString()); expect(newFactory.txCount).to.be.equal((BigInt(oldFactory.txCount) + BigInt(1)).toString());
}); });
@ -669,8 +622,7 @@ describe('uni-info-watcher', () => {
const expectedTick = eventValue.event.tick; const expectedTick = eventValue.event.tick;
const expectedSqrtPrice = eventValue.event.sqrtPriceX96; const expectedSqrtPrice = eventValue.event.sqrtPriceX96;
const data = await request(endpoint, queryPoolById, { id: pool.address }); const newPool = await client.getPoolById(pool.address);
const newPool = data.pool;
expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString()); expect(newPool.txCount).to.be.equal((BigInt(oldPool.txCount) + BigInt(1)).toString());
expect(newPool.liquidity).to.be.equal(expectedLiquidity); expect(newPool.liquidity).to.be.equal(expectedLiquidity);
@ -681,7 +633,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
expectedTxID = transaction.id; expectedTxID = transaction.id;
expectedTxTimestamp = transaction.timestamp; expectedTxTimestamp = transaction.timestamp;
@ -698,23 +650,15 @@ describe('uni-info-watcher', () => {
// Checked values: id, origin, timestamp, pool, transaction. // Checked values: id, origin, timestamp, pool, transaction.
// Unchecked values: amount0, amount1, amountUSD. // Unchecked values: amount0, amount1, amountUSD.
let data: any; const swaps = await client.getSwaps({ pool: pool.address }, 1, 'timestamp', OrderDirection.desc);
const variables = { expect(swaps).to.not.be.empty;
first: 1,
orderBy: 'timestamp',
orderDirection: 'desc',
pool: pool.address
};
data = await request(endpoint, querySwaps, variables); const swap = swaps[0];
expect(data.swaps).to.not.be.empty;
const swap = data.swaps[0];
const txID = swap.id.split('#')[0]; const txID = swap.id.split('#')[0];
const txCountID = swap.id.split('#')[1]; const txCountID = swap.id.split('#')[1];
data = await request(endpoint, queryPoolById, { id: pool.address }); const poolData = await client.getPoolById(pool.address);
const poolTxCount = data.pool.txCount; const poolTxCount = poolData.txCount;
const expectedOrigin = recipient; const expectedOrigin = recipient;
expect(txID).to.be.equal(expectedTxID); expect(txID).to.be.equal(expectedTxID);
@ -727,21 +671,21 @@ describe('uni-info-watcher', () => {
}); });
it('should update UniswapDayData entity', async () => { it('should update UniswapDayData entity', async () => {
checkUniswapDayData(endpoint); checkUniswapDayData(client);
}); });
it('should update PoolDayData entity', async () => { it('should update PoolDayData entity', async () => {
checkPoolDayData(endpoint, pool.address); checkPoolDayData(client, pool.address);
}); });
it('should update TokenDayData entities', async () => { it('should update TokenDayData entities', async () => {
checkTokenDayData(endpoint, token0.address); checkTokenDayData(client, token0.address);
checkTokenDayData(endpoint, token1.address); checkTokenDayData(client, token1.address);
}); });
it('should update TokenHourData entities', async () => { it('should update TokenHourData entities', async () => {
checkTokenHourData(endpoint, token0.address); checkTokenHourData(client, token0.address);
checkTokenHourData(endpoint, token1.address); checkTokenHourData(client, token1.address);
}); });
}); });
@ -841,7 +785,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
expectedTxID = transaction.id; expectedTxID = transaction.id;
const expectedTxTimestamp = transaction.timestamp; const expectedTxTimestamp = transaction.timestamp;
@ -854,15 +798,15 @@ describe('uni-info-watcher', () => {
expect(timestamp).to.be.equal(expectedTxTimestamp); expect(timestamp).to.be.equal(expectedTxTimestamp);
}); });
it('should createa a Position entity', async () => { it('should create a Position entity', async () => {
// Checked values: pool, token0, token1, tickLower, tickUpper, transaction, owner. // Checked values: pool, token0, token1, tickLower, tickUpper, transaction, owner.
// Unchecked values: feeGrowthInside0LastX128, feeGrowthInside0LastX128. // Unchecked values: feeGrowthInside0LastX128, feeGrowthInside0LastX128.
// Get the Position using tokenId. // Get the Position using tokenId.
const data = await request(endpoint, queryPositions, { id: Number(eventValue.event.tokenId) }); const positions = await client.getPositions({ id: Number(eventValue.event.tokenId) }, 1);
expect(data.positions).to.not.be.empty; expect(positions).to.not.be.empty;
const position = data.positions[0]; const position = positions[0];
const positionTickLower = position.tickLower.id.split('#')[1]; const positionTickLower = position.tickLower.id.split('#')[1];
const positionTickUpper = position.tickUpper.id.split('#')[1]; const positionTickUpper = position.tickUpper.id.split('#')[1];
@ -894,8 +838,8 @@ describe('uni-info-watcher', () => {
before(async () => { before(async () => {
// Get initial entity values. // Get initial entity values.
const data = await request(endpoint, queryPositions, { id: Number(tokenId) }); const positions = await client.getPositions({ id: Number(tokenId) }, 1);
oldPosition = data.positions[0]; oldPosition = positions[0];
}); });
it('should trigger IncreaseLiquidityEvent', async () => { it('should trigger IncreaseLiquidityEvent', async () => {
@ -926,7 +870,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
const expectedTxTimestamp = transaction.timestamp; const expectedTxTimestamp = transaction.timestamp;
@ -943,10 +887,10 @@ describe('uni-info-watcher', () => {
// Unchecked values: depositedToken0, depositedToken1, feeGrowthInside0LastX128, feeGrowthInside0LastX128. // Unchecked values: depositedToken0, depositedToken1, feeGrowthInside0LastX128, feeGrowthInside0LastX128.
// Get the Position using tokenId. // Get the Position using tokenId.
const data = await request(endpoint, queryPositions, { id: Number(eventValue.event.tokenId) }); const positions = await client.getPositions({ id: Number(eventValue.event.tokenId) }, 1);
expect(data.positions).to.not.be.empty; expect(positions).to.not.be.empty;
const position = data.positions[0]; const position = positions[0];
const expectedLiquidity = BigInt(oldPosition.liquidity) + BigInt(eventValue.event.liquidity); const expectedLiquidity = BigInt(oldPosition.liquidity) + BigInt(eventValue.event.liquidity);
@ -969,8 +913,8 @@ describe('uni-info-watcher', () => {
before(async () => { before(async () => {
// Get initial entity values. // Get initial entity values.
const data = await request(endpoint, queryPositions, { id: Number(tokenId) }); const positions = await client.getPositions({ id: Number(tokenId) }, 1);
oldPosition = data.positions[0]; oldPosition = positions[0];
}); });
it('should trigger DecreaseLiquidityEvent', async () => { it('should trigger DecreaseLiquidityEvent', async () => {
@ -1000,7 +944,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
const expectedTxTimestamp = transaction.timestamp; const expectedTxTimestamp = transaction.timestamp;
@ -1017,10 +961,10 @@ describe('uni-info-watcher', () => {
// Unchecked values: depositedToken0, depositedToken1, feeGrowthInside0LastX128, feeGrowthInside0LastX128. // Unchecked values: depositedToken0, depositedToken1, feeGrowthInside0LastX128, feeGrowthInside0LastX128.
// Get the Position using tokenId. // Get the Position using tokenId.
const data = await request(endpoint, queryPositions, { id: Number(eventValue.event.tokenId) }); const positions = await client.getPositions({ id: Number(eventValue.event.tokenId) }, 1);
expect(data.positions).to.not.be.empty; expect(positions).to.not.be.empty;
const position = data.positions[0]; const position = positions[0];
const expectedLiquidity = BigInt(oldPosition.liquidity) - BigInt(eventValue.event.liquidity); const expectedLiquidity = BigInt(oldPosition.liquidity) - BigInt(eventValue.event.liquidity);
@ -1064,7 +1008,7 @@ describe('uni-info-watcher', () => {
it('should create a Transaction entity', async () => { it('should create a Transaction entity', async () => {
// Checked values: mints, burns, swaps. // Checked values: mints, burns, swaps.
const transaction: any = await fetchTransaction(endpoint); const transaction: any = await fetchTransaction(client);
const expectedTxTimestamp = transaction.timestamp; const expectedTxTimestamp = transaction.timestamp;

View File

@ -1,292 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { gql } from 'graphql-request';
export const queryToken = gql`
query queryToken($id: ID!) {
token(id: $id) {
derivedETH
feesUSD
id
name
symbol
totalValueLocked
totalValueLockedUSD
txCount
volume
volumeUSD
}
}`;
// Getting the first Factory entity.
export const queryFactory = gql`
{
factories(first: 1) {
id
totalFeesUSD
totalValueLockedUSD
totalVolumeUSD
txCount
}
}`;
// Getting the first Bundle entity.
export const queryBundle = gql`
{
bundles(first: 1) {
id
ethPriceUSD
}
}`;
// Getting Pool by id.
export const queryPoolById = gql`
query queryPoolById($id: ID!) {
pool(id: $id) {
feeTier
id
liquidity
sqrtPrice
tick
token0Price
token1Price
totalValueLockedToken0
totalValueLockedToken1
totalValueLockedUSD
txCount
volumeUSD
}
}`;
// Getting Tick(s) filtered by pool.
export const queryTicks = gql`
query queryTicksByPool($pool: String) {
ticks(where: { poolAddress: $pool }) {
id
liquidityGross
liquidityNet
price0
price1
tickIdx
}
}`;
// Getting Pool(s) filtered by tokens.
export const queryPoolsByTokens = gql`
query queryPoolsByTokens($tokens: [String!]) {
pools(where: { token0_in: $tokens, token1_in: $tokens }) {
id,
feeTier
}
}`;
// Getting UniswapDayData(s).
export const queryUniswapDayData = gql`
query queryUniswapDayData($first: Int, $orderBy: UniswapDayData_orderBy, $orderDirection: OrderDirection) {
uniswapDayDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection) {
id,
date,
tvlUSD
}
}`;
// Getting PoolDayData(s) filtered by pool and ordered by date.
export const queryPoolDayData = gql`
query queryPoolDayData($first: Int, $orderBy: PoolDayData_orderBy, $orderDirection: OrderDirection, $pool: String) {
poolDayDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection, where: { pool: $pool }) {
id,
date,
tvlUSD
}
}`;
// Getting TokenDayDatas(s) filtered by token and ordered by date.
export const queryTokenDayData = gql`
query queryTokenDayData($first: Int, $orderBy: TokenDayData_orderBy, $orderDirection: OrderDirection, $token: String) {
tokenDayDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection, where: { token: $token }) {
id,
date,
totalValueLockedUSD
}
}`;
// Getting TokenDayDatas(s) filtered by token and ordered by date.
export const queryTokenHourData = gql`
query queryTokenHourData($first: Int, $orderBy: TokenHourData_orderBy, $orderDirection: OrderDirection, $token: String) {
tokenHourDatas(first: $first, orderBy: $orderBy, orderDirection: $orderDirection, where: { token: $token }) {
id,
low,
high,
open,
close,
periodStartUnix
}
}`;
// Getting mint(s) filtered by pool, tokens and ordered by timestamp.
export const queryMints = gql`
query queryMints(
$first: Int,
$orderBy: Mint_orderBy,
$orderDirection: OrderDirection,
$pool: String,
$token0: String,
$token1: String) {
mints(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection,
where: {
pool: $pool,
token0: $token0,
token1: $token1
}) {
amount0,
amount1,
amountUSD,
id,
origin,
owner,
sender,
timestamp,
pool {
id
},
transaction {
id
}
}
}`;
// Getting burns(s) filtered by pool, tokens and ordered by timestamp.
export const queryBurns = gql`
query queryBurns(
$first: Int,
$orderBy: Burn_orderBy,
$orderDirection: OrderDirection,
$pool: String,
$token0: String,
$token1: String) {
burns(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection,
where: {
pool: $pool,
token0: $token0,
token1: $token1
}) {
amount0,
amount1,
amountUSD,
id,
origin,
owner,
timestamp,
pool {
id
},
transaction {
id
}
}
}`;
// Getting burns(s) filtered by pool, tokens and ordered by timestamp.
export const querySwaps = gql`
query querySwaps(
$first: Int,
$orderBy: Swap_orderBy,
$orderDirection: OrderDirection,
$pool: String,
$token0: String,
$token1: String) {
swaps(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection,
where: {
pool: $pool,
token0: $token0,
token1: $token1
}) {
amount0,
amount1,
amountUSD,
id,
origin,
timestamp,
pool {
id
},
transaction {
id
}
}
}`;
// Getting transactions(s) ordered by timestamp.
export const queryTransactions = gql`
query queryTransactions(
$first: Int,
$orderBy: Transaction_orderBy,
$mintOrderBy: Mint_orderBy,
$burnOrderBy: Burn_orderBy,
$swapOrderBy: Swap_orderBy,
$orderDirection: OrderDirection) {
transactions(
first: $first,
orderBy: $orderBy,
orderDirection: $orderDirection) {
id,
mints( first: $first, orderBy: $mintOrderBy, orderDirection: $orderDirection) {
id,
timestamp
},
burns( first: $first, orderBy: $burnOrderBy, orderDirection: $orderDirection) {
id,
timestamp
},
swaps( first: $first, orderBy: $swapOrderBy, orderDirection: $orderDirection) {
id,
timestamp
},
timestamp
}
}`;
// Getting position filtered by id.
export const queryPositions = gql`
query queryPositions($first: Int, $id: ID) {
positions(first: $first, where: { id: $id }) {
id,
pool {
id
},
token0 {
id
},
token1 {
id
},
tickLower {
id
},
tickUpper {
id
},
transaction {
id
},
liquidity,
depositedToken0,
depositedToken1,
collectedFeesToken0,
collectedFeesToken1,
owner,
feeGrowthInside0LastX128,
feeGrowthInside1LastX128
}
}`;

View File

@ -4,75 +4,53 @@
import { expect } from 'chai'; import { expect } from 'chai';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import { request } from 'graphql-request';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
import _ from 'lodash'; import _ from 'lodash';
import { insertNDummyBlocks } from '@vulcanize/util/test'; import { insertNDummyBlocks } from '@vulcanize/util/test';
import { import { Database, OrderDirection } from '../src/database';
queryFactory,
queryBundle,
queryToken,
queryPoolById,
queryPoolDayData,
queryUniswapDayData,
queryTokenDayData,
queryTokenHourData,
queryTransactions
} from '../test/queries';
import { Database } from '../src/database';
import { Block } from '../src/events'; import { Block } from '../src/events';
import { Token } from '../src/entity/Token'; import { Token } from '../src/entity/Token';
import { Client } from '../src/client';
export const checkUniswapDayData = async (endpoint: string): Promise<void> => { export const checkUniswapDayData = async (client: Client): Promise<void> => {
// Checked values: date, tvlUSD. // Checked values: date, tvlUSD.
// Unchecked values: volumeUSD. // Unchecked values: volumeUSD.
// Get the latest UniswapDayData. // Get the latest UniswapDayData.
const variables = { const uniswapDayDatas = await client.getUniswapDayDatas({}, 0, 1, 'date', OrderDirection.desc);
first: 1, expect(uniswapDayDatas).to.not.be.empty;
orderBy: 'date',
orderDirection: 'desc'
};
const data = await request(endpoint, queryUniswapDayData, variables);
expect(data.uniswapDayDatas).to.not.be.empty;
const id: string = data.uniswapDayDatas[0].id; const id: string = uniswapDayDatas[0].id;
const dayID = Number(id); const dayID = Number(id);
const date = data.uniswapDayDatas[0].date; const date = uniswapDayDatas[0].date;
const tvlUSD = data.uniswapDayDatas[0].tvlUSD; const tvlUSD = uniswapDayDatas[0].tvlUSD;
const dayStartTimestamp = dayID * 86400; const dayStartTimestamp = dayID * 86400;
const factoryData = await request(endpoint, queryFactory); const factories = await client.getFactories(1);
const totalValueLockedUSD: string = factoryData.factories[0].totalValueLockedUSD; const totalValueLockedUSD: string = factories[0].totalValueLockedUSD;
expect(date).to.be.equal(dayStartTimestamp); expect(date).to.be.equal(dayStartTimestamp);
expect(tvlUSD).to.be.equal(totalValueLockedUSD); expect(tvlUSD).to.be.equal(totalValueLockedUSD);
}; };
export const checkPoolDayData = async (endpoint: string, poolAddress: string): Promise<void> => { export const checkPoolDayData = async (client: Client, poolAddress: string): Promise<void> => {
// Checked values: id, date, tvlUSD. // Checked values: id, date, tvlUSD.
// Unchecked values: volumeUSD. // Unchecked values: volumeUSD.
// Get the latest PoolDayData. // Get the latest PoolDayData.
const variables = { const poolDayDatas = await client.getPoolDayDatas({ pool: poolAddress }, 0, 1, 'date', OrderDirection.desc);
first: 1, expect(poolDayDatas).to.not.be.empty;
orderBy: 'date',
orderDirection: 'desc',
pool: poolAddress
};
const data = await request(endpoint, queryPoolDayData, variables);
expect(data.poolDayDatas).to.not.be.empty;
const dayPoolID: string = data.poolDayDatas[0].id; const dayPoolID: string = poolDayDatas[0].id;
const poolID: string = dayPoolID.split('-')[0]; const poolID: string = dayPoolID.split('-')[0];
const dayID = Number(dayPoolID.split('-')[1]); const dayID = Number(dayPoolID.split('-')[1]);
const date = data.poolDayDatas[0].date; const date = poolDayDatas[0].date;
const tvlUSD = data.poolDayDatas[0].tvlUSD; const tvlUSD = poolDayDatas[0].tvlUSD;
const dayStartTimestamp = dayID * 86400; const dayStartTimestamp = dayID * 86400;
const poolData = await request(endpoint, queryPoolById, { id: poolAddress }); const poolData = await client.getPoolById(poolAddress);
const totalValueLockedUSD: string = poolData.pool.totalValueLockedUSD; const totalValueLockedUSD: string = poolData.pool.totalValueLockedUSD;
expect(poolID).to.be.equal(poolAddress); expect(poolID).to.be.equal(poolAddress);
@ -80,28 +58,22 @@ export const checkPoolDayData = async (endpoint: string, poolAddress: string): P
expect(tvlUSD).to.be.equal(totalValueLockedUSD); expect(tvlUSD).to.be.equal(totalValueLockedUSD);
}; };
export const checkTokenDayData = async (endpoint: string, tokenAddress: string): Promise<void> => { export const checkTokenDayData = async (client: Client, tokenAddress: string): Promise<void> => {
// Checked values: id, date, totalValueLockedUSD. // Checked values: id, date, totalValueLockedUSD.
// Unchecked values: volumeUSD. // Unchecked values: volumeUSD.
// Get the latest TokenDayData. // Get the latest TokenDayData.
const variables = { const tokenDayDatas = await client.getTokenDayDatas({ token: tokenAddress }, 0, 1, 'date', OrderDirection.desc);
first: 1, expect(tokenDayDatas).to.not.be.empty;
orderBy: 'date',
orderDirection: 'desc',
token: tokenAddress
};
const data = await request(endpoint, queryTokenDayData, variables);
expect(data.tokenDayDatas).to.not.be.empty;
const tokenDayID: string = data.tokenDayDatas[0].id; const tokenDayID: string = tokenDayDatas[0].id;
const tokenID: string = tokenDayID.split('-')[0]; const tokenID: string = tokenDayID.split('-')[0];
const dayID = Number(tokenDayID.split('-')[1]); const dayID = Number(tokenDayID.split('-')[1]);
const date = data.tokenDayDatas[0].date; const date = tokenDayDatas[0].date;
const tvlUSD = data.tokenDayDatas[0].totalValueLockedUSD; const tvlUSD = tokenDayDatas[0].totalValueLockedUSD;
const dayStartTimestamp = dayID * 86400; const dayStartTimestamp = dayID * 86400;
const tokenData = await request(endpoint, queryToken, { id: tokenAddress }); const tokenData = await client.getToken(tokenAddress);
const totalValueLockedUSD: string = tokenData.token.totalValueLockedUSD; const totalValueLockedUSD: string = tokenData.token.totalValueLockedUSD;
expect(tokenID).to.be.equal(tokenAddress); expect(tokenID).to.be.equal(tokenAddress);
@ -109,33 +81,27 @@ export const checkTokenDayData = async (endpoint: string, tokenAddress: string):
expect(tvlUSD).to.be.equal(totalValueLockedUSD); expect(tvlUSD).to.be.equal(totalValueLockedUSD);
}; };
export const checkTokenHourData = async (endpoint: string, tokenAddress: string): Promise<void> => { export const checkTokenHourData = async (client: Client, tokenAddress: string): Promise<void> => {
// Checked values: id, periodStartUnix, low, high, open, close. // Checked values: id, periodStartUnix, low, high, open, close.
// Unchecked values: // Unchecked values:
// Get the latest TokenHourData. // Get the latest TokenHourData.
const variables = { const tokenHourDatas = await client.getTokenHourDatas({ token: tokenAddress }, 0, 1, 'periodStartUnix', OrderDirection.desc);
first: 1, expect(tokenHourDatas).to.not.be.empty;
orderBy: 'periodStartUnix',
orderDirection: 'desc',
token: tokenAddress
};
const data = await request(endpoint, queryTokenHourData, variables);
expect(data.tokenHourDatas).to.not.be.empty;
const tokenHourID: string = data.tokenHourDatas[0].id; const tokenHourID: string = tokenHourDatas[0].id;
const tokenID: string = tokenHourID.split('-')[0]; const tokenID: string = tokenHourID.split('-')[0];
const hourIndex = Number(tokenHourID.split('-')[1]); const hourIndex = Number(tokenHourID.split('-')[1]);
const periodStartUnix = data.tokenHourDatas[0].periodStartUnix; const periodStartUnix = tokenHourDatas[0].periodStartUnix;
const low = data.tokenHourDatas[0].low; const low = tokenHourDatas[0].low;
const high = data.tokenHourDatas[0].high; const high = tokenHourDatas[0].high;
const open = data.tokenHourDatas[0].open; const open = tokenHourDatas[0].open;
const close = data.tokenHourDatas[0].close; const close = tokenHourDatas[0].close;
const hourStartUnix = hourIndex * 3600; const hourStartUnix = hourIndex * 3600;
const tokenData = await request(endpoint, queryToken, { id: tokenAddress }); const tokenData = await client.getToken(tokenAddress);
const bundleData = await request(endpoint, queryBundle); const bundles = await client.getBundles(1);
const tokenPrice = new Decimal(tokenData.token.derivedETH).times(bundleData.bundles[0].ethPriceUSD); const tokenPrice = new Decimal(tokenData.token.derivedETH).times(bundles[0].ethPriceUSD);
expect(tokenID).to.be.equal(tokenAddress); expect(tokenID).to.be.equal(tokenAddress);
expect(periodStartUnix).to.be.equal(hourStartUnix); expect(periodStartUnix).to.be.equal(hourStartUnix);
@ -145,22 +111,22 @@ export const checkTokenHourData = async (endpoint: string, tokenAddress: string)
expect(close).to.be.equal(tokenPrice.toString()); expect(close).to.be.equal(tokenPrice.toString());
}; };
export const fetchTransaction = async (endpoint: string): Promise<{transaction: any}> => { export const fetchTransaction = async (client: Client): Promise<{transaction: any}> => {
// Get the latest Transaction. // Get the latest Transaction.
// Get only the latest mint, burn and swap entity in the transaction. // Get only the latest mint, burn and swap entity in the transaction.
const transactions = await client.getTransactions(
1,
{
orderBy: 'timestamp',
mintOrderBy: 'timestamp',
burnOrderBy: 'timestamp',
swapOrderBy: 'timestamp'
},
OrderDirection.desc
);
const variables = { expect(transactions).to.not.be.empty;
first: 1, const transaction = transactions[0];
orderBy: 'timestamp',
mintOrderBy: 'timestamp',
burnOrderBy: 'timestamp',
swapOrderBy: 'timestamp',
orderDirection: 'desc'
};
const data = await request(endpoint, queryTransactions, variables);
expect(data.transactions).to.not.be.empty;
const transaction = data.transactions[0];
expect(transaction.mints).to.be.an.instanceOf(Array); expect(transaction.mints).to.be.an.instanceOf(Array);
expect(transaction.burns).to.be.an.instanceOf(Array); expect(transaction.burns).to.be.an.instanceOf(Array);

View File

@ -33,4 +33,5 @@
[jobQueue] [jobQueue]
dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue" dbConnectionString = "postgres://postgres:postgres@localhost/uni-watcher-job-queue"
maxCompletionLag = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100

View File

@ -63,9 +63,12 @@ describe('chain pruning', () => {
indexer = new Indexer(db, ethClient, postgraphileClient); indexer = new Indexer(db, ethClient, postgraphileClient);
assert(indexer, 'Could not create indexer object.'); assert(indexer, 'Could not create indexer object.');
const jobQueue = new JobQueue(jobQueueConfig); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
jobRunner = new JobRunner(indexer, jobQueue); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
}); });
afterEach(async () => { afterEach(async () => {

View File

@ -5,7 +5,15 @@
import { gql } from '@apollo/client/core'; import { gql } from '@apollo/client/core';
import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client'; import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';
import { queryGetPool, queryPoolIdToPoolKey, queryPosition, queryEvents, subscribeEvents, queryGetContract } from './queries'; import {
queryGetPool,
queryPoolIdToPoolKey,
queryPosition,
queryEvents,
subscribeEvents,
queryGetContract,
queryEventsInRange
} from './queries';
export class Client { export class Client {
_config: GraphQLConfig; _config: GraphQLConfig;
@ -86,4 +94,16 @@ export class Client {
return getContract; return getContract;
} }
async getEventsInRange (fromblockNumber: number, toBlockNumber: number): Promise<any> {
const { events } = await this._client.query(
gql(queryEventsInRange),
{
fromblockNumber,
toBlockNumber
}
);
return events;
}
} }

View File

@ -3,12 +3,11 @@
// //
import assert from 'assert'; import assert from 'assert';
import _ from 'lodash';
import { Connection, ConnectionOptions, DeepPartial, QueryRunner, FindConditions } from 'typeorm'; import { Connection, ConnectionOptions, DeepPartial, QueryRunner, FindConditions } from 'typeorm';
import { Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util'; import { Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; import { Event } from './entity/Event';
import { Contract } from './entity/Contract'; import { Contract } from './entity/Contract';
import { BlockProgress } from './entity/BlockProgress'; import { BlockProgress } from './entity/BlockProgress';
import { SyncStatus } from './entity/SyncStatus'; import { SyncStatus } from './entity/SyncStatus';
@ -32,45 +31,62 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.close(); return this._baseDatabase.close();
} }
async getContract (address: string): Promise<Contract | undefined> {
return this._conn.getRepository(Contract)
.createQueryBuilder('contract')
.where('address = :address', { address })
.getOne();
}
async getLatestContract (kind: string): Promise<Contract | undefined> {
return this._conn.getRepository(Contract)
.createQueryBuilder('contract')
.where('kind = :kind', { kind })
.orderBy('id', 'DESC')
.getOne();
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<void> {
const repo = queryRunner.manager.getRepository(Contract);
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
}
async createTransactionRunner (): Promise<QueryRunner> { async createTransactionRunner (): Promise<QueryRunner> {
return this._baseDatabase.createTransactionRunner(); return this._baseDatabase.createTransactionRunner();
} }
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
const repo = this._conn.getRepository(BlockProgress);
return this._baseDatabase.getProcessedBlockCountForRange(repo, fromBlockNumber, toBlockNumber);
}
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getEventsInRange(repo, fromBlockNumber, toBlockNumber);
}
async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise<Event> {
const repo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string): Promise<Event[]> { async getBlockEvents (blockHash: string): Promise<Event[]> {
const repo = this._conn.getRepository(Event); const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash); return this._baseDatabase.getBlockEvents(repo, blockHash);
} }
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1);
const expected = blockNumbers.length;
const repo = this._conn.getRepository(BlockProgress);
const { count: actual } = await repo
.createQueryBuilder('block_progress')
.select('COUNT(DISTINCT(block_number))', 'count')
.where('block_number IN (:...blockNumbers) AND is_complete = :isComplete', { blockNumbers, isComplete: true })
.getRawOne();
return { expected, actual: parseInt(actual) };
}
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
const events = await this._conn.getRepository(Event)
.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', {
fromBlockNumber,
toBlockNumber,
eventName: UNKNOWN_EVENT_NAME
})
.addOrderBy('event.id', 'ASC')
.getMany();
return events;
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> { async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress); const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event); const eventRepo = queryRunner.manager.getRepository(Event);
@ -108,40 +124,6 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getEvent(repo, id); return this._baseDatabase.getEvent(repo, id);
} }
async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise<Event> {
const repo = queryRunner.manager.getRepository(Event);
return await repo.save(entity);
}
async getContract (address: string): Promise<Contract | undefined> {
return this._conn.getRepository(Contract)
.createQueryBuilder('contract')
.where('address = :address', { address })
.getOne();
}
async getLatestContract (kind: string): Promise<Contract | undefined> {
return this._conn.getRepository(Contract)
.createQueryBuilder('contract')
.where('kind = :kind', { kind })
.orderBy('id', 'DESC')
.getOne();
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<void> {
const repo = queryRunner.manager.getRepository(Contract);
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> { async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
const repo = this._conn.getRepository(BlockProgress); const repo = this._conn.getRepository(BlockProgress);

View File

@ -76,10 +76,10 @@ export const main = async (): Promise<any> => {
const pubsub = new PubSub(); const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, postgraphileClient); const indexer = new Indexer(db, ethClient, postgraphileClient);
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

View File

@ -22,9 +22,6 @@ import { abi as factoryABI, storageLayout as factoryStorageLayout } from './arti
import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json'; import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json';
import poolABI from './artifacts/pool.json'; import poolABI from './artifacts/pool.json';
// TODO: Move to config.
const MAX_EVENTS_BLOCK_RANGE = 1000;
const log = debug('vulcanize:indexer'); const log = debug('vulcanize:indexer');
type ResultEvent = { type ResultEvent = {
@ -102,15 +99,6 @@ export class Indexer implements IndexerInterface {
}; };
} }
// Note: Some event names might be unknown at this point, as earlier events might not yet be processed.
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
}
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> { async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
if (contract) { if (contract) {
const uniContract = await this.isUniswapContract(contract); const uniContract = await this.isUniswapContract(contract);
@ -304,6 +292,109 @@ export class Indexer implements IndexerInterface {
return { eventName, eventInfo }; return { eventName, eventInfo };
} }
async position (blockHash: string, tokenId: string): Promise<any> {
const nfpmContract = await this._db.getLatestContract('nfpm');
assert(nfpmContract, 'No NFPM contract watched.');
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId));
return {
...value,
proof
};
}
async poolIdToPoolKey (blockHash: string, poolId: string): Promise<any> {
const nfpmContract = await this._db.getLatestContract('nfpm');
assert(nfpmContract, 'No NFPM contract watched.');
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId));
return {
...value,
proof
};
}
async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise<any> {
const factoryContract = await this._db.getLatestContract('factory');
assert(factoryContract, 'No Factory contract watched.');
const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee));
return {
pool: value,
proof
};
}
async getContract (type: string): Promise<any> {
const contract = await this._db.getLatestContract(type);
return contract;
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._baseIndexer.saveEventEntity(dbEvent);
}
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
return this._baseIndexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
}
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber);
}
// Note: Some event names might be unknown at this point, as earlier events might not yet be processed.
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<Event>> {
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
return this._baseIndexer.getBlockEvents(blockHash);
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> { async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
assert(blockHash); assert(blockHash);
let { block, logs } = await this._ethClient.getLogs({ blockHash }); let { block, logs } = await this._ethClient.getLogs({ blockHash });
@ -396,121 +487,6 @@ export class Indexer implements IndexerInterface {
} }
} }
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = this._db.saveEventEntity(dbTx, dbEvent);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
return this._baseIndexer.getBlockProgress(blockHash);
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
}
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
return this._db.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
}
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
if (toBlockNumber <= fromBlockNumber) {
throw new Error('toBlockNumber should be greater than fromBlockNumber');
}
if ((toBlockNumber - fromBlockNumber) > MAX_EVENTS_BLOCK_RANGE) {
throw new Error(`Max range (${MAX_EVENTS_BLOCK_RANGE}) exceeded`);
}
return this._db.getEventsInRange(fromBlockNumber, toBlockNumber);
}
async position (blockHash: string, tokenId: string): Promise<any> {
const nfpmContract = await this._db.getLatestContract('nfpm');
assert(nfpmContract, 'No NFPM contract watched.');
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId));
return {
...value,
proof
};
}
async poolIdToPoolKey (blockHash: string, poolId: string): Promise<any> {
const nfpmContract = await this._db.getLatestContract('nfpm');
assert(nfpmContract, 'No NFPM contract watched.');
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId));
return {
...value,
proof
};
}
async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise<any> {
const factoryContract = await this._db.getLatestContract('factory');
assert(factoryContract, 'No Factory contract watched.');
const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee));
return {
pool: value,
proof
};
}
async getContract (type: string): Promise<any> {
const contract = await this._db.getLatestContract(type);
return contract;
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
// TODO: Move into base/class or framework package. // TODO: Move into base/class or framework package.
async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> { async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {
return getStorageValue( return getStorageValue(

View File

@ -10,7 +10,15 @@ import debug from 'debug';
import { getCache } from '@vulcanize/cache'; import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, JobQueue, JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_CHAIN_PRUNING } from '@vulcanize/util'; import {
getConfig,
JobQueue,
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_CHAIN_PRUNING,
JobQueueConfig
} from '@vulcanize/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
@ -22,11 +30,13 @@ export class JobRunner {
_indexer: Indexer _indexer: Indexer
_jobQueue: JobQueue _jobQueue: JobQueue
_baseJobRunner: BaseJobRunner _baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig
constructor (indexer: Indexer, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._indexer, this._jobQueue); this._jobQueueConfig = jobQueueConfig;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
} }
async start (): Promise<void> { async start (): Promise<void> {
@ -39,13 +49,6 @@ export class JobRunner {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job); await this._baseJobRunner.processBlock(job);
const { data: { blockHash, blockNumber, parentHash, timestamp } } = job;
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
}
await this._jobQueue.markComplete(job); await this._jobQueue.markComplete(job);
}); });
} }
@ -130,13 +133,13 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const jobRunner = new JobRunner(indexer, jobQueue); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start(); await jobRunner.start();
}; };

View File

@ -167,3 +167,10 @@ query queryGetContract($type: String!) {
} }
} }
`; `;
export const queryEventsInRange = gql`
query getEventsInRange($fromBlockNumber: Int!, $toBlockNumber: Int!) {
eventsInRange(fromBlockNumber: $fromBlockNumber, toBlockNumber: $toBlockNumber)
${resultEvent}
}
`;

View File

@ -73,10 +73,10 @@ export const main = async (): Promise<any> => {
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLag } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag }); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await jobQueue.start();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

View File

@ -14,8 +14,8 @@ const log = debug('vulcanize:config');
export interface JobQueueConfig { export interface JobQueueConfig {
dbConnectionString: string; dbConnectionString: string;
maxCompletionLag: number; maxCompletionLagInSecs: number;
jobDelay?: number; jobDelayInMilliSecs?: number;
} }
export interface Config { export interface Config {

View File

@ -5,9 +5,12 @@
import assert from 'assert'; import assert from 'assert';
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, In, QueryRunner, Repository } from 'typeorm'; import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, In, QueryRunner, Repository } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import _ from 'lodash';
import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types'; import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types';
const UNKNOWN_EVENT_NAME = '__unknown__';
export class Database { export class Database {
_config: ConnectionOptions _config: ConnectionOptions
_conn!: Connection _conn!: Connection
@ -245,4 +248,35 @@ export class Database {
return ancestorBlockHash; return ancestorBlockHash;
} }
async getProcessedBlockCountForRange (repo: Repository<BlockProgressInterface>, fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1);
const expected = blockNumbers.length;
const { count: actual } = await repo
.createQueryBuilder('block_progress')
.select('COUNT(DISTINCT(block_number))', 'count')
.where('block_number IN (:...blockNumbers) AND is_complete = :isComplete', { blockNumbers, isComplete: true })
.getRawOne();
return { expected, actual: parseInt(actual) };
}
async getEventsInRange (repo: Repository<EventInterface>, fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>> {
const events = repo.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', {
fromBlockNumber,
toBlockNumber,
eventName: UNKNOWN_EVENT_NAME
})
.addOrderBy('event.id', 'ASC')
.getMany();
return events;
}
async saveEventEntity (repo: Repository<EventInterface>, entity: EventInterface): Promise<EventInterface> {
return await repo.save(entity);
}
} }

View File

@ -10,6 +10,8 @@ import { EthClient } from '@vulcanize/ipld-eth-client';
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface } from './types'; import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface } from './types';
const MAX_EVENTS_BLOCK_RANGE = 1000;
const log = debug('vulcanize:indexer'); const log = debug('vulcanize:indexer');
export class Indexer { export class Indexer {
@ -159,4 +161,37 @@ export class Indexer {
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> { async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._db.getAncestorAtDepth(blockHash, depth); return this._db.getAncestorAtDepth(blockHash, depth);
} }
async saveEventEntity (dbEvent: EventInterface): Promise<EventInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = this._db.saveEventEntity(dbTx, dbEvent);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
return this._db.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
}
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>> {
if (toBlockNumber <= fromBlockNumber) {
throw new Error('toBlockNumber should be greater than fromBlockNumber');
}
if ((toBlockNumber - fromBlockNumber) > MAX_EVENTS_BLOCK_RANGE) {
throw new Error(`Max range (${MAX_EVENTS_BLOCK_RANGE}) exceeded`);
}
return this._db.getEventsInRange(fromBlockNumber, toBlockNumber);
}
} }

View File

@ -4,8 +4,10 @@
import assert from 'assert'; import assert from 'assert';
import debug from 'debug'; import debug from 'debug';
import { wait } from '.';
import { MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING } from './constants'; import { JobQueueConfig } from './config';
import { MAX_REORG_DEPTH, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface } from './types'; import { EventInterface, IndexerInterface } from './types';
@ -14,14 +16,16 @@ const log = debug('vulcanize:job-runner');
export class JobRunner { export class JobRunner {
_indexer: IndexerInterface _indexer: IndexerInterface
_jobQueue: JobQueue _jobQueue: JobQueue
_jobQueueConfig: JobQueueConfig
constructor (indexer: IndexerInterface, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer; this._indexer = indexer;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._jobQueueConfig = jobQueueConfig;
} }
async processBlock (job: any): Promise<void> { async processBlock (job: any): Promise<void> {
const { data: { blockHash, blockNumber, parentHash, priority } } = job; const { data: { blockHash, blockNumber, parentHash, priority, timestamp } } = job;
log(`Processing block number ${blockNumber} hash ${blockHash} `); log(`Processing block number ${blockNumber} hash ${blockHash} `);
@ -63,6 +67,21 @@ export class JobRunner {
throw new Error(message); throw new Error(message);
} }
} }
// Check if block is being already processed.
const blockProgress = await this._indexer.getBlockProgress(blockHash);
if (!blockProgress) {
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
// Delay required to process block.
await wait(jobDelayInMilliSecs);
const events = await this._indexer.getOrFetchBlockEvents({ blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
for (let ei = 0; ei < events.length; ei++) {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
}
}
} }
async processEvent (job: any): Promise<EventInterface> { async processEvent (job: any): Promise<EventInterface> {

View File

@ -47,6 +47,7 @@ export interface IndexerInterface {
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>; getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockEvents (blockHash: string): Promise<Array<EventInterface>> getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string> getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
@ -68,11 +69,14 @@ export interface DatabaseInterface {
getEvent (id: string): Promise<EventInterface | undefined> getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatusInterface | undefined> getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatusInterface | undefined>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string> getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>;
getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>>;
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>; markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void>
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>; updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>; saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void>;
} }