Reset latest and frothy entity tables on watcher reset (#236)

This commit is contained in:
nikugogoi 2022-11-17 10:14:59 +05:30 committed by GitHub
parent 693c23d192
commit 79e903b396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 103 additions and 57 deletions

View File

@ -8,7 +8,7 @@ import assert from 'assert';
import { getConfig, initClients, JobQueue, Config } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:checkpoint-create');
@ -37,7 +37,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -8,7 +8,7 @@ import assert from 'assert';
import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:checkpoint-verify');
@ -33,7 +33,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -13,7 +13,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKin
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:export-state');
@ -47,7 +47,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -15,7 +15,7 @@ import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClien
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -10,7 +10,7 @@ import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlock } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:index-block');
@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -11,7 +11,7 @@ import util from 'util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:inspect-cid');
@ -42,7 +42,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -8,7 +8,7 @@ import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:reset-watcher');
@ -32,7 +32,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
@ -53,5 +53,6 @@ export const handler = async (argv: any): Promise<void> => {
await graphWatcher.init();
await indexer.resetWatcherToBlock(argv.blockNumber);
await indexer.resetLatestEntities(argv.blockNumber);
log('Reset watcher successfully');
};

View File

@ -10,7 +10,7 @@ import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:watch-contract');
@ -58,7 +58,7 @@ const main = async (): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -34,6 +34,8 @@ import { Staker } from './entity/Staker';
export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]);
export const ENTITY_TO_LATEST_ENTITY_MAP: Map<any, any> = new Map();
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
_conn!: Connection;

View File

@ -7,15 +7,15 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent }
import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node';
import { FrothyEntity } from './FrothyEntity';
import { ENTITIES } from '../database';
import { ENTITIES, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
@EventSubscriber()
export class EntitySubscriber implements EntitySubscriberInterface {
async afterInsert (event: InsertEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event);
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
}
async afterUpdate (event: UpdateEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event);
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
}
}

View File

@ -12,7 +12,7 @@ import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from './database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { fillState } from './fill-state';
@ -65,7 +65,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -489,6 +489,10 @@ export class Indexer implements IndexerInterface {
await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber);
}
async resetLatestEntities (blockNumber: number): Promise<void> {
await this._graphWatcher.resetLatestEntities(blockNumber);
}
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}

View File

@ -25,7 +25,7 @@ import {
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Indexer } from './indexer';
import { Database } from './database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
const log = debug('vulcanize:job-runner');
@ -94,7 +94,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -18,7 +18,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
@ -42,7 +42,7 @@ export const main = async (): Promise<any> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -6,10 +6,10 @@ import assert from 'assert';
import {
Brackets,
Connection,
ConnectionOptions,
FindOneOptions,
In,
LessThanOrEqual,
MoreThan,
QueryRunner,
Repository,
SelectQueryBuilder,
@ -34,7 +34,7 @@ import {
Where
} from '@cerc-io/util';
import { Block, fromEntityValue, fromStateEntityValues, resolveEntityFieldConflicts, toEntityValue } from './utils';
import { Block, fromEntityValue, fromStateEntityValues, getLatestEntityFromEntity, resolveEntityFieldConflicts, toEntityValue } from './utils';
const log = debug('vulcanize:graph-database');
@ -1388,11 +1388,11 @@ export class Database {
);
// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
const updatePromises = [...entityTypes].map((entity) => {
const updatePromises = [...entityTypes].map((entityType) => {
return this.updateEntity(
queryRunner,
entity as any,
{ id: In(entityIdsMap.get(entity.name) || []), blockHash: In(blockHashes) },
entityType as any,
{ id: In(entityIdsMap.get(entityType.name) || []), blockHash: In(blockHashes) },
{ isPruned: true }
);
});
@ -1407,41 +1407,62 @@ export class Database {
async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise<void> {
// Update latest entity tables with canonical entries
await Promise.all(
Array.from(this._entityToLatestEntityMap.entries()).map(async ([entity, latestEntity]) => {
Array.from(this._entityToLatestEntityMap.entries()).map(async ([entityType, latestEntityType]) => {
// Get entries for non canonical blocks
const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntity, { where: { blockHash: In(nonCanonicalBlockHashes) } });
const nonCanonicalLatestEntities = await this._baseDatabase.getEntities(queryRunner, latestEntityType, { where: { blockHash: In(nonCanonicalBlockHashes) } });
await Promise.all(nonCanonicalLatestEntities.map(async (nonCanonicalLatestEntity: any) => {
// Get pruned version for the non canonical entity
const repo = queryRunner.manager.getRepository(entity);
const prunedVersion = await this._baseDatabase.getLatestPrunedEntityWithoutJoin(repo, nonCanonicalLatestEntity.id, blockNumber);
// If found, update the latestEntity entry for the id
// Else, delete the latestEntity entry for the id
if (prunedVersion) {
return this.updateEntity(
queryRunner,
latestEntity,
{ id: nonCanonicalLatestEntity.id },
prunedVersion
);
} else {
return this._baseDatabase.removeEntities(
queryRunner,
latestEntity,
{ where: { id: nonCanonicalLatestEntity.id } }
);
}
}));
// Canonicalize latest entity table at given block height
await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, nonCanonicalLatestEntities, blockNumber);
})
);
}
async canonicalizeLatestEntity (queryRunner: QueryRunner, entityType: any, latestEntityType: any, entities: any[], blockNumber: number): Promise<void> {
await Promise.all(entities.map(async (entity: any) => {
// Get latest pruned (canonical) version for the given entity
const repo = queryRunner.manager.getRepository(entity);
const prunedVersion = await this._baseDatabase.getLatestPrunedEntity(repo, entity.id, blockNumber);
// If found, update the latestEntity entry for the id
// Else, delete the latestEntity entry for the id
if (prunedVersion) {
// Create a latest entity instance and insert in the db
const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType);
const latestEntity = getLatestEntityFromEntity(latestEntityRepo, prunedVersion);
await this.updateEntity(
queryRunner,
latestEntityType,
{ id: entity.id },
latestEntity
);
} else {
await this._baseDatabase.removeEntities(
queryRunner,
latestEntityType,
{ where: { id: entity.id } }
);
}
}));
}
async pruneFrothyEntities<Entity> (queryRunner: QueryRunner, frothyEntityType: new () => Entity, blockNumber: number): Promise<void> {
// Remove frothy entity entries at | below the prune block height
return this._baseDatabase.removeEntities(queryRunner, frothyEntityType, { where: { blockNumber: LessThanOrEqual(blockNumber) } });
}
async resetLatestEntities (queryRunner: QueryRunner, blockNumber: number): Promise<void> {
await Promise.all(
Array.from(this._entityToLatestEntityMap.entries()).map(async ([entityType, latestEntityType]) => {
// Get entries above the reset block
const entitiesToReset = await this._baseDatabase.getEntities(queryRunner, latestEntityType, { where: { blockNumber: MoreThan(blockNumber) } });
// Canonicalize latest entity table at the reset block height
await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, entitiesToReset, blockNumber);
})
);
}
_measureCachedPrunedEntities () {
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);

View File

@ -3,7 +3,7 @@ import path from 'path';
import fs from 'fs-extra';
import debug from 'debug';
import yaml from 'js-yaml';
import { EntityTarget, InsertEvent, UpdateEvent, ValueTransformer } from 'typeorm';
import { DeepPartial, EntityTarget, InsertEvent, Repository, UpdateEvent, ValueTransformer } from 'typeorm';
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
import assert from 'assert';
import _ from 'lodash';
@ -939,11 +939,10 @@ export const afterEntityInsertOrUpdate = async<Entity> (
// Get latest entity's fields to be updated
const latestEntityRepo = event.manager.getRepository(entityTarget);
const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName);
const fieldsToUpdate = latestEntityRepo.metadata.columns.map(column => column.databaseName).filter(val => val !== 'id');
// Create a latest entity instance and upsert in the db
const latestEntity = event.manager.create(entityTarget, _.pick(entity, latestEntityFields));
const latestEntity = getLatestEntityFromEntity(latestEntityRepo, entity);
await event.manager.createQueryBuilder()
.insert()
.into(entityTarget)
@ -953,3 +952,8 @@ export const afterEntityInsertOrUpdate = async<Entity> (
)
.execute();
};
export function getLatestEntityFromEntity<Entity> (latestEntityRepo: Repository<Entity>, entity: any): Entity {
const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName);
return latestEntityRepo.create(_.pick(entity, latestEntityFields) as DeepPartial<Entity>);
}

View File

@ -386,6 +386,20 @@ export class GraphWatcher {
}
}
async resetLatestEntities (blockNumber: number): Promise<void> {
const dbTx = await this._database.createTransactionRunner();
try {
await this._database.resetLatestEntities(dbTx, blockNumber);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) {
this._database.pruneEntityCacheFrothyBlocks(canonicalBlockHash, canonicalBlockNumber);
}