mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-21 10:39:06 +00:00
Refactor code in graph-node to use in uniswap watcher (#205)
* Refactor code in graph-node to use in uniswap * Move over database methods to fetch entities for GQL query
This commit is contained in:
parent
5af90bd388
commit
668875b3a0
13
packages/graph-node/.npmignore
Normal file
13
packages/graph-node/.npmignore
Normal file
@ -0,0 +1,13 @@
|
||||
assembly
|
||||
environments
|
||||
/src/
|
||||
/test/
|
||||
hardhat.config.ts
|
||||
index.ts
|
||||
tsconfig.json
|
||||
.eslintrc.json
|
||||
.eslintignore
|
||||
.env
|
||||
.env.example
|
||||
.mocharc.yml
|
||||
ipld-demo.md
|
@ -3,7 +3,6 @@
|
||||
"version": "0.2.13",
|
||||
"main": "dist/index.js",
|
||||
"license": "AGPL-3.0",
|
||||
"private": true,
|
||||
"devDependencies": {
|
||||
"@graphprotocol/graph-ts": "^0.22.0",
|
||||
"@nomiclabs/hardhat-ethers": "^2.0.2",
|
||||
|
@ -10,9 +10,11 @@ import {
|
||||
FindOneOptions,
|
||||
LessThanOrEqual,
|
||||
QueryRunner,
|
||||
Repository
|
||||
Repository,
|
||||
SelectQueryBuilder
|
||||
} from 'typeorm';
|
||||
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
|
||||
import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer';
|
||||
import { SelectionNode } from 'graphql';
|
||||
import _ from 'lodash';
|
||||
import debug from 'debug';
|
||||
@ -20,16 +22,27 @@ import debug from 'debug';
|
||||
import {
|
||||
BlockHeight,
|
||||
BlockProgressInterface,
|
||||
cachePrunedEntitiesCount,
|
||||
Database as BaseDatabase,
|
||||
eventProcessingLoadEntityCacheHitCount,
|
||||
eventProcessingLoadEntityCount,
|
||||
eventProcessingLoadEntityDBQueryDuration,
|
||||
QueryOptions,
|
||||
Where
|
||||
} from '@cerc-io/util';
|
||||
|
||||
import { Block, fromEntityValue, fromStateEntityValues, toEntityValue } from './utils';
|
||||
import { Block, fromEntityValue, fromStateEntityValues, resolveEntityFieldConflicts, toEntityValue } from './utils';
|
||||
|
||||
const log = debug('vulcanize:graph-database');
|
||||
|
||||
export const DEFAULT_LIMIT = 100;
|
||||
const DEFAULT_CLEAR_ENTITIES_CACHE_INTERVAL = 1000;
|
||||
|
||||
export enum ENTITY_QUERY_TYPE {
|
||||
SINGULAR,
|
||||
DISTINCT_ON,
|
||||
GROUP_BY
|
||||
}
|
||||
|
||||
interface CachedEntities {
|
||||
frothyBlocks: Map<
|
||||
@ -47,13 +60,14 @@ export class Database {
|
||||
_config: ConnectionOptions
|
||||
_conn!: Connection
|
||||
_baseDatabase: BaseDatabase
|
||||
_entityQueryTypeMap: Map<new() => any, ENTITY_QUERY_TYPE>
|
||||
|
||||
_cachedEntities: CachedEntities = {
|
||||
frothyBlocks: new Map(),
|
||||
latestPrunedEntities: new Map()
|
||||
}
|
||||
|
||||
constructor (config: ConnectionOptions, entitiesPath: string) {
|
||||
constructor (config: ConnectionOptions, entitiesPath: string, entityQueryTypeMap: Map<new() => any, ENTITY_QUERY_TYPE> = new Map()) {
|
||||
assert(config);
|
||||
|
||||
this._config = {
|
||||
@ -63,6 +77,7 @@ export class Database {
|
||||
};
|
||||
|
||||
this._baseDatabase = new BaseDatabase(this._config);
|
||||
this._entityQueryTypeMap = entityQueryTypeMap;
|
||||
}
|
||||
|
||||
get cachedEntities () {
|
||||
@ -81,6 +96,77 @@ export class Database {
|
||||
return this._baseDatabase.createTransactionRunner();
|
||||
}
|
||||
|
||||
async getModelEntity<Entity> (repo: Repository<Entity>, whereOptions: any): Promise<Entity | undefined> {
|
||||
eventProcessingLoadEntityCount.inc();
|
||||
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
if (findOptions.where.blockHash) {
|
||||
// Check cache only if latestPrunedEntities is updated.
|
||||
// latestPrunedEntities is updated when frothyBlocks is filled till canonical block height.
|
||||
if (this._cachedEntities.latestPrunedEntities.size > 0) {
|
||||
let frothyBlock = this._cachedEntities.frothyBlocks.get(findOptions.where.blockHash);
|
||||
let canonicalBlockNumber = -1;
|
||||
|
||||
// Loop through frothy region until latest entity is found.
|
||||
while (frothyBlock) {
|
||||
const entity = frothyBlock.entities
|
||||
.get(repo.metadata.tableName)
|
||||
?.get(findOptions.where.id);
|
||||
|
||||
if (entity) {
|
||||
eventProcessingLoadEntityCacheHitCount.inc();
|
||||
return _.cloneDeep(entity) as Entity;
|
||||
}
|
||||
|
||||
canonicalBlockNumber = frothyBlock.blockNumber + 1;
|
||||
frothyBlock = this._cachedEntities.frothyBlocks.get(frothyBlock.parentHash);
|
||||
}
|
||||
|
||||
// Canonical block number is not assigned if blockHash does not exist in frothy region.
|
||||
// Get latest pruned entity from cache only if blockHash exists in frothy region.
|
||||
// i.e. Latest entity in cache is the version before frothy region.
|
||||
if (canonicalBlockNumber > -1) {
|
||||
// If entity not found in frothy region get latest entity in the pruned region.
|
||||
// Check if latest entity is cached in pruned region.
|
||||
const entity = this._cachedEntities.latestPrunedEntities
|
||||
.get(repo.metadata.tableName)
|
||||
?.get(findOptions.where.id);
|
||||
|
||||
if (entity) {
|
||||
eventProcessingLoadEntityCacheHitCount.inc();
|
||||
return _.cloneDeep(entity) as Entity;
|
||||
}
|
||||
|
||||
// Get latest pruned entity from DB if not found in cache.
|
||||
const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer();
|
||||
const dbEntity = await this._baseDatabase.getLatestPrunedEntity(repo, findOptions.where.id, canonicalBlockNumber);
|
||||
endTimer();
|
||||
|
||||
if (dbEntity) {
|
||||
// Update latest pruned entity in cache.
|
||||
this.cacheUpdatedEntity(repo, dbEntity, true);
|
||||
}
|
||||
|
||||
return dbEntity;
|
||||
}
|
||||
}
|
||||
|
||||
const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer();
|
||||
const dbEntity = await this._baseDatabase.getPrevEntityVersion(repo.queryRunner!, repo, findOptions);
|
||||
endTimer();
|
||||
|
||||
return dbEntity;
|
||||
}
|
||||
|
||||
return repo.findOne(findOptions);
|
||||
}
|
||||
|
||||
async getEntity<Entity> (entityName: string, id: string, blockHash?: string): Promise<Entity | undefined> {
|
||||
const queryRunner = this._conn.createQueryRunner();
|
||||
|
||||
@ -93,74 +179,11 @@ export class Database {
|
||||
whereOptions.blockHash = blockHash;
|
||||
}
|
||||
|
||||
const findOptions = {
|
||||
where: whereOptions,
|
||||
order: {
|
||||
blockNumber: 'DESC'
|
||||
}
|
||||
};
|
||||
|
||||
if (findOptions.where.blockHash) {
|
||||
// Check cache only if latestPrunedEntities is updated.
|
||||
// latestPrunedEntities is updated when frothyBlocks is filled till canonical block height.
|
||||
if (this._cachedEntities.latestPrunedEntities.size > 0) {
|
||||
let frothyBlock = this._cachedEntities.frothyBlocks.get(findOptions.where.blockHash);
|
||||
let canonicalBlockNumber = -1;
|
||||
|
||||
// Loop through frothy region until latest entity is found.
|
||||
while (frothyBlock) {
|
||||
const entity = frothyBlock.entities
|
||||
.get(repo.metadata.tableName)
|
||||
?.get(findOptions.where.id);
|
||||
|
||||
if (entity) {
|
||||
eventProcessingLoadEntityCacheHitCount.inc();
|
||||
return _.cloneDeep(entity) as Entity;
|
||||
}
|
||||
|
||||
canonicalBlockNumber = frothyBlock.blockNumber + 1;
|
||||
frothyBlock = this._cachedEntities.frothyBlocks.get(frothyBlock.parentHash);
|
||||
}
|
||||
|
||||
// Canonical block number is not assigned if blockHash does not exist in frothy region.
|
||||
// Get latest pruned entity from cache only if blockHash exists in frothy region.
|
||||
// i.e. Latest entity in cache is the version before frothy region.
|
||||
if (canonicalBlockNumber > -1) {
|
||||
// If entity not found in frothy region get latest entity in the pruned region.
|
||||
// Check if latest entity is cached in pruned region.
|
||||
const entity = this._cachedEntities.latestPrunedEntities
|
||||
.get(repo.metadata.tableName)
|
||||
?.get(findOptions.where.id);
|
||||
|
||||
if (entity) {
|
||||
eventProcessingLoadEntityCacheHitCount.inc();
|
||||
return _.cloneDeep(entity) as Entity;
|
||||
}
|
||||
|
||||
// Get latest pruned entity from DB if not found in cache.
|
||||
const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer();
|
||||
const dbEntity = await this._baseDatabase.getLatestPrunedEntity(repo, findOptions.where.id, canonicalBlockNumber);
|
||||
endTimer();
|
||||
|
||||
if (dbEntity) {
|
||||
// Update latest pruned entity in cache.
|
||||
this.cacheUpdatedEntity(entityName, dbEntity, true);
|
||||
}
|
||||
|
||||
return dbEntity;
|
||||
}
|
||||
}
|
||||
|
||||
const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer();
|
||||
const dbEntity = await this._baseDatabase.getPrevEntityVersion(repo.queryRunner!, repo, findOptions);
|
||||
endTimer();
|
||||
|
||||
return dbEntity;
|
||||
}
|
||||
|
||||
return repo.findOne(findOptions as FindOneOptions<any>);
|
||||
const entity = await this.getModelEntity(repo, whereOptions);
|
||||
return entity;
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
log(error);
|
||||
throw error;
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
}
|
||||
@ -342,6 +365,42 @@ export class Database {
|
||||
where: Where = {},
|
||||
queryOptions: QueryOptions = {},
|
||||
selections: ReadonlyArray<SelectionNode> = []
|
||||
): Promise<Entity[]> {
|
||||
let entities: Entity[];
|
||||
|
||||
// Use different suitable query patterns based on entities.
|
||||
switch (this._entityQueryTypeMap.get(entity)) {
|
||||
case ENTITY_QUERY_TYPE.SINGULAR:
|
||||
entities = await this.getEntitiesSingular(queryRunner, entity, block, where);
|
||||
break;
|
||||
|
||||
case ENTITY_QUERY_TYPE.DISTINCT_ON:
|
||||
entities = await this.getEntitiesDistinctOn(queryRunner, entity, block, where, queryOptions);
|
||||
break;
|
||||
|
||||
default:
|
||||
// Use group by query if entity query type is not specified in map.
|
||||
entities = await this.getEntitiesGroupBy(queryRunner, entity, block, where, queryOptions);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!entities.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
entities = await this.loadEntitiesRelations(queryRunner, block, relationsMap, entity, entities, selections);
|
||||
// Resolve any field name conflicts in the entity result.
|
||||
entities = entities.map(entity => resolveEntityFieldConflicts(entity));
|
||||
|
||||
return entities;
|
||||
}
|
||||
|
||||
async getEntitiesGroupBy<Entity> (
|
||||
queryRunner: QueryRunner,
|
||||
entity: new () => Entity,
|
||||
block: BlockHeight,
|
||||
where: Where = {},
|
||||
queryOptions: QueryOptions = {}
|
||||
): Promise<Entity[]> {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
const { tableName } = repo.metadata;
|
||||
@ -394,11 +453,105 @@ export class Database {
|
||||
|
||||
const entities = await selectQueryBuilder.getMany();
|
||||
|
||||
if (!entities.length) {
|
||||
return [];
|
||||
return entities;
|
||||
}
|
||||
|
||||
async getEntitiesDistinctOn<Entity> (
|
||||
queryRunner: QueryRunner,
|
||||
entity: new () => Entity,
|
||||
block: BlockHeight,
|
||||
where: Where = {},
|
||||
queryOptions: QueryOptions = {}
|
||||
): Promise<Entity[]> {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
|
||||
let subQuery = repo.createQueryBuilder('subTable')
|
||||
.distinctOn(['subTable.id'])
|
||||
.addFrom('block_progress', 'blockProgress')
|
||||
.where('subTable.block_hash = blockProgress.block_hash')
|
||||
.andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false })
|
||||
.addOrderBy('subTable.id', 'ASC')
|
||||
.addOrderBy('subTable.block_number', 'DESC');
|
||||
|
||||
if (block.hash) {
|
||||
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
|
||||
|
||||
subQuery = subQuery
|
||||
.andWhere(new Brackets(qb => {
|
||||
qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes })
|
||||
.orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
|
||||
}));
|
||||
}
|
||||
|
||||
return this.loadEntitiesRelations(queryRunner, block, relationsMap, entity, entities, selections);
|
||||
if (block.number) {
|
||||
subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number });
|
||||
}
|
||||
|
||||
subQuery = this._baseDatabase.buildQuery(repo, subQuery, where);
|
||||
|
||||
let selectQueryBuilder = queryRunner.manager.createQueryBuilder()
|
||||
.from(
|
||||
`(${subQuery.getQuery()})`,
|
||||
'latestEntities'
|
||||
)
|
||||
.setParameters(subQuery.getParameters());
|
||||
|
||||
if (queryOptions.orderBy) {
|
||||
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, queryOptions, 'subTable_');
|
||||
if (queryOptions.orderBy !== 'id') {
|
||||
selectQueryBuilder = this._baseDatabase.orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }, 'subTable_');
|
||||
}
|
||||
}
|
||||
|
||||
if (queryOptions.skip) {
|
||||
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
|
||||
}
|
||||
|
||||
if (queryOptions.limit) {
|
||||
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
|
||||
}
|
||||
|
||||
let entities = await selectQueryBuilder.getRawMany();
|
||||
entities = await this._transformResults(queryRunner, repo.createQueryBuilder('subTable'), entities);
|
||||
|
||||
return entities as Entity[];
|
||||
}
|
||||
|
||||
async getEntitiesSingular<Entity> (
|
||||
queryRunner: QueryRunner,
|
||||
entity: new () => Entity,
|
||||
block: BlockHeight,
|
||||
where: Where = {}
|
||||
): Promise<Entity[]> {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
const { tableName } = repo.metadata;
|
||||
|
||||
let selectQueryBuilder = repo.createQueryBuilder(tableName)
|
||||
.addFrom('block_progress', 'blockProgress')
|
||||
.where(`${tableName}.block_hash = blockProgress.block_hash`)
|
||||
.andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false })
|
||||
.addOrderBy(`${tableName}.block_number`, 'DESC')
|
||||
.limit(1);
|
||||
|
||||
if (block.hash) {
|
||||
const { canonicalBlockNumber, blockHashes } = await this._baseDatabase.getFrothyRegion(queryRunner, block.hash);
|
||||
|
||||
selectQueryBuilder = selectQueryBuilder
|
||||
.andWhere(new Brackets(qb => {
|
||||
qb.where(`${tableName}.block_hash IN (:...blockHashes)`, { blockHashes })
|
||||
.orWhere(`${tableName}.block_number <= :canonicalBlockNumber`, { canonicalBlockNumber });
|
||||
}));
|
||||
}
|
||||
|
||||
if (block.number) {
|
||||
selectQueryBuilder = selectQueryBuilder.andWhere(`${tableName}.block_number <= :blockNumber`, { blockNumber: block.number });
|
||||
}
|
||||
|
||||
selectQueryBuilder = this._baseDatabase.buildQuery(repo, selectQueryBuilder, where);
|
||||
|
||||
const entities = await selectQueryBuilder.getMany();
|
||||
|
||||
return entities as Entity[];
|
||||
}
|
||||
|
||||
async loadEntitiesRelations<Entity> (
|
||||
@ -676,8 +829,12 @@ export class Database {
|
||||
}, {});
|
||||
}
|
||||
|
||||
cacheUpdatedEntity (entityName: string, entity: any, pruned = false): void {
|
||||
cacheUpdatedEntityByName (entityName: string, entity: any, pruned = false): void {
|
||||
const repo = this._conn.getRepository(entityName);
|
||||
this.cacheUpdatedEntity(repo, entity, pruned);
|
||||
}
|
||||
|
||||
cacheUpdatedEntity<Entity> (repo: Repository<Entity>, entity: any, pruned = false): void {
|
||||
const tableName = repo.metadata.tableName;
|
||||
|
||||
if (pruned) {
|
||||
@ -713,4 +870,76 @@ export class Database {
|
||||
|
||||
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
|
||||
}
|
||||
|
||||
updateEntityCacheFrothyBlocks (blockProgress: BlockProgressInterface, clearEntitiesCacheInterval = DEFAULT_CLEAR_ENTITIES_CACHE_INTERVAL): void {
|
||||
// Set latest block in frothy region to cachedEntities.frothyBlocks map.
|
||||
if (!this.cachedEntities.frothyBlocks.has(blockProgress.blockHash)) {
|
||||
this.cachedEntities.frothyBlocks.set(
|
||||
blockProgress.blockHash,
|
||||
{
|
||||
blockNumber: blockProgress.blockNumber,
|
||||
parentHash: blockProgress.parentHash,
|
||||
entities: new Map()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
log(`Size of cachedEntities.frothyBlocks map: ${this.cachedEntities.frothyBlocks.size}`);
|
||||
this._measureCachedPrunedEntities();
|
||||
|
||||
// Check if it is time to clear entities cache.
|
||||
if (blockProgress.blockNumber % clearEntitiesCacheInterval === 0) {
|
||||
log(`Clearing cachedEntities.latestPrunedEntities at block ${blockProgress.blockNumber}`);
|
||||
// Clearing only pruned region as frothy region cache gets updated in pruning queue.
|
||||
this.cachedEntities.latestPrunedEntities.clear();
|
||||
log(`Cleared cachedEntities.latestPrunedEntities. Map size: ${this.cachedEntities.latestPrunedEntities.size}`);
|
||||
}
|
||||
}
|
||||
|
||||
pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) {
|
||||
const canonicalBlock = this.cachedEntities.frothyBlocks.get(canonicalBlockHash);
|
||||
|
||||
if (canonicalBlock) {
|
||||
// Update latestPrunedEntities map with entities from latest canonical block.
|
||||
canonicalBlock.entities.forEach((entityIdMap, entityTableName) => {
|
||||
entityIdMap.forEach((data, id) => {
|
||||
let entityIdMap = this.cachedEntities.latestPrunedEntities.get(entityTableName);
|
||||
|
||||
if (!entityIdMap) {
|
||||
entityIdMap = new Map();
|
||||
}
|
||||
|
||||
entityIdMap.set(id, data);
|
||||
this.cachedEntities.latestPrunedEntities.set(entityTableName, entityIdMap);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Remove pruned blocks from frothyBlocks.
|
||||
const prunedBlockHashes = Array.from(this.cachedEntities.frothyBlocks.entries())
|
||||
.filter(([, value]) => value.blockNumber <= canonicalBlockNumber)
|
||||
.map(([blockHash]) => blockHash);
|
||||
|
||||
prunedBlockHashes.forEach(blockHash => this.cachedEntities.frothyBlocks.delete(blockHash));
|
||||
}
|
||||
|
||||
_measureCachedPrunedEntities () {
|
||||
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
|
||||
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);
|
||||
|
||||
log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
|
||||
cachePrunedEntitiesCount.set(totalEntities);
|
||||
}
|
||||
|
||||
async _transformResults<Entity> (queryRunner: QueryRunner, qb: SelectQueryBuilder<Entity>, rawResults: any[]): Promise<any[]> {
|
||||
const transformer = new RawSqlResultsToEntityTransformer(
|
||||
qb.expressionMap,
|
||||
queryRunner.manager.connection.driver,
|
||||
[],
|
||||
[],
|
||||
queryRunner
|
||||
);
|
||||
assert(qb.expressionMap.mainAlias);
|
||||
return transformer.transform(rawResults, qb.expressionMap.mainAlias);
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,6 @@
|
||||
export * from './watcher';
|
||||
export * from './database';
|
||||
export { prepareEntityState } from './utils';
|
||||
export {
|
||||
prepareEntityState,
|
||||
updateEntitiesFromState
|
||||
} from './utils';
|
||||
|
@ -15,7 +15,7 @@ import debug from 'debug';
|
||||
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
import loader from '@vulcanize/assemblyscript/lib/loader';
|
||||
import { IndexerInterface, GraphDecimal, getGraphDigitsAndExp, eventProcessingLoadEntityCount } from '@cerc-io/util';
|
||||
import { IndexerInterface, GraphDecimal, getGraphDigitsAndExp } from '@cerc-io/util';
|
||||
|
||||
import { TypeId, Level } from './types';
|
||||
import {
|
||||
@ -74,7 +74,6 @@ export const instantiate = async (
|
||||
const entityId = __getString(id);
|
||||
|
||||
assert(context.block);
|
||||
eventProcessingLoadEntityCount.inc();
|
||||
const entityData = await database.getEntity(entityName, entityId, context.block.blockHash);
|
||||
|
||||
if (!entityData) {
|
||||
@ -97,7 +96,7 @@ export const instantiate = async (
|
||||
assert(context.block);
|
||||
const dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance);
|
||||
const dbEntity = await database.saveEntity(entityName, dbData);
|
||||
database.cacheUpdatedEntity(entityName, dbEntity);
|
||||
database.cacheUpdatedEntityByName(entityName, dbEntity);
|
||||
|
||||
// Update the in-memory subgraph state if not disabled.
|
||||
// TODO: enableSubgraphState
|
||||
|
@ -7,10 +7,11 @@ import { ValueTransformer } from 'typeorm';
|
||||
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
|
||||
import assert from 'assert';
|
||||
|
||||
import { GraphDecimal, jsonBigIntStringReplacer } from '@cerc-io/util';
|
||||
import { GraphDecimal, IndexerInterface, jsonBigIntStringReplacer, StateInterface } from '@cerc-io/util';
|
||||
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
|
||||
|
||||
import { TypeId, EthereumValueKind, ValueKind } from './types';
|
||||
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
|
||||
import { Database } from './database';
|
||||
|
||||
const log = debug('vulcanize:utils');
|
||||
|
||||
@ -873,3 +874,26 @@ export const fromStateEntityValues = (
|
||||
|
||||
return stateEntity[propertyName];
|
||||
};
|
||||
|
||||
export const updateEntitiesFromState = async (database: Database, indexer: IndexerInterface, state: StateInterface) => {
|
||||
const data = indexer.getStateData(state);
|
||||
|
||||
// Get relations for subgraph entity
|
||||
assert(indexer.getRelationsMap);
|
||||
const relationsMap = indexer.getRelationsMap();
|
||||
|
||||
for (const [entityName, entities] of Object.entries(data.state)) {
|
||||
const result = Array.from(relationsMap.entries())
|
||||
.find(([key]) => key.name === entityName);
|
||||
|
||||
const relations = result ? result[1] : {};
|
||||
|
||||
log(`Updating entities from State for entity ${entityName}`);
|
||||
console.time(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
|
||||
for (const [id, entityData] of Object.entries(entities as any)) {
|
||||
const dbData = database.fromState(state.block, entityName, entityData, relations);
|
||||
await database.saveEntity(entityName, dbData);
|
||||
}
|
||||
console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
|
||||
}
|
||||
};
|
||||
|
@ -12,9 +12,9 @@ import { SelectionNode } from 'graphql';
|
||||
|
||||
import { ResultObject } from '@vulcanize/assemblyscript/lib/loader';
|
||||
import { EthClient } from '@cerc-io/ipld-eth-client';
|
||||
import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, StateInterface, IndexerInterface, BlockProgressInterface, cachePrunedEntitiesCount } from '@cerc-io/util';
|
||||
import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, StateInterface, IndexerInterface, BlockProgressInterface } from '@cerc-io/util';
|
||||
|
||||
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils';
|
||||
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction, updateEntitiesFromState } from './utils';
|
||||
import { Context, GraphData, instantiate } from './loader';
|
||||
import { Database, DEFAULT_LIMIT } from './database';
|
||||
|
||||
@ -338,9 +338,6 @@ export class GraphWatcher {
|
||||
// Get entities from the database.
|
||||
const entities = await this._database.getEntities(dbTx, entity, relationsMap, block, where, queryOptions, selections);
|
||||
await dbTx.commitTransaction();
|
||||
|
||||
// Resolve any field name conflicts in the entity result.
|
||||
return entities.map(entity => resolveEntityFieldConflicts(entity));
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
@ -351,79 +348,16 @@ export class GraphWatcher {
|
||||
|
||||
async updateEntitiesFromState (state: StateInterface) {
|
||||
assert(this._indexer);
|
||||
const data = this._indexer.getStateData(state);
|
||||
|
||||
for (const [entityName, entities] of Object.entries(data.state)) {
|
||||
// Get relations for subgraph entity
|
||||
assert(this._indexer.getRelationsMap);
|
||||
const relationsMap = this._indexer.getRelationsMap();
|
||||
|
||||
const result = Array.from(relationsMap.entries())
|
||||
.find(([key]) => key.name === entityName);
|
||||
|
||||
const relations = result ? result[1] : {};
|
||||
|
||||
log(`Updating entities from State for entity ${entityName}`);
|
||||
console.time(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
|
||||
for (const [id, entityData] of Object.entries(entities as any)) {
|
||||
const dbData = this._database.fromState(state.block, entityName, entityData, relations);
|
||||
await this._database.saveEntity(entityName, dbData);
|
||||
}
|
||||
console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromState-update-entity-${entityName}`);
|
||||
}
|
||||
await updateEntitiesFromState(this._database, this._indexer, state);
|
||||
}
|
||||
|
||||
updateEntityCacheFrothyBlocks (blockProgress: BlockProgressInterface): void {
|
||||
// Set latest block in frothy region to cachedEntities.frothyBlocks map.
|
||||
if (!this._database.cachedEntities.frothyBlocks.has(blockProgress.blockHash)) {
|
||||
this._database.cachedEntities.frothyBlocks.set(
|
||||
blockProgress.blockHash,
|
||||
{
|
||||
blockNumber: blockProgress.blockNumber,
|
||||
parentHash: blockProgress.parentHash,
|
||||
entities: new Map()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
log(`Size of cachedEntities.frothyBlocks map: ${this._database.cachedEntities.frothyBlocks.size}`);
|
||||
this._measureCachedPrunedEntities();
|
||||
|
||||
assert(this._indexer);
|
||||
// Check if it is time to clear entities cache.
|
||||
if (blockProgress.blockNumber % this._indexer.serverConfig.clearEntitiesCacheInterval === 0) {
|
||||
log(`Clearing cachedEntities.latestPrunedEntities at block ${blockProgress.blockNumber}`);
|
||||
// Clearing only pruned region as frothy region cache gets updated in pruning queue.
|
||||
this._database.cachedEntities.latestPrunedEntities.clear();
|
||||
log(`Cleared cachedEntities.latestPrunedEntities. Map size: ${this._database.cachedEntities.latestPrunedEntities.size}`);
|
||||
}
|
||||
this._database.updateEntityCacheFrothyBlocks(blockProgress, this._indexer.serverConfig.clearEntitiesCacheInterval);
|
||||
}
|
||||
|
||||
pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) {
|
||||
const canonicalBlock = this._database.cachedEntities.frothyBlocks.get(canonicalBlockHash);
|
||||
|
||||
if (canonicalBlock) {
|
||||
// Update latestPrunedEntities map with entities from latest canonical block.
|
||||
canonicalBlock.entities.forEach((entityIdMap, entityTableName) => {
|
||||
entityIdMap.forEach((data, id) => {
|
||||
let entityIdMap = this._database.cachedEntities.latestPrunedEntities.get(entityTableName);
|
||||
|
||||
if (!entityIdMap) {
|
||||
entityIdMap = new Map();
|
||||
}
|
||||
|
||||
entityIdMap.set(id, data);
|
||||
this._database.cachedEntities.latestPrunedEntities.set(entityTableName, entityIdMap);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Remove pruned blocks from frothyBlocks.
|
||||
const prunedBlockHashes = Array.from(this._database.cachedEntities.frothyBlocks.entries())
|
||||
.filter(([, value]) => value.blockNumber <= canonicalBlockNumber)
|
||||
.map(([blockHash]) => blockHash);
|
||||
|
||||
prunedBlockHashes.forEach(blockHash => this._database.cachedEntities.frothyBlocks.delete(blockHash));
|
||||
this._database.pruneEntityCacheFrothyBlocks(canonicalBlockHash, canonicalBlockNumber);
|
||||
}
|
||||
|
||||
_clearCachedEntities () {
|
||||
@ -431,14 +365,6 @@ export class GraphWatcher {
|
||||
this._database.cachedEntities.latestPrunedEntities.clear();
|
||||
}
|
||||
|
||||
_measureCachedPrunedEntities () {
|
||||
const totalEntities = Array.from(this._database.cachedEntities.latestPrunedEntities.values())
|
||||
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);
|
||||
|
||||
log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
|
||||
cachePrunedEntitiesCount.set(totalEntities);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to reinstantiate WASM instance for specified dataSource.
|
||||
* @param dataSourceName
|
||||
|
@ -925,8 +925,8 @@ export class Indexer {
|
||||
data.state = _.merge(data.state, diff.state);
|
||||
}
|
||||
|
||||
i = endBlockHeight;
|
||||
console.timeEnd(`time:indexer#_mergeDiffsInRange-${i}-${endBlockHeight}-${contractAddress}`);
|
||||
i = endBlockHeight;
|
||||
}
|
||||
|
||||
return data;
|
||||
|
Loading…
Reference in New Issue
Block a user