Refactoring pruning and reorg handling code (#227)

* Refactor db methods getPrevEntityVersion and getFrothyRegion.

* Filter out entities from pruned blocks.

* Pull saveContract and getModelEntities to util.

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-25 12:47:53 +05:30 committed by GitHub
parent 3ff2fdf11b
commit afd7c954a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 325 additions and 326 deletions

View File

@ -9,7 +9,8 @@
"server:mock": "MOCK=1 nodemon src/server.ts -f environments/local.toml",
"test": "mocha -r ts-node/register src/**/*.spec.ts",
"lint": "eslint .",
"build": "tsc"
"build": "tsc",
"watch:contract": "ts-node src/cli/watch-contract.ts --configFile environments/local.toml"
},
"repository": {
"type": "git",

View File

@ -7,7 +7,8 @@ import yargs from 'yargs';
import 'reflect-metadata';
import { ethers } from 'ethers';
import { Config, getConfig } from '../config';
import { Config, getConfig } from '@vulcanize/util';
import { Database } from '../database';
(async () => {

View File

@ -3,8 +3,9 @@
//
import assert from 'assert';
import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Connection, ConnectionOptions, DeepPartial } from 'typeorm';
import { Database as BaseDatabase } from '@vulcanize/util';
import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';
@ -15,23 +16,20 @@ import { EventSyncProgress } from './entity/EventProgress';
export class Database {
_config: ConnectionOptions
_conn!: Connection
_baseDatabase: BaseDatabase;
constructor (config: ConnectionOptions) {
assert(config);
this._config = config;
this._baseDatabase = new BaseDatabase(this._config);
}
async init (): Promise<void> {
assert(!this._conn);
this._conn = await createConnection({
...this._config,
namingStrategy: new SnakeNamingStrategy()
});
this._conn = await this._baseDatabase.init();
}
async close (): Promise<void> {
return this._conn.close();
return this._baseDatabase.close();
}
async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
@ -149,15 +147,7 @@ export class Database {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, startingBlock });
await repo.save(entity);
}
return this._baseDatabase.saveContract(repo, address, startingBlock);
});
}
}

View File

@ -4,8 +4,8 @@
import { gql } from '@apollo/client/core';
import { GraphQLClient, GraphQLConfig } from '@vulcanize/ipld-eth-client';
import { BlockHeight, OrderDirection } from '@vulcanize/util';
import { BlockHeight, OrderDirection } from './indexer';
import {
queryBundles,
queryBurns,

View File

@ -3,11 +3,24 @@
//
import assert from 'assert';
import { Brackets, Connection, ConnectionOptions, DeepPartial, FindConditions, FindOneOptions, LessThanOrEqual, QueryRunner, Repository } from 'typeorm';
import {
Connection,
ConnectionOptions,
DeepPartial,
FindConditions,
FindOneOptions,
LessThanOrEqual,
QueryRunner
} from 'typeorm';
import { MAX_REORG_DEPTH, Database as BaseDatabase, DatabaseInterface } from '@vulcanize/util';
import {
Database as BaseDatabase,
DatabaseInterface,
BlockHeight,
QueryOptions,
Where
} from '@vulcanize/util';
import { EventSyncProgress } from './entity/EventProgress';
import { Factory } from './entity/Factory';
import { Pool } from './entity/Pool';
import { Event } from './entity/Event';
@ -29,46 +42,6 @@ import { BlockProgress } from './entity/BlockProgress';
import { Block } from './events';
import { SyncStatus } from './entity/SyncStatus';
const DEFAULT_LIMIT = 100;
const DEFAULT_SKIP = 0;
const OPERATOR_MAP = {
equals: '=',
gt: '>',
lt: '<',
gte: '>=',
lte: '<=',
in: 'IN',
contains: 'LIKE',
starts: 'LIKE',
ends: 'LIKE'
};
export interface BlockHeight {
number?: number;
hash?: string;
}
export enum OrderDirection {
asc = 'asc',
desc = 'desc'
}
export interface QueryOptions {
limit?: number;
skip?: number;
orderBy?: string;
orderDirection?: OrderDirection;
}
interface Where {
[key: string]: [{
value: any;
not: boolean;
operator: keyof typeof OPERATOR_MAP;
}]
}
export class Database implements DatabaseInterface {
_config: ConnectionOptions
_conn!: Connection
@ -106,7 +79,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<Factory>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -134,7 +107,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<Bundle>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -159,7 +132,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<Token>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -202,7 +175,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<Pool>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -246,7 +219,7 @@ export class Database implements DatabaseInterface {
entity = await repo.findOne(findOptions as FindOneOptions<Position>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
} finally {
await queryRunner.release();
@ -274,7 +247,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<Tick>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -313,7 +286,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<PoolDayData>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -337,7 +310,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<PoolHourData>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -361,7 +334,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<UniswapDayData>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -385,7 +358,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<TokenDayData>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -409,7 +382,7 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<TokenHourData>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
@ -433,93 +406,14 @@ export class Database implements DatabaseInterface {
let entity = await repo.findOne(findOptions as FindOneOptions<Transaction>);
if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entity;
}
async getUniswapEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise<Entity[]> {
const repo = queryRunner.manager.getRepository(entity);
const { tableName } = repo.metadata;
let subQuery = repo.createQueryBuilder('subTable')
.select('MAX(subTable.block_number)')
.where(`subTable.id = ${tableName}.id`);
if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this._getFrothyRegion(queryRunner, block.hash);
subQuery = subQuery
.andWhere(new Brackets(qb => {
qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes })
.orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
}));
}
if (block.number) {
subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number });
}
let selectQueryBuilder = repo.createQueryBuilder(tableName)
.where(`${tableName}.block_number IN (${subQuery.getQuery()})`)
.setParameters(subQuery.getParameters());
relations.forEach(relation => {
selectQueryBuilder = selectQueryBuilder.leftJoinAndSelect(`${repo.metadata.tableName}.${relation}`, relation);
});
Object.entries(where).forEach(([field, filters]) => {
filters.forEach((filter, index) => {
// Form the where clause.
const { not, operator, value } = filter;
const columnMetadata = repo.metadata.findColumnWithPropertyName(field);
assert(columnMetadata);
let whereClause = `${tableName}.${columnMetadata.propertyAliasName} `;
if (not) {
if (operator === 'equals') {
whereClause += '!';
} else {
whereClause += 'NOT ';
}
}
whereClause += `${OPERATOR_MAP[operator]} `;
if (['contains', 'starts'].some(el => el === operator)) {
whereClause += '%:';
} else if (operator === 'in') {
whereClause += '(:...';
} else {
whereClause += ':';
}
const variableName = `${field}${index}`;
whereClause += variableName;
if (['contains', 'ends'].some(el => el === operator)) {
whereClause += '%';
} else if (operator === 'in') {
whereClause += ')';
}
selectQueryBuilder = selectQueryBuilder.andWhere(whereClause, { [variableName]: value });
});
});
const { limit = DEFAULT_LIMIT, orderBy, orderDirection, skip = DEFAULT_SKIP } = queryOptions;
selectQueryBuilder = selectQueryBuilder.skip(skip)
.take(limit);
if (orderBy) {
const columnMetadata = repo.metadata.findColumnWithPropertyName(orderBy);
assert(columnMetadata);
selectQueryBuilder = selectQueryBuilder.orderBy(`${tableName}.${columnMetadata.propertyAliasName}`, orderDirection === 'desc' ? 'DESC' : 'ASC');
}
return selectQueryBuilder.getMany();
async getModelEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise<Entity[]> {
return this._baseDatabase.getModelEntities(queryRunner, entity, block, where, queryOptions, relations);
}
async saveFactory (queryRunner: QueryRunner, factory: Factory, block: Block): Promise<Factory> {
@ -634,44 +528,6 @@ export class Database implements DatabaseInterface {
return repo.save(swap);
}
// Returns true if events have already been synced for the (block, token) combination.
async didSyncEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<boolean> {
const numRows = await this._conn.getRepository(EventSyncProgress)
.createQueryBuilder()
.where('block_hash = :blockHash AND token = :token', {
blockHash,
token
})
.getCount();
return numRows > 0;
}
async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> {
return this._conn.getRepository(Event)
.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_hash = :blockHash AND token = :token', {
blockHash,
token
})
.addOrderBy('event.id', 'ASC')
.getMany();
}
async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> {
return this._conn.getRepository(Event)
.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_hash = :blockHash AND token = :token AND :eventName = :eventName', {
blockHash,
token,
eventName
})
.addOrderBy('event.id', 'ASC')
.getMany();
}
async createTransactionRunner (): Promise<QueryRunner> {
return this._baseDatabase.createTransactionRunner();
}
@ -774,110 +630,4 @@ export class Database implements DatabaseInterface {
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}
async _getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
// Check whether query is ordered by blockNumber to get the latest entity.
assert(findOptions.order.blockNumber);
// Hierarchical query for getting the entity in the frothy region.
// TODO: Use syncStatus.latestCanonicalBlockNumber instead of MAX_REORG_DEPTH after pruning is implemented.
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
1 as depth,
e.id
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e ON e.block_hash = b.block_hash
WHERE
b.block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1,
e.id
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e
ON e.block_hash = b.block_hash
AND e.id = $2
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.id IS NULL AND c.depth < $3
)
SELECT
block_hash, block_number, id
FROM
cte_query
ORDER BY block_number ASC
LIMIT 1;
`;
// Fetching blockHash for previous entity in frothy region.
const [{ block_hash: blockHash, block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [findOptions.where.blockHash, findOptions.where.id, MAX_REORG_DEPTH]);
if (id) {
// Entity found in frothy region.
findOptions.where.blockHash = blockHash;
return repo.findOne(findOptions);
}
// If entity not found in frothy region get latest entity in the pruned region.
delete findOptions.where.blockHash;
const canonicalBlockNumber = blockNumber + 1;
findOptions.where.blockNumber = LessThanOrEqual(canonicalBlockNumber);
return repo.findOne(findOptions);
}
async _getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
// TODO: Use syncStatus.latestCanonicalBlockNumber instead of MAX_REORG_DEPTH after pruning is implemented.
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
block_hash,
block_number,
parent_hash,
1 as depth
FROM
block_progress
WHERE
block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1
FROM
block_progress b
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.depth < $2
)
SELECT
block_hash, block_number
FROM
cte_query;
`;
// Get blocks in the frothy region using heirarchical query.
const blocks = await queryRunner.query(heirerchicalQuery, [blockHash, MAX_REORG_DEPTH]);
const blockHashes = blocks.map(({ block_hash: blockHash }: any) => blockHash);
// Canonical block is the block after the last block in frothy region.
const canonicalBlockNumber = blocks[blocks.length - 1].block_number + 1;
return { canonicalBlockNumber, blockHashes };
}
}

View File

@ -11,7 +11,7 @@ import { utils } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight } from '@vulcanize/util';
import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
@ -20,7 +20,7 @@ import { convertTokenToDecimal, loadTransaction, safeDiv } from './utils';
import { createTick } from './utils/tick';
import Decimal from 'decimal.js';
import { Position } from './entity/Position';
import { Database, QueryOptions, OrderDirection, BlockHeight } from './database';
import { Database } from './database';
import { Event } from './entity/Event';
import { ResultEvent, Block, Transaction, PoolCreatedEvent, InitializeEvent, MintEvent, BurnEvent, SwapEvent, IncreaseLiquidityEvent, DecreaseLiquidityEvent, CollectEvent, TransferEvent } from './events';
import { Factory } from './entity/Factory';
@ -264,7 +264,7 @@ export class Indexer implements IndexerInterface {
return acc;
}, {});
res = await this._db.getUniswapEntities(dbTx, entity, block, where, queryOptions, relations);
res = await this._db.getModelEntities(dbTx, entity, block, where, queryOptions, relations);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
@ -524,7 +524,7 @@ export class Indexer implements IndexerInterface {
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getUniswapEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await this._db.getModelEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const token0 = pool.token0;
const token1 = pool.token1;
@ -669,7 +669,7 @@ export class Indexer implements IndexerInterface {
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getUniswapEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await this._db.getModelEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const token0 = pool.token0;
const token1 = pool.token1;
@ -796,7 +796,7 @@ export class Indexer implements IndexerInterface {
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await this._db.getUniswapEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await this._db.getModelEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const pool = await this._db.getPool(dbTx, { id: contractAddress, blockHash: block.hash });
assert(pool);

View File

@ -6,9 +6,9 @@
import debug from 'debug';
import BigInt from 'apollo-type-bigint';
import { BlockHeight, OrderDirection } from '@vulcanize/util';
import { Data, Entity, NO_OF_BLOCKS } from './data';
import { BlockHeight } from '../resolvers';
import { OrderDirection } from '../database';
const log = debug('vulcanize:test');

View File

@ -6,7 +6,9 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';
import { Indexer, OrderDirection, BlockHeight } from './indexer';
import { BlockHeight, OrderDirection } from '@vulcanize/util';
import { Indexer } from './indexer';
import { Burn } from './entity/Burn';
import { Bundle } from './entity/Bundle';
import { Factory } from './entity/Factory';

View File

@ -10,7 +10,8 @@ import _ from 'lodash';
import {
Config,
getConfig,
wait
wait,
OrderDirection
} from '@vulcanize/util';
import {
deployTokens,
@ -40,7 +41,6 @@ import {
checkTokenHourData,
fetchTransaction
} from '../test/utils';
import { OrderDirection } from './indexer';
const NETWORK_RPC_URL = 'http://localhost:8545';

View File

@ -26,7 +26,7 @@ export const updateUniswapDayData = async (db: Database, dbTx: QueryRunner, even
// TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists.
const [factory] = await db.getUniswapEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const [factory] = await db.getModelEntities(dbTx, Factory, { hash: block.hash }, {}, { limit: 1 });
const dayID = Math.floor(block.timestamp / 86400); // Rounded.
const dayStartTimestamp = dayID * 86400;

View File

@ -7,9 +7,10 @@ import { ethers } from 'ethers';
import Decimal from 'decimal.js';
import _ from 'lodash';
import { OrderDirection } from '@vulcanize/util';
import { insertNDummyBlocks } from '@vulcanize/util/test';
import { Database, OrderDirection } from '../src/database';
import { Database } from '../src/database';
import { Block } from '../src/events';
import { Token } from '../src/entity/Token';
import { Client } from '../src/client';

View File

@ -49,15 +49,7 @@ export class Database implements DatabaseInterface {
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<void> {
const repo = queryRunner.manager.getRepository(Contract);
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
return this._baseDatabase.saveContract(repo, address, startingBlock, kind);
}
async createTransactionRunner (): Promise<QueryRunner> {

View File

@ -3,13 +3,63 @@
//
import assert from 'assert';
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, In, QueryRunner, Repository } from 'typeorm';
import {
Brackets,
Connection,
ConnectionOptions,
createConnection,
DeepPartial,
FindConditions,
In,
QueryRunner,
Repository
} from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import _ from 'lodash';
import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types';
import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types';
import { MAX_REORG_DEPTH } from './constants';
const UNKNOWN_EVENT_NAME = '__unknown__';
const DEFAULT_LIMIT = 100;
const DEFAULT_SKIP = 0;
const OPERATOR_MAP = {
equals: '=',
gt: '>',
lt: '<',
gte: '>=',
lte: '<=',
in: 'IN',
contains: 'LIKE',
starts: 'LIKE',
ends: 'LIKE'
};
export interface BlockHeight {
number?: number;
hash?: string;
}
export enum OrderDirection {
asc = 'asc',
desc = 'desc'
}
export interface QueryOptions {
limit?: number;
skip?: number;
orderBy?: string;
orderDirection?: OrderDirection;
}
export interface Where {
[key: string]: [{
value: any;
not: boolean;
operator: keyof typeof OPERATOR_MAP;
}]
}
export class Database {
_config: ConnectionOptions
@ -265,7 +315,7 @@ export class Database {
async getEventsInRange (repo: Repository<EventInterface>, fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>> {
const events = repo.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', {
.where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName AND is_pruned = false', {
fromBlockNumber,
toBlockNumber,
eventName: UNKNOWN_EVENT_NAME
@ -279,4 +329,209 @@ export class Database {
async saveEventEntity (repo: Repository<EventInterface>, entity: EventInterface): Promise<EventInterface> {
return await repo.save(entity);
}
async getModelEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: string[] = []): Promise<Entity[]> {
const repo = queryRunner.manager.getRepository(entity);
const { tableName } = repo.metadata;
let subQuery = repo.createQueryBuilder('subTable')
.select('MAX(subTable.block_number)')
.where(`subTable.id = ${tableName}.id`);
if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this.getFrothyRegion(queryRunner, block.hash);
subQuery = subQuery
.andWhere(new Brackets(qb => {
qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes })
.orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
}));
}
if (block.number) {
subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number });
}
let selectQueryBuilder = repo.createQueryBuilder(tableName)
.where(`${tableName}.block_number IN (${subQuery.getQuery()})`)
.setParameters(subQuery.getParameters());
relations.forEach(relation => {
selectQueryBuilder = selectQueryBuilder.leftJoinAndSelect(`${repo.metadata.tableName}.${relation}`, relation);
});
Object.entries(where).forEach(([field, filters]) => {
filters.forEach((filter, index) => {
// Form the where clause.
const { not, operator, value } = filter;
const columnMetadata = repo.metadata.findColumnWithPropertyName(field);
assert(columnMetadata);
let whereClause = `${tableName}.${columnMetadata.propertyAliasName} `;
if (not) {
if (operator === 'equals') {
whereClause += '!';
} else {
whereClause += 'NOT ';
}
}
whereClause += `${OPERATOR_MAP[operator]} `;
if (['contains', 'starts'].some(el => el === operator)) {
whereClause += '%:';
} else if (operator === 'in') {
whereClause += '(:...';
} else {
whereClause += ':';
}
const variableName = `${field}${index}`;
whereClause += variableName;
if (['contains', 'ends'].some(el => el === operator)) {
whereClause += '%';
} else if (operator === 'in') {
whereClause += ')';
}
selectQueryBuilder = selectQueryBuilder.andWhere(whereClause, { [variableName]: value });
});
});
const { limit = DEFAULT_LIMIT, orderBy, orderDirection, skip = DEFAULT_SKIP } = queryOptions;
selectQueryBuilder = selectQueryBuilder.skip(skip)
.take(limit);
if (orderBy) {
const columnMetadata = repo.metadata.findColumnWithPropertyName(orderBy);
assert(columnMetadata);
selectQueryBuilder = selectQueryBuilder.orderBy(`${tableName}.${columnMetadata.propertyAliasName}`, orderDirection === 'desc' ? 'DESC' : 'ASC');
}
return selectQueryBuilder.getMany();
}
async getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
// Hierarchical query for getting the entity in the frothy region.
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
1 as depth,
e.id
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e ON e.block_hash = b.block_hash
WHERE
b.block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1,
e.id
FROM
block_progress b
LEFT JOIN
${repo.metadata.tableName} e
ON e.block_hash = b.block_hash
AND e.id = $2
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.id IS NULL AND c.depth < $3
)
SELECT
block_hash, block_number, id
FROM
cte_query
ORDER BY block_number ASC
LIMIT 1;
`;
// Fetching blockHash for previous entity in frothy region.
const [{ block_hash: blockHash, block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [findOptions.where.blockHash, findOptions.where.id, MAX_REORG_DEPTH]);
if (id) {
// Entity found in frothy region.
findOptions.where.blockHash = blockHash;
} else {
// If entity not found in frothy region get latest entity in the pruned region.
// Filter out entities from pruned blocks.
const canonicalBlockNumber = blockNumber + 1;
const entityInPrunedRegion:any = await repo.createQueryBuilder('entity')
.innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash')
.where('block.is_pruned = false')
.andWhere('entity.id = :id', { id: findOptions.where.id })
.andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('entity.block_number', 'DESC')
.limit(1)
.getOne();
findOptions.where.blockHash = entityInPrunedRegion?.blockHash;
}
return repo.findOne(findOptions);
}
async getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
block_hash,
block_number,
parent_hash,
1 as depth
FROM
block_progress
WHERE
block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1
FROM
block_progress b
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.depth < $2
)
SELECT
block_hash, block_number
FROM
cte_query;
`;
// Get blocks in the frothy region using heirarchical query.
const blocks = await queryRunner.query(heirerchicalQuery, [blockHash, MAX_REORG_DEPTH]);
const blockHashes = blocks.map(({ block_hash: blockHash }: any) => blockHash);
// Canonical block is the block after the last block in frothy region.
const canonicalBlockNumber = blocks[blocks.length - 1].block_number + 1;
return { canonicalBlockNumber, blockHashes };
}
async saveContract (repo: Repository<ContractInterface>, address: string, startingBlock: number, kind?: string): Promise<void> {
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
}
}

View File

@ -39,6 +39,13 @@ export interface EventInterface {
proof: string;
}
export interface ContractInterface {
id: number;
address: string;
startingBlock: number;
kind?: string;
}
export interface IndexerInterface {
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getEvent (id: string): Promise<EventInterface | undefined>