mirror of
https://github.com/cerc-io/watcher-ts
synced 2024-11-19 20:36:19 +00:00
Denormalize is_pruned flag in eden-watcher (#230)
* Add is_pruned flag to all entities * Mark entities as pruned once the corresponding block gets pruned
This commit is contained in:
parent
62c57d8005
commit
7e5974ccf7
@ -14,6 +14,25 @@ import { SyncStatus } from './entity/SyncStatus';
|
||||
import { StateSyncStatus } from './entity/StateSyncStatus';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { State } from './entity/State';
|
||||
import { Account } from './entity/Account';
|
||||
import { Claim } from './entity/Claim';
|
||||
import { Distribution } from './entity/Distribution';
|
||||
import { Distributor } from './entity/Distributor';
|
||||
import { Epoch } from './entity/Epoch';
|
||||
import { Network } from './entity/Network';
|
||||
import { Producer } from './entity/Producer';
|
||||
import { ProducerEpoch } from './entity/ProducerEpoch';
|
||||
import { ProducerRewardCollectorChange } from './entity/ProducerRewardCollectorChange';
|
||||
import { ProducerSet } from './entity/ProducerSet';
|
||||
import { ProducerSetChange } from './entity/ProducerSetChange';
|
||||
import { RewardSchedule } from './entity/RewardSchedule';
|
||||
import { RewardScheduleEntry } from './entity/RewardScheduleEntry';
|
||||
import { Slash } from './entity/Slash';
|
||||
import { Slot } from './entity/Slot';
|
||||
import { SlotClaim } from './entity/SlotClaim';
|
||||
import { Staker } from './entity/Staker';
|
||||
|
||||
export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]);
|
||||
|
||||
export class Database implements DatabaseInterface {
|
||||
_config: ConnectionOptions;
|
||||
|
@ -23,4 +23,7 @@ export class Account {
|
||||
|
||||
@Column('numeric', { transformer: bigintTransformer })
|
||||
totalSlashed!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -61,4 +61,7 @@ export class Block {
|
||||
|
||||
@Column('numeric', { nullable: true, transformer: bigintTransformer })
|
||||
size!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -32,4 +32,7 @@ export class Claim {
|
||||
|
||||
@Column('numeric', { transformer: bigintTransformer })
|
||||
claimed!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -32,4 +32,7 @@ export class Distribution {
|
||||
|
||||
@Column('varchar')
|
||||
metadataURI!: string;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -18,4 +18,7 @@ export class Distributor {
|
||||
|
||||
@Column('varchar', { nullable: true })
|
||||
currentDistribution!: string;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -39,4 +39,7 @@ export class Epoch {
|
||||
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
producerBlocksRatio!: Decimal;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -39,4 +39,7 @@ export class Network {
|
||||
// https://github.com/brianc/node-postgres/issues/1943#issuecomment-520500053
|
||||
@Column('varchar', { transformer: bigintArrayTransformer, array: true })
|
||||
stakedPercentiles!: bigint[];
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -31,4 +31,7 @@ export class Producer {
|
||||
|
||||
@Column('numeric', { transformer: bigintTransformer })
|
||||
pendingEpochBlocks!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -33,4 +33,7 @@ export class ProducerEpoch {
|
||||
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
blocksProducedRatio!: Decimal;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -25,4 +25,7 @@ export class ProducerRewardCollectorChange {
|
||||
|
||||
@Column('varchar')
|
||||
rewardCollector!: string;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -18,4 +18,7 @@ export class ProducerSet {
|
||||
|
||||
@Column('varchar', { array: true })
|
||||
producers!: string[];
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -33,4 +33,7 @@ export class ProducerSetChange {
|
||||
enum: ProducerSetChangeType
|
||||
})
|
||||
changeType!: ProducerSetChangeType;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -27,4 +27,7 @@ export class RewardSchedule {
|
||||
|
||||
@Column('varchar', { nullable: true })
|
||||
activeRewardScheduleEntry!: string;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -25,4 +25,7 @@ export class RewardScheduleEntry {
|
||||
|
||||
@Column('numeric', { transformer: bigintTransformer })
|
||||
rewardsPerEpoch!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -26,4 +26,7 @@ export class Slash {
|
||||
|
||||
@Column('numeric', { transformer: bigintTransformer })
|
||||
slashed!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -39,4 +39,7 @@ export class Slot {
|
||||
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
taxRatePerDay!: Decimal;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -39,4 +39,7 @@ export class SlotClaim {
|
||||
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
taxRatePerDay!: Decimal;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -22,4 +22,7 @@ export class Staker {
|
||||
|
||||
@Column('numeric', { nullable: true, transformer: bigintTransformer })
|
||||
rank!: bigint;
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ import {
|
||||
} from '@cerc-io/util';
|
||||
import { GraphWatcher } from '@cerc-io/graph-node';
|
||||
|
||||
import { Database } from './database';
|
||||
import { Database, ENTITIES } from './database';
|
||||
import { Contract } from './entity/Contract';
|
||||
import { Event } from './entity/Event';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
@ -476,7 +476,9 @@ export class Indexer implements IndexerInterface {
|
||||
}
|
||||
|
||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||
return this._baseIndexer.markBlocksAsPruned(blocks);
|
||||
await this._baseIndexer.markBlocksAsPruned(blocks);
|
||||
|
||||
await this._graphWatcher.pruneEntities(blocks, ENTITIES);
|
||||
}
|
||||
|
||||
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
|
||||
|
@ -8,10 +8,12 @@ import {
|
||||
Connection,
|
||||
ConnectionOptions,
|
||||
FindOneOptions,
|
||||
In,
|
||||
LessThanOrEqual,
|
||||
QueryRunner,
|
||||
Repository,
|
||||
SelectQueryBuilder
|
||||
SelectQueryBuilder,
|
||||
UpdateResult
|
||||
} from 'typeorm';
|
||||
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
|
||||
import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer';
|
||||
@ -1032,6 +1034,10 @@ export class Database {
|
||||
const entityValuePromises = entityFields.map(async (field: any) => {
|
||||
const { propertyName } = field;
|
||||
|
||||
if (propertyName === 'isPruned') {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// Get blockHash property for db entry from block instance.
|
||||
if (propertyName === 'blockHash') {
|
||||
return block.blockHash;
|
||||
@ -1191,14 +1197,6 @@ export class Database {
|
||||
prunedBlockHashes.forEach(blockHash => this.cachedEntities.frothyBlocks.delete(blockHash));
|
||||
}
|
||||
|
||||
_measureCachedPrunedEntities () {
|
||||
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
|
||||
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);
|
||||
|
||||
log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
|
||||
cachePrunedEntitiesCount.set(totalEntities);
|
||||
}
|
||||
|
||||
async transformResults<Entity> (queryRunner: QueryRunner, qb: SelectQueryBuilder<Entity>, rawResults: any[]): Promise<any[]> {
|
||||
const transformer = new RawSqlResultsToEntityTransformer(
|
||||
qb.expressionMap,
|
||||
@ -1210,4 +1208,59 @@ export class Database {
|
||||
assert(qb.expressionMap.mainAlias);
|
||||
return transformer.transform(rawResults, qb.expressionMap.mainAlias);
|
||||
}
|
||||
|
||||
async updateEntity<Entity> (queryRunner: QueryRunner, entity: new () => Entity, criteria: any, update: any): Promise<UpdateResult> {
|
||||
const repo = queryRunner.manager.getRepository(entity);
|
||||
return repo.createQueryBuilder()
|
||||
.update()
|
||||
.set(update)
|
||||
.where(criteria)
|
||||
.execute();
|
||||
}
|
||||
|
||||
async pruneEntities (queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
|
||||
// Assumption: all blocks are at same height
|
||||
assert(blocks.length);
|
||||
const blockNumber = blocks[0].blockNumber;
|
||||
const blockHashes = blocks.map(block => block.blockHash);
|
||||
|
||||
// Get all entities at the block height
|
||||
const entitiesAtBlock = await Promise.all(
|
||||
[...entityTypes].map(entityType => {
|
||||
return this._baseDatabase.getEntities(
|
||||
queryRunner,
|
||||
entityType as any,
|
||||
{
|
||||
select: ['id'] as any,
|
||||
where: { blockNumber }
|
||||
}
|
||||
);
|
||||
})
|
||||
);
|
||||
|
||||
// Extract entity ids from result
|
||||
const entityIds = entitiesAtBlock.map(entities => {
|
||||
return entities.map((entity: any) => entity.id);
|
||||
});
|
||||
|
||||
// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
|
||||
const updatePromises = [...entityTypes].map((entity, index: number) => {
|
||||
return this.updateEntity(
|
||||
queryRunner,
|
||||
entity as any,
|
||||
{ id: In(entityIds[index]), blockHash: In(blockHashes) },
|
||||
{ isPruned: true }
|
||||
);
|
||||
});
|
||||
|
||||
await Promise.all(updatePromises);
|
||||
}
|
||||
|
||||
_measureCachedPrunedEntities () {
|
||||
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
|
||||
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);
|
||||
|
||||
log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
|
||||
cachePrunedEntitiesCount.set(totalEntities);
|
||||
}
|
||||
}
|
||||
|
@ -358,6 +358,20 @@ export class GraphWatcher {
|
||||
this._database.updateEntityCacheFrothyBlocks(blockProgress, this._indexer.serverConfig.clearEntitiesCacheInterval);
|
||||
}
|
||||
|
||||
async pruneEntities (prunedBlocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
|
||||
const dbTx = await this._database.createTransactionRunner();
|
||||
|
||||
try {
|
||||
await this._database.pruneEntities(dbTx, prunedBlocks, entityTypes);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
|
||||
pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) {
|
||||
this._database.pruneEntityCacheFrothyBlocks(canonicalBlockHash, canonicalBlockNumber);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user