// // Copyright 2021 Vulcanize, Inc. // import assert from 'assert'; import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindManyOptions, In, QueryRunner, Repository, SelectQueryBuilder } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import _ from 'lodash'; import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types'; import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants'; const DEFAULT_LIMIT = 100; const DEFAULT_SKIP = 0; const OPERATOR_MAP = { equals: '=', gt: '>', lt: '<', gte: '>=', lte: '<=', in: 'IN', contains: 'LIKE', starts: 'LIKE', ends: 'LIKE' }; const INSERT_EVENTS_BATCH = 100; export interface BlockHeight { number?: number; hash?: string; } export enum OrderDirection { asc = 'asc', desc = 'desc' } export interface QueryOptions { limit?: number; skip?: number; orderBy?: string; orderDirection?: OrderDirection; } export interface Where { [key: string]: [{ value: any; not: boolean; operator: keyof typeof OPERATOR_MAP; }] } export type Relation = string | { property: string, alias: string } export class Database { _config: ConnectionOptions _conn!: Connection constructor (config: ConnectionOptions) { assert(config); this._config = config; } async init (): Promise { assert(!this._conn); this._conn = await createConnection({ ...this._config, namingStrategy: new SnakeNamingStrategy() }); return this._conn; } async close (): Promise { return this._conn.close(); } async createTransactionRunner (): Promise { const queryRunner = this._conn.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); return queryRunner; } async getSyncStatus (repo: Repository): Promise { return repo.findOne(); } async updateSyncStatusIndexedBlock (repo: Repository, blockHash: string, blockNumber: number, force = false): Promise { const entity = await repo.findOne(); assert(entity); if (force || blockNumber >= entity.latestIndexedBlockNumber) { entity.latestIndexedBlockHash = blockHash; entity.latestIndexedBlockNumber = blockNumber; } return await repo.save(entity); } async updateSyncStatusCanonicalBlock (repo: Repository, blockHash: string, blockNumber: number, force = false): Promise { const entity = await repo.findOne(); assert(entity); if (force || blockNumber >= entity.latestCanonicalBlockNumber) { entity.latestCanonicalBlockHash = blockHash; entity.latestCanonicalBlockNumber = blockNumber; } return await repo.save(entity); } async updateSyncStatusChainHead (repo: Repository, blockHash: string, blockNumber: number, force = false): Promise { let entity = await repo.findOne(); if (!entity) { entity = repo.create({ chainHeadBlockHash: blockHash, chainHeadBlockNumber: blockNumber, latestCanonicalBlockHash: blockHash, latestCanonicalBlockNumber: blockNumber, latestIndexedBlockHash: '', latestIndexedBlockNumber: -1 }); } if (force || blockNumber >= entity.chainHeadBlockNumber) { entity.chainHeadBlockHash = blockHash; entity.chainHeadBlockNumber = blockNumber; } return await repo.save(entity); } async getBlockProgress (repo: Repository, blockHash: string): Promise { return repo.findOne({ where: { blockHash } }); } async getBlockProgressEntities (repo: Repository, where: FindConditions, options: FindManyOptions): Promise { options.where = where; return repo.find(options); } async getBlocksAtHeight (repo: Repository, height: number, isPruned: boolean): Promise { return repo.createQueryBuilder('block_progress') .where('block_number = :height AND is_pruned = :isPruned', { height, isPruned }) .getMany(); } async updateBlockProgress (repo: Repository, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { if (!block.isComplete) { if (lastProcessedEventIndex <= block.lastProcessedEventIndex) { throw new Error(`Events processed out of order ${block.blockHash}, was ${block.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); } block.lastProcessedEventIndex = lastProcessedEventIndex; block.numProcessedEvents++; if (block.numProcessedEvents >= block.numEvents) { block.isComplete = true; } const { generatedMaps } = await repo.createQueryBuilder() .update() .set(block) .where('id = :id', { id: block.id }) .whereEntity(block) .returning('*') .execute(); block = generatedMaps[0] as BlockProgressInterface; } return block; } async markBlocksAsPruned (repo: Repository, blocks: BlockProgressInterface[]): Promise { const ids = blocks.map(({ id }) => id); await repo.update({ id: In(ids) }, { isPruned: true }); } async getEvent (repo: Repository, id: string): Promise { return repo.findOne(id, { relations: ['block'] }); } async getBlockEvents (repo: Repository, blockHash: string, where: Where = {}, queryOptions: QueryOptions = {}): Promise { let queryBuilder = repo.createQueryBuilder('event') .innerJoinAndSelect('event.block', 'block') .where('block.block_hash = :blockHash AND block.is_pruned = false', { blockHash }); queryBuilder = this._buildQuery(repo, queryBuilder, where, queryOptions); queryBuilder.addOrderBy('event.id', 'ASC'); const { limit = DEFAULT_LIMIT, skip = DEFAULT_SKIP } = queryOptions; queryBuilder = queryBuilder.offset(skip) .limit(limit); return queryBuilder.getMany(); } async saveEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { const { cid, blockHash, blockNumber, blockTimestamp, parentHash } = block; assert(blockHash); assert(blockNumber !== undefined); assert(blockNumber > -1); assert(blockTimestamp !== undefined); assert(blockTimestamp > -1); // In a transaction: // (1) Save all the events in the database. // (2) Add an entry to the block progress table. const numEvents = events.length; const entity = blockRepo.create({ cid, blockHash, parentHash, blockNumber, blockTimestamp, numEvents, numProcessedEvents: 0, lastProcessedEventIndex: -1, isComplete: (numEvents === 0) }); const blockProgress = await blockRepo.save(entity); // Bulk insert events. events.forEach(event => { event.block = blockProgress; }); const eventBatches = _.chunk(events, INSERT_EVENTS_BATCH); const insertPromises = eventBatches.map(async events => { await eventRepo.createQueryBuilder() .insert() .values(events) .updateEntity(false) .execute(); }); await Promise.all(insertPromises); return blockProgress; } async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { const repo = queryRunner.manager.getRepository(entity); const entities = await repo.find(findConditions); return entities; } async isEntityEmpty (entity: new () => Entity): Promise { const queryRunner = this._conn.createQueryRunner(); try { await queryRunner.connect(); const data = await this.getEntities(queryRunner, entity); if (data.length > 0) { return false; } return true; } finally { await queryRunner.release(); } } async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise { const repo = queryRunner.manager.getRepository(entity); const entities = await repo.find(findConditions); await repo.remove(entities); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { const heirerchicalQuery = ` WITH RECURSIVE cte_query AS ( SELECT block_hash, block_number, parent_hash, 0 as depth FROM block_progress WHERE block_hash = $1 UNION ALL SELECT b.block_hash, b.block_number, b.parent_hash, c.depth + 1 FROM block_progress b INNER JOIN cte_query c ON c.parent_hash = b.block_hash WHERE c.depth < $2 ) SELECT block_hash, block_number FROM cte_query ORDER BY block_number ASC LIMIT 1; `; // Get ancestor block hash using heirarchical query. const [{ block_hash: ancestorBlockHash }] = await this._conn.query(heirerchicalQuery, [blockHash, depth]); return ancestorBlockHash; } async getProcessedBlockCountForRange (repo: Repository, fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { const blockNumbers = _.range(fromBlockNumber, toBlockNumber + 1); const expected = blockNumbers.length; const { count: actual } = await repo .createQueryBuilder('block_progress') .select('COUNT(DISTINCT(block_number))', 'count') .where('block_number IN (:...blockNumbers) AND is_complete = :isComplete', { blockNumbers, isComplete: true }) .getRawOne(); return { expected, actual: parseInt(actual) }; } async getEventsInRange (repo: Repository, fromBlockNumber: number, toBlockNumber: number): Promise> { const events = repo.createQueryBuilder('event') .innerJoinAndSelect('event.block', 'block') .where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName AND is_pruned = false', { fromBlockNumber, toBlockNumber, eventName: UNKNOWN_EVENT_NAME }) .addOrderBy('event.id', 'ASC') .getMany(); return events; } async saveEventEntity (repo: Repository, entity: EventInterface): Promise { return await repo.save(entity); } async getModelEntities (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: Relation[] = []): Promise { const repo = queryRunner.manager.getRepository(entity); const { tableName } = repo.metadata; let subQuery = repo.createQueryBuilder('subTable') .select('MAX(subTable.block_number)') .where(`subTable.id = ${tableName}.id`); if (block.hash) { const { canonicalBlockNumber, blockHashes } = await this.getFrothyRegion(queryRunner, block.hash); subQuery = subQuery .andWhere(new Brackets(qb => { qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes }) .orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }); })); } if (block.number) { subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number }); } let selectQueryBuilder = repo.createQueryBuilder(tableName) .where(`${tableName}.block_number IN (${subQuery.getQuery()})`) .setParameters(subQuery.getParameters()); relations.forEach(relation => { let alias, property; if (typeof relation === 'string') { [, alias] = relation.split('.'); property = relation; } else { alias = relation.alias; property = relation.property; } selectQueryBuilder = selectQueryBuilder.leftJoinAndSelect(property, alias); }); selectQueryBuilder = this._buildQuery(repo, selectQueryBuilder, where, queryOptions); const { limit = DEFAULT_LIMIT, skip = DEFAULT_SKIP } = queryOptions; selectQueryBuilder = selectQueryBuilder.skip(skip) .take(limit); return selectQueryBuilder.getMany(); } async getPrevEntityVersion (queryRunner: QueryRunner, repo: Repository, findOptions: { [key: string]: any }): Promise { // Hierarchical query for getting the entity in the frothy region. const heirerchicalQuery = ` WITH RECURSIVE cte_query AS ( SELECT b.block_hash, b.block_number, b.parent_hash, 1 as depth, e.id FROM block_progress b LEFT JOIN ${repo.metadata.tableName} e ON e.block_hash = b.block_hash AND e.id = $2 WHERE b.block_hash = $1 UNION ALL SELECT b.block_hash, b.block_number, b.parent_hash, c.depth + 1, e.id FROM block_progress b LEFT JOIN ${repo.metadata.tableName} e ON e.block_hash = b.block_hash AND e.id = $2 INNER JOIN cte_query c ON c.parent_hash = b.block_hash WHERE c.id IS NULL AND c.depth < $3 ) SELECT block_hash, block_number, id FROM cte_query ORDER BY block_number ASC LIMIT 1; `; // Fetching blockHash for previous entity in frothy region. const [{ block_hash: blockHash, block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [findOptions.where.blockHash, findOptions.where.id, MAX_REORG_DEPTH]); if (id) { // Entity found in frothy region. findOptions.where.blockHash = blockHash; } else { // If entity not found in frothy region get latest entity in the pruned region. // Filter out entities from pruned blocks. const canonicalBlockNumber = blockNumber + 1; const entityInPrunedRegion:any = await repo.createQueryBuilder('entity') .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') .where('block.is_pruned = false') .andWhere('entity.id = :id', { id: findOptions.where.id }) .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) .orderBy('entity.block_number', 'DESC') .limit(1) .getOne(); findOptions.where.blockHash = entityInPrunedRegion?.blockHash; } return repo.findOne(findOptions); } async getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { const heirerchicalQuery = ` WITH RECURSIVE cte_query AS ( SELECT block_hash, block_number, parent_hash, 1 as depth FROM block_progress WHERE block_hash = $1 UNION ALL SELECT b.block_hash, b.block_number, b.parent_hash, c.depth + 1 FROM block_progress b INNER JOIN cte_query c ON c.parent_hash = b.block_hash WHERE c.depth < $2 ) SELECT block_hash, block_number FROM cte_query; `; // Get blocks in the frothy region using heirarchical query. const blocks = await queryRunner.query(heirerchicalQuery, [blockHash, MAX_REORG_DEPTH]); const blockHashes = blocks.map(({ block_hash: blockHash }: any) => blockHash); // Canonical block is the block after the last block in frothy region. const canonicalBlockNumber = blocks[blocks.length - 1].block_number + 1; return { canonicalBlockNumber, blockHashes }; } async getContracts (repo: Repository): Promise { return repo.createQueryBuilder('contract') .getMany(); } async saveContract (repo: Repository, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { const contract = await repo .createQueryBuilder() .where('address = :address', { address }) .getOne(); const entity = repo.create({ address, kind, checkpoint, startingBlock }); // If contract already present, overwrite fields. if (contract) { entity.id = contract.id; } return repo.save(entity); } _buildQuery (repo: Repository, selectQueryBuilder: SelectQueryBuilder, where: Where = {}, queryOptions: QueryOptions = {}): SelectQueryBuilder { const { tableName } = repo.metadata; Object.entries(where).forEach(([field, filters]) => { filters.forEach((filter, index) => { // Form the where clause. let { not, operator, value } = filter; const columnMetadata = repo.metadata.findColumnWithPropertyName(field); assert(columnMetadata); let whereClause = `${tableName}.${columnMetadata.propertyAliasName} `; if (not) { if (operator === 'equals') { whereClause += '!'; } else { whereClause += 'NOT '; } } whereClause += `${OPERATOR_MAP[operator]} `; if (['contains', 'starts'].some(el => el === operator)) { whereClause += '%:'; } else if (operator === 'in') { whereClause += '(:...'; } else { // Convert to string type value as bigint type throws error in query. value = value.toString(); whereClause += ':'; } const variableName = `${field}${index}`; whereClause += variableName; if (['contains', 'ends'].some(el => el === operator)) { whereClause += '%'; } else if (operator === 'in') { whereClause += ')'; if (!value.length) { whereClause = 'FALSE'; } } selectQueryBuilder = selectQueryBuilder.andWhere(whereClause, { [variableName]: value }); }); }); const { orderBy, orderDirection } = queryOptions; if (orderBy) { const columnMetadata = repo.metadata.findColumnWithPropertyName(orderBy); assert(columnMetadata); selectQueryBuilder = selectQueryBuilder.orderBy(`${tableName}.${columnMetadata.propertyAliasName}`, orderDirection === 'desc' ? 'DESC' : 'ASC'); } return selectQueryBuilder; } }