diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 72cfd64f..2e18bb57 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -78,7 +78,9 @@ export class BaseCmd { jobQueue: JobQueue, graphWatcher?: GraphWatcher ) => IndexerInterface, - clients: { [key: string]: any } = {} + clients: { [key: string]: any } = {}, + entityQueryTypeMap?: Map, + entityToLatestEntityMap?: Map ): Promise { assert(this._config); @@ -100,7 +102,7 @@ export class BaseCmd { // Check if subgraph watcher. if (this._config.server.subgraphPath) { - const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase); + const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase, entityQueryTypeMap, entityToLatestEntityMap); this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, this._jobQueue, graphWatcher); await this._indexer.init(); @@ -130,12 +132,16 @@ export class BaseCmd { this._eventWatcher = new EventWatcher(this._clients.ethClient, this._indexer, pubsub, this._jobQueue); } - async _getGraphWatcher (baseDatabase: BaseDatabase): Promise { + async _getGraphWatcher ( + baseDatabase: BaseDatabase, + entityQueryTypeMap?: Map, + entityToLatestEntityMap?: Map + ): Promise { assert(this._config); assert(this._clients?.ethClient); assert(this._ethProvider); - this._graphDb = new GraphDatabase(this._config.server, baseDatabase); + this._graphDb = new GraphDatabase(this._config.server, baseDatabase, entityQueryTypeMap, entityToLatestEntityMap); await this._graphDb.init(); return new GraphWatcher(this._graphDb, this._clients.ethClient, this._ethProvider, this._config.server); diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 47c4c425..0ac33f18 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -10,3 +10,4 @@ export * from './checkpoint/verify'; export * from './inspect-cid'; export * from './import-state'; export * from './export-state'; +export * from './server'; diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts new file mode 100644 index 00000000..337641a2 --- /dev/null +++ b/packages/cli/src/server.ts @@ -0,0 +1,128 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import 'reflect-metadata'; +import fs from 'fs'; +import path from 'path'; +import assert from 'assert'; +import { ConnectionOptions } from 'typeorm'; +import { PubSub } from 'graphql-subscriptions'; +import express, { Application } from 'express'; +import { ApolloServer } from 'apollo-server-express'; + +import { JsonRpcProvider } from '@ethersproject/providers'; +import { GraphWatcher } from '@cerc-io/graph-node'; +import { EthClient } from '@cerc-io/ipld-eth-client'; +import { + DEFAULT_CONFIG_PATH, + JobQueue, + DatabaseInterface, + IndexerInterface, + ServerConfig, + Clients, + EventWatcherInterface, + KIND_ACTIVE, + createAndStartServer, + startGQLMetricsServer +} from '@cerc-io/util'; +import { TypeSource } from '@graphql-tools/utils'; + +import { BaseCmd } from './base'; + +interface Arguments { + configFile: string; + importFile: string; +} + +export class ServerCmd { + _argv?: Arguments + _baseCmd: BaseCmd; + + constructor () { + this._baseCmd = new BaseCmd(); + } + + async initConfig (): Promise { + this._argv = this._getArgv(); + assert(this._argv); + + return this._baseCmd.initConfig(this._argv.configFile); + } + + async init ( + Database: new (config: ConnectionOptions, + serverConfig?: ServerConfig + ) => DatabaseInterface, + Indexer: new ( + serverConfig: ServerConfig, + db: DatabaseInterface, + clients: Clients, + ethProvider: JsonRpcProvider, + jobQueue: JobQueue, + graphWatcher?: GraphWatcher + ) => IndexerInterface, + EventWatcher: new( + ethClient: EthClient, + indexer: IndexerInterface, + pubsub: PubSub, + jobQueue: JobQueue + ) => EventWatcherInterface, + clients: { [key: string]: any } = {}, + entityQueryTypeMap?: Map, + entityToLatestEntityMap?: Map + ): Promise { + await this.initConfig(); + + await this._baseCmd.init(Database, Indexer, clients, entityQueryTypeMap, entityToLatestEntityMap); + await this._baseCmd.initEventWatcher(EventWatcher); + } + + async exec ( + createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcherInterface) => Promise, + typeDefs: TypeSource + ): Promise<{ + app: Application, + server: ApolloServer + }> { + const config = this._baseCmd.config; + const jobQueue = this._baseCmd.jobQueue; + const indexer = this._baseCmd.indexer; + const eventWatcher = this._baseCmd.eventWatcher; + + assert(config); + assert(jobQueue); + assert(indexer); + assert(eventWatcher); + + if (config.server.kind === KIND_ACTIVE) { + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); + await eventWatcher.start(); + } + + const resolvers = await createResolvers(indexer, eventWatcher); + + // Create an Express app + const app: Application = express(); + const server = await createAndStartServer(app, typeDefs, resolvers, config.server); + + await startGQLMetricsServer(config); + + return { app, server }; + } + + _getArgv (): any { + return yargs(hideBin(process.argv)) + .option('f', { + alias: 'config-file', + demandOption: true, + describe: 'configuration file path (toml)', + type: 'string', + default: DEFAULT_CONFIG_PATH + }) + .argv; + } +} diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index e1cc786d..a7ad3965 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -34,7 +34,8 @@ import { Staker } from './entity/Staker'; export const SUBGRAPH_ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]); export const ENTITIES = [...SUBGRAPH_ENTITIES]; -export const ENTITY_TO_LATEST_ENTITY_MAP: Map = new Map(); +export const ENTITY_TO_LATEST_ENTITY_MAP = new Map(); +export const ENTITY_QUERY_TYPE_MAP = new Map(); export class Database implements DatabaseInterface { _config: ConnectionOptions; diff --git a/packages/eden-watcher/src/resolvers.ts b/packages/eden-watcher/src/resolvers.ts index a81a759d..4841c021 100644 --- a/packages/eden-watcher/src/resolvers.ts +++ b/packages/eden-watcher/src/resolvers.ts @@ -8,7 +8,7 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; -import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints } from '@cerc-io/util'; +import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints, IndexerInterface, EventWatcherInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; @@ -34,8 +34,9 @@ import { Account } from './entity/Account'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { - assert(indexer); +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { + const indexer = indexerArg as Indexer; + const eventWatcher = eventWatcherArg as EventWatcher; const gqlCacheConfig = indexer.serverConfig.gqlCache; diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index 6e90bfaf..e2f8cca2 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -4,86 +4,26 @@ import fs from 'fs'; import path from 'path'; -import assert from 'assert'; import 'reflect-metadata'; -import express, { Application } from 'express'; -import { PubSub } from 'graphql-subscriptions'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { ServerCmd } from '@cerc-io/cli'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; +import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const serverCmd = new ServerCmd(); + await serverCmd.init(Database, Indexer, EventWatcher, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const { kind: watcherKind } = config.server; - - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - if (watcherKind === KIND_ACTIVE) { - await jobQueue.start(); - // Delete jobs to prevent creating jobs after completion of processing previous block. - await jobQueue.deleteAllJobs(); - await eventWatcher.start(); - } - - const resolvers = await createResolvers(indexer, eventWatcher); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); - // Create an Express app - const app: Application = express(); - const server = createAndStartServer(app, typeDefs, resolvers, config.server); - - startGQLMetricsServer(config); - - return { app, server }; + return serverCmd.exec(createResolvers, typeDefs); }; main().then(() => { diff --git a/packages/erc20-watcher/src/events.ts b/packages/erc20-watcher/src/events.ts index f7bc0374..4cb6e10d 100644 --- a/packages/erc20-watcher/src/events.ts +++ b/packages/erc20-watcher/src/events.ts @@ -11,7 +11,8 @@ import { EventWatcher as BaseEventWatcher, EventWatcherInterface, QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING + QUEUE_EVENT_PROCESSING, + IndexerInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -24,12 +25,12 @@ export class EventWatcher implements EventWatcherInterface { _pubsub: PubSub _jobQueue: JobQueue - constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); this._ethClient = ethClient; - this._indexer = indexer; + this._indexer = indexer as Indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); diff --git a/packages/erc20-watcher/src/resolvers.ts b/packages/erc20-watcher/src/resolvers.ts index d015df46..6a9839d6 100644 --- a/packages/erc20-watcher/src/resolvers.ts +++ b/packages/erc20-watcher/src/resolvers.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; -import { ValueResult } from '@cerc-io/util'; +import { EventWatcherInterface, IndexerInterface, ValueResult } from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; @@ -14,8 +14,9 @@ import { CONTRACT_KIND } from './utils/index'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { - assert(indexer); +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { + const indexer = indexerArg as Indexer; + const eventWatcher = eventWatcherArg as EventWatcher; return { BigInt: new BigInt('bigInt'), diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 4bb71909..ee0cb27e 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -2,19 +2,12 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; -import 'reflect-metadata'; -import express, { Application } from 'express'; -import { PubSub } from 'graphql-subscriptions'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util'; +import { ServerCmd } from '@cerc-io/cli'; import typeDefs from './schema'; - import { createResolvers as createMockResolvers } from './mock/resolvers'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; @@ -24,57 +17,10 @@ import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const serverCmd = new ServerCmd(); + await serverCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const { kind: watcherKind } = config.server; - - const db = new Database(config.database); - await db.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - if (watcherKind === KIND_ACTIVE) { - await jobQueue.start(); - // Delete jobs to prevent creating jobs after completion of processing previous block. - await jobQueue.deleteAllJobs(); - await eventWatcher.start(); - } - - const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); - - // Create an Express app - const app: Application = express(); - const server = createAndStartServer(app, typeDefs, resolvers, config.server); - - startGQLMetricsServer(config); - - return { app, server }; + return process.env.MOCK ? serverCmd.exec(createMockResolvers, typeDefs) : serverCmd.exec(createResolvers, typeDefs); }; main().then(() => { diff --git a/packages/erc721-watcher/src/resolvers.ts b/packages/erc721-watcher/src/resolvers.ts index 1dca46ac..c576cb21 100644 --- a/packages/erc721-watcher/src/resolvers.ts +++ b/packages/erc721-watcher/src/resolvers.ts @@ -8,15 +8,16 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLScalarType } from 'graphql'; -import { ValueResult, BlockHeight, getResultState } from '@cerc-io/util'; +import { ValueResult, BlockHeight, getResultState, IndexerInterface, EventWatcherInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { - assert(indexer); +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { + const indexer = indexerArg as Indexer; + const eventWatcher = eventWatcherArg as EventWatcher; return { BigInt: new BigInt('bigInt'), diff --git a/packages/erc721-watcher/src/server.ts b/packages/erc721-watcher/src/server.ts index 81da07df..5c769659 100644 --- a/packages/erc721-watcher/src/server.ts +++ b/packages/erc721-watcher/src/server.ts @@ -4,16 +4,11 @@ import fs from 'fs'; import path from 'path'; -import assert from 'assert'; import 'reflect-metadata'; -import express, { Application } from 'express'; -import { PubSub } from 'graphql-subscriptions'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util'; +import { ServerCmd } from '@cerc-io/cli'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; @@ -23,58 +18,12 @@ import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const serverCmd = new ServerCmd(); + await serverCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const { kind: watcherKind } = config.server; - - const db = new Database(config.database); - await db.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - if (watcherKind === KIND_ACTIVE) { - await jobQueue.start(); - // Delete jobs to prevent creating jobs after completion of processing previous block. - await jobQueue.deleteAllJobs(); - await eventWatcher.start(); - } - - const resolvers = await createResolvers(indexer, eventWatcher); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); - // Create an Express app - const app: Application = express(); - const server = createAndStartServer(app, typeDefs, resolvers, config.server); - - startGQLMetricsServer(config); - - return { app, server }; + return serverCmd.exec(createResolvers, typeDefs); }; main().then(() => { diff --git a/packages/graph-test-watcher/src/database.ts b/packages/graph-test-watcher/src/database.ts index ec7d49a5..89d8afce 100644 --- a/packages/graph-test-watcher/src/database.ts +++ b/packages/graph-test-watcher/src/database.ts @@ -24,6 +24,7 @@ import { Category } from './entity/Category'; export const SUBGRAPH_ENTITIES = new Set([Author, Blog, Category]); export const ENTITIES = [_Test, GetMethod, ...SUBGRAPH_ENTITIES]; export const ENTITY_TO_LATEST_ENTITY_MAP: Map = new Map(); +export const ENTITY_QUERY_TYPE_MAP = new Map(); export class Database implements DatabaseInterface { _config: ConnectionOptions; diff --git a/packages/graph-test-watcher/src/resolvers.ts b/packages/graph-test-watcher/src/resolvers.ts index 403b7a62..8edbb706 100644 --- a/packages/graph-test-watcher/src/resolvers.ts +++ b/packages/graph-test-watcher/src/resolvers.ts @@ -8,7 +8,17 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; -import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints } from '@cerc-io/util'; +import { + ValueResult, + BlockHeight, + gqlTotalQueryCount, + gqlQueryCount, + jsonBigIntStringReplacer, + getResultState, + setGQLCacheHints, + IndexerInterface, + EventWatcherInterface +} from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; @@ -19,8 +29,9 @@ import { Category } from './entity/Category'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { - assert(indexer); +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { + const indexer = indexerArg as Indexer; + const eventWatcher = eventWatcherArg as EventWatcher; const gqlCacheConfig = indexer.serverConfig.gqlCache; diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index 6e90bfaf..e2f8cca2 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -4,86 +4,26 @@ import fs from 'fs'; import path from 'path'; -import assert from 'assert'; import 'reflect-metadata'; -import express, { Application } from 'express'; -import { PubSub } from 'graphql-subscriptions'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util'; -import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; +import { ServerCmd } from '@cerc-io/cli'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; -import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; +import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const serverCmd = new ServerCmd(); + await serverCmd.init(Database, Indexer, EventWatcher, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const { kind: watcherKind } = config.server; - - const db = new Database(config.database); - await db.init(); - - const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP); - await graphDb.init(); - - const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher); - await indexer.init(); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - if (watcherKind === KIND_ACTIVE) { - await jobQueue.start(); - // Delete jobs to prevent creating jobs after completion of processing previous block. - await jobQueue.deleteAllJobs(); - await eventWatcher.start(); - } - - const resolvers = await createResolvers(indexer, eventWatcher); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); - // Create an Express app - const app: Application = express(); - const server = createAndStartServer(app, typeDefs, resolvers, config.server); - - startGQLMetricsServer(config); - - return { app, server }; + return serverCmd.exec(createResolvers, typeDefs); }; main().then(() => { diff --git a/packages/mobymask-watcher/src/resolvers.ts b/packages/mobymask-watcher/src/resolvers.ts index 7992de60..ba264794 100644 --- a/packages/mobymask-watcher/src/resolvers.ts +++ b/packages/mobymask-watcher/src/resolvers.ts @@ -8,15 +8,16 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLScalarType } from 'graphql'; -import { ValueResult, gqlTotalQueryCount, gqlQueryCount, getResultState } from '@cerc-io/util'; +import { ValueResult, gqlTotalQueryCount, gqlQueryCount, getResultState, IndexerInterface, EventWatcherInterface } from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise => { - assert(indexer); +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { + const indexer = indexerArg as Indexer; + const eventWatcher = eventWatcherArg as EventWatcher; return { BigInt: new BigInt('bigInt'), diff --git a/packages/mobymask-watcher/src/server.ts b/packages/mobymask-watcher/src/server.ts index 81da07df..5c769659 100644 --- a/packages/mobymask-watcher/src/server.ts +++ b/packages/mobymask-watcher/src/server.ts @@ -4,16 +4,11 @@ import fs from 'fs'; import path from 'path'; -import assert from 'assert'; import 'reflect-metadata'; -import express, { Application } from 'express'; -import { PubSub } from 'graphql-subscriptions'; -import yargs from 'yargs'; -import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import 'graphql-import-node'; -import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util'; +import { ServerCmd } from '@cerc-io/cli'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; @@ -23,58 +18,12 @@ import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { - const argv = await yargs(hideBin(process.argv)) - .option('f', { - alias: 'config-file', - demandOption: true, - describe: 'configuration file path (toml)', - type: 'string', - default: DEFAULT_CONFIG_PATH - }) - .argv; + const serverCmd = new ServerCmd(); + await serverCmd.init(Database, Indexer, EventWatcher); - const config: Config = await getConfig(argv.f); - const { ethClient, ethProvider } = await initClients(config); - - const { kind: watcherKind } = config.server; - - const db = new Database(config.database); - await db.init(); - - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - - const jobQueueConfig = config.jobQueue; - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - - const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); - await indexer.init(); - - const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - - if (watcherKind === KIND_ACTIVE) { - await jobQueue.start(); - // Delete jobs to prevent creating jobs after completion of processing previous block. - await jobQueue.deleteAllJobs(); - await eventWatcher.start(); - } - - const resolvers = await createResolvers(indexer, eventWatcher); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); - // Create an Express app - const app: Application = express(); - const server = createAndStartServer(app, typeDefs, resolvers, config.server); - - startGQLMetricsServer(config); - - return { app, server }; + return serverCmd.exec(createResolvers, typeDefs); }; main().then(() => { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 374ce025..d0787960 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -135,6 +135,7 @@ export interface IndexerInterface { } export interface EventWatcherInterface { + start (): Promise getBlockProgressEventIterator (): AsyncIterator initBlockProcessingOnCompleteHandler (): Promise initEventProcessingOnCompleteHandler (): Promise