Add flag to load relations sequentially or concurrently (#225)

* Add flag to load relations sequentially or concurrently

* Fix database init in graph-node test

* Fix graph-node watcher query method and add changes to codegen
This commit is contained in:
nikugogoi 2022-11-14 14:23:46 +05:30 committed by GitHub
parent 13edff143b
commit 6f8ededd52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 178 additions and 148 deletions

View File

@ -41,7 +41,7 @@ export const handler = async (argv: any): Promise<void> => {
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -34,7 +34,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -59,7 +59,7 @@ const main = async (): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -44,7 +44,7 @@ const main = async (): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -45,7 +45,7 @@ const main = async (): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -97,7 +97,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -41,7 +41,7 @@ export const handler = async (argv: any): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -46,7 +46,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -61,7 +61,7 @@ const main = async (): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -37,7 +37,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -33,7 +33,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -47,7 +47,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -42,7 +42,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -32,7 +32,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -58,7 +58,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -65,7 +65,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -94,7 +94,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -43,7 +43,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -127,7 +127,7 @@ export const main = async (): Promise<void> => {
const baseDatabase = new BaseDatabase({ ...watcherConfig.database, entities: [entitiesDir] });
await baseDatabase.init();
db = new Database(baseDatabase);
db = new Database(watcherConfig.server, baseDatabase);
await db.init();
if (config.watcher.verifyState) {

View File

@ -28,6 +28,7 @@ import {
eventProcessingLoadEntityCount,
eventProcessingLoadEntityDBQueryDuration,
QueryOptions,
ServerConfig,
Where
} from '@cerc-io/util';
@ -61,6 +62,7 @@ interface CachedEntities {
}
export class Database {
_serverConfig: ServerConfig
_conn!: Connection
_baseDatabase: BaseDatabase
_entityQueryTypeMap: Map<new() => any, ENTITY_QUERY_TYPE>
@ -70,7 +72,8 @@ export class Database {
latestPrunedEntities: new Map()
}
constructor (baseDatabase: BaseDatabase, entityQueryTypeMap: Map<new() => any, ENTITY_QUERY_TYPE> = new Map()) {
constructor (serverConfig: ServerConfig, baseDatabase: BaseDatabase, entityQueryTypeMap: Map<new() => any, ENTITY_QUERY_TYPE> = new Map()) {
this._serverConfig = serverConfig;
this._baseDatabase = baseDatabase;
this._entityQueryTypeMap = entityQueryTypeMap;
}
@ -815,118 +818,41 @@ export class Database {
const relationSelections = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value]));
for (const selection of relationSelections) {
assert(selection.kind === 'Field');
const field = selection.name.value;
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field];
let childSelections = selection.selectionSet?.selections || [];
// Filter out __typename field in GQL for loading relations.
childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename'));
if (isDerived) {
const where: Where = {
[foreignKey]: [{
value: entities.map((entity: any) => entity.id),
not: false,
operator: 'in'
}]
};
const relatedEntities = await this.getEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => {
// Related entity might be loaded with data.
const parentEntityId = entity[foreignKey].id ?? entity[foreignKey];
if (!acc[parentEntityId]) {
acc[parentEntityId] = [];
}
if (acc[parentEntityId].length < DEFAULT_LIMIT) {
acc[parentEntityId].push(entity);
}
return acc;
}, {});
entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity.id]) {
entity[field] = relatedEntitiesMap[entity.id];
} else {
entity[field] = [];
}
});
continue;
if (this._serverConfig.loadRelationsSequential) {
for (const selection of relationSelections) {
await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection);
}
} else {
const loadRelationPromises = relationSelections.map(async selection => {
await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection);
});
if (isArray) {
const relatedIds = entities.reduce((acc: Set<string>, entity: any) => {
entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId));
await Promise.all(loadRelationPromises);
}
return acc;
}, new Set());
return entities;
}
const where: Where = {
id: [{
value: Array.from(relatedIds),
not: false,
operator: 'in'
}]
};
async loadRelation<Entity> (
queryRunner: QueryRunner,
block: BlockHeight,
relationsMap: Map<any, { [key: string]: any }>,
relations: { [key: string]: any },
entities: Entity[],
selection: SelectionNode
): Promise<void> {
assert(selection.kind === 'Field');
const field = selection.name.value;
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field];
let childSelections = selection.selectionSet?.selections || [];
const relatedEntities = await this.getEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);
entities.forEach((entity: any) => {
const relatedEntityIds: Set<string> = entity[field].reduce((acc: Set<string>, id: string) => {
acc.add(id);
return acc;
}, new Set());
entity[field] = [];
relatedEntities.forEach((relatedEntity: any) => {
if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) {
entity[field].push(relatedEntity);
}
});
});
continue;
}
// field is neither an array nor derivedFrom
// Avoid loading relation if selections only has id field.
if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') {
entities.forEach((entity: any) => {
entity[field] = { id: entity[field] };
});
continue;
}
// Filter out __typename field in GQL for loading relations.
childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename'));
if (isDerived) {
const where: Where = {
id: [{
value: entities.map((entity: any) => entity[field]),
[foreignKey]: [{
value: entities.map((entity: any) => entity.id),
not: false,
operator: 'in'
}]
@ -942,20 +868,116 @@ export class Database {
childSelections
);
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => {
acc[entity.id] = entity;
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => {
// Related entity might be loaded with data.
const parentEntityId = entity[foreignKey].id ?? entity[foreignKey];
if (!acc[parentEntityId]) {
acc[parentEntityId] = [];
}
if (acc[parentEntityId].length < DEFAULT_LIMIT) {
acc[parentEntityId].push(entity);
}
return acc;
}, {});
entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity[field]]) {
entity[field] = relatedEntitiesMap[entity[field]];
if (relatedEntitiesMap[entity.id]) {
entity[field] = relatedEntitiesMap[entity.id];
} else {
entity[field] = [];
}
});
return;
}
return entities;
if (isArray) {
const relatedIds = entities.reduce((acc: Set<string>, entity: any) => {
entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId));
return acc;
}, new Set());
const where: Where = {
id: [{
value: Array.from(relatedIds),
not: false,
operator: 'in'
}]
};
const relatedEntities = await this.getEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);
entities.forEach((entity: any) => {
const relatedEntityIds: Set<string> = entity[field].reduce((acc: Set<string>, id: string) => {
acc.add(id);
return acc;
}, new Set());
entity[field] = [];
relatedEntities.forEach((relatedEntity: any) => {
if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) {
entity[field].push(relatedEntity);
}
});
});
return;
}
// field is neither an array nor derivedFrom
// Avoid loading relation if selections only has id field.
if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') {
entities.forEach((entity: any) => {
entity[field] = { id: entity[field] };
});
return;
}
const where: Where = {
id: [{
value: entities.map((entity: any) => entity[field]),
not: false,
operator: 'in'
}]
};
const relatedEntities = await this.getEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => {
acc[entity.id] = entity;
return acc;
}, {});
entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity[field]]) {
entity[field] = relatedEntitiesMap[entity[field]];
}
});
}
async saveEntity (entity: string, data: any): Promise<any> {

View File

@ -338,6 +338,8 @@ export class GraphWatcher {
// Get entities from the database.
const entities = await this._database.getEntities(dbTx, entity, relationsMap, block, where, queryOptions, selections);
await dbTx.commitTransaction();
return entities;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;

View File

@ -9,7 +9,7 @@ import { StorageLayout } from '@cerc-io/solidity-mapper';
import { EventData } from '../../src/utils';
import { Database } from '../../src/database';
import { Indexer } from './indexer';
import { Indexer, ServerConfig } from './indexer';
const NETWORK_URL = 'http://127.0.0.1:8081';
const IPLD_ETH_SERVER_GQL_URL = 'http://127.0.0.1:8082/graphql';
@ -72,7 +72,8 @@ export const getDummyGraphData = (): any => {
export const getTestDatabase = (): Database => {
const baseDatabase = new BaseDatabase({ type: 'postgres' });
return new Database(baseDatabase);
const serverConfig = new ServerConfig();
return new Database(serverConfig, baseDatabase);
};
export const getTestIndexer = (storageLayout?: Map<string, StorageLayout>): Indexer => {

View File

@ -203,7 +203,7 @@ export class Indexer implements IndexerInterface {
}
}
class ServerConfig implements ServerConfigInterface {
export class ServerConfig implements ServerConfigInterface {
host: string;
port: number;
mode: string;
@ -217,6 +217,7 @@ class ServerConfig implements ServerConfigInterface {
maxEventsBlockRange: number;
clearEntitiesCacheInterval: number;
skipStateFieldsUpdate: boolean;
loadRelationsSequential: boolean;
constructor () {
this.host = '';
@ -232,5 +233,6 @@ class ServerConfig implements ServerConfigInterface {
this.maxEventsBlockRange = 0;
this.clearEntitiesCacheInterval = 0;
this.skipStateFieldsUpdate = false;
this.loadRelationsSequential = false;
}
}

View File

@ -37,7 +37,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -33,7 +33,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -47,7 +47,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -42,7 +42,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -32,7 +32,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -58,7 +58,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -59,7 +59,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -94,7 +94,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -43,7 +43,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -51,6 +51,9 @@ export interface ServerConfig {
// Max GQL API requests in queue until reject (defaults to -1, means do not reject).
maxRequestQueueLimit?: number;
// Boolean to load GQL query nested entity relations sequentially.
loadRelationsSequential: boolean;
}
export interface UpstreamConfig {