diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index efc76d73..61ded81a 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -725,10 +725,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned); } - async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise { + async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.markBlockAsPruned(repo, block); + return this._baseDatabase.markBlocksAsPruned(repo, blocks); } async getBlockProgress (blockHash: string): Promise { @@ -742,6 +742,22 @@ export class Database implements DatabaseInterface { return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); } + async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { + return this._baseDatabase.getEntities(queryRunner, entity, findConditions); + } + + async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { + return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); + } + + async isEntityEmpty (entity: new () => Entity): Promise { + return this._baseDatabase.isEntityEmpty(entity); + } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseDatabase.getAncestorAtDepth(blockHash, depth); + } + async _getPrevEntityVersion (queryRunner: QueryRunner, repo: Repository, findOptions: { [key: string]: any }): Promise { // Check whether query is ordered by blockNumber to get the latest entity. assert(findOptions.order.blockNumber); @@ -847,16 +863,4 @@ export class Database implements DatabaseInterface { return { canonicalBlockNumber, blockHashes }; } - - async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { - return this._baseDatabase.getEntities(queryRunner, entity, findConditions); - } - - async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { - return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); - } - - async isEntityEmpty (entity: new () => Entity): Promise { - return this._baseDatabase.isEntityEmpty(entity); - } } diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 02947d53..24507bda 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -215,12 +215,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise { - return this._baseIndexer.blockIsAncestor(ancestorBlockHash, blockHash, maxDepth); - } - - async markBlockAsPruned (block: BlockProgress): Promise { - return this._baseIndexer.markBlockAsPruned(block); + async markBlocksAsPruned (blocks: BlockProgress[]): Promise { + return this._baseIndexer.markBlocksAsPruned(blocks); } async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { @@ -324,6 +320,10 @@ export class Indexer implements IndexerInterface { return res; } + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseIndexer.getAncestorAtDepth(blockHash, depth); + } + async _fetchAndSaveEvents (block: DeepPartial): Promise { assert(block.blockHash); const events = await this._uniClient.getEvents(block.blockHash); diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index c1b867ac..f46cd650 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -148,10 +148,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned); } - async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise { + async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.markBlockAsPruned(repo, block); + return this._baseDatabase.markBlocksAsPruned(repo, blocks); } async getBlockProgress (blockHash: string): Promise { @@ -176,4 +176,8 @@ export class Database implements DatabaseInterface { async isEntityEmpty (entity: new () => Entity): Promise { return this._baseDatabase.isEntityEmpty(entity); } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseDatabase.getAncestorAtDepth(blockHash, depth); + } } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 5a152a2a..4b624150 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -445,12 +445,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise { - return this._baseIndexer.blockIsAncestor(ancestorBlockHash, blockHash, maxDepth); - } - - async markBlockAsPruned (block: BlockProgress): Promise { - return this._baseIndexer.markBlockAsPruned(block); + async markBlocksAsPruned (blocks: BlockProgress[]): Promise { + return this._baseIndexer.markBlocksAsPruned(blocks); } async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { @@ -506,6 +502,15 @@ export class Indexer implements IndexerInterface { }; } + async getContract (type: string): Promise { + const contract = await this._db.getLatestContract(type); + return contract; + } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._baseIndexer.getAncestorAtDepth(blockHash, depth); + } + // TODO: Move into base/class or framework package. async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise { return getStorageValue( @@ -517,9 +522,4 @@ export class Indexer implements IndexerInterface { ...mappingKeys ); } - - async getContract (type: string): Promise { - const contract = await this._db.getLatestContract(type); - return contract; - } } diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 62643596..68268e2a 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, QueryRunner, Repository } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, In, QueryRunner, Repository } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types'; @@ -115,9 +115,10 @@ export class Database { } } - async markBlockAsPruned (repo: Repository, block: BlockProgressInterface): Promise { - block.isPruned = true; - return repo.save(block); + async markBlocksAsPruned (repo: Repository, blocks: BlockProgressInterface[]): Promise { + const ids = blocks.map(({ id }) => id); + + await repo.update({ id: In(ids) }, { isPruned: true }); } async getEvent (repo: Repository, id: string): Promise { @@ -204,4 +205,44 @@ export class Database { const entities = await repo.find(findConditions); await repo.remove(entities); } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + const heirerchicalQuery = ` + WITH RECURSIVE cte_query AS + ( + SELECT + block_hash, + block_number, + parent_hash, + 0 as depth + FROM + block_progress + WHERE + block_hash = $1 + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1 + FROM + block_progress b + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.depth < $2 + ) + SELECT + block_hash, block_number + FROM + cte_query + ORDER BY block_number ASC + LIMIT 1; + `; + + // Get ancestor block hash using heirarchical query. + const [{ block_hash: ancestorBlockHash }] = await this._conn.query(heirerchicalQuery, [blockHash, depth]); + + return ancestorBlockHash; + } } diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index a792c968..5a01994f 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -102,39 +102,11 @@ export class Indexer { return this._db.getBlocksAtHeight(height, isPruned); } - async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise { - assert(maxDepth > 0); - - let depth = 0; - let currentBlockHash = blockHash; - let currentBlock; - - // TODO: Use a hierarchical query to optimize this. - while (depth < maxDepth) { - depth++; - - currentBlock = await this._db.getBlockProgress(currentBlockHash); - if (!currentBlock) { - break; - } else { - if (currentBlock.parentHash === ancestorBlockHash) { - return true; - } - - // Descend the chain. - currentBlockHash = currentBlock.parentHash; - } - } - - return false; - } - - async markBlockAsPruned (block: BlockProgressInterface): Promise { + async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise { const dbTx = await this._db.createTransactionRunner(); - let res; try { - res = await this._db.markBlockAsPruned(dbTx, block); + await this._db.markBlocksAsPruned(dbTx, blocks); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -142,8 +114,6 @@ export class Indexer { } finally { await dbTx.release(); } - - return res; } async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { @@ -185,4 +155,8 @@ export class Indexer { async getBlockEvents (blockHash: string): Promise> { return this._db.getBlockEvents(blockHash); } + + async getAncestorAtDepth (blockHash: string, depth: number): Promise { + return this._db.getAncestorAtDepth(blockHash, depth); + } } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 0cdb9b61..88eb4ea1 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -128,13 +128,14 @@ export class JobRunner { if (blocksAtHeight.length > 1) { const [indexedBlock] = await this._indexer.getBlocksAtHeight(pruneBlockHeight + MAX_REORG_DEPTH, false); - for (let i = 0; i < blocksAtHeight.length; i++) { - const block = blocksAtHeight[i]; - // If this block is not reachable from the indexed block at max reorg depth from prune height, mark it as pruned. - const isAncestor = await this._indexer.blockIsAncestor(block.blockHash, indexedBlock.blockHash, MAX_REORG_DEPTH); - if (!isAncestor) { - await this._indexer.markBlockAsPruned(block); - } + // Get ancestor blockHash from indexed block at prune height. + const ancestorBlockHash = await this._indexer.getAncestorAtDepth(indexedBlock.blockHash, MAX_REORG_DEPTH); + + const blocksToBePruned = blocksAtHeight.filter(block => ancestorBlockHash !== block.blockHash); + + if (blocksToBePruned.length) { + // Mark blocks pruned which are not the ancestor block. + await this._indexer.markBlocksAsPruned(blocksToBePruned); } } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 6fc7782a..fe2cc3fa 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -46,12 +46,12 @@ export interface IndexerInterface { getBlock (blockHash: string): Promise getBlocksAtHeight (height: number, isPruned: boolean): Promise; getBlockEvents (blockHash: string): Promise> - blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise; + getAncestorAtDepth (blockHash: string, depth: number): Promise updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise - markBlockAsPruned (block: BlockProgressInterface): Promise; + markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise; } export interface EventWatcherInterface { @@ -67,7 +67,8 @@ export interface DatabaseInterface { getBlockEvents (blockHash: string): Promise; getEvent (id: string): Promise getSyncStatus (queryRunner: QueryRunner): Promise - markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgressInterface): Promise; + getAncestorAtDepth (blockHash: string, depth: number): Promise + markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise; updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise;