From 5a7dcbd20fe9aee11243fef24a3382daa68c5a88 Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Tue, 13 Sep 2022 17:24:14 +0530 Subject: [PATCH] Load relations for single entity GQL query with default limit 100 (#182) * Check endBlock greater than chainHeadBlock in fill blocks CLI * Load relations for single entity with limit 100 * Use single tx query runner for entity GQL query * Remove assert for CLI input checks --- packages/graph-node/src/database.ts | 253 ++++++++++++++++++---------- packages/graph-node/src/watcher.ts | 94 +++++++---- packages/util/src/fill.ts | 10 +- 3 files changed, 230 insertions(+), 127 deletions(-) diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index 46203aff..e90c7e62 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -9,6 +9,7 @@ import { ConnectionOptions, FindOneOptions, LessThanOrEqual, + QueryRunner, Repository } from 'typeorm'; @@ -49,6 +50,10 @@ export class Database { return this._baseDatabase.close(); } + async createTransactionRunner (): Promise { + return this._baseDatabase.createTransactionRunner(); + } + async getEntity (entity: (new () => Entity) | string, id: string, blockHash?: string): Promise { const queryRunner = this._conn.createQueryRunner(); @@ -107,114 +112,183 @@ export class Database { return entities.map((entity: any) => entity.id); } - async getEntityWithRelations (entity: (new () => Entity), id: string, relationsMap: Map, block: BlockHeight = {}): Promise { - const queryRunner = this._conn.createQueryRunner(); + async getEntityWithRelations (queryRunner: QueryRunner, entity: (new () => Entity), id: string, relationsMap: Map, block: BlockHeight = {}, depth = 1): Promise { let { hash: blockHash, number: blockNumber } = block; + const repo = queryRunner.manager.getRepository(entity); + const whereOptions: any = { id }; - try { - const repo = queryRunner.manager.getRepository(entity); + if (blockNumber) { + whereOptions.blockNumber = LessThanOrEqual(blockNumber); + } - const whereOptions: any = { id }; + if (blockHash) { + whereOptions.blockHash = blockHash; + const block = await this._baseDatabase.getBlockProgress(queryRunner.manager.getRepository('block_progress'), blockHash); + blockNumber = block?.blockNumber; + } - if (blockNumber) { - whereOptions.blockNumber = LessThanOrEqual(blockNumber); + const findOptions = { + where: whereOptions, + order: { + blockNumber: 'DESC' } + }; - if (blockHash) { - whereOptions.blockHash = blockHash; - const block = await this._baseDatabase.getBlockProgress(queryRunner.manager.getRepository('block_progress'), blockHash); - blockNumber = block?.blockNumber; - } + let entityData: any = await repo.findOne(findOptions as FindOneOptions); - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; + if (!entityData && findOptions.where.blockHash) { + entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); + } - let entityData: any = await repo.findOne(findOptions as FindOneOptions); + // Get relational fields + if (entityData) { + entityData = await this.loadEntityRelations(queryRunner, block, relationsMap, entity, entityData, depth); + } - if (!entityData && findOptions.where.blockHash) { - entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } - - // Get relational fields - if (entityData) { - [entityData] = await this.loadRelations(block, relationsMap, entity, [entityData], 1); - } + return entityData; + } + async loadEntityRelations (queryRunner: QueryRunner, block: BlockHeight, relationsMap: Map, entity: new () => Entity, entityData: any, depth: number): Promise { + // Only support two-level nesting of relations + if (depth > 2) { return entityData; - } finally { - await queryRunner.release(); } + + const relations = relationsMap.get(entity); + if (relations === undefined) { + return entityData; + } + + const relationPromises = Object.entries(relations) + .map(async ([field, data]) => { + const { entity: relationEntity, isArray, isDerived, field: foreignKey } = data; + + if (isDerived) { + const where: Where = { + [foreignKey]: [{ + value: entityData.id, + not: false, + operator: 'equals' + }] + }; + + const relatedEntities = await this.getEntities( + queryRunner, + relationEntity, + relationsMap, + block, + where, + { limit: DEFAULT_LIMIT }, + depth + 1 + ); + + entityData[field] = relatedEntities; + + return; + } + + if (isArray) { + const where: Where = { + id: [{ + value: entityData[field], + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getEntities( + queryRunner, + relationEntity, + relationsMap, + block, + where, + { limit: DEFAULT_LIMIT }, + depth + 1 + ); + + entityData[field] = relatedEntities; + + return; + } + + // field is neither an array nor derivedFrom + const relatedEntity = await this.getEntityWithRelations( + queryRunner, + relationEntity, + entityData[field], + relationsMap, + block, + depth + 1 + ); + + entityData[field] = relatedEntity; + }); + + await Promise.all(relationPromises); + + return entityData; } - async getEntities (entity: new () => Entity, relationsMap: Map, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, depth = 1): Promise { - const queryRunner = this._conn.createQueryRunner(); - try { - const repo = queryRunner.manager.getRepository(entity); - const { tableName } = repo.metadata; + async getEntities (queryRunner: QueryRunner, entity: new () => Entity, relationsMap: Map, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, depth = 1): Promise { + const repo = queryRunner.manager.getRepository(entity); + const { tableName } = repo.metadata; - let subQuery = repo.createQueryBuilder('subTable') - .select('subTable.id', 'id') - .addSelect('MAX(subTable.block_number)', 'block_number') - .addFrom('block_progress', 'blockProgress') - .where('subTable.block_hash = blockProgress.block_hash') - .andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false }) - .groupBy('subTable.id'); + let subQuery = repo.createQueryBuilder('subTable') + .select('subTable.id', 'id') + .addSelect('MAX(subTable.block_number)', 'block_number') + .addFrom('block_progress', 'blockProgress') + .where('subTable.block_hash = blockProgress.block_hash') + .andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false }) + .groupBy('subTable.id'); - if (block.hash) { - const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash); + if (block.hash) { + const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash); - subQuery = subQuery - .andWhere(new Brackets(qb => { - qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes }) - .orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }); - })); - } - - if (block.number) { - subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number }); - } - - let selectQueryBuilder = repo.createQueryBuilder(tableName) - .innerJoin( - `(${subQuery.getQuery()})`, - 'latestEntities', - `${tableName}.id = "latestEntities"."id" AND ${tableName}.block_number = "latestEntities"."block_number"` - ) - .setParameters(subQuery.getParameters()); - - selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where); - - if (queryOptions.orderBy) { - selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions); - } - - selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }); - - if (queryOptions.skip) { - selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip); - } - - if (queryOptions.limit) { - selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit); - } - - const entities = await selectQueryBuilder.getMany(); - - if (!entities.length) { - return []; - } - - return this.loadRelations(block, relationsMap, entity, entities, depth); - } finally { - await queryRunner.release(); + subQuery = subQuery + .andWhere(new Brackets(qb => { + qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes }) + .orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }); + })); } + + if (block.number) { + subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number }); + } + + let selectQueryBuilder = repo.createQueryBuilder(tableName) + .innerJoin( + `(${subQuery.getQuery()})`, + 'latestEntities', + `${tableName}.id = "latestEntities"."id" AND ${tableName}.block_number = "latestEntities"."block_number"` + ) + .setParameters(subQuery.getParameters()); + + selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where); + + if (queryOptions.orderBy) { + selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions); + } + + selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }); + + if (queryOptions.skip) { + selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip); + } + + if (queryOptions.limit) { + selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit); + } + + const entities = await selectQueryBuilder.getMany(); + + if (!entities.length) { + return []; + } + + return this.loadEntitiesRelations(queryRunner, block, relationsMap, entity, entities, depth); } - async loadRelations (block: BlockHeight, relationsMap: Map, entity: new () => Entity, entities: Entity[], depth: number): Promise { + async loadEntitiesRelations (queryRunner: QueryRunner, block: BlockHeight, relationsMap: Map, entity: new () => Entity, entities: Entity[], depth: number): Promise { // Only support two-level nesting of relations if (depth > 2) { return entities; @@ -238,6 +312,7 @@ export class Database { }; const relatedEntities = await this.getEntities( + queryRunner, relationEntity, relationsMap, block, @@ -288,6 +363,7 @@ export class Database { }; const relatedEntities = await this.getEntities( + queryRunner, relationEntity, relationsMap, block, @@ -325,6 +401,7 @@ export class Database { }; const relatedEntities = await this.getEntities( + queryRunner, relationEntity, relationsMap, block, diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 1eaefff8..99bb796c 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -258,52 +258,72 @@ export class GraphWatcher { } async getEntity (entity: new () => Entity, id: string, relationsMap: Map, block?: BlockHeight): Promise { - // Get entity from the database. - const result = await this._database.getEntityWithRelations(entity, id, relationsMap, block); + const dbTx = await this._database.createTransactionRunner(); - // Resolve any field name conflicts in the entity result. - return resolveEntityFieldConflicts(result); + try { + // Get entity from the database. + const result = await this._database.getEntityWithRelations(dbTx, entity, id, relationsMap, block); + await dbTx.commitTransaction(); + + // Resolve any field name conflicts in the entity result. + return resolveEntityFieldConflicts(result); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async getEntities (entity: new () => Entity, relationsMap: Map, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise { - where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => { - const [field, ...suffix] = fieldWithSuffix.split('_'); + const dbTx = await this._database.createTransactionRunner(); - if (!acc[field]) { - acc[field] = []; + try { + where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => { + const [field, ...suffix] = fieldWithSuffix.split('_'); + + if (!acc[field]) { + acc[field] = []; + } + + const filter = { + value, + not: false, + operator: 'equals' + }; + + let operator = suffix.shift(); + + if (operator === 'not') { + filter.not = true; + operator = suffix.shift(); + } + + if (operator) { + filter.operator = operator; + } + + acc[field].push(filter); + + return acc; + }, {}); + + if (!queryOptions.limit) { + queryOptions.limit = DEFAULT_LIMIT; } - const filter = { - value, - not: false, - operator: 'equals' - }; + // Get entities from the database. + const entities = await this._database.getEntities(dbTx, entity, relationsMap, block, where, queryOptions); + await dbTx.commitTransaction(); - let operator = suffix.shift(); - - if (operator === 'not') { - filter.not = true; - operator = suffix.shift(); - } - - if (operator) { - filter.operator = operator; - } - - acc[field].push(filter); - - return acc; - }, {}); - - if (!queryOptions.limit) { - queryOptions.limit = DEFAULT_LIMIT; + // Resolve any field name conflicts in the entity result. + return entities.map(entity => resolveEntityFieldConflicts(entity)); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); } - - // Get entities from the database. - const entities = await this._database.getEntities(entity, relationsMap, block, where, queryOptions); - - // Resolve any field name conflicts in the entity result. - return entities.map(entity => resolveEntityFieldConflicts(entity)); } /** diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index 521197da..63b5522a 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import debug from 'debug'; import { JobQueue } from './job-queue'; @@ -27,7 +26,10 @@ export const fillBlocks = async ( } ): Promise => { let { startBlock, endBlock, prefetch = false, batchBlocks = DEFAULT_PREFETCH_BATCH_SIZE } = argv; - assert(startBlock <= endBlock, 'endBlock should be greater than or equal to startBlock'); + + if (startBlock > endBlock) { + throw new Error(`endBlock ${endBlock} should be greater than or equal to startBlock ${startBlock}`); + } const syncStatus = await indexer.getSyncStatus(); @@ -45,6 +47,10 @@ export const fillBlocks = async ( throw new Error(`Missing blocks between startBlock ${startBlock} and chainHeadBlockNumber ${syncStatus.chainHeadBlockNumber}`); } + if (endBlock <= syncStatus.chainHeadBlockNumber) { + throw new Error(`endBlock ${endBlock} should be greater than chainHeadBlockNumber ${syncStatus.chainHeadBlockNumber}`); + } + startBlock = syncStatus.chainHeadBlockNumber + 1; }