mirror of
https://github.com/cerc-io/watcher-ts
synced 2026-05-12 10:04:08 +00:00
* Handle object type for where clause on nested GQL selections * Handle list type for where clause on nested GQL selections * Generate GQL schema types with arguments on plural fields
1533 lines
53 KiB
TypeScript
1533 lines
53 KiB
TypeScript
//
|
|
// Copyright 2021 Vulcanize, Inc.
|
|
//
|
|
|
|
import assert from 'assert';
|
|
import {
|
|
Connection,
|
|
FindOneOptions,
|
|
In,
|
|
LessThanOrEqual,
|
|
MoreThan,
|
|
QueryRunner,
|
|
Repository,
|
|
SelectQueryBuilder,
|
|
UpdateResult,
|
|
ObjectLiteral
|
|
} from 'typeorm';
|
|
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
|
|
import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer';
|
|
import { ArgumentNode, FieldNode, GraphQLResolveInfo, SelectionNode, IntValueNode, EnumValueNode, ObjectValueNode, ObjectFieldNode, ValueNode } from 'graphql';
|
|
import _ from 'lodash';
|
|
import debug from 'debug';
|
|
|
|
import { Database as BaseDatabase, QueryOptions, Where, CanonicalBlockHeight, Filter, OPERATOR_MAP, OrderDirection } from '../database';
|
|
import { BlockProgressInterface } from '../types';
|
|
import { cachePrunedEntitiesCount, eventProcessingLoadEntityCacheHitCount, eventProcessingLoadEntityCount, eventProcessingLoadEntityDBQueryDuration } from '../metrics';
|
|
import { ServerConfig } from '../config';
|
|
import { Block, formatValue, fromEntityValue, getLatestEntityFromEntity, parseEntityValue, resolveEntityFieldConflicts, toEntityValue, JSONbigNative } from './utils';
|
|
import { fromStateEntityValues } from './state-utils';
|
|
import { ValueKind } from './types';
|
|
|
|
const log = debug('vulcanize:graph-database');
|
|
|
|
export const FILTER_CHANGE_BLOCK = '_change_block';
|
|
|
|
export const DEFAULT_LIMIT = 100;
|
|
const DEFAULT_CLEAR_ENTITIES_CACHE_INTERVAL = 1000;
|
|
|
|
export enum ENTITY_QUERY_TYPE {
|
|
SINGULAR,
|
|
DISTINCT_ON,
|
|
GROUP_BY,
|
|
UNIQUE,
|
|
}
|
|
|
|
interface CachedEntities {
|
|
frothyBlocks: Map<
|
|
string,
|
|
{
|
|
blockNumber: number;
|
|
parentHash: string;
|
|
entities: Map<string, Map<string, { [key: string]: any }>>;
|
|
}
|
|
>;
|
|
latestPrunedEntities: Map<string, Map<string, { [key: string]: any }>>;
|
|
}
|
|
|
|
export class GraphDatabase {
|
|
_serverConfig: ServerConfig;
|
|
_conn!: Connection;
|
|
_baseDatabase: BaseDatabase;
|
|
_entityQueryTypeMap: Map<new() => any, ENTITY_QUERY_TYPE>;
|
|
_entityToLatestEntityMap: Map<new () => any, new () => any> = new Map();
|
|
|
|
_cachedEntities: CachedEntities = {
|
|
frothyBlocks: new Map(),
|
|
latestPrunedEntities: new Map()
|
|
};
|
|
|
|
constructor (
|
|
serverConfig: ServerConfig,
|
|
baseDatabase: BaseDatabase,
|
|
entityQueryTypeMap: Map<new () => any, ENTITY_QUERY_TYPE> = new Map(),
|
|
entityToLatestEntityMap: Map<new () => any, new () => any> = new Map()
|
|
) {
|
|
this._serverConfig = serverConfig;
|
|
this._baseDatabase = baseDatabase;
|
|
this._entityQueryTypeMap = entityQueryTypeMap;
|
|
this._entityToLatestEntityMap = entityToLatestEntityMap;
|
|
}
|
|
|
|
get cachedEntities (): CachedEntities {
|
|
return this._cachedEntities;
|
|
}
|
|
|
|
async init (): Promise<void> {
|
|
this._conn = this._baseDatabase.conn;
|
|
assert(this._conn);
|
|
}
|
|
|
|
async close (): Promise<void> {
|
|
return this._baseDatabase.close();
|
|
}
|
|
|
|
async createTransactionRunner (): Promise<QueryRunner> {
|
|
return this._baseDatabase.createTransactionRunner();
|
|
}
|
|
|
|
async getModelEntity<Entity extends ObjectLiteral> (repo: Repository<Entity>, whereOptions: any): Promise<Entity | undefined> {
|
|
eventProcessingLoadEntityCount.inc();
|
|
|
|
const findOptions = {
|
|
where: whereOptions,
|
|
order: {
|
|
blockNumber: 'DESC'
|
|
}
|
|
};
|
|
|
|
if (findOptions.where.blockHash) {
|
|
// Check cache only if latestPrunedEntities is updated.
|
|
// latestPrunedEntities is updated when frothyBlocks is filled till canonical block height.
|
|
if (this._cachedEntities.latestPrunedEntities.size > 0) {
|
|
let frothyBlock = this._cachedEntities.frothyBlocks.get(findOptions.where.blockHash);
|
|
let canonicalBlockNumber = -1;
|
|
|
|
// Loop through frothy region until latest entity is found.
|
|
while (frothyBlock) {
|
|
const entity = frothyBlock.entities
|
|
.get(repo.metadata.tableName)
|
|
?.get(findOptions.where.id);
|
|
|
|
if (entity) {
|
|
eventProcessingLoadEntityCacheHitCount.inc();
|
|
return _.cloneDeep(entity) as Entity;
|
|
}
|
|
|
|
canonicalBlockNumber = frothyBlock.blockNumber + 1;
|
|
frothyBlock = this._cachedEntities.frothyBlocks.get(frothyBlock.parentHash);
|
|
}
|
|
|
|
// Canonical block number is not assigned if blockHash does not exist in frothy region.
|
|
// Get latest pruned entity from cache only if blockHash exists in frothy region.
|
|
// i.e. Latest entity in cache is the version before frothy region.
|
|
if (canonicalBlockNumber > -1) {
|
|
// If entity not found in frothy region get latest entity in the pruned region.
|
|
// Check if latest entity is cached in pruned region.
|
|
const entity = this._cachedEntities.latestPrunedEntities
|
|
.get(repo.metadata.tableName)
|
|
?.get(findOptions.where.id);
|
|
|
|
if (entity) {
|
|
eventProcessingLoadEntityCacheHitCount.inc();
|
|
return _.cloneDeep(entity) as Entity;
|
|
}
|
|
|
|
// Get latest pruned entity from DB if not found in cache.
|
|
const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer();
|
|
const dbEntity = await this._baseDatabase.getLatestPrunedEntity(repo, findOptions.where.id, canonicalBlockNumber);
|
|
endTimer();
|
|
|
|
if (dbEntity) {
|
|
// Update latest pruned entity in cache.
|
|
this.cacheUpdatedEntity(repo, dbEntity, true);
|
|
}
|
|
|
|
return dbEntity;
|
|
}
|
|
}
|
|
|
|
assert(repo.queryRunner);
|
|
const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer();
|
|
const dbEntity = await this._baseDatabase.getPrevEntityVersion(repo.queryRunner, repo, findOptions);
|
|
endTimer();
|
|
|
|
return dbEntity;
|
|
}
|
|
|
|
return repo.findOne(findOptions);
|
|
}
|
|
|
|
async getEntity<Entity extends ObjectLiteral> (entityName: string, id: string, blockHash?: string): Promise<Entity | undefined> {
|
|
const queryRunner = this._conn.createQueryRunner();
|
|
|
|
try {
|
|
const repo: Repository<Entity> = queryRunner.manager.getRepository(entityName);
|
|
|
|
const whereOptions: { [key: string]: any } = { id };
|
|
|
|
if (blockHash) {
|
|
whereOptions.blockHash = blockHash;
|
|
}
|
|
|
|
const entity = await this.getModelEntity(repo, whereOptions);
|
|
return entity;
|
|
} catch (error) {
|
|
log(error);
|
|
throw error;
|
|
} finally {
|
|
await queryRunner.release();
|
|
}
|
|
}
|
|
|
|
async getEntityIdsAtBlockNumber (blockNumber: number, tableName: string): Promise<string[]> {
|
|
const repo = this._conn.getRepository(tableName);
|
|
|
|
const entities = await repo.find({
|
|
select: ['id'],
|
|
where: {
|
|
blockNumber
|
|
}
|
|
});
|
|
|
|
return entities.map((entity: any) => entity.id);
|
|
}
|
|
|
|
async isEntityUpdatedAtBlockNumber (blockNumber: number, tableName: string): Promise<boolean> {
|
|
const repo = this._conn.getRepository(tableName);
|
|
|
|
const count = await repo.count({
|
|
where: {
|
|
blockNumber
|
|
}
|
|
});
|
|
|
|
return count > 0;
|
|
}
|
|
|
|
async getEntityWithRelations<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: (new () => Entity),
|
|
id: string,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight = {},
|
|
selections: ReadonlyArray<SelectionNode> = [],
|
|
queryInfo: GraphQLResolveInfo
|
|
): Promise<Entity | undefined> {
|
|
const { hash: blockHash, number: blockNumber } = block;
|
|
const repo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
const whereOptions: any = { id };
|
|
|
|
if (blockHash) {
|
|
whereOptions.blockHash = blockHash;
|
|
} else if (blockNumber) {
|
|
whereOptions.blockNumber = LessThanOrEqual(blockNumber);
|
|
}
|
|
|
|
const findOptions = {
|
|
where: whereOptions,
|
|
order: {
|
|
blockNumber: 'DESC'
|
|
}
|
|
};
|
|
|
|
let entityData = await repo.findOne(findOptions as FindOneOptions<Entity>);
|
|
|
|
if (!entityData && findOptions.where.blockHash) {
|
|
entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
|
|
}
|
|
|
|
// Get relational fields
|
|
if (entityData) {
|
|
const defragmentedSelections = this._defragmentGQLQuerySelections(selections, queryInfo);
|
|
entityData = await this.loadEntityRelations(queryRunner, block, relationsMap, entityType, entityData, defragmentedSelections, queryInfo);
|
|
}
|
|
|
|
return entityData;
|
|
}
|
|
|
|
async loadEntityRelations<Entity> (
|
|
queryRunner: QueryRunner,
|
|
block: CanonicalBlockHeight,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
entityType: new () => Entity, entityData: any,
|
|
selections: ReadonlyArray<SelectionNode> = [],
|
|
queryInfo: GraphQLResolveInfo
|
|
): Promise<Entity> {
|
|
const relations = relationsMap.get(entityType);
|
|
if (relations === undefined) {
|
|
return entityData;
|
|
}
|
|
|
|
const relationPromises = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value]))
|
|
.map(async (selection) => {
|
|
assert(selection.kind === 'Field');
|
|
|
|
const field = selection.name.value;
|
|
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field];
|
|
let childSelections = selection.selectionSet?.selections || [];
|
|
|
|
// Filter out __typename field in GQL for loading relations.
|
|
childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename'));
|
|
|
|
// Parse selection's arguments
|
|
let { where: relationWhere, queryOptions: relationQueryOptions } = this._getGQLSelectionFieldArguments(selection, queryInfo);
|
|
|
|
if (isDerived) {
|
|
const where: Where = {
|
|
[foreignKey]: [{
|
|
value: entityData.id,
|
|
not: false,
|
|
operator: 'equals'
|
|
}]
|
|
};
|
|
relationWhere = _.mergeWith(relationWhere, where, (objValue: any, srcValue: any) => {
|
|
if (Array.isArray(objValue)) {
|
|
// Overwrite the array in the target object with the source array
|
|
return srcValue;
|
|
}
|
|
});
|
|
|
|
const relatedEntities = await this.getEntities(
|
|
queryRunner,
|
|
relationEntity,
|
|
relationsMap,
|
|
block,
|
|
relationWhere,
|
|
relationQueryOptions,
|
|
childSelections,
|
|
queryInfo
|
|
);
|
|
|
|
entityData[field] = relatedEntities;
|
|
|
|
return;
|
|
}
|
|
|
|
if (isArray) {
|
|
const where: Where = {
|
|
id: [{
|
|
value: entityData[field],
|
|
not: false,
|
|
operator: 'in'
|
|
}]
|
|
};
|
|
relationWhere = _.mergeWith(relationWhere, where, (objValue: any, srcValue: any) => {
|
|
if (Array.isArray(objValue)) {
|
|
// Overwrite the array in the target object with the source array
|
|
return srcValue;
|
|
}
|
|
});
|
|
|
|
const relatedEntities = await this.getEntities(
|
|
queryRunner,
|
|
relationEntity,
|
|
relationsMap,
|
|
block,
|
|
relationWhere,
|
|
relationQueryOptions,
|
|
childSelections,
|
|
queryInfo
|
|
);
|
|
|
|
entityData[field] = relatedEntities;
|
|
|
|
return;
|
|
}
|
|
|
|
// field is neither an array nor derivedFrom
|
|
const relatedEntity = await this.getEntityWithRelations(
|
|
queryRunner,
|
|
relationEntity,
|
|
entityData[field],
|
|
relationsMap,
|
|
block,
|
|
childSelections,
|
|
queryInfo
|
|
);
|
|
|
|
entityData[field] = relatedEntity;
|
|
});
|
|
|
|
await Promise.all(relationPromises);
|
|
|
|
return entityData;
|
|
}
|
|
|
|
async getEntities<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight = {},
|
|
where: Where = {},
|
|
queryOptions: QueryOptions = {},
|
|
selections: ReadonlyArray<SelectionNode> = [],
|
|
queryInfo: GraphQLResolveInfo
|
|
): Promise<Entity[]> {
|
|
let entities: Entity[] = [];
|
|
const latestEntityType = this._entityToLatestEntityMap.get(entityType);
|
|
|
|
const defragmentedSelections = this._defragmentGQLQuerySelections(selections, queryInfo);
|
|
|
|
if (latestEntityType) {
|
|
if (Object.keys(block).length) {
|
|
// Use lateral query for entities with latest entity table.
|
|
entities = await this.getEntitiesLateral(
|
|
queryRunner,
|
|
entityType,
|
|
latestEntityType,
|
|
relationsMap,
|
|
block,
|
|
where,
|
|
queryOptions
|
|
);
|
|
} else {
|
|
// Use latest entity tables if block height not passed.
|
|
entities = await this.getEntitiesLatest(
|
|
queryRunner,
|
|
entityType,
|
|
latestEntityType,
|
|
relationsMap,
|
|
where,
|
|
queryOptions,
|
|
defragmentedSelections
|
|
);
|
|
}
|
|
} else {
|
|
// Use different suitable query patterns based on entities.
|
|
switch (this._entityQueryTypeMap.get(entityType)) {
|
|
case ENTITY_QUERY_TYPE.SINGULAR:
|
|
entities = await this.getEntitiesSingular(queryRunner, entityType, relationsMap, block, where);
|
|
break;
|
|
|
|
case ENTITY_QUERY_TYPE.UNIQUE:
|
|
entities = await this.getEntitiesUnique(queryRunner, entityType, relationsMap, block, where, queryOptions);
|
|
break;
|
|
|
|
case ENTITY_QUERY_TYPE.DISTINCT_ON:
|
|
entities = await this.getEntitiesDistinctOn(queryRunner, entityType, relationsMap, block, where, queryOptions);
|
|
break;
|
|
|
|
case ENTITY_QUERY_TYPE.GROUP_BY:
|
|
default:
|
|
// Use group by query if entity query type is not specified in map.
|
|
entities = await this.getEntitiesGroupBy(queryRunner, entityType, relationsMap, block, where, queryOptions);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!entities.length) {
|
|
return [];
|
|
}
|
|
|
|
entities = await this.loadEntitiesRelations(queryRunner, block, relationsMap, entityType, entities, defragmentedSelections, queryInfo);
|
|
// Resolve any field name conflicts in the entity result.
|
|
entities = entities.map(entity => resolveEntityFieldConflicts(entity));
|
|
|
|
return entities;
|
|
}
|
|
|
|
async getEntitiesGroupBy<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight,
|
|
where: Where = {},
|
|
queryOptions: QueryOptions = {}
|
|
): Promise<Entity[]> {
|
|
const repo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
const { tableName } = repo.metadata;
|
|
|
|
let subQuery = repo.createQueryBuilder('subTable')
|
|
.select('subTable.id', 'id')
|
|
.addSelect('MAX(subTable.block_number)', 'block_number')
|
|
.where('subTable.is_pruned = :isPruned', { isPruned: false })
|
|
.groupBy('subTable.id');
|
|
|
|
if (where.id) {
|
|
subQuery = this._baseDatabase.buildQuery(repo, subQuery, { id: where.id });
|
|
delete where.id;
|
|
}
|
|
|
|
if (where[FILTER_CHANGE_BLOCK]) {
|
|
subQuery = subQuery.andWhere('subTable.block_number >= :changeBlockNumber', { changeBlockNumber: where[FILTER_CHANGE_BLOCK][0].value });
|
|
delete where[FILTER_CHANGE_BLOCK];
|
|
}
|
|
|
|
subQuery = await this._baseDatabase.applyBlockHeightFilter(queryRunner, subQuery, block, 'subTable');
|
|
|
|
let selectQueryBuilder = repo.createQueryBuilder(tableName)
|
|
.innerJoin(
|
|
`(${subQuery.getQuery()})`,
|
|
'latestEntities',
|
|
`${tableName}.id = "latestEntities"."id" AND ${tableName}.block_number = "latestEntities"."block_number"`
|
|
)
|
|
.setParameters(subQuery.getParameters())
|
|
.where(`${tableName}.is_pruned = :isPruned`, { isPruned: false });
|
|
|
|
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where, relationsMap.get(entityType), block);
|
|
|
|
if (queryOptions.tsRankBy) {
|
|
selectQueryBuilder = this._baseDatabase.orderTsQuery(repo, selectQueryBuilder, queryOptions);
|
|
}
|
|
|
|
if (queryOptions.orderBy) {
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions, relationsMap.get(entityType), block);
|
|
}
|
|
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' });
|
|
|
|
if (queryOptions.skip) {
|
|
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
|
}
|
|
|
|
if (queryOptions.limit) {
|
|
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
|
}
|
|
|
|
const entities = await selectQueryBuilder.getMany();
|
|
|
|
return entities;
|
|
}
|
|
|
|
async getEntitiesDistinctOn<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight,
|
|
where: Where = {},
|
|
queryOptions: QueryOptions = {}
|
|
): Promise<Entity[]> {
|
|
const repo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
|
|
let subQuery = repo.createQueryBuilder('subTable')
|
|
.distinctOn(['subTable.id'])
|
|
.where('subTable.is_pruned = :isPruned', { isPruned: false })
|
|
.addOrderBy('subTable.id', 'ASC')
|
|
.addOrderBy('subTable.block_number', 'DESC');
|
|
|
|
if (where.id) {
|
|
subQuery = this._baseDatabase.buildQuery(repo, subQuery, { id: where.id });
|
|
delete where.id;
|
|
}
|
|
|
|
if (where[FILTER_CHANGE_BLOCK]) {
|
|
subQuery = subQuery.andWhere('subTable.block_number >= :changeBlockNumber', { changeBlockNumber: where[FILTER_CHANGE_BLOCK][0].value });
|
|
delete where[FILTER_CHANGE_BLOCK];
|
|
}
|
|
|
|
subQuery = await this._baseDatabase.applyBlockHeightFilter(queryRunner, subQuery, block, 'subTable');
|
|
|
|
subQuery = this._baseDatabase.buildQuery(repo, subQuery, where, relationsMap.get(entityType), block);
|
|
|
|
let selectQueryBuilder = queryRunner.manager.createQueryBuilder()
|
|
.select('"latestEntities".*')
|
|
.from(`(${subQuery.getQuery()})`, 'latestEntities')
|
|
.setParameters(subQuery.getParameters()) as SelectQueryBuilder<Entity>;
|
|
|
|
if (queryOptions.tsRankBy) {
|
|
selectQueryBuilder = this._baseDatabase.orderTsQuery(repo, selectQueryBuilder, queryOptions, 'subTable_');
|
|
}
|
|
|
|
if (queryOptions.orderBy) {
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions, relationsMap.get(entityType), block, 'subTable_');
|
|
if (queryOptions.orderBy !== 'id') {
|
|
// Order by id if ordered by some non-id column (for rows having same value for the column ordered on)
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }, {}, {}, 'subTable_');
|
|
}
|
|
}
|
|
|
|
if (queryOptions.skip) {
|
|
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
|
}
|
|
|
|
if (queryOptions.limit) {
|
|
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
|
}
|
|
|
|
let entities = await selectQueryBuilder.getRawMany();
|
|
entities = await this.transformResults(queryRunner, repo.createQueryBuilder('subTable'), entities);
|
|
|
|
return entities as Entity[];
|
|
}
|
|
|
|
async getEntitiesSingular<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight,
|
|
where: Where = {}
|
|
): Promise<Entity[]> {
|
|
const repo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
const { tableName } = repo.metadata;
|
|
|
|
let selectQueryBuilder = repo.createQueryBuilder(tableName)
|
|
.where('is_pruned = :isPruned', { isPruned: false })
|
|
.addOrderBy(`${tableName}.block_number`, 'DESC')
|
|
.limit(1);
|
|
|
|
if (where[FILTER_CHANGE_BLOCK]) {
|
|
selectQueryBuilder = selectQueryBuilder.andWhere(`${tableName}.block_number >= :changeBlockNumber`, { changeBlockNumber: where[FILTER_CHANGE_BLOCK][0].value });
|
|
delete where[FILTER_CHANGE_BLOCK];
|
|
}
|
|
|
|
selectQueryBuilder = await this._baseDatabase.applyBlockHeightFilter(queryRunner, selectQueryBuilder, block, tableName);
|
|
|
|
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where, relationsMap.get(entityType), block);
|
|
|
|
const entities = await selectQueryBuilder.getMany();
|
|
|
|
return entities as Entity[];
|
|
}
|
|
|
|
async getEntitiesUnique<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight,
|
|
where: Where = {},
|
|
queryOptions: QueryOptions = {}
|
|
): Promise<Entity[]> {
|
|
const repo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
const { tableName } = repo.metadata;
|
|
|
|
let selectQueryBuilder = repo.createQueryBuilder(tableName)
|
|
.where('is_pruned = :isPruned', { isPruned: false });
|
|
|
|
if (where[FILTER_CHANGE_BLOCK]) {
|
|
selectQueryBuilder = selectQueryBuilder.andWhere(`${tableName}.block_number >= :changeBlockNumber`, { changeBlockNumber: where[FILTER_CHANGE_BLOCK][0].value });
|
|
delete where[FILTER_CHANGE_BLOCK];
|
|
}
|
|
|
|
selectQueryBuilder = await this._baseDatabase.applyBlockHeightFilter(queryRunner, selectQueryBuilder, block, tableName);
|
|
|
|
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where, relationsMap.get(entityType), block);
|
|
|
|
if (queryOptions.tsRankBy) {
|
|
selectQueryBuilder = this._baseDatabase.orderTsQuery(repo, selectQueryBuilder, queryOptions);
|
|
}
|
|
|
|
if (queryOptions.orderBy) {
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions, relationsMap.get(entityType), block);
|
|
}
|
|
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' });
|
|
|
|
if (queryOptions.skip) {
|
|
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
|
}
|
|
|
|
if (queryOptions.limit) {
|
|
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
|
}
|
|
|
|
const entities = await selectQueryBuilder.getMany();
|
|
|
|
return entities as Entity[];
|
|
}
|
|
|
|
async getEntitiesLatest<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
latestEntity: new () => any,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
where: Where = {},
|
|
queryOptions: QueryOptions = {},
|
|
selections: ReadonlyArray<SelectionNode> = []
|
|
): Promise<Entity[]> {
|
|
const entityRepo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
const latestEntityRepo = queryRunner.manager.getRepository<any>(latestEntity);
|
|
const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName);
|
|
|
|
const selectionNotInLatestEntity = selections.filter(selection => selection.kind === 'Field' && selection.name.value !== '__typename')
|
|
.some(selection => {
|
|
assert(selection.kind === 'Field');
|
|
|
|
return !latestEntityFields.includes(selection.name.value);
|
|
});
|
|
|
|
// Use latest entity table for faster query.
|
|
let repo = latestEntityRepo;
|
|
let selectQueryBuilder = repo.createQueryBuilder('latest');
|
|
|
|
if (selectionNotInLatestEntity) {
|
|
// Join with latest entity table if selection field doesn't exist in latest entity.
|
|
repo = entityRepo;
|
|
|
|
selectQueryBuilder = repo.createQueryBuilder(repo.metadata.tableName)
|
|
.innerJoin(
|
|
latestEntity,
|
|
'latest',
|
|
`latest.id = ${repo.metadata.tableName}.id AND latest.blockHash = ${repo.metadata.tableName}.blockHash`
|
|
);
|
|
}
|
|
|
|
if (where[FILTER_CHANGE_BLOCK]) {
|
|
selectQueryBuilder = selectQueryBuilder.andWhere('latest.block_number >= :changeBlockNumber', { changeBlockNumber: where[FILTER_CHANGE_BLOCK][0].value });
|
|
delete where[FILTER_CHANGE_BLOCK];
|
|
}
|
|
|
|
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where, relationsMap.get(entityType), {}, 'latest');
|
|
|
|
if (queryOptions.tsRankBy) {
|
|
selectQueryBuilder = this._baseDatabase.orderTsQuery(repo, selectQueryBuilder, queryOptions, '', 'latest');
|
|
}
|
|
|
|
if (queryOptions.orderBy) {
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions, relationsMap.get(entityType), {}, '', 'latest');
|
|
}
|
|
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }, {}, {}, '', 'latest');
|
|
|
|
if (queryOptions.skip) {
|
|
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
|
}
|
|
|
|
if (queryOptions.limit) {
|
|
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
|
}
|
|
|
|
return selectQueryBuilder.getMany();
|
|
}
|
|
|
|
async getEntitiesLateral<Entity extends ObjectLiteral> (
|
|
queryRunner: QueryRunner,
|
|
entityType: new () => Entity,
|
|
latestEntity: new () => any,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
block: CanonicalBlockHeight,
|
|
where: Where = {},
|
|
queryOptions: QueryOptions = {}
|
|
): Promise<Entity[]> {
|
|
const entityRepo = queryRunner.manager.getRepository<Entity>(entityType);
|
|
const latestEntityRepo = queryRunner.manager.getRepository<any>(latestEntity);
|
|
|
|
let subQuery = entityRepo.createQueryBuilder('subTable')
|
|
.where('latest.id = subTable.id')
|
|
.andWhere('subTable.is_pruned = :isPruned', { isPruned: false })
|
|
.orderBy('subTable.block_number', 'DESC')
|
|
.limit(1);
|
|
|
|
if (where[FILTER_CHANGE_BLOCK]) {
|
|
subQuery = subQuery.andWhere('subTable.block_number >= :changeBlockNumber', { changeBlockNumber: where[FILTER_CHANGE_BLOCK][0].value });
|
|
delete where[FILTER_CHANGE_BLOCK];
|
|
}
|
|
|
|
subQuery = await this._baseDatabase.applyBlockHeightFilter(queryRunner, subQuery, block, 'subTable');
|
|
|
|
let selectQueryBuilder = latestEntityRepo.createQueryBuilder('latest')
|
|
.select('*')
|
|
.from(
|
|
qb => {
|
|
// https://stackoverflow.com/a/72026555/10026807
|
|
qb.getQuery = () => `LATERAL (${subQuery.getQuery()})`;
|
|
qb.setParameters(subQuery.getParameters());
|
|
return qb;
|
|
},
|
|
'result'
|
|
) as SelectQueryBuilder<Entity>;
|
|
|
|
selectQueryBuilder = this._baseDatabase.buildQuery(latestEntityRepo, selectQueryBuilder, where, relationsMap.get(entityType), block, 'latest');
|
|
|
|
if (queryOptions.tsRankBy) {
|
|
selectQueryBuilder = this._baseDatabase.orderTsQuery(latestEntityRepo, selectQueryBuilder, queryOptions, '', 'latest');
|
|
}
|
|
|
|
if (queryOptions.orderBy) {
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(latestEntityRepo, selectQueryBuilder, queryOptions, relationsMap.get(entityType), block, '', 'latest');
|
|
}
|
|
|
|
selectQueryBuilder = await this._baseDatabase.orderQuery(latestEntityRepo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }, {}, {}, '', 'latest');
|
|
|
|
if (queryOptions.skip) {
|
|
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
|
}
|
|
|
|
if (queryOptions.limit) {
|
|
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
|
}
|
|
|
|
let entities = await selectQueryBuilder.getRawMany();
|
|
entities = await this.transformResults(queryRunner, entityRepo.createQueryBuilder('subTable'), entities);
|
|
|
|
return entities as Entity[];
|
|
}
|
|
|
|
async loadEntitiesRelations<Entity> (
|
|
queryRunner: QueryRunner,
|
|
block: CanonicalBlockHeight,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
entity: new () => Entity,
|
|
entities: Entity[],
|
|
selections: ReadonlyArray<SelectionNode> = [],
|
|
queryInfo: GraphQLResolveInfo
|
|
): Promise<Entity[]> {
|
|
const relations = relationsMap.get(entity);
|
|
if (relations === undefined) {
|
|
return entities;
|
|
}
|
|
|
|
const relationSelections = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value]));
|
|
|
|
if (this._serverConfig.gql.loadRelationsSequential) {
|
|
for (const selection of relationSelections) {
|
|
await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection, queryInfo);
|
|
}
|
|
} else {
|
|
const loadRelationPromises = relationSelections.map(async selection => {
|
|
await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection, queryInfo);
|
|
});
|
|
|
|
await Promise.all(loadRelationPromises);
|
|
}
|
|
|
|
return entities;
|
|
}
|
|
|
|
async loadRelation<Entity> (
|
|
queryRunner: QueryRunner,
|
|
block: CanonicalBlockHeight,
|
|
relationsMap: Map<any, { [key: string]: any }>,
|
|
relations: { [key: string]: any },
|
|
entities: Entity[],
|
|
selection: SelectionNode,
|
|
queryInfo: GraphQLResolveInfo
|
|
): Promise<void> {
|
|
assert(selection.kind === 'Field');
|
|
|
|
const field = selection.name.value;
|
|
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field];
|
|
let childSelections = selection.selectionSet?.selections || [];
|
|
|
|
// Filter out __typename field in GQL for loading relations.
|
|
childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename'));
|
|
|
|
// Parse selection's arguments
|
|
let { where: relationWhere, queryOptions: relationQueryOptions } = this._getGQLSelectionFieldArguments(selection, queryInfo);
|
|
|
|
if (isDerived) {
|
|
const where: Where = {
|
|
[foreignKey]: [{
|
|
value: entities.map((entity: any) => entity.id),
|
|
not: false,
|
|
operator: 'in'
|
|
}]
|
|
};
|
|
relationWhere = _.mergeWith(relationWhere, where, (objValue: any, srcValue: any) => {
|
|
if (Array.isArray(objValue)) {
|
|
// Overwrite the array in the target object with the source array
|
|
return srcValue;
|
|
}
|
|
});
|
|
|
|
const relatedEntities = await this.getEntities(
|
|
queryRunner,
|
|
relationEntity,
|
|
relationsMap,
|
|
block,
|
|
relationWhere,
|
|
relationQueryOptions,
|
|
childSelections,
|
|
queryInfo
|
|
);
|
|
|
|
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => {
|
|
// Related entity might be loaded with data.
|
|
const parentEntityId = entity[foreignKey].id ?? entity[foreignKey];
|
|
|
|
if (!acc[parentEntityId]) {
|
|
acc[parentEntityId] = [];
|
|
}
|
|
|
|
if (acc[parentEntityId].length < DEFAULT_LIMIT) {
|
|
acc[parentEntityId].push(entity);
|
|
}
|
|
|
|
return acc;
|
|
}, {});
|
|
|
|
entities.forEach((entity: any) => {
|
|
if (relatedEntitiesMap[entity.id]) {
|
|
entity[field] = relatedEntitiesMap[entity.id];
|
|
} else {
|
|
entity[field] = [];
|
|
}
|
|
});
|
|
|
|
return;
|
|
}
|
|
|
|
if (isArray) {
|
|
const relatedIds = entities.reduce((acc: Set<string>, entity: any) => {
|
|
entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId));
|
|
|
|
return acc;
|
|
}, new Set());
|
|
|
|
const where: Where = {
|
|
id: [{
|
|
value: Array.from(relatedIds),
|
|
not: false,
|
|
operator: 'in'
|
|
}]
|
|
};
|
|
relationWhere = _.mergeWith(relationWhere, where, (objValue: any, srcValue: any) => {
|
|
if (Array.isArray(objValue)) {
|
|
// Overwrite the array in the target object with the source array
|
|
return srcValue;
|
|
}
|
|
});
|
|
|
|
const relatedEntities = await this.getEntities(
|
|
queryRunner,
|
|
relationEntity,
|
|
relationsMap,
|
|
block,
|
|
relationWhere,
|
|
relationQueryOptions,
|
|
childSelections,
|
|
queryInfo
|
|
);
|
|
|
|
entities.forEach((entity: any) => {
|
|
const relatedEntityIds: Set<string> = entity[field].reduce((acc: Set<string>, id: string) => {
|
|
acc.add(id);
|
|
|
|
return acc;
|
|
}, new Set());
|
|
|
|
entity[field] = [];
|
|
|
|
relatedEntities.forEach((relatedEntity: any) => {
|
|
if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) {
|
|
entity[field].push(relatedEntity);
|
|
}
|
|
});
|
|
});
|
|
|
|
return;
|
|
}
|
|
|
|
// field is neither an array nor derivedFrom
|
|
|
|
// Avoid loading relation if selections only has id field.
|
|
if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') {
|
|
entities.forEach((entity: any) => {
|
|
// Set only if field value is not null
|
|
if (entity[field]) {
|
|
entity[field] = { id: entity[field] };
|
|
}
|
|
});
|
|
|
|
return;
|
|
}
|
|
|
|
const where: Where = {
|
|
id: [{
|
|
value: entities.map((entity: any) => entity[field]),
|
|
not: false,
|
|
operator: 'in'
|
|
}]
|
|
};
|
|
relationWhere = _.mergeWith(relationWhere, where, (objValue: any, srcValue: any) => {
|
|
if (Array.isArray(objValue)) {
|
|
// Overwrite the array in the target object with the source array
|
|
return srcValue;
|
|
}
|
|
});
|
|
|
|
const relatedEntities = await this.getEntities(
|
|
queryRunner,
|
|
relationEntity,
|
|
relationsMap,
|
|
block,
|
|
relationWhere,
|
|
relationQueryOptions,
|
|
childSelections,
|
|
queryInfo
|
|
);
|
|
|
|
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => {
|
|
acc[entity.id] = entity;
|
|
|
|
return acc;
|
|
}, {});
|
|
|
|
entities.forEach((entity: any) => {
|
|
if (relatedEntitiesMap[entity[field]]) {
|
|
entity[field] = relatedEntitiesMap[entity[field]];
|
|
}
|
|
});
|
|
}
|
|
|
|
async saveEntity (entityType: string, data: any): Promise<any> {
|
|
const repo = this._conn.getRepository(entityType);
|
|
|
|
const dbEntity: any = repo.create(data);
|
|
return repo.save(dbEntity);
|
|
}
|
|
|
|
async toGraphEntity (instanceExports: any, entityName: string, data: any, entityTypesMap: Map<string, {[key: string]: string; }>): Promise<any> {
|
|
// Get field types for this entity
|
|
const entityTypes = entityTypesMap.get(entityName);
|
|
assert(entityTypes);
|
|
|
|
// TODO: Cache schema/columns.
|
|
const repo = this._conn.getRepository(entityName);
|
|
const entityFields = repo.metadata.columns;
|
|
|
|
const { Entity } = instanceExports;
|
|
const entityInstance = await Entity.__new();
|
|
|
|
const entityValuePromises = entityFields.filter(field => {
|
|
const { propertyName } = field;
|
|
|
|
// Filter out custom fields from entity to fill the entityInstance (wasm).
|
|
if (['isPruned', 'isRemoved', 'blockHash', 'blockNumber'].includes(propertyName)) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}).map(async (field) => {
|
|
// Fill _blockNumber as blockNumber and _blockHash as blockHash in the entityInstance (wasm).
|
|
if (['_blockNumber', '_blockHash'].includes(field.propertyName)) {
|
|
field.propertyName = field.propertyName.slice(1);
|
|
}
|
|
|
|
let gqlType = entityTypes[field.propertyName];
|
|
|
|
// If the mapped type is present in entityTypesMap, it's a relational field
|
|
// Get the type for id field in that case
|
|
if (entityTypesMap.has(gqlType)) {
|
|
const relatedEntityTypes = entityTypesMap.get(gqlType);
|
|
assert(relatedEntityTypes);
|
|
|
|
gqlType = relatedEntityTypes.id;
|
|
}
|
|
|
|
return toEntityValue(instanceExports, entityInstance, data, field, gqlType);
|
|
}, {});
|
|
|
|
await Promise.all(entityValuePromises);
|
|
|
|
return entityInstance;
|
|
}
|
|
|
|
async toGraphContext (instanceExports: any, contextData: any): Promise<any> {
|
|
const { Entity } = instanceExports;
|
|
const contextInstance = await Entity.__new();
|
|
|
|
const { __newString } = instanceExports;
|
|
const contextValuePromises = Object.entries((contextData ?? {}) as Record<string, { type: ValueKind, data: any }>).map(async ([key, { type, data }]) => {
|
|
const contextKey = await __newString(key);
|
|
|
|
const value = JSONbigNative.parse(data);
|
|
const contextValue = await formatValue(instanceExports, type, value);
|
|
|
|
return contextInstance.set(contextKey, contextValue);
|
|
});
|
|
|
|
await Promise.all(contextValuePromises);
|
|
|
|
return contextInstance;
|
|
}
|
|
|
|
async fromGraphContext (instanceExports: any, contextInstance: any): Promise<{ [key: string]: any }> {
|
|
const { __getString, __getArray, ValueTypedMapEntry } = instanceExports;
|
|
|
|
const contextInstanceEntries = __getArray(await contextInstance.entries);
|
|
const contextValuePromises = contextInstanceEntries.map(async (entryPtr: any) => {
|
|
const entry = await ValueTypedMapEntry.wrap(entryPtr);
|
|
const contextKeyPtr = await entry.key;
|
|
const contextValuePtr = await entry.value;
|
|
|
|
const key = await __getString(contextKeyPtr);
|
|
const parsedValue = await parseEntityValue(instanceExports, contextValuePtr);
|
|
|
|
return { key, ...parsedValue };
|
|
});
|
|
|
|
const contextValues = await Promise.all(contextValuePromises);
|
|
|
|
return contextValues.reduce((acc: { [key: string]: any }, contextValue: any) => {
|
|
const { key, type, data } = contextValue;
|
|
acc[key] = { type, data: JSONbigNative.stringify(data) };
|
|
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
async fromGraphEntity (instanceExports: any, block: Block, entityName: string, entityInstance: any): Promise<{ [key: string]: any }> {
|
|
// TODO: Cache schema/columns.
|
|
const repo = this._conn.getRepository(entityName);
|
|
const entityFields = repo.metadata.columns;
|
|
|
|
return this.getEntityValues(instanceExports, block, entityInstance, entityFields);
|
|
}
|
|
|
|
async getEntityValues (instanceExports: any, block: Block, entityInstance: any, entityFields: any): Promise<{ [key: string]: any } > {
|
|
const entityValuePromises = entityFields.map(async (field: any) => {
|
|
const { propertyName } = field;
|
|
|
|
if (['isPruned', 'isRemoved'].includes(propertyName)) {
|
|
return undefined;
|
|
}
|
|
|
|
// Get blockHash property for db entry from block instance.
|
|
if (propertyName === 'blockHash') {
|
|
return block.blockHash;
|
|
}
|
|
|
|
// Get blockNumber property for db entry from block instance.
|
|
if (propertyName === 'blockNumber') {
|
|
return block.blockNumber;
|
|
}
|
|
|
|
// Get blockNumber as _blockNumber and blockHash as _blockHash from the entityInstance (wasm).
|
|
if (['_blockNumber', '_blockHash'].includes(propertyName)) {
|
|
const entityValue = await fromEntityValue(instanceExports, entityInstance, propertyName.slice(1));
|
|
return entityValue.data;
|
|
}
|
|
|
|
const entityValue = await fromEntityValue(instanceExports, entityInstance, propertyName);
|
|
|
|
if (entityValue.type === ValueKind.ARRAY) {
|
|
return entityValue.data.map((el: any) => el.data);
|
|
}
|
|
|
|
return entityValue.data;
|
|
}, {});
|
|
|
|
const entityValues = await Promise.all(entityValuePromises);
|
|
|
|
return entityFields.reduce((acc: { [key: string]: any }, field: any, index: number) => {
|
|
const { propertyName } = field;
|
|
acc[propertyName] = entityValues[index];
|
|
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
fromState (block: BlockProgressInterface, entityName: string, stateEntity: any, relations: { [key: string]: any } = {}): any {
|
|
const repo = this._conn.getRepository(entityName);
|
|
const entityFields = repo.metadata.columns;
|
|
|
|
return this.getStateEntityValues(block, stateEntity, entityFields, relations);
|
|
}
|
|
|
|
getStateEntityValues (block: BlockProgressInterface, stateEntity: any, entityFields: ColumnMetadata[], relations: { [key: string]: any } = {}): { [key: string]: any } {
|
|
const entityValues = entityFields.map((field) => {
|
|
const { propertyName, transformer } = field;
|
|
|
|
// Get blockHash property for db entry from block instance.
|
|
if (propertyName === 'blockHash') {
|
|
return block.blockHash;
|
|
}
|
|
|
|
// Get blockNumber property for db entry from block instance.
|
|
if (propertyName === 'blockNumber') {
|
|
return block.blockNumber;
|
|
}
|
|
|
|
// Get blockNumber as _blockNumber and blockHash as _blockHash from the entityInstance (wasm).
|
|
if (['_blockNumber', '_blockHash'].includes(propertyName)) {
|
|
return fromStateEntityValues(stateEntity, propertyName.slice(1), relations, transformer);
|
|
}
|
|
|
|
return fromStateEntityValues(stateEntity, propertyName, relations, transformer);
|
|
}, {});
|
|
|
|
return entityFields.reduce((acc: { [key: string]: any }, field: any, index: number) => {
|
|
const { propertyName } = field;
|
|
acc[propertyName] = entityValues[index];
|
|
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
cacheUpdatedEntityByName (entityName: string, entity: any, pruned = false): void {
|
|
const repo = this._conn.getRepository<ObjectLiteral>(entityName);
|
|
this.cacheUpdatedEntity(repo, entity, pruned);
|
|
}
|
|
|
|
cacheUpdatedEntity<Entity extends ObjectLiteral> (repo: Repository<Entity>, entity: any, pruned = false): void {
|
|
const tableName = repo.metadata.tableName;
|
|
|
|
if (pruned) {
|
|
let entityIdMap = this._cachedEntities.latestPrunedEntities.get(tableName);
|
|
|
|
if (!entityIdMap) {
|
|
entityIdMap = new Map();
|
|
}
|
|
|
|
entityIdMap.set(entity.id, _.cloneDeep(entity));
|
|
this._cachedEntities.latestPrunedEntities.set(tableName, entityIdMap);
|
|
return;
|
|
}
|
|
|
|
const frothyBlock = this._cachedEntities.frothyBlocks.get(entity.blockHash);
|
|
|
|
// Update frothyBlock only if already present in cache.
|
|
// Might not be present when event processing starts without block processing on job retry.
|
|
if (frothyBlock) {
|
|
let entityIdMap = frothyBlock.entities.get(tableName);
|
|
|
|
if (!entityIdMap) {
|
|
entityIdMap = new Map();
|
|
}
|
|
|
|
entityIdMap.set(entity.id, _.cloneDeep(entity));
|
|
frothyBlock.entities.set(tableName, entityIdMap);
|
|
}
|
|
}
|
|
|
|
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]> {
|
|
const repo: Repository<BlockProgressInterface> = this._conn.getRepository('block_progress');
|
|
|
|
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
|
|
}
|
|
|
|
updateEntityCacheFrothyBlocks (blockProgress: BlockProgressInterface, clearEntitiesCacheInterval = DEFAULT_CLEAR_ENTITIES_CACHE_INTERVAL): void {
|
|
// Set latest block in frothy region to cachedEntities.frothyBlocks map.
|
|
if (!this.cachedEntities.frothyBlocks.has(blockProgress.blockHash)) {
|
|
this.cachedEntities.frothyBlocks.set(
|
|
blockProgress.blockHash,
|
|
{
|
|
blockNumber: blockProgress.blockNumber,
|
|
parentHash: blockProgress.parentHash,
|
|
entities: new Map()
|
|
}
|
|
);
|
|
}
|
|
|
|
log(`Size of cachedEntities.frothyBlocks map: ${this.cachedEntities.frothyBlocks.size}`);
|
|
this._measureCachedPrunedEntities();
|
|
|
|
// Check if it is time to clear entities cache.
|
|
if (blockProgress.blockNumber % clearEntitiesCacheInterval === 0) {
|
|
log(`Clearing cachedEntities.latestPrunedEntities at block ${blockProgress.blockNumber}`);
|
|
// Clearing only pruned region as frothy region cache gets updated in pruning queue.
|
|
this.cachedEntities.latestPrunedEntities.clear();
|
|
log(`Cleared cachedEntities.latestPrunedEntities. Map size: ${this.cachedEntities.latestPrunedEntities.size}`);
|
|
}
|
|
}
|
|
|
|
pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number): void {
|
|
const canonicalBlock = this.cachedEntities.frothyBlocks.get(canonicalBlockHash);
|
|
|
|
if (canonicalBlock) {
|
|
// Update latestPrunedEntities map with entities from latest canonical block.
|
|
canonicalBlock.entities.forEach((entityIdMap, entityTableName) => {
|
|
entityIdMap.forEach((data, id) => {
|
|
let entityIdMap = this.cachedEntities.latestPrunedEntities.get(entityTableName);
|
|
|
|
if (!entityIdMap) {
|
|
entityIdMap = new Map();
|
|
}
|
|
|
|
entityIdMap.set(id, data);
|
|
this.cachedEntities.latestPrunedEntities.set(entityTableName, entityIdMap);
|
|
});
|
|
});
|
|
}
|
|
|
|
// Remove pruned blocks from frothyBlocks.
|
|
const prunedBlockHashes = Array.from(this.cachedEntities.frothyBlocks.entries())
|
|
.filter(([, value]) => value.blockNumber <= canonicalBlockNumber)
|
|
.map(([blockHash]) => blockHash);
|
|
|
|
prunedBlockHashes.forEach(blockHash => this.cachedEntities.frothyBlocks.delete(blockHash));
|
|
}
|
|
|
|
async transformResults<Entity> (queryRunner: QueryRunner, qb: SelectQueryBuilder<Entity>, rawResults: any[]): Promise<any[]> {
|
|
const transformer = new RawSqlResultsToEntityTransformer(
|
|
qb.expressionMap,
|
|
queryRunner.manager.connection.driver,
|
|
[],
|
|
[],
|
|
queryRunner
|
|
);
|
|
assert(qb.expressionMap.mainAlias);
|
|
return transformer.transform(rawResults, qb.expressionMap.mainAlias);
|
|
}
|
|
|
|
async updateEntity<Entity> (queryRunner: QueryRunner, entityType: new () => Entity, criteria: any, update: any): Promise<UpdateResult> {
|
|
const repo = queryRunner.manager.getRepository(entityType);
|
|
return repo.createQueryBuilder()
|
|
.update()
|
|
.set(update)
|
|
.where(criteria)
|
|
.execute();
|
|
}
|
|
|
|
async pruneEntities (frothyEntityType: new () => any, queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set<new () => any>): Promise<void> {
|
|
// Assumption: all blocks are at same height
|
|
assert(blocks.length);
|
|
const blockNumber = blocks[0].blockNumber;
|
|
const blockHashes = blocks.map(block => block.blockHash);
|
|
|
|
// Get all entities at the block height
|
|
const entitiesAtHeight = await this._baseDatabase.getEntities(queryRunner, frothyEntityType, { where: { blockNumber } });
|
|
|
|
// Extract entity ids from result
|
|
const entityIdsMap: Map<string, string[]> = new Map();
|
|
|
|
entitiesAtHeight.forEach(entity =>
|
|
entityIdsMap.set(
|
|
entity.name,
|
|
[...entityIdsMap.get(entity.name) || [], entity.id]
|
|
)
|
|
);
|
|
|
|
// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
|
|
const updatePromises = [...entityTypes].map((entityType) => {
|
|
return this.updateEntity(
|
|
queryRunner,
|
|
entityType as any,
|
|
{ id: In(entityIdsMap.get(entityType.name) || []), blockHash: In(blockHashes) },
|
|
{ isPruned: true }
|
|
);
|
|
});
|
|
|
|
// Simultaneously update isPruned flag for all entities
|
|
await Promise.all(updatePromises);
|
|
|
|
// Update latest entity tables with canonical entries
|
|
await this.updateNonCanonicalLatestEntities(queryRunner, blockNumber, blockHashes);
|
|
}
|
|
|
|
async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise<void> {
|
|
// Update latest entity tables with canonical entries
|
|
await Promise.all(
|
|
Array.from(this._entityToLatestEntityMap.entries()).map(async ([entityType, latestEntityType]) => {
|
|
// Get entries for non canonical blocks
|
|
const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntityType, { where: { blockHash: In(nonCanonicalBlockHashes) } });
|
|
|
|
// Canonicalize latest entity table at given block height
|
|
await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, nonCanonicalLatestEntities, blockNumber);
|
|
})
|
|
);
|
|
}
|
|
|
|
async canonicalizeLatestEntity (queryRunner: QueryRunner, entityType: any, latestEntityType: any, entities: any[], blockNumber: number): Promise<void> {
|
|
const repo = queryRunner.manager.getRepository<ObjectLiteral>(entityType);
|
|
const latestEntityRepo = queryRunner.manager.getRepository<ObjectLiteral>(latestEntityType);
|
|
|
|
await Promise.all(entities.map(async (entity: any) => {
|
|
// Get latest pruned (canonical) version for the given entity
|
|
const prunedVersion = await this._baseDatabase.getLatestPrunedEntity(repo, entity.id, blockNumber);
|
|
|
|
// If found, update the latestEntity entry for the id
|
|
// Else, delete the latestEntity entry for the id
|
|
if (prunedVersion) {
|
|
// Create a latest entity instance and insert in the db
|
|
const latestEntity = getLatestEntityFromEntity(latestEntityRepo, prunedVersion);
|
|
|
|
await this.updateEntity(
|
|
queryRunner,
|
|
latestEntityType,
|
|
{ id: entity.id },
|
|
latestEntity
|
|
);
|
|
} else {
|
|
await this._baseDatabase.removeEntities(
|
|
queryRunner,
|
|
latestEntityType,
|
|
{ where: { id: entity.id } }
|
|
);
|
|
}
|
|
}));
|
|
}
|
|
|
|
async pruneFrothyEntities<Entity> (queryRunner: QueryRunner, frothyEntityType: new () => Entity, blockNumber: number): Promise<void> {
|
|
// Remove frothy entity entries at | below the prune block height
|
|
return this._baseDatabase.removeEntities(queryRunner, frothyEntityType, { where: { blockNumber: LessThanOrEqual(blockNumber) } });
|
|
}
|
|
|
|
async resetLatestEntities (queryRunner: QueryRunner, blockNumber: number): Promise<void> {
|
|
await Promise.all(
|
|
Array.from(this._entityToLatestEntityMap.entries()).map(async ([entityType, latestEntityType]) => {
|
|
// Get entries above the reset block
|
|
const entitiesToReset = await this._baseDatabase.getEntities(queryRunner, latestEntityType, { where: { blockNumber: MoreThan(blockNumber) } });
|
|
|
|
// Canonicalize latest entity table at the reset block height
|
|
await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, entitiesToReset, blockNumber);
|
|
})
|
|
);
|
|
}
|
|
|
|
buildFilter (where: { [key: string]: any } = {}): Where {
|
|
return Object.entries(where).reduce((acc: Where, [fieldWithSuffix, value]) => {
|
|
if (fieldWithSuffix === FILTER_CHANGE_BLOCK) {
|
|
assert(value.number_gte && typeof value.number_gte === 'number');
|
|
|
|
acc[FILTER_CHANGE_BLOCK] = [{
|
|
value: value.number_gte,
|
|
not: false
|
|
}];
|
|
|
|
return acc;
|
|
}
|
|
|
|
if (['and', 'or'].includes(fieldWithSuffix)) {
|
|
assert(Array.isArray(value));
|
|
|
|
// Parse all the comibations given in the array
|
|
acc[fieldWithSuffix] = value.map(w => {
|
|
return this.buildFilter(w);
|
|
});
|
|
|
|
return acc;
|
|
}
|
|
|
|
const [field, ...suffix] = fieldWithSuffix.split('_');
|
|
|
|
if (!acc[field]) {
|
|
acc[field] = [];
|
|
}
|
|
|
|
let op = suffix.shift();
|
|
|
|
// If op is "" (different from undefined), it means it's a nested filter on a relation field
|
|
if (op === '') {
|
|
(acc[field] as Filter[]).push({
|
|
// Parse nested filter value
|
|
value: this.buildFilter(value),
|
|
not: false,
|
|
operator: 'nested'
|
|
});
|
|
|
|
return acc;
|
|
}
|
|
|
|
const filter: Filter = {
|
|
value,
|
|
not: false,
|
|
operator: 'equals'
|
|
};
|
|
|
|
if (op === 'not') {
|
|
filter.not = true;
|
|
op = suffix.shift();
|
|
}
|
|
|
|
if (op) {
|
|
filter.operator = op as keyof typeof OPERATOR_MAP;
|
|
}
|
|
|
|
// If filter field ends with "nocase", use case insensitive version of the operator
|
|
if (suffix[suffix.length - 1] === 'nocase') {
|
|
filter.operator = `${op}_nocase` as keyof typeof OPERATOR_MAP;
|
|
}
|
|
|
|
(acc[field] as Filter[]).push(filter);
|
|
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
_measureCachedPrunedEntities (): void {
|
|
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
|
|
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);
|
|
|
|
log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
|
|
cachePrunedEntitiesCount.set(totalEntities);
|
|
}
|
|
|
|
_defragmentGQLQuerySelections (selections: ReadonlyArray<SelectionNode>, queryInfo: GraphQLResolveInfo): SelectionNode[] {
|
|
return selections.reduce((acc: SelectionNode[], selection) => {
|
|
if (selection.kind === 'FragmentSpread') {
|
|
const fragmentSelections = queryInfo.fragments[selection.name.value].selectionSet.selections;
|
|
|
|
return [...acc, ...fragmentSelections];
|
|
}
|
|
|
|
return [...acc, selection];
|
|
}, []);
|
|
}
|
|
|
|
_getGQLSelectionFieldArguments (fieldNode: FieldNode, queryInfo: GraphQLResolveInfo): { where: Where, queryOptions: QueryOptions } {
|
|
let where: Where = {};
|
|
const queryOptions: QueryOptions = {};
|
|
|
|
fieldNode.arguments?.forEach((arg: ArgumentNode) => {
|
|
switch (arg.name.value) {
|
|
case 'where':
|
|
where = this.buildFilter(this._buildWhereFromGQLArgValue((arg.value as ObjectValueNode), queryInfo));
|
|
break;
|
|
|
|
case 'first': {
|
|
const argValue = (arg.value.kind === 'Variable') ? queryInfo.variableValues[arg.value.name.value] : (arg.value as IntValueNode).value;
|
|
queryOptions.limit = Number(argValue);
|
|
break;
|
|
}
|
|
|
|
case 'skip': {
|
|
const argValue = (arg.value.kind === 'Variable') ? queryInfo.variableValues[arg.value.name.value] : (arg.value as IntValueNode).value;
|
|
queryOptions.skip = Number(argValue);
|
|
break;
|
|
}
|
|
|
|
case 'orderBy': {
|
|
const argValue = (arg.value.kind === 'Variable') ? queryInfo.variableValues[arg.value.name.value] : (arg.value as EnumValueNode).value;
|
|
queryOptions.orderBy = String(argValue);
|
|
break;
|
|
}
|
|
|
|
case 'orderDirection': {
|
|
const argValue = (arg.value.kind === 'Variable') ? queryInfo.variableValues[arg.value.name.value] : (arg.value as EnumValueNode).value;
|
|
queryOptions.orderDirection = argValue as OrderDirection;
|
|
break;
|
|
}
|
|
|
|
default:
|
|
throw new Error('Unrecognized query argument');
|
|
}
|
|
});
|
|
|
|
queryOptions.limit = queryOptions.limit || DEFAULT_LIMIT;
|
|
|
|
return { where, queryOptions };
|
|
}
|
|
|
|
_buildWhereFromGQLArgValue (whereArgValue: ObjectValueNode, queryInfo: GraphQLResolveInfo): { [key: string]: any } {
|
|
return whereArgValue.fields.reduce((acc: { [key: string]: any }, fieldNode: ObjectFieldNode) => {
|
|
acc[fieldNode.name.value] = this._parseGQLFieldValue(fieldNode.value, queryInfo);
|
|
return acc;
|
|
}, {});
|
|
}
|
|
|
|
_parseGQLFieldValue (value: ValueNode, queryInfo: GraphQLResolveInfo): any {
|
|
switch (value.kind) {
|
|
case 'BooleanValue':
|
|
case 'EnumValue':
|
|
case 'FloatValue':
|
|
case 'IntValue':
|
|
case 'StringValue':
|
|
return value.value;
|
|
|
|
case 'NullValue':
|
|
return null;
|
|
|
|
case 'Variable':
|
|
return queryInfo.variableValues[value.name.value];
|
|
|
|
case 'ListValue':
|
|
return value.values.map((valueNode) => this._parseGQLFieldValue(valueNode, queryInfo));
|
|
|
|
case 'ObjectValue':
|
|
return this._buildWhereFromGQLArgValue(value, queryInfo);
|
|
}
|
|
}
|
|
}
|