diff --git a/packages/cli/package.json b/packages/cli/package.json index d3a535f8..53d6b33a 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -11,7 +11,6 @@ }, "dependencies": { "@cerc-io/graph-node": "^0.2.13", - "@cerc-io/ipld-eth-client": "^0.2.13", "@cerc-io/util": "^0.2.13", "@ethersproject/providers": "^5.4.4", "reflect-metadata": "^0.1.13", diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts new file mode 100644 index 00000000..c2814166 --- /dev/null +++ b/packages/cli/src/base.ts @@ -0,0 +1,100 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import 'reflect-metadata'; +import assert from 'assert'; +import { ConnectionOptions } from 'typeorm'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { + Config, + getConfig, + initClients, + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Database as BaseDatabase, + Clients +} from '@cerc-io/util'; + +export class BaseCmd { + _config?: Config; + _clients?: Clients; + _ethProvider?: JsonRpcProvider; + _database?: DatabaseInterface; + _indexer?: IndexerInterface; + + async initConfig (configFile: string): Promise { + if (!this._config) { + this._config = await getConfig(configFile); + } + + return this._config as any; + } + + async init ( + Database: new ( + config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcher + ) => IndexerInterface, + clients: { [key: string]: any } = {} + ): Promise<{ + database: DatabaseInterface, + indexer: IndexerInterface + }> { + assert(this._config); + + this._database = new Database(this._config.database, this._config.server); + await this._database.init(); + + const jobQueueConfig = this._config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const { ethClient, ethProvider } = await initClients(this._config); + this._ethProvider = ethProvider; + this._clients = { ethClient, ...clients }; + + // Check if subgraph watcher. + if (this._config.server.subgraphPath) { + const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase); + this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue, graphWatcher); + await this._indexer.init(); + + graphWatcher.setIndexer(this._indexer); + await graphWatcher.init(); + } else { + this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue); + await this._indexer.init(); + } + + return { database: this._database, indexer: this._indexer }; + } + + async _getGraphWatcher (baseDatabase: BaseDatabase): Promise { + assert(this._config); + assert(this._clients?.ethClient); + assert(this._ethProvider); + + const graphDb = new GraphDatabase(this._config.server, baseDatabase); + await graphDb.init(); + + return new GraphWatcher(graphDb, this._clients.ethClient, this._ethProvider, this._config.server); + } +} diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 81931510..218f6391 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -3,3 +3,5 @@ // export * from './watch-contract'; +export * from './reset/watcher'; +export * from './reset/state'; diff --git a/packages/cli/src/reset/state.ts b/packages/cli/src/reset/state.ts new file mode 100644 index 00000000..9750c149 --- /dev/null +++ b/packages/cli/src/reset/state.ts @@ -0,0 +1,92 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import debug from 'debug'; +import 'reflect-metadata'; +import assert from 'assert'; +import { ConnectionOptions } from 'typeorm'; + +import { + Config, + getConfig, + DatabaseInterface, + ServerConfig +} from '@cerc-io/util'; + +const log = debug('vulcanize:reset-state'); + +interface Arguments { + configFile: string; + blockNumber: number; +} + +export class ResetStateCmd { + _argv?: Arguments + _config?: Config; + _database?: DatabaseInterface + + async initConfig (configFile: string): Promise { + this._config = await getConfig(configFile); + assert(this._config); + + return this._config; + } + + async init ( + argv: any, + Database: new ( + config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface + ): Promise { + this._argv = argv; + if (!this._config) { + await this.initConfig(argv.configFile); + } + assert(this._config); + + this._database = new Database(this._config.database, this._config.server); + await this._database.init(); + } + + async exec (): Promise { + assert(this._argv); + assert(this._database); + + // Create a DB transaction + const dbTx = await this._database.createTransactionRunner(); + + console.time('time:reset-state'); + const { blockNumber } = this._argv; + try { + // Delete all State entries after the given block + assert(this._database.removeStatesAfterBlock); + await this._database.removeStatesAfterBlock(dbTx, blockNumber); + + // Reset the stateSyncStatus. + const stateSyncStatus = await this._database.getStateSyncStatus(); + + if (stateSyncStatus) { + if (stateSyncStatus.latestIndexedBlockNumber > blockNumber) { + await this._database.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true); + } + + if (stateSyncStatus.latestCheckpointBlockNumber > blockNumber) { + await this._database.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true); + } + } + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + console.timeEnd('time:reset-state'); + + this._database.close(); + log(`Reset state successfully to block ${blockNumber}`); + } +} diff --git a/packages/cli/src/reset/watcher.ts b/packages/cli/src/reset/watcher.ts new file mode 100644 index 00000000..cc810c5d --- /dev/null +++ b/packages/cli/src/reset/watcher.ts @@ -0,0 +1,72 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import debug from 'debug'; +import 'reflect-metadata'; +import assert from 'assert'; +import { ConnectionOptions } from 'typeorm'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { GraphWatcher } from '@cerc-io/graph-node'; +import { + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Clients +} from '@cerc-io/util'; + +import { BaseCmd } from '../base'; + +const log = debug('vulcanize:reset-watcher'); + +interface Arguments { + configFile: string; + blockNumber: number; +} + +export class ResetWatcherCmd { + _argv?: Arguments + _baseCmd: BaseCmd; + _database?: DatabaseInterface; + _indexer?: IndexerInterface; + + constructor () { + this._baseCmd = new BaseCmd(); + } + + async initConfig (configFile: string): Promise { + return this._baseCmd.initConfig(configFile); + } + + async init ( + argv: any, + Database: new ( + config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcher + ) => IndexerInterface, + clients: { [key: string]: any } = {} + ): Promise { + this._argv = argv; + await this.initConfig(argv.configFile); + + ({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); + } + + async exec (): Promise { + assert(this._argv); + assert(this._indexer); + + await this._indexer.resetWatcherToBlock(this._argv.blockNumber); + log('Reset watcher successfully'); + } +} diff --git a/packages/cli/src/watch-contract.ts b/packages/cli/src/watch-contract.ts index 16e465b2..d8d073a2 100644 --- a/packages/cli/src/watch-contract.ts +++ b/packages/cli/src/watch-contract.ts @@ -8,23 +8,19 @@ import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; import { JsonRpcProvider } from '@ethersproject/providers'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { EthClient } from '@cerc-io/ipld-eth-client'; +import { GraphWatcher } from '@cerc-io/graph-node'; import { DEFAULT_CONFIG_PATH, - Config, - getConfig, - initClients, JobQueue, DatabaseInterface, IndexerInterface, ServerConfig, - Database as BaseDatabase, Clients } from '@cerc-io/util'; +import { BaseCmd } from './base'; + interface Arguments { - [x: string]: unknown; configFile: string; address: string; kind: string; @@ -33,22 +29,20 @@ interface Arguments { } export class WatchContractCmd { - _argv?: Arguments - _config?: Config; - _clients?: Clients; - _ethClient?: EthClient; - _ethProvider?: JsonRpcProvider - _database?: DatabaseInterface - _indexer?: IndexerInterface + _argv?: Arguments; + _baseCmd: BaseCmd; + _database?: DatabaseInterface; + _indexer?: IndexerInterface; + + constructor () { + this._baseCmd = new BaseCmd(); + } async initConfig (): Promise { this._argv = this._getArgv(); assert(this._argv); - this._config = await getConfig(this._argv.configFile); - assert(this._config); - - return this._config as any; + return this._baseCmd.initConfig(this._argv.configFile); } async init ( @@ -66,40 +60,9 @@ export class WatchContractCmd { ) => IndexerInterface, clients: { [key: string]: any } = {} ): Promise { - if (!this._config) { - await this.initConfig(); - } - assert(this._config); + await this.initConfig(); - this._database = new Database(this._config.database, this._config.server); - await this._database.init(); - - const jobQueueConfig = this._config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const { ethClient, ethProvider } = await initClients(this._config); - this._ethClient = ethClient; - this._ethProvider = ethProvider; - this._clients = { ethClient, ...clients }; - - // Check if subgraph watcher. - if (this._config.server.subgraphPath) { - const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase); - this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue, graphWatcher); - await this._indexer.init(); - - graphWatcher.setIndexer(this._indexer); - await graphWatcher.init(); - } else { - this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue); - await this._indexer.init(); - } + ({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); } async exec (): Promise { @@ -112,17 +75,6 @@ export class WatchContractCmd { await this._database.close(); } - async _getGraphWatcher (baseDatabase: BaseDatabase): Promise { - assert(this._config); - assert(this._ethClient); - assert(this._ethProvider); - - const graphDb = new GraphDatabase(this._config.server, baseDatabase); - await graphDb.init(); - - return new GraphWatcher(graphDb, this._ethClient, this._ethProvider, this._config.server); - } - _getArgv (): any { return yargs.parserConfiguration({ 'parse-numbers': false diff --git a/packages/eden-watcher/src/cli/reset-cmds/state.ts b/packages/eden-watcher/src/cli/reset-cmds/state.ts index efd8b3c4..33211d6e 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/state.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/state.ts @@ -2,14 +2,10 @@ // Copyright 2022 Vulcanize, Inc. // -import debug from 'debug'; - -import { getConfig, Config } from '@cerc-io/util'; +import { ResetStateCmd } from '@cerc-io/cli'; import { Database } from '../../database'; -const log = debug('vulcanize:reset-state'); - export const command = 'state'; export const desc = 'Reset State to a given block number'; @@ -21,42 +17,8 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const { blockNumber } = argv; - const config: Config = await getConfig(argv.configFile); + const resetStateCmd = new ResetStateCmd(); + await resetStateCmd.init(argv, Database); - // Initialize database - const db = new Database(config.database); - await db.init(); - - // Create a DB transaction - const dbTx = await db.createTransactionRunner(); - - console.time('time:reset-state'); - try { - // Delete all State entries after the given block - await db.removeStatesAfterBlock(dbTx, blockNumber); - - // Reset the stateSyncStatus. - const stateSyncStatus = await db.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockNumber) { - await db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockNumber) { - await db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true); - } - } - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - console.timeEnd('time:reset-state'); - - log(`Reset state successfully to block ${blockNumber}`); + await resetStateCmd.exec(); }; diff --git a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts index 1b32fbc7..295adc45 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts @@ -2,17 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import debug from 'debug'; -import assert from 'assert'; +import { ResetWatcherCmd } from '@cerc-io/cli'; -import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; - -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../../database'; +import { Database } from '../../database'; import { Indexer } from '../../indexer'; -const log = debug('vulcanize:reset-watcher'); - export const command = 'watcher'; export const desc = 'Reset watcher to a block number'; @@ -24,35 +18,8 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config: Config = await getConfig(argv.configFile); - await resetJobs(config); - const { ethClient, ethProvider } = await initClients(config); + const resetWatcherCmd = new ResetWatcherCmd(); + await resetWatcherCmd.init(argv, Database, Indexer); - // Initialize database. - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - await indexer.resetWatcherToBlock(argv.blockNumber); - await indexer.resetLatestEntities(argv.blockNumber); - log('Reset watcher successfully'); + await resetWatcherCmd.exec(); }; diff --git a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts index cfa9bcb4..295adc45 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts @@ -2,16 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import debug from 'debug'; -import assert from 'assert'; - -import { getConfig, initClients, JobQueue, resetJobs, Config } from '@cerc-io/util'; +import { ResetWatcherCmd } from '@cerc-io/cli'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -const log = debug('vulcanize:reset-watcher'); - export const command = 'watcher'; export const desc = 'Reset watcher to a block number'; @@ -23,27 +18,8 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config: Config = await getConfig(argv.configFile); - await resetJobs(config); - const { ethClient, ethProvider } = await initClients(config); + const resetWatcherCmd = new ResetWatcherCmd(); + await resetWatcherCmd.init(argv, Database, Indexer); - // Initialize database. - const db = new Database(config.database); - await db.init(); - - const { jobQueue: jobQueueConfig } = config; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - await indexer.resetWatcherToBlock(argv.blockNumber); - log('Reset watcher successfully'); + await resetWatcherCmd.exec(); }; diff --git a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts index afafc124..295adc45 100644 --- a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts @@ -2,16 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import debug from 'debug'; -import assert from 'assert'; - -import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; +import { ResetWatcherCmd } from '@cerc-io/cli'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -const log = debug('vulcanize:reset-watcher'); - export const command = 'watcher'; export const desc = 'Reset watcher to a block number'; @@ -23,26 +18,8 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config: Config = await getConfig(argv.configFile); - await resetJobs(config); - const { ethClient, ethProvider } = await initClients(config); + const resetWatcherCmd = new ResetWatcherCmd(); + await resetWatcherCmd.init(argv, Database, Indexer); - // Initialize database. - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - await indexer.resetWatcherToBlock(argv.blockNumber); - log('Reset watcher successfully'); + await resetWatcherCmd.exec(); }; diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index aa5d7bd3..7ba42f1c 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -1223,16 +1223,17 @@ export class Database { } async canonicalizeLatestEntity (queryRunner: QueryRunner, entityType: any, latestEntityType: any, entities: any[], blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(entityType); + const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType); + await Promise.all(entities.map(async (entity: any) => { // Get latest pruned (canonical) version for the given entity - const repo = queryRunner.manager.getRepository(entity); const prunedVersion = await this._baseDatabase.getLatestPrunedEntity(repo, entity.id, blockNumber); // If found, update the latestEntity entry for the id // Else, delete the latestEntity entry for the id if (prunedVersion) { // Create a latest entity instance and insert in the db - const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType); const latestEntity = getLatestEntityFromEntity(latestEntityRepo, prunedVersion); await this.updateEntity( diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts index 59095553..295adc45 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts @@ -2,17 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import debug from 'debug'; -import assert from 'assert'; - -import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { ResetWatcherCmd } from '@cerc-io/cli'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -const log = debug('vulcanize:reset-watcher'); - export const command = 'watcher'; export const desc = 'Reset watcher to a block number'; @@ -24,34 +18,8 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config: Config = await getConfig(argv.configFile); - await resetJobs(config); - const { ethClient, ethProvider } = await initClients(config); + const resetWatcherCmd = new ResetWatcherCmd(); + await resetWatcherCmd.init(argv, Database, Indexer); - // Initialize database. - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - await indexer.resetWatcherToBlock(argv.blockNumber); - log('Reset watcher successfully'); + await resetWatcherCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts index afafc124..295adc45 100644 --- a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts @@ -2,16 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import debug from 'debug'; -import assert from 'assert'; - -import { getConfig, initClients, resetJobs, JobQueue, Config } from '@cerc-io/util'; +import { ResetWatcherCmd } from '@cerc-io/cli'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; -const log = debug('vulcanize:reset-watcher'); - export const command = 'watcher'; export const desc = 'Reset watcher to a block number'; @@ -23,26 +18,8 @@ export const builder = { }; export const handler = async (argv: any): Promise => { - const config: Config = await getConfig(argv.configFile); - await resetJobs(config); - const { ethClient, ethProvider } = await initClients(config); + const resetWatcherCmd = new ResetWatcherCmd(); + await resetWatcherCmd.init(argv, Database, Indexer); - // Initialize database. - const db = new Database(config.database); - await db.init(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - await indexer.resetWatcherToBlock(argv.blockNumber); - log('Reset watcher successfully'); + await resetWatcherCmd.exec(); }; diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 45b5f444..43d218c4 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -165,8 +165,9 @@ export interface DatabaseInterface { getStates (where: FindConditions): Promise getDiffStatesInRange (contractAddress: string, startBlock: number, endBlock: number): Promise getNewState (): StateInterface - removeStates(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise - saveOrUpdateState (dbTx: QueryRunner, state: StateInterface): Promise + removeStates(queryRunner: QueryRunner, blockNumber: number, kind: StateKind): Promise + removeStatesAfterBlock?: (queryRunner: QueryRunner, blockNumber: number) => Promise + saveOrUpdateState (queryRunner: QueryRunner, state: StateInterface): Promise getStateSyncStatus (): Promise updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise