diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index c2814166..72cfd64f 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -5,6 +5,7 @@ import 'reflect-metadata'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; +import { PubSub } from 'graphql-subscriptions'; import { JsonRpcProvider } from '@ethersproject/providers'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; @@ -17,15 +18,44 @@ import { IndexerInterface, ServerConfig, Database as BaseDatabase, - Clients + Clients, + EventWatcherInterface } from '@cerc-io/util'; +import { EthClient } from '@cerc-io/ipld-eth-client'; export class BaseCmd { _config?: Config; _clients?: Clients; _ethProvider?: JsonRpcProvider; + _jobQueue?: JobQueue _database?: DatabaseInterface; _indexer?: IndexerInterface; + _graphDb?: GraphDatabase + _eventWatcher?: EventWatcherInterface + + get config (): Config | undefined { + return this._config; + } + + get jobQueue (): JobQueue | undefined { + return this._jobQueue; + } + + get database (): DatabaseInterface | undefined { + return this._database; + } + + get graphDb (): GraphDatabase | undefined { + return this._graphDb; + } + + get indexer (): IndexerInterface | undefined { + return this._indexer; + } + + get eventWatcher (): EventWatcherInterface | undefined { + return this._eventWatcher; + } async initConfig (configFile: string): Promise { if (!this._config) { @@ -49,10 +79,7 @@ export class BaseCmd { graphWatcher?: GraphWatcher ) => IndexerInterface, clients: { [key: string]: any } = {} - ): Promise<{ - database: DatabaseInterface, - indexer: IndexerInterface - }> { + ): Promise { assert(this._config); this._database = new Database(this._config.database, this._config.server); @@ -64,8 +91,8 @@ export class BaseCmd { const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); + this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await this._jobQueue.start(); const { ethClient, ethProvider } = await initClients(this._config); this._ethProvider = ethProvider; @@ -74,17 +101,33 @@ export class BaseCmd { // Check if subgraph watcher. if (this._config.server.subgraphPath) { const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase); - this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue, graphWatcher); + this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, this._jobQueue, graphWatcher); await this._indexer.init(); graphWatcher.setIndexer(this._indexer); await graphWatcher.init(); } else { - this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue); + this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, this._jobQueue); await this._indexer.init(); } + } - return { database: this._database, indexer: this._indexer }; + async initEventWatcher ( + EventWatcher: new( + ethClient: EthClient, + indexer: IndexerInterface, + pubsub: PubSub, + jobQueue: JobQueue + ) => EventWatcherInterface + ): Promise { + assert(this._clients?.ethClient); + assert(this._indexer); + assert(this._jobQueue); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + this._eventWatcher = new EventWatcher(this._clients.ethClient, this._indexer, pubsub, this._jobQueue); } async _getGraphWatcher (baseDatabase: BaseDatabase): Promise { @@ -92,9 +135,9 @@ export class BaseCmd { assert(this._clients?.ethClient); assert(this._ethProvider); - const graphDb = new GraphDatabase(this._config.server, baseDatabase); - await graphDb.init(); + this._graphDb = new GraphDatabase(this._config.server, baseDatabase); + await this._graphDb.init(); - return new GraphWatcher(graphDb, this._clients.ethClient, this._ethProvider, this._config.server); + return new GraphWatcher(this._graphDb, this._clients.ethClient, this._ethProvider, this._config.server); } } diff --git a/packages/cli/src/checkpoint/create.ts b/packages/cli/src/checkpoint/create.ts index 6128a1e2..94b7622a 100644 --- a/packages/cli/src/checkpoint/create.ts +++ b/packages/cli/src/checkpoint/create.ts @@ -60,17 +60,21 @@ export class CreateCheckpointCmd { this._argv = argv; await this.initConfig(argv.configFile); - ({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); + await this._baseCmd.init(Database, Indexer, clients); } async exec (): Promise { assert(this._argv); - assert(this._database); - assert(this._indexer); - const blockHash = await this._indexer.processCLICheckpoint(this._argv.address, this._argv.blockHash); + const database = this._baseCmd.database; + const indexer = this._baseCmd.indexer; - await this._database.close(); + assert(database); + assert(indexer); + + const blockHash = await indexer.processCLICheckpoint(this._argv.address, this._argv.blockHash); + + await database.close(); log(`Created a checkpoint for contract ${this._argv.address} at block-hash ${blockHash}`); } } diff --git a/packages/cli/src/import-state.ts b/packages/cli/src/import-state.ts new file mode 100644 index 00000000..489278d8 --- /dev/null +++ b/packages/cli/src/import-state.ts @@ -0,0 +1,181 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import yargs from 'yargs'; +import 'reflect-metadata'; +import assert from 'assert'; +import path from 'path'; +import fs from 'fs'; +import debug from 'debug'; +import { ConnectionOptions } from 'typeorm'; +import { PubSub } from 'graphql-subscriptions'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { GraphWatcher, updateEntitiesFromState } from '@cerc-io/graph-node'; +import { EthClient } from '@cerc-io/ipld-eth-client'; +import { + DEFAULT_CONFIG_PATH, + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Clients, + EventWatcherInterface, + fillBlocks, + StateKind +} from '@cerc-io/util'; +import * as codec from '@ipld/dag-cbor'; + +import { BaseCmd } from './base'; + +const log = debug('vulcanize:import-state'); + +interface Arguments { + configFile: string; + importFile: string; +} + +export class ImportStateCmd { + _argv?: Arguments + _baseCmd: BaseCmd; + + constructor () { + this._baseCmd = new BaseCmd(); + } + + async initConfig (): Promise { + this._argv = this._getArgv(); + assert(this._argv); + + return this._baseCmd.initConfig(this._argv.configFile); + } + + async init ( + Database: new (config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcher + ) => IndexerInterface, + EventWatcher: new( + ethClient: EthClient, + indexer: IndexerInterface, + pubsub: PubSub, + jobQueue: JobQueue + ) => EventWatcherInterface, + clients: { [key: string]: any } = {} + ): Promise { + await this.initConfig(); + + await this._baseCmd.init(Database, Indexer, clients); + await this._baseCmd.initEventWatcher(EventWatcher); + } + + async exec (State: new() => any): Promise { + assert(this._argv); + + const config = this._baseCmd.config; + const jobQueue = this._baseCmd.jobQueue; + const database = this._baseCmd.database; + const indexer = this._baseCmd.indexer; + const eventWatcher = this._baseCmd.eventWatcher; + + assert(config); + assert(jobQueue); + assert(database); + assert(indexer); + assert(eventWatcher); + + // Import data. + const importFilePath = path.resolve(this._argv.importFile); + const encodedImportData = fs.readFileSync(importFilePath); + const importData = codec.decode(Buffer.from(encodedImportData)) as any; + + // Fill the snapshot block. + await fillBlocks( + jobQueue, + indexer, + eventWatcher, + config.jobQueue.blockDelayInMilliSecs, + { + prefetch: true, + startBlock: importData.snapshotBlock.blockNumber, + endBlock: importData.snapshotBlock.blockNumber + } + ); + + // Fill the Contracts. + for (const contract of importData.contracts) { + indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); + } + + // Get the snapshot block. + const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); + assert(block); + + // Fill the States. + for (const checkpoint of importData.stateCheckpoints) { + let state = new State(); + + state = Object.assign(state, checkpoint); + state.block = block; + state.data = Buffer.from(codec.encode(state.data)); + + state = await indexer.saveOrUpdateState(state); + + // Fill entities using State if: + // relationsMap defined for the watcher, + // graphDb instance is avaiable + // TODO: Fill latest entity tables + if (indexer.getRelationsMap) { + if (this._baseCmd.graphDb) { + await updateEntitiesFromState(this._baseCmd.graphDb, indexer, state); + } else if (database.graphDatabase) { + await updateEntitiesFromState(database.graphDatabase, indexer, state); + } + } + } + + // Mark snapshot block as completely processed. + block.isComplete = true; + await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); + await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber); + await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber); + await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber); + await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber); + + // The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block. + await indexer.removeStates(block.blockNumber, StateKind.Init); + await indexer.removeStates(block.blockNumber, StateKind.DiffStaged); + + log(`Import completed for snapshot block at height ${block.blockNumber}`); + await database.close(); + } + + _getArgv (): any { + return yargs.parserConfiguration({ + 'parse-numbers': false + }).options({ + configFile: { + alias: 'f', + type: 'string', + require: true, + demandOption: true, + describe: 'Configuration file path (toml)', + default: DEFAULT_CONFIG_PATH + }, + importFile: { + alias: 'i', + type: 'string', + demandOption: true, + describe: 'Import file path (JSON)' + } + }).argv; + } +} diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index a99bfe11..f8a94c26 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -7,3 +7,4 @@ export * from './reset/watcher'; export * from './reset/state'; export * from './checkpoint/create'; export * from './inspect-cid'; +export * from './import-state'; diff --git a/packages/cli/src/inspect-cid.ts b/packages/cli/src/inspect-cid.ts index 80db02d0..d4563579 100644 --- a/packages/cli/src/inspect-cid.ts +++ b/packages/cli/src/inspect-cid.ts @@ -32,8 +32,6 @@ interface Arguments { export class InspectCIDCmd { _argv?: Arguments _baseCmd: BaseCmd; - _database?: DatabaseInterface; - _indexer?: IndexerInterface; constructor () { this._baseCmd = new BaseCmd(); @@ -63,21 +61,25 @@ export class InspectCIDCmd { ): Promise { await this.initConfig(); - ({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); + await this._baseCmd.init(Database, Indexer, clients); } async exec (): Promise { assert(this._argv); - assert(this._database); - assert(this._indexer); - const state = await this._indexer.getStateByCID(this._argv.cid); + const database = this._baseCmd.database; + const indexer = this._baseCmd.indexer; + + assert(database); + assert(indexer); + + const state = await indexer.getStateByCID(this._argv.cid); assert(state, 'State for the provided CID doesn\'t exist.'); - const stateData = await this._indexer.getStateData(state); + const stateData = await indexer.getStateData(state); log(util.inspect(stateData, false, null)); - await this._database.close(); + await database.close(); } _getArgv (): any { diff --git a/packages/cli/src/reset/watcher.ts b/packages/cli/src/reset/watcher.ts index 12de4472..430bfbc5 100644 --- a/packages/cli/src/reset/watcher.ts +++ b/packages/cli/src/reset/watcher.ts @@ -29,8 +29,6 @@ interface Arguments { export class ResetWatcherCmd { _argv?: Arguments _baseCmd: BaseCmd; - _database?: DatabaseInterface; - _indexer?: IndexerInterface; constructor () { this._baseCmd = new BaseCmd(); @@ -59,17 +57,21 @@ export class ResetWatcherCmd { this._argv = argv; await this.initConfig(argv.configFile); - ({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); + await this._baseCmd.init(Database, Indexer, clients); } async exec (): Promise { assert(this._argv); - assert(this._database); - assert(this._indexer); - await this._indexer.resetWatcherToBlock(this._argv.blockNumber); + const database = this._baseCmd.database; + const indexer = this._baseCmd.indexer; - await this._database.close(); + assert(database); + assert(indexer); + + await indexer.resetWatcherToBlock(this._argv.blockNumber); + + await database.close(); log('Reset watcher successfully'); } } diff --git a/packages/cli/src/watch-contract.ts b/packages/cli/src/watch-contract.ts index d8d073a2..610cb3c0 100644 --- a/packages/cli/src/watch-contract.ts +++ b/packages/cli/src/watch-contract.ts @@ -31,8 +31,6 @@ interface Arguments { export class WatchContractCmd { _argv?: Arguments; _baseCmd: BaseCmd; - _database?: DatabaseInterface; - _indexer?: IndexerInterface; constructor () { this._baseCmd = new BaseCmd(); @@ -62,17 +60,20 @@ export class WatchContractCmd { ): Promise { await this.initConfig(); - ({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); + await this._baseCmd.init(Database, Indexer, clients); } async exec (): Promise { assert(this._argv); - assert(this._database); - assert(this._indexer); - assert(this._indexer.watchContract); - await this._indexer.watchContract(this._argv.address, this._argv.kind, this._argv.checkpoint, this._argv.startingBlock); - await this._database.close(); + const database = this._baseCmd.database; + const indexer = this._baseCmd.indexer; + + assert(database); + assert(indexer); + + await indexer.watchContract(this._argv.address, this._argv.kind, this._argv.checkpoint, this._argv.startingBlock); + await database.close(); } _getArgv (): any { diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index 377c344e..f7bc0374 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -11,8 +11,7 @@ import { EventWatcher as BaseEventWatcher, EventWatcherInterface, QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - UpstreamConfig + QUEUE_EVENT_PROCESSING } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -25,7 +24,7 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); @@ -33,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface { this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index 59d8daea..4a4c6241 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -106,7 +106,7 @@ export const main = async (): Promise => { // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index 4281a477..dd3c81ef 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -77,7 +77,7 @@ export const main = async (): Promise => { await graphWatcher.init(); {{/if}} - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); // Import data. const importFilePath = path.resolve(argv.importFile); diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index ef635466..b64aa109 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -71,7 +71,7 @@ export const main = async (): Promise => { await graphWatcher.init(); {{/if}} - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index 1182abb2..871f1b00 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -2,20 +2,12 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import fs from 'fs'; -import path from 'path'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import * as codec from '@ipld/dag-cbor'; +import { ImportStateCmd } from '@cerc-io/cli'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; +import { Database } from '../database'; import { Indexer } from '../indexer'; import { EventWatcher } from '../events'; import { State } from '../entity/State'; @@ -23,109 +15,10 @@ import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - importFile: { - alias: 'i', - type: 'string', - demandOption: true, - describe: 'Import file path (JSON)' - } - }).argv; + const importStateCmd = new ImportStateCmd(); + await importStateCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - - // Import data. - const importFilePath = path.resolve(argv.importFile); - const encodedImportData = fs.readFileSync(importFilePath); - const importData = codec.decode(Buffer.from(encodedImportData)) as any; - - // Fill the snapshot block. - await fillBlocks( - jobQueue, - indexer, - eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, - { - prefetch: true, - startBlock: importData.snapshotBlock.blockNumber, - endBlock: importData.snapshotBlock.blockNumber - } - ); - - // Fill the Contracts. - for (const contract of importData.contracts) { - await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); - } - - // Get the snapshot block. - const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); - assert(block); - - // Fill the States. - for (const checkpoint of importData.stateCheckpoints) { - let state = new State(); - - state = Object.assign(state, checkpoint); - state.block = block; - - state.data = Buffer.from(codec.encode(state.data)); - - state = await indexer.saveOrUpdateState(state); - await graphWatcher.updateEntitiesFromState(state); - } - - // Mark snapshot block as completely processed. - block.isComplete = true; - await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); - await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber); - await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber); - await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber); - await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber); - - // The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block. - await indexer.removeStates(block.blockNumber, StateKind.Init); - await indexer.removeStates(block.blockNumber, StateKind.DiffStaged); - - log(`Import completed for snapshot block at height ${block.blockNumber}`); + await importStateCmd.exec(State); }; main().catch(err => { diff --git a/packages/eden-watcher/src/events.ts b/packages/eden-watcher/src/events.ts index 377c344e..4cb6e10d 100644 --- a/packages/eden-watcher/src/events.ts +++ b/packages/eden-watcher/src/events.ts @@ -12,7 +12,7 @@ import { EventWatcherInterface, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - UpstreamConfig + IndexerInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); this._ethClient = ethClient; - this._indexer = indexer; + this._indexer = indexer as Indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index 09b25ff6..fd027258 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -96,7 +96,7 @@ export const main = async (): Promise => { // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 6064ca60..ab981791 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -758,7 +758,6 @@ export class Indexer implements IndexerInterface { } _populateRelationsMap (): void { - // Needs to be generated by codegen. this._relationsMap.set(ProducerSet, { producers: { entity: Producer, diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index 29c81a9f..6e90bfaf 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -65,7 +65,7 @@ export const main = async (): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/erc20-watcher/src/events.ts b/packages/erc20-watcher/src/events.ts index 377c344e..f7bc0374 100644 --- a/packages/erc20-watcher/src/events.ts +++ b/packages/erc20-watcher/src/events.ts @@ -11,8 +11,7 @@ import { EventWatcher as BaseEventWatcher, EventWatcherInterface, QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - UpstreamConfig + QUEUE_EVENT_PROCESSING } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -25,7 +24,7 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); @@ -33,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface { this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index a85ef072..305b892c 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -76,7 +76,7 @@ export const main = async (): Promise => { const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 58d702c3..53ccda1d 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -12,7 +12,19 @@ import { BaseProvider } from '@ethersproject/providers'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; -import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions, ServerConfig, StateStatus, DatabaseInterface, Clients } from '@cerc-io/util'; +import { + IndexerInterface, + Indexer as BaseIndexer, + ValueResult, + JobQueue, + Where, + QueryOptions, + ServerConfig, + StateStatus, + DatabaseInterface, + Clients, + StateKind +} from '@cerc-io/util'; import { Database, ENTITIES } from './database'; import { Event } from './entity/Event'; @@ -273,6 +285,14 @@ export class Indexer implements IndexerInterface { return undefined; } + async saveOrUpdateState (state: State): Promise { + return {} as State; + } + + async removeStates (blockNumber: number, kind: StateKind): Promise { + // TODO Implement + } + getStateData (state: State): any { return this._baseIndexer.getStateData(state); } diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 26482261..a812707e 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -57,7 +57,7 @@ export const main = async (): Promise => { const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/erc721-watcher/src/cli/import-state.ts b/packages/erc721-watcher/src/cli/import-state.ts index 6947cd78..871f1b00 100644 --- a/packages/erc721-watcher/src/cli/import-state.ts +++ b/packages/erc721-watcher/src/cli/import-state.ts @@ -2,17 +2,10 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import fs from 'fs'; -import path from 'path'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; -import * as codec from '@ipld/dag-cbor'; +import { ImportStateCmd } from '@cerc-io/cli'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -22,100 +15,10 @@ import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - importFile: { - alias: 'i', - type: 'string', - demandOption: true, - describe: 'Import file path (JSON)' - } - }).argv; + const importStateCmd = new ImportStateCmd(); + await importStateCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - - // Import data. - const importFilePath = path.resolve(argv.importFile); - const encodedImportData = fs.readFileSync(importFilePath); - const importData = codec.decode(Buffer.from(encodedImportData)) as any; - - // Fill the snapshot block. - await fillBlocks( - jobQueue, - indexer, - eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, - { - prefetch: true, - startBlock: importData.snapshotBlock.blockNumber, - endBlock: importData.snapshotBlock.blockNumber - } - ); - - // Fill the Contracts. - for (const contract of importData.contracts) { - await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); - } - - // Get the snapshot block. - const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); - assert(block); - - // Fill the States. - for (const checkpoint of importData.stateCheckpoints) { - let state = new State(); - - state = Object.assign(state, checkpoint); - state.block = block; - - state.data = Buffer.from(codec.encode(state.data)); - - state = await indexer.saveOrUpdateState(state); - } - - // Mark snapshot block as completely processed. - block.isComplete = true; - await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); - await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber); - await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber); - await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber); - await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber); - - // The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block. - await indexer.removeStates(block.blockNumber, StateKind.Init); - await indexer.removeStates(block.blockNumber, StateKind.DiffStaged); - - log(`Import completed for snapshot block at height ${block.blockNumber}`); + await importStateCmd.exec(State); }; main().catch(err => { diff --git a/packages/erc721-watcher/src/events.ts b/packages/erc721-watcher/src/events.ts index 377c344e..4cb6e10d 100644 --- a/packages/erc721-watcher/src/events.ts +++ b/packages/erc721-watcher/src/events.ts @@ -12,7 +12,7 @@ import { EventWatcherInterface, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - UpstreamConfig + IndexerInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); this._ethClient = ethClient; - this._indexer = indexer; + this._indexer = indexer as Indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/erc721-watcher/src/fill.ts b/packages/erc721-watcher/src/fill.ts index 66e33a47..aaf25f36 100644 --- a/packages/erc721-watcher/src/fill.ts +++ b/packages/erc721-watcher/src/fill.ts @@ -74,7 +74,7 @@ export const main = async (): Promise => { // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; diff --git a/packages/erc721-watcher/src/server.ts b/packages/erc721-watcher/src/server.ts index 6e40f88e..82195f3e 100644 --- a/packages/erc721-watcher/src/server.ts +++ b/packages/erc721-watcher/src/server.ts @@ -56,7 +56,7 @@ export const main = async (): Promise => { const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 8ae593e1..66c5cd94 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -348,6 +348,7 @@ export class GraphWatcher { } } + // TODO Remove after updating codegen CLIs async updateEntitiesFromState (state: StateInterface) { assert(this._indexer); await updateEntitiesFromState(this._database, this._indexer, state); diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 1c19cd52..42ee273b 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -13,7 +13,8 @@ import { StateSyncStatusInterface, StateInterface, getResultEvent, - ResultEvent + ResultEvent, + StateKind } from '@cerc-io/util'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -188,6 +189,10 @@ export class Indexer implements IndexerInterface { return undefined; } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + return undefined; + } + async processBlock (blockProgress: BlockProgressInterface): Promise { return undefined; } @@ -208,6 +213,14 @@ export class Indexer implements IndexerInterface { return undefined; } + async saveOrUpdateState (state: StateInterface): Promise { + return {} as StateInterface; + } + + async removeStates (blockNumber: number, kind: StateKind): Promise { + // TODO Implement + } + getStateData (state: StateInterface): any { return undefined; } diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index 1182abb2..871f1b00 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -2,20 +2,12 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import fs from 'fs'; -import path from 'path'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import * as codec from '@ipld/dag-cbor'; +import { ImportStateCmd } from '@cerc-io/cli'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; +import { Database } from '../database'; import { Indexer } from '../indexer'; import { EventWatcher } from '../events'; import { State } from '../entity/State'; @@ -23,109 +15,10 @@ import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - importFile: { - alias: 'i', - type: 'string', - demandOption: true, - describe: 'Import file path (JSON)' - } - }).argv; + const importStateCmd = new ImportStateCmd(); + await importStateCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - - // Import data. - const importFilePath = path.resolve(argv.importFile); - const encodedImportData = fs.readFileSync(importFilePath); - const importData = codec.decode(Buffer.from(encodedImportData)) as any; - - // Fill the snapshot block. - await fillBlocks( - jobQueue, - indexer, - eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, - { - prefetch: true, - startBlock: importData.snapshotBlock.blockNumber, - endBlock: importData.snapshotBlock.blockNumber - } - ); - - // Fill the Contracts. - for (const contract of importData.contracts) { - await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); - } - - // Get the snapshot block. - const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); - assert(block); - - // Fill the States. - for (const checkpoint of importData.stateCheckpoints) { - let state = new State(); - - state = Object.assign(state, checkpoint); - state.block = block; - - state.data = Buffer.from(codec.encode(state.data)); - - state = await indexer.saveOrUpdateState(state); - await graphWatcher.updateEntitiesFromState(state); - } - - // Mark snapshot block as completely processed. - block.isComplete = true; - await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); - await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber); - await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber); - await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber); - await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber); - - // The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block. - await indexer.removeStates(block.blockNumber, StateKind.Init); - await indexer.removeStates(block.blockNumber, StateKind.DiffStaged); - - log(`Import completed for snapshot block at height ${block.blockNumber}`); + await importStateCmd.exec(State); }; main().catch(err => { diff --git a/packages/graph-test-watcher/src/events.ts b/packages/graph-test-watcher/src/events.ts index 377c344e..4cb6e10d 100644 --- a/packages/graph-test-watcher/src/events.ts +++ b/packages/graph-test-watcher/src/events.ts @@ -12,7 +12,7 @@ import { EventWatcherInterface, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - UpstreamConfig + IndexerInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); this._ethClient = ethClient; - this._indexer = indexer; + this._indexer = indexer as Indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index 09b25ff6..fd027258 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -96,7 +96,7 @@ export const main = async (): Promise => { // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index dc632a74..fb576727 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -616,7 +616,6 @@ export class Indexer implements IndexerInterface { } _populateRelationsMap (): void { - // Needs to be generated by codegen. this._relationsMap.set(Author, { blogs: { entity: Blog, diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index 29c81a9f..6e90bfaf 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -65,7 +65,7 @@ export const main = async (): Promise => { graphWatcher.setIndexer(indexer); await graphWatcher.init(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/mobymask-watcher/src/cli/import-state.ts b/packages/mobymask-watcher/src/cli/import-state.ts index 6947cd78..871f1b00 100644 --- a/packages/mobymask-watcher/src/cli/import-state.ts +++ b/packages/mobymask-watcher/src/cli/import-state.ts @@ -2,17 +2,10 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { PubSub } from 'graphql-subscriptions'; -import fs from 'fs'; -import path from 'path'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; -import * as codec from '@ipld/dag-cbor'; +import { ImportStateCmd } from '@cerc-io/cli'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -22,100 +15,10 @@ import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).options({ - configFile: { - alias: 'f', - type: 'string', - demandOption: true, - describe: 'configuration file path (toml)', - default: DEFAULT_CONFIG_PATH - }, - importFile: { - alias: 'i', - type: 'string', - demandOption: true, - describe: 'Import file path (JSON)' - } - }).argv; + const importStateCmd = new ImportStateCmd(); + await importStateCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.configFile); - const { ethClient, ethProvider } = await initClients(config); - - const db = new Database(config.database); - await db.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); - - // Import data. - const importFilePath = path.resolve(argv.importFile); - const encodedImportData = fs.readFileSync(importFilePath); - const importData = codec.decode(Buffer.from(encodedImportData)) as any; - - // Fill the snapshot block. - await fillBlocks( - jobQueue, - indexer, - eventWatcher, - jobQueueConfig.blockDelayInMilliSecs, - { - prefetch: true, - startBlock: importData.snapshotBlock.blockNumber, - endBlock: importData.snapshotBlock.blockNumber - } - ); - - // Fill the Contracts. - for (const contract of importData.contracts) { - await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); - } - - // Get the snapshot block. - const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash); - assert(block); - - // Fill the States. - for (const checkpoint of importData.stateCheckpoints) { - let state = new State(); - - state = Object.assign(state, checkpoint); - state.block = block; - - state.data = Buffer.from(codec.encode(state.data)); - - state = await indexer.saveOrUpdateState(state); - } - - // Mark snapshot block as completely processed. - block.isComplete = true; - await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); - await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber); - await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber); - await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber); - await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber); - - // The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block. - await indexer.removeStates(block.blockNumber, StateKind.Init); - await indexer.removeStates(block.blockNumber, StateKind.DiffStaged); - - log(`Import completed for snapshot block at height ${block.blockNumber}`); + await importStateCmd.exec(State); }; main().catch(err => { diff --git a/packages/mobymask-watcher/src/events.ts b/packages/mobymask-watcher/src/events.ts index 377c344e..4cb6e10d 100644 --- a/packages/mobymask-watcher/src/events.ts +++ b/packages/mobymask-watcher/src/events.ts @@ -12,7 +12,7 @@ import { EventWatcherInterface, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, - UpstreamConfig + IndexerInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); this._ethClient = ethClient; - this._indexer = indexer; + this._indexer = indexer as Indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/mobymask-watcher/src/fill.ts b/packages/mobymask-watcher/src/fill.ts index 66e33a47..aaf25f36 100644 --- a/packages/mobymask-watcher/src/fill.ts +++ b/packages/mobymask-watcher/src/fill.ts @@ -74,7 +74,7 @@ export const main = async (): Promise => { // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); }; diff --git a/packages/mobymask-watcher/src/server.ts b/packages/mobymask-watcher/src/server.ts index 24bb6721..81da07df 100644 --- a/packages/mobymask-watcher/src/server.ts +++ b/packages/mobymask-watcher/src/server.ts @@ -56,7 +56,7 @@ export const main = async (): Promise => { const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index ff3b070e..f6761c72 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -12,7 +12,6 @@ import { JobQueue } from './job-queue'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; import { createPruningJob, processBlockByNumber } from './common'; -import { UpstreamConfig } from './config'; import { OrderDirection } from './database'; const EVENT = 'event'; @@ -27,10 +26,8 @@ export class EventWatcher { _subscription?: ZenObservable.Subscription _pubsub: PubSub _jobQueue: JobQueue - _upstreamConfig: UpstreamConfig - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - this._upstreamConfig = upstreamConfig; + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; this._indexer = indexer; this._pubsub = pubsub; diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index d9fc4552..59f8e337 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -109,7 +109,7 @@ export interface IndexerInterface { isWatchedContract: (address: string) => ContractInterface | undefined; getContractsByKind?: (kind: string) => ContractInterface[] cacheContract?: (contract: ContractInterface) => void; - watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise + watchContract: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise getEntityTypesMap?: () => Map getRelationsMap?: () => Map createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise @@ -125,6 +125,8 @@ export interface IndexerInterface { updateStateStatusMap (address: string, stateStatus: StateStatus): void getStateData (state: StateInterface): any getStateByCID (cid: string): Promise + saveOrUpdateState (state: StateInterface): Promise + removeStates (blockNumber: number, kind: StateKind): Promise resetWatcherToBlock (blockNumber: number): Promise getResultEvent (event: EventInterface): any } @@ -138,6 +140,7 @@ export interface EventWatcherInterface { export interface DatabaseInterface { _conn: Connection; readonly baseDatabase: Database + readonly graphDatabase?: any init (): Promise; close (): Promise; createTransactionRunner (): Promise;