From 79e903b3966690f6e3abb4cae5e09177c5527ea1 Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Thu, 17 Nov 2022 10:14:59 +0530 Subject: [PATCH] Reset latest and frothy entity tables on watcher reset (#236) --- .../src/cli/checkpoint-cmds/create.ts | 4 +- .../src/cli/checkpoint-cmds/verify.ts | 4 +- packages/eden-watcher/src/cli/export-state.ts | 4 +- packages/eden-watcher/src/cli/import-state.ts | 4 +- packages/eden-watcher/src/cli/index-block.ts | 4 +- packages/eden-watcher/src/cli/inspect-cid.ts | 4 +- .../src/cli/reset-cmds/watcher.ts | 5 +- .../eden-watcher/src/cli/watch-contract.ts | 4 +- packages/eden-watcher/src/database.ts | 2 + .../eden-watcher/src/entity/Subscriber.ts | 6 +- packages/eden-watcher/src/fill.ts | 4 +- packages/eden-watcher/src/indexer.ts | 4 + packages/eden-watcher/src/job-runner.ts | 4 +- packages/eden-watcher/src/server.ts | 4 +- packages/graph-node/src/database.ts | 79 ++++++++++++------- packages/graph-node/src/utils.ts | 10 ++- packages/graph-node/src/watcher.ts | 14 ++++ 17 files changed, 103 insertions(+), 57 deletions(-) diff --git a/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts b/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts index aa80849c..2524e7f9 100644 --- a/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts +++ b/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts @@ -8,7 +8,7 @@ import assert from 'assert'; import { getConfig, initClients, JobQueue, Config } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database'; import { Indexer } from '../../indexer'; const log = debug('vulcanize:checkpoint-create'); @@ -37,7 +37,7 @@ export const handler = async (argv: any): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts b/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts index 035d5ec2..0ab8e93c 100644 --- a/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts +++ b/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts @@ -8,7 +8,7 @@ import assert from 'assert'; import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database'; import { Indexer } from '../../indexer'; const log = debug('vulcanize:checkpoint-verify'); @@ -33,7 +33,7 @@ export const handler = async (argv: any): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/cli/export-state.ts b/packages/eden-watcher/src/cli/export-state.ts index dd7b1182..f29c60c8 100644 --- a/packages/eden-watcher/src/cli/export-state.ts +++ b/packages/eden-watcher/src/cli/export-state.ts @@ -13,7 +13,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKin import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import * as codec from '@ipld/dag-cbor'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:export-state'); @@ -47,7 +47,7 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index d6654793..a4b56b8c 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -15,7 +15,7 @@ import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClien import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import * as codec from '@ipld/dag-cbor'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; import { EventWatcher } from '../events'; import { State } from '../entity/State'; @@ -47,7 +47,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/cli/index-block.ts b/packages/eden-watcher/src/cli/index-block.ts index e7f21481..47f85e13 100644 --- a/packages/eden-watcher/src/cli/index-block.ts +++ b/packages/eden-watcher/src/cli/index-block.ts @@ -10,7 +10,7 @@ import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:index-block'); @@ -41,7 +41,7 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/cli/inspect-cid.ts b/packages/eden-watcher/src/cli/inspect-cid.ts index fe4d8fcb..ea855423 100644 --- a/packages/eden-watcher/src/cli/inspect-cid.ts +++ b/packages/eden-watcher/src/cli/inspect-cid.ts @@ -11,7 +11,7 @@ import util from 'util'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:inspect-cid'); @@ -42,7 +42,7 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts index 665dc69a..e69412cb 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts @@ -8,7 +8,7 @@ import assert from 'assert'; import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database'; import { Indexer } from '../../indexer'; const log = debug('vulcanize:reset-watcher'); @@ -32,7 +32,7 @@ export const handler = async (argv: any): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); @@ -53,5 +53,6 @@ export const handler = async (argv: any): Promise => { await graphWatcher.init(); await indexer.resetWatcherToBlock(argv.blockNumber); + await indexer.resetLatestEntities(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/eden-watcher/src/cli/watch-contract.ts b/packages/eden-watcher/src/cli/watch-contract.ts index 863f0688..4b12bae4 100644 --- a/packages/eden-watcher/src/cli/watch-contract.ts +++ b/packages/eden-watcher/src/cli/watch-contract.ts @@ -10,7 +10,7 @@ import assert from 'assert'; import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:watch-contract'); @@ -58,7 +58,7 @@ const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index 8f2542a7..02476e4a 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -34,6 +34,8 @@ import { Staker } from './entity/Staker'; export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]); +export const ENTITY_TO_LATEST_ENTITY_MAP: Map = new Map(); + export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; diff --git a/packages/eden-watcher/src/entity/Subscriber.ts b/packages/eden-watcher/src/entity/Subscriber.ts index 93c77fe4..d9b29c9b 100644 --- a/packages/eden-watcher/src/entity/Subscriber.ts +++ b/packages/eden-watcher/src/entity/Subscriber.ts @@ -7,15 +7,15 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node'; import { FrothyEntity } from './FrothyEntity'; -import { ENTITIES } from '../database'; +import { ENTITIES, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; @EventSubscriber() export class EntitySubscriber implements EntitySubscriberInterface { async afterInsert (event: InsertEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); + await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } async afterUpdate (event: UpdateEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); + await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } } diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index 3169df8d..e9a717cf 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -12,7 +12,7 @@ import { PubSub } from 'graphql-subscriptions'; import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from './database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; import { fillState } from './fill-state'; @@ -65,7 +65,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index c8ea1cc0..0ebb9f87 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -489,6 +489,10 @@ export class Indexer implements IndexerInterface { await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber); } + async resetLatestEntities (blockNumber: number): Promise { + await this._graphWatcher.resetLatestEntities(blockNumber); + } + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index 34643789..97af3e7e 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -25,7 +25,7 @@ import { import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { Indexer } from './indexer'; -import { Database } from './database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; const log = debug('vulcanize:job-runner'); @@ -94,7 +94,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index 771e7127..d2f32dc1 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -18,7 +18,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; -import { Database } from './database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -42,7 +42,7 @@ export const main = async (): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index 47d17bc5..376d6287 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -6,10 +6,10 @@ import assert from 'assert'; import { Brackets, Connection, - ConnectionOptions, FindOneOptions, In, LessThanOrEqual, + MoreThan, QueryRunner, Repository, SelectQueryBuilder, @@ -34,7 +34,7 @@ import { Where } from '@cerc-io/util'; -import { Block, fromEntityValue, fromStateEntityValues, resolveEntityFieldConflicts, toEntityValue } from './utils'; +import { Block, fromEntityValue, fromStateEntityValues, getLatestEntityFromEntity, resolveEntityFieldConflicts, toEntityValue } from './utils'; const log = debug('vulcanize:graph-database'); @@ -1388,11 +1388,11 @@ export class Database { ); // Update isPruned flag using fetched entity ids and hashes of blocks to be pruned - const updatePromises = [...entityTypes].map((entity) => { + const updatePromises = [...entityTypes].map((entityType) => { return this.updateEntity( queryRunner, - entity as any, - { id: In(entityIdsMap.get(entity.name) || []), blockHash: In(blockHashes) }, + entityType as any, + { id: In(entityIdsMap.get(entityType.name) || []), blockHash: In(blockHashes) }, { isPruned: true } ); }); @@ -1407,41 +1407,62 @@ export class Database { async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise { // Update latest entity tables with canonical entries await Promise.all( - Array.from(this._entityToLatestEntityMap.entries()).map(async ([entity, latestEntity]) => { + Array.from(this._entityToLatestEntityMap.entries()).map(async ([entityType, latestEntityType]) => { // Get entries for non canonical blocks - const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntity, { where: { blockHash: In(nonCanonicalBlockHashes) } }); + const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntityType, { where: { blockHash: In(nonCanonicalBlockHashes) } }); - await Promise.all(nonCanonicalLatestEntities.map(async (nonCanonicalLatestEntity: any) => { - // Get pruned version for the non canonical entity - const repo = queryRunner.manager.getRepository(entity); - const prunedVersion = await this._baseDatabase.getLatestPrunedEntityWithoutJoin(repo, nonCanonicalLatestEntity.id, blockNumber); - - // If found, update the latestEntity entry for the id - // Else, delete the latestEntity entry for the id - if (prunedVersion) { - return this.updateEntity( - queryRunner, - latestEntity, - { id: nonCanonicalLatestEntity.id }, - prunedVersion - ); - } else { - return this._baseDatabase.removeEntities( - queryRunner, - latestEntity, - { where: { id: nonCanonicalLatestEntity.id } } - ); - } - })); + // Canonicalize latest entity table at given block height + await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, nonCanonicalLatestEntities, blockNumber); }) ); } + async canonicalizeLatestEntity (queryRunner: QueryRunner, entityType: any, latestEntityType: any, entities: any[], blockNumber: number): Promise { + await Promise.all(entities.map(async (entity: any) => { + // Get latest pruned (canonical) version for the given entity + const repo = queryRunner.manager.getRepository(entity); + const prunedVersion = await this._baseDatabase.getLatestPrunedEntity(repo, entity.id, blockNumber); + + // If found, update the latestEntity entry for the id + // Else, delete the latestEntity entry for the id + if (prunedVersion) { + // Create a latest entity instance and insert in the db + const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType); + const latestEntity = getLatestEntityFromEntity(latestEntityRepo, prunedVersion); + + await this.updateEntity( + queryRunner, + latestEntityType, + { id: entity.id }, + latestEntity + ); + } else { + await this._baseDatabase.removeEntities( + queryRunner, + latestEntityType, + { where: { id: entity.id } } + ); + } + })); + } + async pruneFrothyEntities (queryRunner: QueryRunner, frothyEntityType: new () => Entity, blockNumber: number): Promise { // Remove frothy entity entries at | below the prune block height return this._baseDatabase.removeEntities(queryRunner, frothyEntityType, { where: { blockNumber: LessThanOrEqual(blockNumber) } }); } + async resetLatestEntities (queryRunner: QueryRunner, blockNumber: number): Promise { + await Promise.all( + Array.from(this._entityToLatestEntityMap.entries()).map(async ([entityType, latestEntityType]) => { + // Get entries above the reset block + const entitiesToReset = await this._baseDatabase.getEntities(queryRunner, latestEntityType, { where: { blockNumber: MoreThan(blockNumber) } }); + + // Canonicalize latest entity table at the reset block height + await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, entitiesToReset, blockNumber); + }) + ); + } + _measureCachedPrunedEntities () { const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values()) .reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0); diff --git a/packages/graph-node/src/utils.ts b/packages/graph-node/src/utils.ts index 8c04bc61..67890128 100644 --- a/packages/graph-node/src/utils.ts +++ b/packages/graph-node/src/utils.ts @@ -3,7 +3,7 @@ import path from 'path'; import fs from 'fs-extra'; import debug from 'debug'; import yaml from 'js-yaml'; -import { EntityTarget, InsertEvent, UpdateEvent, ValueTransformer } from 'typeorm'; +import { DeepPartial, EntityTarget, InsertEvent, Repository, UpdateEvent, ValueTransformer } from 'typeorm'; import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata'; import assert from 'assert'; import _ from 'lodash'; @@ -939,11 +939,10 @@ export const afterEntityInsertOrUpdate = async ( // Get latest entity's fields to be updated const latestEntityRepo = event.manager.getRepository(entityTarget); - const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName); const fieldsToUpdate = latestEntityRepo.metadata.columns.map(column => column.databaseName).filter(val => val !== 'id'); // Create a latest entity instance and upsert in the db - const latestEntity = event.manager.create(entityTarget, _.pick(entity, latestEntityFields)); + const latestEntity = getLatestEntityFromEntity(latestEntityRepo, entity); await event.manager.createQueryBuilder() .insert() .into(entityTarget) @@ -953,3 +952,8 @@ export const afterEntityInsertOrUpdate = async ( ) .execute(); }; + +export function getLatestEntityFromEntity (latestEntityRepo: Repository, entity: any): Entity { + const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName); + return latestEntityRepo.create(_.pick(entity, latestEntityFields) as DeepPartial); +} diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 4287569c..8ae593e1 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -386,6 +386,20 @@ export class GraphWatcher { } } + async resetLatestEntities (blockNumber: number): Promise { + const dbTx = await this._database.createTransactionRunner(); + try { + await this._database.resetLatestEntities(dbTx, blockNumber); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) { this._database.pruneEntityCacheFrothyBlocks(canonicalBlockHash, canonicalBlockNumber); }