Refactor reset CLIs to cli package (#246)

* Refactor reset CLIs to cli package

* Use reset CLIs from cli package in watchers

* Fix method to canonicalize latest entities

* Use composition with common code for refactored CLIs
This commit is contained in:
prathamesh0 2022-11-21 05:14:10 -06:00 committed by GitHub
parent 7520e9012c
commit 1e639c1af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 311 additions and 265 deletions

View File

@ -11,7 +11,6 @@
},
"dependencies": {
"@cerc-io/graph-node": "^0.2.13",
"@cerc-io/ipld-eth-client": "^0.2.13",
"@cerc-io/util": "^0.2.13",
"@ethersproject/providers": "^5.4.4",
"reflect-metadata": "^0.1.13",

100
packages/cli/src/base.ts Normal file
View File

@ -0,0 +1,100 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import {
Config,
getConfig,
initClients,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Database as BaseDatabase,
Clients
} from '@cerc-io/util';
export class BaseCmd {
_config?: Config;
_clients?: Clients;
_ethProvider?: JsonRpcProvider;
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
async initConfig<ConfigType> (configFile: string): Promise<ConfigType> {
if (!this._config) {
this._config = await getConfig(configFile);
}
return this._config as any;
}
async init (
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcher
) => IndexerInterface,
clients: { [key: string]: any } = {}
): Promise<{
database: DatabaseInterface,
indexer: IndexerInterface
}> {
assert(this._config);
this._database = new Database(this._config.database, this._config.server);
await this._database.init();
const jobQueueConfig = this._config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const { ethClient, ethProvider } = await initClients(this._config);
this._ethProvider = ethProvider;
this._clients = { ethClient, ...clients };
// Check if subgraph watcher.
if (this._config.server.subgraphPath) {
const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase);
this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue, graphWatcher);
await this._indexer.init();
graphWatcher.setIndexer(this._indexer);
await graphWatcher.init();
} else {
this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue);
await this._indexer.init();
}
return { database: this._database, indexer: this._indexer };
}
async _getGraphWatcher (baseDatabase: BaseDatabase): Promise<GraphWatcher> {
assert(this._config);
assert(this._clients?.ethClient);
assert(this._ethProvider);
const graphDb = new GraphDatabase(this._config.server, baseDatabase);
await graphDb.init();
return new GraphWatcher(graphDb, this._clients.ethClient, this._ethProvider, this._config.server);
}
}

View File

@ -3,3 +3,5 @@
//
export * from './watch-contract';
export * from './reset/watcher';
export * from './reset/state';

View File

@ -0,0 +1,92 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import debug from 'debug';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import {
Config,
getConfig,
DatabaseInterface,
ServerConfig
} from '@cerc-io/util';
const log = debug('vulcanize:reset-state');
interface Arguments {
configFile: string;
blockNumber: number;
}
export class ResetStateCmd {
_argv?: Arguments
_config?: Config;
_database?: DatabaseInterface
async initConfig (configFile: string): Promise<Config> {
this._config = await getConfig(configFile);
assert(this._config);
return this._config;
}
async init (
argv: any,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface
): Promise<void> {
this._argv = argv;
if (!this._config) {
await this.initConfig(argv.configFile);
}
assert(this._config);
this._database = new Database(this._config.database, this._config.server);
await this._database.init();
}
async exec (): Promise<void> {
assert(this._argv);
assert(this._database);
// Create a DB transaction
const dbTx = await this._database.createTransactionRunner();
console.time('time:reset-state');
const { blockNumber } = this._argv;
try {
// Delete all State entries after the given block
assert(this._database.removeStatesAfterBlock);
await this._database.removeStatesAfterBlock(dbTx, blockNumber);
// Reset the stateSyncStatus.
const stateSyncStatus = await this._database.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockNumber) {
await this._database.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockNumber) {
await this._database.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true);
}
}
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
console.timeEnd('time:reset-state');
this._database.close();
log(`Reset state successfully to block ${blockNumber}`);
}
}

View File

@ -0,0 +1,72 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import debug from 'debug';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher } from '@cerc-io/graph-node';
import {
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients
} from '@cerc-io/util';
import { BaseCmd } from '../base';
const log = debug('vulcanize:reset-watcher');
interface Arguments {
configFile: string;
blockNumber: number;
}
export class ResetWatcherCmd {
_argv?: Arguments
_baseCmd: BaseCmd;
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
constructor () {
this._baseCmd = new BaseCmd();
}
async initConfig<ConfigType> (configFile: string): Promise<ConfigType> {
return this._baseCmd.initConfig(configFile);
}
async init (
argv: any,
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcher
) => IndexerInterface,
clients: { [key: string]: any } = {}
): Promise<void> {
this._argv = argv;
await this.initConfig(argv.configFile);
({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients));
}
async exec (): Promise<void> {
assert(this._argv);
assert(this._indexer);
await this._indexer.resetWatcherToBlock(this._argv.blockNumber);
log('Reset watcher successfully');
}
}

View File

@ -8,23 +8,19 @@ import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { GraphWatcher } from '@cerc-io/graph-node';
import {
DEFAULT_CONFIG_PATH,
Config,
getConfig,
initClients,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Database as BaseDatabase,
Clients
} from '@cerc-io/util';
import { BaseCmd } from './base';
interface Arguments {
[x: string]: unknown;
configFile: string;
address: string;
kind: string;
@ -33,22 +29,20 @@ interface Arguments {
}
export class WatchContractCmd {
_argv?: Arguments
_config?: Config;
_clients?: Clients;
_ethClient?: EthClient;
_ethProvider?: JsonRpcProvider
_database?: DatabaseInterface
_indexer?: IndexerInterface
_argv?: Arguments;
_baseCmd: BaseCmd;
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
constructor () {
this._baseCmd = new BaseCmd();
}
async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
this._config = await getConfig(this._argv.configFile);
assert(this._config);
return this._config as any;
return this._baseCmd.initConfig(this._argv.configFile);
}
async init (
@ -66,40 +60,9 @@ export class WatchContractCmd {
) => IndexerInterface,
clients: { [key: string]: any } = {}
): Promise<void> {
if (!this._config) {
await this.initConfig();
}
assert(this._config);
await this.initConfig();
this._database = new Database(this._config.database, this._config.server);
await this._database.init();
const jobQueueConfig = this._config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const { ethClient, ethProvider } = await initClients(this._config);
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._clients = { ethClient, ...clients };
// Check if subgraph watcher.
if (this._config.server.subgraphPath) {
const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase);
this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue, graphWatcher);
await this._indexer.init();
graphWatcher.setIndexer(this._indexer);
await graphWatcher.init();
} else {
this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue);
await this._indexer.init();
}
({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients));
}
async exec (): Promise<void> {
@ -112,17 +75,6 @@ export class WatchContractCmd {
await this._database.close();
}
async _getGraphWatcher (baseDatabase: BaseDatabase): Promise<GraphWatcher> {
assert(this._config);
assert(this._ethClient);
assert(this._ethProvider);
const graphDb = new GraphDatabase(this._config.server, baseDatabase);
await graphDb.init();
return new GraphWatcher(graphDb, this._ethClient, this._ethProvider, this._config.server);
}
_getArgv (): any {
return yargs.parserConfiguration({
'parse-numbers': false

View File

@ -2,14 +2,10 @@
// Copyright 2022 Vulcanize, Inc.
//
import debug from 'debug';
import { getConfig, Config } from '@cerc-io/util';
import { ResetStateCmd } from '@cerc-io/cli';
import { Database } from '../../database';
const log = debug('vulcanize:reset-state');
export const command = 'state';
export const desc = 'Reset State to a given block number';
@ -21,42 +17,8 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const { blockNumber } = argv;
const config: Config = await getConfig(argv.configFile);
const resetStateCmd = new ResetStateCmd();
await resetStateCmd.init(argv, Database);
// Initialize database
const db = new Database(config.database);
await db.init();
// Create a DB transaction
const dbTx = await db.createTransactionRunner();
console.time('time:reset-state');
try {
// Delete all State entries after the given block
await db.removeStatesAfterBlock(dbTx, blockNumber);
// Reset the stateSyncStatus.
const stateSyncStatus = await db.getStateSyncStatus();
if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockNumber) {
await db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true);
}
if (stateSyncStatus.latestCheckpointBlockNumber > blockNumber) {
await db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true);
}
}
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
console.timeEnd('time:reset-state');
log(`Reset state successfully to block ${blockNumber}`);
await resetStateCmd.exec();
};

View File

@ -2,17 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import assert from 'assert';
import { ResetWatcherCmd } from '@cerc-io/cli';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:reset-watcher');
export const command = 'watcher';
export const desc = 'Reset watcher to a block number';
@ -24,35 +18,8 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database, Indexer);
// Initialize database.
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
await indexer.resetWatcherToBlock(argv.blockNumber);
await indexer.resetLatestEntities(argv.blockNumber);
log('Reset watcher successfully');
await resetWatcherCmd.exec();
};

View File

@ -2,16 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, JobQueue, resetJobs, Config } from '@cerc-io/util';
import { ResetWatcherCmd } from '@cerc-io/cli';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:reset-watcher');
export const command = 'watcher';
export const desc = 'Reset watcher to a block number';
@ -23,27 +18,8 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database, Indexer);
// Initialize database.
const db = new Database(config.database);
await db.init();
const { jobQueue: jobQueueConfig } = config;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
await resetWatcherCmd.exec();
};

View File

@ -2,16 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { ResetWatcherCmd } from '@cerc-io/cli';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:reset-watcher');
export const command = 'watcher';
export const desc = 'Reset watcher to a block number';
@ -23,26 +18,8 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database, Indexer);
// Initialize database.
const db = new Database(config.database);
await db.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
await resetWatcherCmd.exec();
};

View File

@ -1223,16 +1223,17 @@ export class Database {
}
async canonicalizeLatestEntity (queryRunner: QueryRunner, entityType: any, latestEntityType: any, entities: any[], blockNumber: number): Promise<void> {
const repo = queryRunner.manager.getRepository(entityType);
const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType);
await Promise.all(entities.map(async (entity: any) => {
// Get latest pruned (canonical) version for the given entity
const repo = queryRunner.manager.getRepository(entity);
const prunedVersion = await this._baseDatabase.getLatestPrunedEntity(repo, entity.id, blockNumber);
// If found, update the latestEntity entry for the id
// Else, delete the latestEntity entry for the id
if (prunedVersion) {
// Create a latest entity instance and insert in the db
const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType);
const latestEntity = getLatestEntityFromEntity(latestEntityRepo, prunedVersion);
await this.updateEntity(

View File

@ -2,17 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { ResetWatcherCmd } from '@cerc-io/cli';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:reset-watcher');
export const command = 'watcher';
export const desc = 'Reset watcher to a block number';
@ -24,34 +18,8 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database, Indexer);
// Initialize database.
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
await resetWatcherCmd.exec();
};

View File

@ -2,16 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util';
import { ResetWatcherCmd } from '@cerc-io/cli';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:reset-watcher');
export const command = 'watcher';
export const desc = 'Reset watcher to a block number';
@ -23,26 +18,8 @@ export const builder = {
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { ethClient, ethProvider } = await initClients(config);
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database, Indexer);
// Initialize database.
const db = new Database(config.database);
await db.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();
await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
await resetWatcherCmd.exec();
};

View File

@ -165,8 +165,9 @@ export interface DatabaseInterface {
getStates (where: FindConditions<StateInterface>): Promise<StateInterface[]>
getDiffStatesInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<StateInterface[]>
getNewState (): StateInterface
removeStates(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
saveOrUpdateState (dbTx: QueryRunner, state: StateInterface): Promise<StateInterface>
removeStates(queryRunner: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
removeStatesAfterBlock?: (queryRunner: QueryRunner, blockNumber: number) => Promise<void>
saveOrUpdateState (queryRunner: QueryRunner, state: StateInterface): Promise<StateInterface>
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>