mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-07 20:08:06 +00:00
Load relations for single entity GQL query with default limit 100 (#182)
* Check endBlock greater than chainHeadBlock in fill blocks CLI * Load relations for single entity with limit 100 * Use single tx query runner for entity GQL query * Remove assert for CLI input checks
This commit is contained in:
parent
8960f67f1b
commit
5a7dcbd20f
@ -9,6 +9,7 @@ import {
|
||||
ConnectionOptions,
|
||||
FindOneOptions,
|
||||
LessThanOrEqual,
|
||||
QueryRunner,
|
||||
Repository
|
||||
} from 'typeorm';
|
||||
|
||||
@ -49,6 +50,10 @@ export class Database {
|
||||
return this._baseDatabase.close();
|
||||
}
|
||||
|
||||
async createTransactionRunner (): Promise<QueryRunner> {
|
||||
return this._baseDatabase.createTransactionRunner();
|
||||
}
|
||||
|
||||
async getEntity<Entity> (entity: (new () => Entity) | string, id: string, blockHash?: string): Promise<Entity | undefined> {
|
||||
const queryRunner = this._conn.createQueryRunner();
|
||||
|
||||
@ -107,114 +112,183 @@ export class Database {
|
||||
return entities.map((entity: any) => entity.id);
|
||||
}
|
||||
|
||||
async getEntityWithRelations<Entity> (entity: (new () => Entity), id: string, relationsMap: Map<any, { [key: string]: any }>, block: BlockHeight = {}): Promise<Entity | undefined> {
|
||||
const queryRunner = this._conn.createQueryRunner();
|
||||
async getEntityWithRelations<Entity> (queryRunner: QueryRunner, entity: (new () => Entity), id: string, relationsMap: Map<any, { [key: string]: any }>, block: BlockHeight = {}, depth = 1): Promise<Entity | undefined> {
|
||||
let { hash: blockHash, number: blockNumber } = block;
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
const whereOptions: any = { id };
|
||||
|
||||
try {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
}
|
||||
|
||||
const whereOptions: any = { id };
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
const block = await this._baseDatabase.getBlockProgress(queryRunner.manager.getRepository('block_progress'), blockHash);
|
||||
blockNumber = block?.blockNumber;
|
||||
}
|
||||
|
||||
if (blockNumber) {
|
||||
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
if (blockHash) {
|
||||
whereOptions.blockHash = blockHash;
|
||||
const block = await this._baseDatabase.getBlockProgress(queryRunner.manager.getRepository('block_progress'), blockHash);
|
||||
blockNumber = block?.blockNumber;
|
||||
}
|
||||
let entityData: any = await repo.findOne(findOptions as FindOneOptions<Entity>);
|
||||
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
if (!entityData && findOptions.where.blockHash) {
|
||||
entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
|
||||
}
|
||||
|
||||
let entityData: any = await repo.findOne(findOptions as FindOneOptions<Entity>);
|
||||
// Get relational fields
|
||||
if (entityData) {
|
||||
entityData = await this.loadEntityRelations(queryRunner, block, relationsMap, entity, entityData, depth);
|
||||
}
|
||||
|
||||
if (!entityData && findOptions.where.blockHash) {
|
||||
entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
|
||||
}
|
||||
|
||||
// Get relational fields
|
||||
if (entityData) {
|
||||
[entityData] = await this.loadRelations(block, relationsMap, entity, [entityData], 1);
|
||||
}
|
||||
return entityData;
|
||||
}
|
||||
|
||||
async loadEntityRelations<Entity> (queryRunner: QueryRunner, block: BlockHeight, relationsMap: Map<any, { [key: string]: any }>, entity: new () => Entity, entityData: any, depth: number): Promise<Entity> {
|
||||
// Only support two-level nesting of relations
|
||||
if (depth > 2) {
|
||||
return entityData;
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
}
|
||||
|
||||
const relations = relationsMap.get(entity);
|
||||
if (relations === undefined) {
|
||||
return entityData;
|
||||
}
|
||||
|
||||
const relationPromises = Object.entries(relations)
|
||||
.map(async ([field, data]) => {
|
||||
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = data;
|
||||
|
||||
if (isDerived) {
|
||||
const where: Where = {
|
||||
[foreignKey]: [{
|
||||
value: entityData.id,
|
||||
not: false,
|
||||
operator: 'equals'
|
||||
}]
|
||||
};
|
||||
|
||||
const relatedEntities = await this.getEntities(
|
||||
queryRunner,
|
||||
relationEntity,
|
||||
relationsMap,
|
||||
block,
|
||||
where,
|
||||
{ limit: DEFAULT_LIMIT },
|
||||
depth + 1
|
||||
);
|
||||
|
||||
entityData[field] = relatedEntities;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (isArray) {
|
||||
const where: Where = {
|
||||
id: [{
|
||||
value: entityData[field],
|
||||
not: false,
|
||||
operator: 'in'
|
||||
}]
|
||||
};
|
||||
|
||||
const relatedEntities = await this.getEntities(
|
||||
queryRunner,
|
||||
relationEntity,
|
||||
relationsMap,
|
||||
block,
|
||||
where,
|
||||
{ limit: DEFAULT_LIMIT },
|
||||
depth + 1
|
||||
);
|
||||
|
||||
entityData[field] = relatedEntities;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// field is neither an array nor derivedFrom
|
||||
const relatedEntity = await this.getEntityWithRelations(
|
||||
queryRunner,
|
||||
relationEntity,
|
||||
entityData[field],
|
||||
relationsMap,
|
||||
block,
|
||||
depth + 1
|
||||
);
|
||||
|
||||
entityData[field] = relatedEntity;
|
||||
});
|
||||
|
||||
await Promise.all(relationPromises);
|
||||
|
||||
return entityData;
|
||||
}
|
||||
|
||||
async getEntities<Entity> (entity: new () => Entity, relationsMap: Map<any, { [key: string]: any }>, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, depth = 1): Promise<Entity[]> {
|
||||
const queryRunner = this._conn.createQueryRunner();
|
||||
try {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
const { tableName } = repo.metadata;
|
||||
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, relationsMap: Map<any, { [key: string]: any }>, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, depth = 1): Promise<Entity[]> {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
const { tableName } = repo.metadata;
|
||||
|
||||
let subQuery = repo.createQueryBuilder('subTable')
|
||||
.select('subTable.id', 'id')
|
||||
.addSelect('MAX(subTable.block_number)', 'block_number')
|
||||
.addFrom('block_progress', 'blockProgress')
|
||||
.where('subTable.block_hash = blockProgress.block_hash')
|
||||
.andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false })
|
||||
.groupBy('subTable.id');
|
||||
let subQuery = repo.createQueryBuilder('subTable')
|
||||
.select('subTable.id', 'id')
|
||||
.addSelect('MAX(subTable.block_number)', 'block_number')
|
||||
.addFrom('block_progress', 'blockProgress')
|
||||
.where('subTable.block_hash = blockProgress.block_hash')
|
||||
.andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false })
|
||||
.groupBy('subTable.id');
|
||||
|
||||
if (block.hash) {
|
||||
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
|
||||
if (block.hash) {
|
||||
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
|
||||
|
||||
subQuery = subQuery
|
||||
.andWhere(new Brackets(qb => {
|
||||
qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes })
|
||||
.orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
|
||||
}));
|
||||
}
|
||||
|
||||
if (block.number) {
|
||||
subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number });
|
||||
}
|
||||
|
||||
let selectQueryBuilder = repo.createQueryBuilder(tableName)
|
||||
.innerJoin(
|
||||
`(${subQuery.getQuery()})`,
|
||||
'latestEntities',
|
||||
`${tableName}.id = "latestEntities"."id" AND ${tableName}.block_number = "latestEntities"."block_number"`
|
||||
)
|
||||
.setParameters(subQuery.getParameters());
|
||||
|
||||
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where);
|
||||
|
||||
if (queryOptions.orderBy) {
|
||||
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions);
|
||||
}
|
||||
|
||||
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' });
|
||||
|
||||
if (queryOptions.skip) {
|
||||
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
||||
}
|
||||
|
||||
if (queryOptions.limit) {
|
||||
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
||||
}
|
||||
|
||||
const entities = await selectQueryBuilder.getMany();
|
||||
|
||||
if (!entities.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return this.loadRelations(block, relationsMap, entity, entities, depth);
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
subQuery = subQuery
|
||||
.andWhere(new Brackets(qb => {
|
||||
qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes })
|
||||
.orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
|
||||
}));
|
||||
}
|
||||
|
||||
if (block.number) {
|
||||
subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number });
|
||||
}
|
||||
|
||||
let selectQueryBuilder = repo.createQueryBuilder(tableName)
|
||||
.innerJoin(
|
||||
`(${subQuery.getQuery()})`,
|
||||
'latestEntities',
|
||||
`${tableName}.id = "latestEntities"."id" AND ${tableName}.block_number = "latestEntities"."block_number"`
|
||||
)
|
||||
.setParameters(subQuery.getParameters());
|
||||
|
||||
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where);
|
||||
|
||||
if (queryOptions.orderBy) {
|
||||
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions);
|
||||
}
|
||||
|
||||
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' });
|
||||
|
||||
if (queryOptions.skip) {
|
||||
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
||||
}
|
||||
|
||||
if (queryOptions.limit) {
|
||||
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
||||
}
|
||||
|
||||
const entities = await selectQueryBuilder.getMany();
|
||||
|
||||
if (!entities.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return this.loadEntitiesRelations(queryRunner, block, relationsMap, entity, entities, depth);
|
||||
}
|
||||
|
||||
async loadRelations<Entity> (block: BlockHeight, relationsMap: Map<any, { [key: string]: any }>, entity: new () => Entity, entities: Entity[], depth: number): Promise<Entity[]> {
|
||||
async loadEntitiesRelations<Entity> (queryRunner: QueryRunner, block: BlockHeight, relationsMap: Map<any, { [key: string]: any }>, entity: new () => Entity, entities: Entity[], depth: number): Promise<Entity[]> {
|
||||
// Only support two-level nesting of relations
|
||||
if (depth > 2) {
|
||||
return entities;
|
||||
@ -238,6 +312,7 @@ export class Database {
|
||||
};
|
||||
|
||||
const relatedEntities = await this.getEntities(
|
||||
queryRunner,
|
||||
relationEntity,
|
||||
relationsMap,
|
||||
block,
|
||||
@ -288,6 +363,7 @@ export class Database {
|
||||
};
|
||||
|
||||
const relatedEntities = await this.getEntities(
|
||||
queryRunner,
|
||||
relationEntity,
|
||||
relationsMap,
|
||||
block,
|
||||
@ -325,6 +401,7 @@ export class Database {
|
||||
};
|
||||
|
||||
const relatedEntities = await this.getEntities(
|
||||
queryRunner,
|
||||
relationEntity,
|
||||
relationsMap,
|
||||
block,
|
||||
|
@ -258,52 +258,72 @@ export class GraphWatcher {
|
||||
}
|
||||
|
||||
async getEntity<Entity> (entity: new () => Entity, id: string, relationsMap: Map<any, { [key: string]: any }>, block?: BlockHeight): Promise<any> {
|
||||
// Get entity from the database.
|
||||
const result = await this._database.getEntityWithRelations(entity, id, relationsMap, block);
|
||||
const dbTx = await this._database.createTransactionRunner();
|
||||
|
||||
// Resolve any field name conflicts in the entity result.
|
||||
return resolveEntityFieldConflicts(result);
|
||||
try {
|
||||
// Get entity from the database.
|
||||
const result = await this._database.getEntityWithRelations(dbTx, entity, id, relationsMap, block);
|
||||
await dbTx.commitTransaction();
|
||||
|
||||
// Resolve any field name conflicts in the entity result.
|
||||
return resolveEntityFieldConflicts(result);
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
|
||||
async getEntities<Entity> (entity: new () => Entity, relationsMap: Map<any, { [key: string]: any }>, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise<any> {
|
||||
where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => {
|
||||
const [field, ...suffix] = fieldWithSuffix.split('_');
|
||||
const dbTx = await this._database.createTransactionRunner();
|
||||
|
||||
if (!acc[field]) {
|
||||
acc[field] = [];
|
||||
try {
|
||||
where = Object.entries(where).reduce((acc: { [key: string]: any }, [fieldWithSuffix, value]) => {
|
||||
const [field, ...suffix] = fieldWithSuffix.split('_');
|
||||
|
||||
if (!acc[field]) {
|
||||
acc[field] = [];
|
||||
}
|
||||
|
||||
const filter = {
|
||||
value,
|
||||
not: false,
|
||||
operator: 'equals'
|
||||
};
|
||||
|
||||
let operator = suffix.shift();
|
||||
|
||||
if (operator === 'not') {
|
||||
filter.not = true;
|
||||
operator = suffix.shift();
|
||||
}
|
||||
|
||||
if (operator) {
|
||||
filter.operator = operator;
|
||||
}
|
||||
|
||||
acc[field].push(filter);
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
if (!queryOptions.limit) {
|
||||
queryOptions.limit = DEFAULT_LIMIT;
|
||||
}
|
||||
|
||||
const filter = {
|
||||
value,
|
||||
not: false,
|
||||
operator: 'equals'
|
||||
};
|
||||
// Get entities from the database.
|
||||
const entities = await this._database.getEntities(dbTx, entity, relationsMap, block, where, queryOptions);
|
||||
await dbTx.commitTransaction();
|
||||
|
||||
let operator = suffix.shift();
|
||||
|
||||
if (operator === 'not') {
|
||||
filter.not = true;
|
||||
operator = suffix.shift();
|
||||
}
|
||||
|
||||
if (operator) {
|
||||
filter.operator = operator;
|
||||
}
|
||||
|
||||
acc[field].push(filter);
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
if (!queryOptions.limit) {
|
||||
queryOptions.limit = DEFAULT_LIMIT;
|
||||
// Resolve any field name conflicts in the entity result.
|
||||
return entities.map(entity => resolveEntityFieldConflicts(entity));
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
// Get entities from the database.
|
||||
const entities = await this._database.getEntities(entity, relationsMap, block, where, queryOptions);
|
||||
|
||||
// Resolve any field name conflicts in the entity result.
|
||||
return entities.map(entity => resolveEntityFieldConflicts(entity));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2,7 +2,6 @@
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
|
||||
import { JobQueue } from './job-queue';
|
||||
@ -27,7 +26,10 @@ export const fillBlocks = async (
|
||||
}
|
||||
): Promise<any> => {
|
||||
let { startBlock, endBlock, prefetch = false, batchBlocks = DEFAULT_PREFETCH_BATCH_SIZE } = argv;
|
||||
assert(startBlock <= endBlock, 'endBlock should be greater than or equal to startBlock');
|
||||
|
||||
if (startBlock > endBlock) {
|
||||
throw new Error(`endBlock ${endBlock} should be greater than or equal to startBlock ${startBlock}`);
|
||||
}
|
||||
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
|
||||
@ -45,6 +47,10 @@ export const fillBlocks = async (
|
||||
throw new Error(`Missing blocks between startBlock ${startBlock} and chainHeadBlockNumber ${syncStatus.chainHeadBlockNumber}`);
|
||||
}
|
||||
|
||||
if (endBlock <= syncStatus.chainHeadBlockNumber) {
|
||||
throw new Error(`endBlock ${endBlock} should be greater than chainHeadBlockNumber ${syncStatus.chainHeadBlockNumber}`);
|
||||
}
|
||||
|
||||
startBlock = syncStatus.chainHeadBlockNumber + 1;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user