mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-22 19:19:05 +00:00
Update latest entity tables on chain pruning (#235)
This commit is contained in:
parent
1ad223db4d
commit
693c23d192
@ -482,7 +482,7 @@ export class Indexer implements IndexerInterface {
|
||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||
await this._baseIndexer.markBlocksAsPruned(blocks);
|
||||
|
||||
await this._graphWatcher.pruneEntities(blocks, ENTITIES);
|
||||
await this._graphWatcher.pruneEntities(FrothyEntity, blocks, ENTITIES);
|
||||
}
|
||||
|
||||
async pruneFrothyEntities (blockNumber: number): Promise<void> {
|
||||
|
@ -1368,42 +1368,73 @@ export class Database {
|
||||
.execute();
|
||||
}
|
||||
|
||||
async pruneEntities (queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
|
||||
async pruneEntities (frothyEntityType: new () => any, queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
|
||||
// Assumption: all blocks are at same height
|
||||
assert(blocks.length);
|
||||
const blockNumber = blocks[0].blockNumber;
|
||||
const blockHashes = blocks.map(block => block.blockHash);
|
||||
|
||||
// Get all entities at the block height
|
||||
const entitiesAtBlock = await Promise.all(
|
||||
[...entityTypes].map(entityType => {
|
||||
return this._baseDatabase.getEntities(
|
||||
queryRunner,
|
||||
entityType as any,
|
||||
{
|
||||
select: ['id'] as any,
|
||||
where: { blockNumber }
|
||||
}
|
||||
);
|
||||
})
|
||||
);
|
||||
const entitiesAtHeight = await this._baseDatabase.getEntities(queryRunner, frothyEntityType, { where: { blockNumber } });
|
||||
|
||||
// Extract entity ids from result
|
||||
const entityIds = entitiesAtBlock.map(entities => {
|
||||
return entities.map((entity: any) => entity.id);
|
||||
});
|
||||
const entityIdsMap: Map<string, string[]> = new Map();
|
||||
|
||||
entitiesAtHeight.forEach(entity =>
|
||||
entityIdsMap.set(
|
||||
entity.name,
|
||||
[...entityIdsMap.get(entity.name) || [], entity.id]
|
||||
)
|
||||
);
|
||||
|
||||
// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
|
||||
const updatePromises = [...entityTypes].map((entity, index: number) => {
|
||||
const updatePromises = [...entityTypes].map((entity) => {
|
||||
return this.updateEntity(
|
||||
queryRunner,
|
||||
entity as any,
|
||||
{ id: In(entityIds[index]), blockHash: In(blockHashes) },
|
||||
{ id: In(entityIdsMap.get(entity.name) || []), blockHash: In(blockHashes) },
|
||||
{ isPruned: true }
|
||||
);
|
||||
});
|
||||
|
||||
// Simultaneously update isPruned flag for all entities
|
||||
await Promise.all(updatePromises);
|
||||
|
||||
// Update latest entity tables with canonical entries
|
||||
await this.updateNonCanonicalLatestEntities(queryRunner, blockNumber, blockHashes);
|
||||
}
|
||||
|
||||
async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise<void> {
|
||||
// Update latest entity tables with canonical entries
|
||||
await Promise.all(
|
||||
Array.from(this._entityToLatestEntityMap.entries()).map(async ([entity, latestEntity]) => {
|
||||
// Get entries for non canonical blocks
|
||||
const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntity, { where: { blockHash: In(nonCanonicalBlockHashes) } });
|
||||
|
||||
await Promise.all(nonCanonicalLatestEntities.map(async (nonCanonicalLatestEntity: any) => {
|
||||
// Get pruned version for the non canonical entity
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
const prunedVersion = await this._baseDatabase.getLatestPrunedEntityWithoutJoin(repo, nonCanonicalLatestEntity.id, blockNumber);
|
||||
|
||||
// If found, update the latestEntity entry for the id
|
||||
// Else, delete the latestEntity entry for the id
|
||||
if (prunedVersion) {
|
||||
return this.updateEntity(
|
||||
queryRunner,
|
||||
latestEntity,
|
||||
{ id: nonCanonicalLatestEntity.id },
|
||||
prunedVersion
|
||||
);
|
||||
} else {
|
||||
return this._baseDatabase.removeEntities(
|
||||
queryRunner,
|
||||
latestEntity,
|
||||
{ where: { id: nonCanonicalLatestEntity.id } }
|
||||
);
|
||||
}
|
||||
}));
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
async pruneFrothyEntities<Entity> (queryRunner: QueryRunner, frothyEntityType: new () => Entity, blockNumber: number): Promise<void> {
|
||||
|
@ -358,11 +358,11 @@ export class GraphWatcher {
|
||||
this._database.updateEntityCacheFrothyBlocks(blockProgress, this._indexer.serverConfig.clearEntitiesCacheInterval);
|
||||
}
|
||||
|
||||
async pruneEntities (prunedBlocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
|
||||
async pruneEntities (frothyEntityType: new () => any, prunedBlocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
|
||||
const dbTx = await this._database.createTransactionRunner();
|
||||
|
||||
try {
|
||||
await this._database.pruneEntities(dbTx, prunedBlocks, entityTypes);
|
||||
await this._database.pruneEntities(frothyEntityType, dbTx, prunedBlocks, entityTypes);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
|
@ -495,6 +495,20 @@ export class Database {
|
||||
return entityInPrunedRegion;
|
||||
}
|
||||
|
||||
async getLatestPrunedEntityWithoutJoin<Entity> (repo: Repository<Entity>, id: string, canonicalBlockNumber: number): Promise<Entity | undefined> {
|
||||
// Filter out latest entity from pruned blocks.
|
||||
|
||||
const entityInPrunedRegion = await repo.createQueryBuilder('entity')
|
||||
.where('entity.id = :id', { id })
|
||||
.andWhere('entity.is_pruned = false')
|
||||
.andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
|
||||
.orderBy('entity.block_number', 'DESC')
|
||||
.limit(1)
|
||||
.getOne();
|
||||
|
||||
return entityInPrunedRegion;
|
||||
}
|
||||
|
||||
async getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
|
||||
const heirerchicalQuery = `
|
||||
WITH RECURSIVE cte_query AS
|
||||
|
Loading…
Reference in New Issue
Block a user