diff --git a/packages/uni-info-watcher/src/database.md b/packages/uni-info-watcher/src/database.md new file mode 100644 index 00000000..5e38e6c7 --- /dev/null +++ b/packages/uni-info-watcher/src/database.md @@ -0,0 +1,87 @@ +# database + +## Hierarchical Queries + +For fetching previous entity that would be updated at a particular blockHash, we need to traverse the parent hashes. As the same entity might be present on a different branch chain with different values. These branches occur in the frothy region and so a recursive query is done to get the blockHash of the previous entity in this region. + +Let the blockHash be `0xBlockHash` and the entity id be `entityId`, then the hierarchical query is + +```pgsql +WITH RECURSIVE cte_query AS +( + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + 1 as depth, + e.id + FROM + block_progress b + LEFT JOIN + entityTable e ON e.block_hash = b.block_hash + WHERE + b.block_hash = '0xBlockHash' + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1, + e.id + FROM + block_progress b + LEFT JOIN + ${repo.metadata.tableName} e + ON e.block_hash = b.block_hash + AND e.id = 'entityId' + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.id IS NULL AND c.depth < 16 +) +SELECT + block_hash, block_number, id +FROM + cte_query +ORDER BY block_number ASC +LIMIT 1; +``` + +The second WHERE clause checks that the loop continues only till MAX_REORG_DEPTH `16` which specifies the frothy region or stop when the entity is found. + +The resulting blockHash is then used to fetch the previous entity. + +For fetching multiple entities, we fetch all the blockHashes in the frothy region. So it fetches the entities from the correct branch in the frothy and then from the pruned region. + +Hierarchical query for getting blockHashes in the frothy region + +```pgsql +WITH RECURSIVE cte_query AS +( + SELECT + block_hash, + block_number, + parent_hash, + 1 as depth + FROM + block_progress + WHERE + block_hash = '0xBlockHash' + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1 + FROM + block_progress b + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.depth < 16 +) +SELECT + block_hash, block_number +FROM + cte_query; +``` diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 2b7128f1..74fd16ca 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -3,7 +3,7 @@ // import assert from 'assert'; -import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, QueryRunner, Repository } from 'typeorm'; +import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, LessThanOrEqual, QueryRunner, Repository } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { MAX_REORG_DEPTH } from '@vulcanize/util'; @@ -458,7 +458,7 @@ export class Database { .where(`subTable.id = ${tableName}.id`); if (block.hash) { - const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(queryRunner, block.hash); + const { canonicalBlockNumber, blockHashes } = await this._getFrothyRegion(queryRunner, block.hash); subQuery = subQuery .andWhere(new Brackets(qb => { @@ -783,38 +783,107 @@ export class Database { } async _getPrevEntityVersion (queryRunner: QueryRunner, repo: Repository, findOptions: { [key: string]: any }): Promise { + // Check whether query is ordered by blockNumber to get the latest entity. assert(findOptions.order.blockNumber); - const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(queryRunner, findOptions.where.blockHash); - findOptions.where.blockHash = In(blockHashes); - let entity = await repo.findOne(findOptions); - if (!entity) { - delete findOptions.where.blockHash; - findOptions.where.blockNumber = LessThanOrEqual(canonicalBlockNumber); - entity = await repo.findOne(findOptions); + // Hierarchical query for getting the entity in the frothy region. + // TODO: Use syncStatus.latestCanonicalBlockNumber instead of MAX_REORG_DEPTH after pruning is implemented. + const heirerchicalQuery = ` + WITH RECURSIVE cte_query AS + ( + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + 1 as depth, + e.id + FROM + block_progress b + LEFT JOIN + ${repo.metadata.tableName} e ON e.block_hash = b.block_hash + WHERE + b.block_hash = $1 + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1, + e.id + FROM + block_progress b + LEFT JOIN + ${repo.metadata.tableName} e + ON e.block_hash = b.block_hash + AND e.id = $2 + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.id IS NULL AND c.depth < $3 + ) + SELECT + block_hash, block_number, id + FROM + cte_query + ORDER BY block_number ASC + LIMIT 1; + `; + + // Fetching blockHash for previous entity in frothy region. + const [{ block_hash: blockHash, block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [findOptions.where.blockHash, findOptions.where.id, MAX_REORG_DEPTH]); + + if (id) { + // Entity found in frothy region. + findOptions.where.blockHash = blockHash; + return repo.findOne(findOptions); } - return entity; + // If entity not found in frothy region get latest entity in the pruned region. + delete findOptions.where.blockHash; + const canonicalBlockNumber = blockNumber + 1; + findOptions.where.blockNumber = LessThanOrEqual(canonicalBlockNumber); + return repo.findOne(findOptions); } - async _getBranchInfo (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { - const blockRepo = queryRunner.manager.getRepository(BlockProgress); - let block = await blockRepo.findOne({ blockHash }); - assert(block); + async _getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { + // TODO: Use syncStatus.latestCanonicalBlockNumber instead of MAX_REORG_DEPTH after pruning is implemented. + const heirerchicalQuery = ` + WITH RECURSIVE cte_query AS + ( + SELECT + block_hash, + block_number, + parent_hash, + 1 as depth + FROM + block_progress + WHERE + block_hash = $1 + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1 + FROM + block_progress b + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.depth < $2 + ) + SELECT + block_hash, block_number + FROM + cte_query; + `; - // TODO: Should be calcualted from chainHeadBlockNumber? - const canonicalBlockNumber = block.blockNumber - MAX_REORG_DEPTH; + // Get blocks in the frothy region using heirarchical query. + const blocks = await queryRunner.query(heirerchicalQuery, [blockHash, MAX_REORG_DEPTH]); + const blockHashes = blocks.map(({ block_hash: blockHash }: any) => blockHash); - const syncStatus = await this.getSyncStatus(queryRunner); - assert(syncStatus); - const blockHashes = [block.blockHash]; - - while (block.blockNumber > canonicalBlockNumber && block.blockNumber > syncStatus.latestCanonicalBlockNumber) { - blockHash = block.parentHash; - block = await blockRepo.findOne({ blockHash }); - assert(block); - blockHashes.push(block.blockHash); - } + // Canonical block is the block after the last block in frothy region. + const canonicalBlockNumber = blocks[blocks.length - 1].block_number + 1; return { canonicalBlockNumber, blockHashes }; } diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 51d33c54..6faf0ae5 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -260,7 +260,7 @@ export class Indexer { let res; try { - res = this._db.getBundle(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); + res = await this._db.getBundle(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -294,7 +294,7 @@ export class Indexer { let res; try { - res = this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); + res = await this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number }); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction();