mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-28 19:12:06 +00:00
Implement heirarchical query for getting previous entity (#216)
* Implement heirarchical query for getting previous entity. * Check entity id in hierarchical query to limit number of recursions. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
9e1ed70d18
commit
23bfcc02dc
87
packages/uni-info-watcher/src/database.md
Normal file
87
packages/uni-info-watcher/src/database.md
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
# database
|
||||||
|
|
||||||
|
## Hierarchical Queries
|
||||||
|
|
||||||
|
For fetching previous entity that would be updated at a particular blockHash, we need to traverse the parent hashes. As the same entity might be present on a different branch chain with different values. These branches occur in the frothy region and so a recursive query is done to get the blockHash of the previous entity in this region.
|
||||||
|
|
||||||
|
Let the blockHash be `0xBlockHash` and the entity id be `entityId`, then the hierarchical query is
|
||||||
|
|
||||||
|
```pgsql
|
||||||
|
WITH RECURSIVE cte_query AS
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
b.block_hash,
|
||||||
|
b.block_number,
|
||||||
|
b.parent_hash,
|
||||||
|
1 as depth,
|
||||||
|
e.id
|
||||||
|
FROM
|
||||||
|
block_progress b
|
||||||
|
LEFT JOIN
|
||||||
|
entityTable e ON e.block_hash = b.block_hash
|
||||||
|
WHERE
|
||||||
|
b.block_hash = '0xBlockHash'
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
b.block_hash,
|
||||||
|
b.block_number,
|
||||||
|
b.parent_hash,
|
||||||
|
c.depth + 1,
|
||||||
|
e.id
|
||||||
|
FROM
|
||||||
|
block_progress b
|
||||||
|
LEFT JOIN
|
||||||
|
${repo.metadata.tableName} e
|
||||||
|
ON e.block_hash = b.block_hash
|
||||||
|
AND e.id = 'entityId'
|
||||||
|
INNER JOIN
|
||||||
|
cte_query c ON c.parent_hash = b.block_hash
|
||||||
|
WHERE
|
||||||
|
c.id IS NULL AND c.depth < 16
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
block_hash, block_number, id
|
||||||
|
FROM
|
||||||
|
cte_query
|
||||||
|
ORDER BY block_number ASC
|
||||||
|
LIMIT 1;
|
||||||
|
```
|
||||||
|
|
||||||
|
The second WHERE clause checks that the loop continues only till MAX_REORG_DEPTH `16` which specifies the frothy region or stop when the entity is found.
|
||||||
|
|
||||||
|
The resulting blockHash is then used to fetch the previous entity.
|
||||||
|
|
||||||
|
For fetching multiple entities, we fetch all the blockHashes in the frothy region. So it fetches the entities from the correct branch in the frothy and then from the pruned region.
|
||||||
|
|
||||||
|
Hierarchical query for getting blockHashes in the frothy region
|
||||||
|
|
||||||
|
```pgsql
|
||||||
|
WITH RECURSIVE cte_query AS
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
block_hash,
|
||||||
|
block_number,
|
||||||
|
parent_hash,
|
||||||
|
1 as depth
|
||||||
|
FROM
|
||||||
|
block_progress
|
||||||
|
WHERE
|
||||||
|
block_hash = '0xBlockHash'
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
b.block_hash,
|
||||||
|
b.block_number,
|
||||||
|
b.parent_hash,
|
||||||
|
c.depth + 1
|
||||||
|
FROM
|
||||||
|
block_progress b
|
||||||
|
INNER JOIN
|
||||||
|
cte_query c ON c.parent_hash = b.block_hash
|
||||||
|
WHERE
|
||||||
|
c.depth < 16
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
block_hash, block_number
|
||||||
|
FROM
|
||||||
|
cte_query;
|
||||||
|
```
|
@ -3,7 +3,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import assert from 'assert';
|
import assert from 'assert';
|
||||||
import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, In, LessThanOrEqual, QueryRunner, Repository } from 'typeorm';
|
import { Brackets, Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, FindOneOptions, LessThanOrEqual, QueryRunner, Repository } from 'typeorm';
|
||||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
|
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
|
||||||
import { MAX_REORG_DEPTH } from '@vulcanize/util';
|
import { MAX_REORG_DEPTH } from '@vulcanize/util';
|
||||||
|
|
||||||
@ -458,7 +458,7 @@ export class Database {
|
|||||||
.where(`subTable.id = ${tableName}.id`);
|
.where(`subTable.id = ${tableName}.id`);
|
||||||
|
|
||||||
if (block.hash) {
|
if (block.hash) {
|
||||||
const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(queryRunner, block.hash);
|
const { canonicalBlockNumber, blockHashes } = await this._getFrothyRegion(queryRunner, block.hash);
|
||||||
|
|
||||||
subQuery = subQuery
|
subQuery = subQuery
|
||||||
.andWhere(new Brackets(qb => {
|
.andWhere(new Brackets(qb => {
|
||||||
@ -783,38 +783,107 @@ export class Database {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async _getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
|
async _getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
|
||||||
|
// Check whether query is ordered by blockNumber to get the latest entity.
|
||||||
assert(findOptions.order.blockNumber);
|
assert(findOptions.order.blockNumber);
|
||||||
const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(queryRunner, findOptions.where.blockHash);
|
|
||||||
findOptions.where.blockHash = In(blockHashes);
|
|
||||||
let entity = await repo.findOne(findOptions);
|
|
||||||
|
|
||||||
if (!entity) {
|
// Hierarchical query for getting the entity in the frothy region.
|
||||||
delete findOptions.where.blockHash;
|
// TODO: Use syncStatus.latestCanonicalBlockNumber instead of MAX_REORG_DEPTH after pruning is implemented.
|
||||||
findOptions.where.blockNumber = LessThanOrEqual(canonicalBlockNumber);
|
const heirerchicalQuery = `
|
||||||
entity = await repo.findOne(findOptions);
|
WITH RECURSIVE cte_query AS
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
b.block_hash,
|
||||||
|
b.block_number,
|
||||||
|
b.parent_hash,
|
||||||
|
1 as depth,
|
||||||
|
e.id
|
||||||
|
FROM
|
||||||
|
block_progress b
|
||||||
|
LEFT JOIN
|
||||||
|
${repo.metadata.tableName} e ON e.block_hash = b.block_hash
|
||||||
|
WHERE
|
||||||
|
b.block_hash = $1
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
b.block_hash,
|
||||||
|
b.block_number,
|
||||||
|
b.parent_hash,
|
||||||
|
c.depth + 1,
|
||||||
|
e.id
|
||||||
|
FROM
|
||||||
|
block_progress b
|
||||||
|
LEFT JOIN
|
||||||
|
${repo.metadata.tableName} e
|
||||||
|
ON e.block_hash = b.block_hash
|
||||||
|
AND e.id = $2
|
||||||
|
INNER JOIN
|
||||||
|
cte_query c ON c.parent_hash = b.block_hash
|
||||||
|
WHERE
|
||||||
|
c.id IS NULL AND c.depth < $3
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
block_hash, block_number, id
|
||||||
|
FROM
|
||||||
|
cte_query
|
||||||
|
ORDER BY block_number ASC
|
||||||
|
LIMIT 1;
|
||||||
|
`;
|
||||||
|
|
||||||
|
// Fetching blockHash for previous entity in frothy region.
|
||||||
|
const [{ block_hash: blockHash, block_number: blockNumber, id }] = await queryRunner.query(heirerchicalQuery, [findOptions.where.blockHash, findOptions.where.id, MAX_REORG_DEPTH]);
|
||||||
|
|
||||||
|
if (id) {
|
||||||
|
// Entity found in frothy region.
|
||||||
|
findOptions.where.blockHash = blockHash;
|
||||||
|
return repo.findOne(findOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
return entity;
|
// If entity not found in frothy region get latest entity in the pruned region.
|
||||||
|
delete findOptions.where.blockHash;
|
||||||
|
const canonicalBlockNumber = blockNumber + 1;
|
||||||
|
findOptions.where.blockNumber = LessThanOrEqual(canonicalBlockNumber);
|
||||||
|
return repo.findOne(findOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _getBranchInfo (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
|
async _getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
|
||||||
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
|
// TODO: Use syncStatus.latestCanonicalBlockNumber instead of MAX_REORG_DEPTH after pruning is implemented.
|
||||||
let block = await blockRepo.findOne({ blockHash });
|
const heirerchicalQuery = `
|
||||||
assert(block);
|
WITH RECURSIVE cte_query AS
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
block_hash,
|
||||||
|
block_number,
|
||||||
|
parent_hash,
|
||||||
|
1 as depth
|
||||||
|
FROM
|
||||||
|
block_progress
|
||||||
|
WHERE
|
||||||
|
block_hash = $1
|
||||||
|
UNION ALL
|
||||||
|
SELECT
|
||||||
|
b.block_hash,
|
||||||
|
b.block_number,
|
||||||
|
b.parent_hash,
|
||||||
|
c.depth + 1
|
||||||
|
FROM
|
||||||
|
block_progress b
|
||||||
|
INNER JOIN
|
||||||
|
cte_query c ON c.parent_hash = b.block_hash
|
||||||
|
WHERE
|
||||||
|
c.depth < $2
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
block_hash, block_number
|
||||||
|
FROM
|
||||||
|
cte_query;
|
||||||
|
`;
|
||||||
|
|
||||||
// TODO: Should be calcualted from chainHeadBlockNumber?
|
// Get blocks in the frothy region using heirarchical query.
|
||||||
const canonicalBlockNumber = block.blockNumber - MAX_REORG_DEPTH;
|
const blocks = await queryRunner.query(heirerchicalQuery, [blockHash, MAX_REORG_DEPTH]);
|
||||||
|
const blockHashes = blocks.map(({ block_hash: blockHash }: any) => blockHash);
|
||||||
|
|
||||||
const syncStatus = await this.getSyncStatus(queryRunner);
|
// Canonical block is the block after the last block in frothy region.
|
||||||
assert(syncStatus);
|
const canonicalBlockNumber = blocks[blocks.length - 1].block_number + 1;
|
||||||
const blockHashes = [block.blockHash];
|
|
||||||
|
|
||||||
while (block.blockNumber > canonicalBlockNumber && block.blockNumber > syncStatus.latestCanonicalBlockNumber) {
|
|
||||||
blockHash = block.parentHash;
|
|
||||||
block = await blockRepo.findOne({ blockHash });
|
|
||||||
assert(block);
|
|
||||||
blockHashes.push(block.blockHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
return { canonicalBlockNumber, blockHashes };
|
return { canonicalBlockNumber, blockHashes };
|
||||||
}
|
}
|
||||||
|
@ -260,7 +260,7 @@ export class Indexer {
|
|||||||
let res;
|
let res;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
res = this._db.getBundle(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
|
res = await this._db.getBundle(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
|
||||||
await dbTx.commitTransaction();
|
await dbTx.commitTransaction();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await dbTx.rollbackTransaction();
|
await dbTx.rollbackTransaction();
|
||||||
@ -294,7 +294,7 @@ export class Indexer {
|
|||||||
let res;
|
let res;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
res = this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
|
res = await this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number });
|
||||||
await dbTx.commitTransaction();
|
await dbTx.commitTransaction();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await dbTx.rollbackTransaction();
|
await dbTx.rollbackTransaction();
|
||||||
|
Loading…
Reference in New Issue
Block a user