diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 5a34bc11..c8ea1cc0 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -482,7 +482,7 @@ export class Indexer implements IndexerInterface { async markBlocksAsPruned (blocks: BlockProgress[]): Promise { await this._baseIndexer.markBlocksAsPruned(blocks); - await this._graphWatcher.pruneEntities(blocks, ENTITIES); + await this._graphWatcher.pruneEntities(FrothyEntity, blocks, ENTITIES); } async pruneFrothyEntities (blockNumber: number): Promise { diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index 82f8c55d..47d17bc5 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -1368,42 +1368,73 @@ export class Database { .execute(); } - async pruneEntities (queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set any>) { + async pruneEntities (frothyEntityType: new () => any, queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set any>) { // Assumption: all blocks are at same height assert(blocks.length); const blockNumber = blocks[0].blockNumber; const blockHashes = blocks.map(block => block.blockHash); // Get all entities at the block height - const entitiesAtBlock = await Promise.all( - [...entityTypes].map(entityType => { - return this._baseDatabase.getEntities( - queryRunner, - entityType as any, - { - select: ['id'] as any, - where: { blockNumber } - } - ); - }) - ); + const entitiesAtHeight = await this._baseDatabase.getEntities(queryRunner, frothyEntityType, { where: { blockNumber } }); // Extract entity ids from result - const entityIds = entitiesAtBlock.map(entities => { - return entities.map((entity: any) => entity.id); - }); + const entityIdsMap: Map = new Map(); + + entitiesAtHeight.forEach(entity => + entityIdsMap.set( + entity.name, + [...entityIdsMap.get(entity.name) || [], entity.id] + ) + ); // Update isPruned flag using fetched entity ids and hashes of blocks to be pruned - const updatePromises = [...entityTypes].map((entity, index: number) => { + const updatePromises = [...entityTypes].map((entity) => { return this.updateEntity( queryRunner, entity as any, - { id: In(entityIds[index]), blockHash: In(blockHashes) }, + { id: In(entityIdsMap.get(entity.name) || []), blockHash: In(blockHashes) }, { isPruned: true } ); }); + // Simultaneously update isPruned flag for all entities await Promise.all(updatePromises); + + // Update latest entity tables with canonical entries + await this.updateNonCanonicalLatestEntities(queryRunner, blockNumber, blockHashes); + } + + async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise { + // Update latest entity tables with canonical entries + await Promise.all( + Array.from(this._entityToLatestEntityMap.entries()).map(async ([entity, latestEntity]) => { + // Get entries for non canonical blocks + const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntity, { where: { blockHash: In(nonCanonicalBlockHashes) } }); + + await Promise.all(nonCanonicalLatestEntities.map(async (nonCanonicalLatestEntity: any) => { + // Get pruned version for the non canonical entity + const repo = queryRunner.manager.getRepository(entity); + const prunedVersion = await this._baseDatabase.getLatestPrunedEntityWithoutJoin(repo, nonCanonicalLatestEntity.id, blockNumber); + + // If found, update the latestEntity entry for the id + // Else, delete the latestEntity entry for the id + if (prunedVersion) { + return this.updateEntity( + queryRunner, + latestEntity, + { id: nonCanonicalLatestEntity.id }, + prunedVersion + ); + } else { + return this._baseDatabase.removeEntities( + queryRunner, + latestEntity, + { where: { id: nonCanonicalLatestEntity.id } } + ); + } + })); + }) + ); } async pruneFrothyEntities (queryRunner: QueryRunner, frothyEntityType: new () => Entity, blockNumber: number): Promise { diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 206f3be6..4287569c 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -358,11 +358,11 @@ export class GraphWatcher { this._database.updateEntityCacheFrothyBlocks(blockProgress, this._indexer.serverConfig.clearEntitiesCacheInterval); } - async pruneEntities (prunedBlocks: BlockProgressInterface[], entityTypes: Set any>) { + async pruneEntities (frothyEntityType: new () => any, prunedBlocks: BlockProgressInterface[], entityTypes: Set any>) { const dbTx = await this._database.createTransactionRunner(); try { - await this._database.pruneEntities(dbTx, prunedBlocks, entityTypes); + await this._database.pruneEntities(frothyEntityType, dbTx, prunedBlocks, entityTypes); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index cfce1392..417874d8 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -495,6 +495,20 @@ export class Database { return entityInPrunedRegion; } + async getLatestPrunedEntityWithoutJoin (repo: Repository, id: string, canonicalBlockNumber: number): Promise { + // Filter out latest entity from pruned blocks. + + const entityInPrunedRegion = await repo.createQueryBuilder('entity') + .where('entity.id = :id', { id }) + .andWhere('entity.is_pruned = false') + .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) + .orderBy('entity.block_number', 'DESC') + .limit(1) + .getOne(); + + return entityInPrunedRegion; + } + async getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { const heirerchicalQuery = ` WITH RECURSIVE cte_query AS