mirror of
https://github.com/cerc-io/watcher-ts
synced 2024-11-19 12:26:19 +00:00
Implement hierarchical query for checking ancestor block during pruning (#225)
* Implement hierarchical query for checking ancestor block. * Prune blocks together in single query. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
2ddedfefe7
commit
885a55e513
@ -725,10 +725,10 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
|
||||
}
|
||||
|
||||
async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise<BlockProgress> {
|
||||
async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise<void> {
|
||||
const repo = queryRunner.manager.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.markBlockAsPruned(repo, block);
|
||||
return this._baseDatabase.markBlocksAsPruned(repo, blocks);
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
@ -742,6 +742,22 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
|
||||
}
|
||||
|
||||
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {
|
||||
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
|
||||
}
|
||||
|
||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||
}
|
||||
|
||||
async isEntityEmpty<Entity> (entity: new () => Entity): Promise<boolean> {
|
||||
return this._baseDatabase.isEntityEmpty(entity);
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
|
||||
async _getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
|
||||
// Check whether query is ordered by blockNumber to get the latest entity.
|
||||
assert(findOptions.order.blockNumber);
|
||||
@ -847,16 +863,4 @@ export class Database implements DatabaseInterface {
|
||||
|
||||
return { canonicalBlockNumber, blockHashes };
|
||||
}
|
||||
|
||||
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {
|
||||
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
|
||||
}
|
||||
|
||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||
}
|
||||
|
||||
async isEntityEmpty<Entity> (entity: new () => Entity): Promise<boolean> {
|
||||
return this._baseDatabase.isEntityEmpty(entity);
|
||||
}
|
||||
}
|
||||
|
@ -215,12 +215,8 @@ export class Indexer implements IndexerInterface {
|
||||
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
|
||||
}
|
||||
|
||||
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
|
||||
return this._baseIndexer.blockIsAncestor(ancestorBlockHash, blockHash, maxDepth);
|
||||
}
|
||||
|
||||
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
|
||||
return this._baseIndexer.markBlockAsPruned(block);
|
||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||
return this._baseIndexer.markBlocksAsPruned(blocks);
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
@ -324,6 +320,10 @@ export class Indexer implements IndexerInterface {
|
||||
return res;
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
|
||||
async _fetchAndSaveEvents (block: DeepPartial<BlockProgress>): Promise<void> {
|
||||
assert(block.blockHash);
|
||||
const events = await this._uniClient.getEvents(block.blockHash);
|
||||
|
@ -148,10 +148,10 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
|
||||
}
|
||||
|
||||
async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise<BlockProgress> {
|
||||
async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise<void> {
|
||||
const repo = queryRunner.manager.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.markBlockAsPruned(repo, block);
|
||||
return this._baseDatabase.markBlocksAsPruned(repo, blocks);
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
@ -176,4 +176,8 @@ export class Database implements DatabaseInterface {
|
||||
async isEntityEmpty<Entity> (entity: new () => Entity): Promise<boolean> {
|
||||
return this._baseDatabase.isEntityEmpty(entity);
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
}
|
||||
|
@ -445,12 +445,8 @@ export class Indexer implements IndexerInterface {
|
||||
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
|
||||
}
|
||||
|
||||
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
|
||||
return this._baseIndexer.blockIsAncestor(ancestorBlockHash, blockHash, maxDepth);
|
||||
}
|
||||
|
||||
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
|
||||
return this._baseIndexer.markBlockAsPruned(block);
|
||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||
return this._baseIndexer.markBlocksAsPruned(blocks);
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
@ -506,6 +502,15 @@ export class Indexer implements IndexerInterface {
|
||||
};
|
||||
}
|
||||
|
||||
async getContract (type: string): Promise<any> {
|
||||
const contract = await this._db.getLatestContract(type);
|
||||
return contract;
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
|
||||
// TODO: Move into base/class or framework package.
|
||||
async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {
|
||||
return getStorageValue(
|
||||
@ -517,9 +522,4 @@ export class Indexer implements IndexerInterface {
|
||||
...mappingKeys
|
||||
);
|
||||
}
|
||||
|
||||
async getContract (type: string): Promise<any> {
|
||||
const contract = await this._db.getLatestContract(type);
|
||||
return contract;
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, QueryRunner, Repository } from 'typeorm';
|
||||
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, In, QueryRunner, Repository } from 'typeorm';
|
||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
|
||||
|
||||
import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types';
|
||||
@ -115,9 +115,10 @@ export class Database {
|
||||
}
|
||||
}
|
||||
|
||||
async markBlockAsPruned (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface): Promise<BlockProgressInterface> {
|
||||
block.isPruned = true;
|
||||
return repo.save(block);
|
||||
async markBlocksAsPruned (repo: Repository<BlockProgressInterface>, blocks: BlockProgressInterface[]): Promise<void> {
|
||||
const ids = blocks.map(({ id }) => id);
|
||||
|
||||
await repo.update({ id: In(ids) }, { isPruned: true });
|
||||
}
|
||||
|
||||
async getEvent (repo: Repository<EventInterface>, id: string): Promise<EventInterface | undefined> {
|
||||
@ -204,4 +205,44 @@ export class Database {
|
||||
const entities = await repo.find(findConditions);
|
||||
await repo.remove(entities);
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
const heirerchicalQuery = `
|
||||
WITH RECURSIVE cte_query AS
|
||||
(
|
||||
SELECT
|
||||
block_hash,
|
||||
block_number,
|
||||
parent_hash,
|
||||
0 as depth
|
||||
FROM
|
||||
block_progress
|
||||
WHERE
|
||||
block_hash = $1
|
||||
UNION ALL
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
c.depth + 1
|
||||
FROM
|
||||
block_progress b
|
||||
INNER JOIN
|
||||
cte_query c ON c.parent_hash = b.block_hash
|
||||
WHERE
|
||||
c.depth < $2
|
||||
)
|
||||
SELECT
|
||||
block_hash, block_number
|
||||
FROM
|
||||
cte_query
|
||||
ORDER BY block_number ASC
|
||||
LIMIT 1;
|
||||
`;
|
||||
|
||||
// Get ancestor block hash using heirarchical query.
|
||||
const [{ block_hash: ancestorBlockHash }] = await this._conn.query(heirerchicalQuery, [blockHash, depth]);
|
||||
|
||||
return ancestorBlockHash;
|
||||
}
|
||||
}
|
||||
|
@ -102,39 +102,11 @@ export class Indexer {
|
||||
return this._db.getBlocksAtHeight(height, isPruned);
|
||||
}
|
||||
|
||||
async blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean> {
|
||||
assert(maxDepth > 0);
|
||||
|
||||
let depth = 0;
|
||||
let currentBlockHash = blockHash;
|
||||
let currentBlock;
|
||||
|
||||
// TODO: Use a hierarchical query to optimize this.
|
||||
while (depth < maxDepth) {
|
||||
depth++;
|
||||
|
||||
currentBlock = await this._db.getBlockProgress(currentBlockHash);
|
||||
if (!currentBlock) {
|
||||
break;
|
||||
} else {
|
||||
if (currentBlock.parentHash === ancestorBlockHash) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Descend the chain.
|
||||
currentBlockHash = currentBlock.parentHash;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async markBlockAsPruned (block: BlockProgressInterface): Promise<BlockProgressInterface> {
|
||||
async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.markBlockAsPruned(dbTx, block);
|
||||
await this._db.markBlocksAsPruned(dbTx, blocks);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
@ -142,8 +114,6 @@ export class Indexer {
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
@ -185,4 +155,8 @@ export class Indexer {
|
||||
async getBlockEvents (blockHash: string): Promise<Array<EventInterface>> {
|
||||
return this._db.getBlockEvents(blockHash);
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._db.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
}
|
||||
|
@ -128,13 +128,14 @@ export class JobRunner {
|
||||
if (blocksAtHeight.length > 1) {
|
||||
const [indexedBlock] = await this._indexer.getBlocksAtHeight(pruneBlockHeight + MAX_REORG_DEPTH, false);
|
||||
|
||||
for (let i = 0; i < blocksAtHeight.length; i++) {
|
||||
const block = blocksAtHeight[i];
|
||||
// If this block is not reachable from the indexed block at max reorg depth from prune height, mark it as pruned.
|
||||
const isAncestor = await this._indexer.blockIsAncestor(block.blockHash, indexedBlock.blockHash, MAX_REORG_DEPTH);
|
||||
if (!isAncestor) {
|
||||
await this._indexer.markBlockAsPruned(block);
|
||||
}
|
||||
// Get ancestor blockHash from indexed block at prune height.
|
||||
const ancestorBlockHash = await this._indexer.getAncestorAtDepth(indexedBlock.blockHash, MAX_REORG_DEPTH);
|
||||
|
||||
const blocksToBePruned = blocksAtHeight.filter(block => ancestorBlockHash !== block.blockHash);
|
||||
|
||||
if (blocksToBePruned.length) {
|
||||
// Mark blocks pruned which are not the ancestor block.
|
||||
await this._indexer.markBlocksAsPruned(blocksToBePruned);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -46,12 +46,12 @@ export interface IndexerInterface {
|
||||
getBlock (blockHash: string): Promise<any>
|
||||
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
|
||||
getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
|
||||
blockIsAncestor (ancestorBlockHash: string, blockHash: string, maxDepth: number): Promise<boolean>;
|
||||
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
||||
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
|
||||
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
markBlockAsPruned (block: BlockProgressInterface): Promise<BlockProgressInterface>;
|
||||
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>;
|
||||
}
|
||||
|
||||
export interface EventWatcherInterface {
|
||||
@ -67,7 +67,8 @@ export interface DatabaseInterface {
|
||||
getBlockEvents (blockHash: string): Promise<EventInterface[]>;
|
||||
getEvent (id: string): Promise<EventInterface | undefined>
|
||||
getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatusInterface | undefined>
|
||||
markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgressInterface): Promise<BlockProgressInterface>;
|
||||
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
||||
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
|
||||
updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void>
|
||||
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
|
||||
|
Loading…
Reference in New Issue
Block a user