From bb1345c696da3d241a64eba731caecf6c77b24f3 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Mon, 1 Nov 2021 11:31:54 +0530 Subject: [PATCH] Refactor code to initialize clients (#290) * Refactor CLI code * Refactor code to initialize clients * Don't return config from function to initialize clients --- .../templates/checkpoint-template.handlebars | 36 ++-------- .../export-state-template.handlebars | 35 ++-------- .../src/templates/fill-template.handlebars | 45 +++--------- .../import-state-template.handlebars | 42 +++-------- .../templates/inspect-cid-template.handlebars | 35 ++-------- .../templates/job-runner-template.handlebars | 40 +++-------- .../templates/reset-state-template.handlebars | 8 +-- .../src/templates/server-template.handlebars | 43 +++--------- .../watch-contract-template.handlebars | 36 ++-------- .../erc20-watcher/src/cli/reset-cmds/state.ts | 10 +-- .../erc20-watcher/src/cli/watch-contract.ts | 4 +- packages/erc20-watcher/src/fill.ts | 45 +++--------- packages/erc20-watcher/src/job-runner.ts | 47 +++---------- packages/erc20-watcher/src/server.ts | 38 ++-------- .../src/cli/reset-cmds/state.ts | 10 +-- packages/uni-info-watcher/src/fill.ts | 39 +++-------- packages/uni-info-watcher/src/job-runner.ts | 70 ++++--------------- packages/uni-info-watcher/src/server.ts | 46 ++---------- .../uni-watcher/src/cli/reset-cmds/state.ts | 11 ++- .../uni-watcher/src/cli/watch-contract.ts | 4 +- packages/uni-watcher/src/fill.ts | 42 +++-------- packages/uni-watcher/src/job-runner.ts | 45 +++--------- packages/uni-watcher/src/server.ts | 37 ++-------- packages/util/index.ts | 2 +- packages/util/src/config.ts | 13 ++-- .../src/{ipldHelper.ts => ipld-helper.ts} | 0 packages/util/src/job-runner.ts | 6 +- 27 files changed, 162 insertions(+), 627 deletions(-) rename packages/util/src/{ipldHelper.ts => ipld-helper.ts} (100%) diff --git a/packages/codegen/src/templates/checkpoint-template.handlebars b/packages/codegen/src/templates/checkpoint-template.handlebars index 3bb49d97..03cbbd53 100644 --- a/packages/codegen/src/templates/checkpoint-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-template.handlebars @@ -2,15 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; -import { getDefaultProvider } from 'ethers'; -import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -42,36 +38,12 @@ const main = async (): Promise => { }).argv; const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getDefaultProvider(rpcProviderEndpoint); - - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); const blockHash = await indexer.processCLICheckpoint(argv.address, argv.blockHash); log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`); diff --git a/packages/codegen/src/templates/export-state-template.handlebars b/packages/codegen/src/templates/export-state-template.handlebars index 070103d2..a85d710e 100644 --- a/packages/codegen/src/templates/export-state-template.handlebars +++ b/packages/codegen/src/templates/export-state-template.handlebars @@ -6,13 +6,10 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; -import { getDefaultProvider } from 'ethers'; import fs from 'fs'; import path from 'path'; -import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; import * as codec from '@ipld/dag-cbor'; import { Database } from '../database'; @@ -40,36 +37,12 @@ const main = async (): Promise => { }).argv; const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getDefaultProvider(rpcProviderEndpoint); - - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); const exportData: any = { snapshotBlock: {}, diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index f19e0dac..60a5c481 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -9,9 +9,7 @@ import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import { PubSub } from 'apollo-server-express'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util'; +import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; import { Database } from './database'; import { Indexer } from './indexer'; @@ -42,40 +40,19 @@ export const main = async (): Promise => { } }).argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -83,11 +60,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); - assert(jobQueueConfig, 'Missing job queue config'); - - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, argv); }; main().catch(err => { diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index 6127f610..26eace38 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -8,13 +8,10 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import { PubSub } from 'apollo-server-express'; -import { getDefaultProvider } from 'ethers'; import fs from 'fs'; import path from 'path'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH } from '@vulcanize/util'; +import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients } from '@vulcanize/util'; import * as codec from '@ipld/dag-cbor'; import { Database } from '../database'; @@ -43,40 +40,19 @@ export const main = async (): Promise => { } }).argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getDefaultProvider(rpcProviderEndpoint); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); + + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -86,8 +62,6 @@ export const main = async (): Promise => { const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue); - assert(jobQueueConfig, 'Missing job queue config'); - // Import data. const importFilePath = path.resolve(argv.importFile); const encodedImportData = fs.readFileSync(importFilePath); diff --git a/packages/codegen/src/templates/inspect-cid-template.handlebars b/packages/codegen/src/templates/inspect-cid-template.handlebars index 4ac727c7..5f3258d9 100644 --- a/packages/codegen/src/templates/inspect-cid-template.handlebars +++ b/packages/codegen/src/templates/inspect-cid-template.handlebars @@ -6,12 +6,9 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; -import { getDefaultProvider } from 'ethers'; import util from 'util'; -import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -39,36 +36,12 @@ const main = async (): Promise => { }).argv; const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getDefaultProvider(rpcProviderEndpoint); - - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid); assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.'); diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index ed87d6f8..3c795e1b 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -7,11 +7,9 @@ import 'reflect-metadata'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; - -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; import { getConfig, + Config, JobQueue, JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, @@ -21,7 +19,7 @@ import { QUEUE_IPFS, JobQueueConfig, DEFAULT_CONFIG_PATH, - getCustomProvider + initClients } from '@vulcanize/util'; import { Indexer } from './indexer'; @@ -123,37 +121,15 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -162,7 +138,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, config.server, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/codegen/src/templates/reset-state-template.handlebars b/packages/codegen/src/templates/reset-state-template.handlebars index 480ea63c..c68aeb2c 100644 --- a/packages/codegen/src/templates/reset-state-template.handlebars +++ b/packages/codegen/src/templates/reset-state-template.handlebars @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, resetJobs } from '@vulcanize/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -31,13 +31,13 @@ export const builder = { export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); - const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); // Initialize database. - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index dead94c8..66148502 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -14,9 +14,7 @@ import debug from 'debug'; import 'graphql-import-node'; import { createServer } from 'http'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig, JobQueue, KIND_ACTIVE, getCustomProvider } from '@vulcanize/util'; +import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients } from '@vulcanize/util'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; @@ -36,51 +34,28 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; + const { host, port, kind: watcherKind } = config.server; - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const { host, port, kind: watcherKind } = serverConfig; - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); + const indexer = new Indexer(config.server, db, ethClient, ethProvider); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index c57d703c..30a32df3 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -2,15 +2,11 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; -import { getDefaultProvider } from 'ethers'; -import { Config, DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -54,36 +50,12 @@ const main = async (): Promise => { }).argv; const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getDefaultProvider(rpcProviderEndpoint); - - const indexer = new Indexer(serverConfig, db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider); await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock); await db.close(); diff --git a/packages/erc20-watcher/src/cli/reset-cmds/state.ts b/packages/erc20-watcher/src/cli/reset-cmds/state.ts index 738cae75..87bb9e7e 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/state.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/state.ts @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, JobQueue, resetJobs } from '@vulcanize/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -29,13 +29,13 @@ export const builder = { export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); - const { jobQueue: jobQueueConfig } = config; - const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); // Initialize database. - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); + const { jobQueue: jobQueueConfig } = config; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -43,7 +43,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/erc20-watcher/src/cli/watch-contract.ts b/packages/erc20-watcher/src/cli/watch-contract.ts index 26a13786..73ec918f 100644 --- a/packages/erc20-watcher/src/cli/watch-contract.ts +++ b/packages/erc20-watcher/src/cli/watch-contract.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -43,7 +43,7 @@ import { Indexer } from '../indexer'; const config: Config = await getConfig(argv.configFile); const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config; - const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); assert(dbConfig); diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 98aa9565..ac93355e 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -9,9 +9,7 @@ import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import { PubSub } from 'apollo-server-express'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util'; +import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; import { Database } from './database'; import { Indexer } from './indexer'; @@ -55,53 +53,30 @@ export const main = async (): Promise => { } }).argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config; - - assert(dbConfig, 'Missing database config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); - assert(jobQueueConfig, 'Missing job queue config'); - - await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index ad1ae113..526a6987 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -8,18 +8,16 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; import { getConfig, + Config, JobQueue, JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, JobQueueConfig, DEFAULT_CONFIG_PATH, - getCustomProvider, - ServerConfig + initClients } from '@vulcanize/util'; import { Indexer } from './indexer'; @@ -32,14 +30,12 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig - _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { this._jobQueueConfig = jobQueueConfig; this._indexer = indexer; this._jobQueue = jobQueue; - this._serverConfig = serverConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -71,36 +67,13 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -109,9 +82,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); + const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode); - const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 7787e191..18d59181 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -12,9 +12,7 @@ import debug from 'debug'; import 'graphql-import-node'; import { createServer } from 'http'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue, KIND_ACTIVE } from '@vulcanize/util'; +import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients } from '@vulcanize/util'; import typeDefs from './schema'; @@ -37,43 +35,19 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); - - assert(config.server, 'Missing server config'); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); const { host, port, mode, kind: watcherKind } = config.server; - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; - - assert(dbConfig, 'Missing database config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -83,7 +57,7 @@ export const main = async (): Promise => { const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts index 809b159b..4d551926 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, JobQueue, resetJobs } from '@vulcanize/util'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; @@ -39,16 +39,16 @@ export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); const { jobQueue: jobQueueConfig } = config; - const { dbConfig, serverConfig, upstreamConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); // Initialize database. - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); const { uniWatcher, tokenWatcher - } = upstreamConfig; + } = config.upstream; const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); @@ -61,7 +61,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 0f46f700..29b0ca1e 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -8,9 +8,7 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util'; +import { getConfig, Config, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; @@ -57,52 +55,33 @@ export const main = async (): Promise => { } }).argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config; - - assert(dbConfig, 'Missing database config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); + const { uniWatcher, tokenWatcher, ethServer: { blockDelayInMilliSecs } } = config.upstream; const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); - const ethProvider = getCustomProvider(rpcProviderEndpoint); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); }; diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index fc964e17..30b49a3e 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -10,18 +10,17 @@ import debug from 'debug'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; + import { getConfig, + Config, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, JobRunner as BaseJobRunner, JobQueueConfig, DEFAULT_CONFIG_PATH, - getCustomProvider, - ServerConfig + initClients } from '@vulcanize/util'; import { Indexer } from './indexer'; @@ -34,14 +33,12 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig - _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { this._jobQueueConfig = jobQueueConfig; this._indexer = indexer; this._jobQueue = jobQueue; - this._serverConfig = serverConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -73,58 +70,21 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { - uniWatcher: { - gqlEndpoint, - gqlSubscriptionEndpoint - }, - tokenWatcher, - cache: cacheConfig, - ethServer: { - gqlApiEndpoint, - gqlPostgraphileEndpoint, - rpcProviderEndpoint - } - } = upstream; - - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlEndpoint, 'Missing upstream uniWatcher.gqlEndpoint'); - assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const uniClient = new UniClient({ - gqlEndpoint, - gqlSubscriptionEndpoint - }); + uniWatcher, + tokenWatcher + } = config.upstream; + const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); - const ethProvider = getCustomProvider(rpcProviderEndpoint); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -133,9 +93,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, config.server.mode); - const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 85cbad57..9a15edaf 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -14,9 +14,7 @@ import { createServer } from 'http'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { Client as UniClient } from '@vulcanize/uni-watcher'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue } from '@vulcanize/util'; -import { getCache } from '@vulcanize/cache'; +import { DEFAULT_CONFIG_PATH, getConfig, Config, getCustomProvider, JobQueue, initClients } from '@vulcanize/util'; import typeDefs from './schema'; @@ -39,51 +37,21 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); - - assert(config.server, 'Missing server config'); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient } = await initClients(config); const { host, port, mode } = config.server; - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; - - assert(dbConfig, 'Missing database config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { - ethServer: { - gqlApiEndpoint, - gqlPostgraphileEndpoint, - rpcProviderEndpoint - }, - uniWatcher, - tokenWatcher, - cache: cacheConfig - } = upstream; - - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); + const { uniWatcher, tokenWatcher, ethServer: { rpcProviderEndpoint } } = config.upstream; const uniClient = new UniClient(uniWatcher); const erc20Client = new ERC20Client(tokenWatcher); const ethProvider = getCustomProvider(rpcProviderEndpoint); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -95,7 +63,7 @@ export const main = async (): Promise => { const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); const pubSub = new PubSub(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); diff --git a/packages/uni-watcher/src/cli/reset-cmds/state.ts b/packages/uni-watcher/src/cli/reset-cmds/state.ts index 4ac9b58a..65ddfe44 100644 --- a/packages/uni-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-watcher/src/cli/reset-cmds/state.ts @@ -6,7 +6,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; @@ -27,16 +27,15 @@ export const builder = { export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); - const { jobQueue: jobQueueConfig } = config; - const { dbConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); // Initialize database. - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(jobQueueConfig, 'Missing job queue config'); + assert(config.jobQueue, 'Missing job queue config'); - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + const { dbConnectionString, maxCompletionLagInSecs } = config.jobQueue; assert(dbConnectionString, 'Missing job queue db connection string'); const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); diff --git a/packages/uni-watcher/src/cli/watch-contract.ts b/packages/uni-watcher/src/cli/watch-contract.ts index 16e37a83..bd908179 100644 --- a/packages/uni-watcher/src/cli/watch-contract.ts +++ b/packages/uni-watcher/src/cli/watch-contract.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import yargs from 'yargs'; import 'reflect-metadata'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { Database } from '../database'; import { Indexer } from '../indexer'; @@ -49,7 +49,7 @@ import { Indexer } from '../indexer'; const config: Config = await getConfig(argv.configFile); const { database: dbConfig, jobQueue: jobQueueConfig } = config; - const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); assert(dbConfig); diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index 2141fe27..f5598eaf 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -9,9 +9,7 @@ import { hideBin } from 'yargs/helpers'; import debug from 'debug'; import { PubSub } from 'apollo-server-express'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util'; +import { getConfig, Config, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@vulcanize/util'; import { Database } from './database'; import { Indexer } from './indexer'; @@ -55,39 +53,19 @@ export const main = async (): Promise => { } }).argv; - const config = await getConfig(argv.configFile); + const config: Config = await getConfig(argv.configFile); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - assert(config.server, 'Missing server config'); - - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; - - assert(dbConfig, 'Missing database config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -97,11 +75,9 @@ export const main = async (): Promise => { const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); - assert(jobQueueConfig, 'Missing job queue config'); - - await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 770908ac..585d00cd 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -8,18 +8,16 @@ import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; import { getConfig, + Config, JobQueue, JobRunner as BaseJobRunner, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, JobQueueConfig, DEFAULT_CONFIG_PATH, - getCustomProvider, - ServerConfig + initClients } from '@vulcanize/util'; import { Indexer } from './indexer'; @@ -32,14 +30,12 @@ export class JobRunner { _jobQueue: JobQueue _baseJobRunner: BaseJobRunner _jobQueueConfig: JobQueueConfig - _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: Indexer, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) { this._jobQueueConfig = jobQueueConfig; this._indexer = indexer; this._jobQueue = jobQueue; - this._serverConfig = serverConfig; - this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._serverConfig, this._indexer, this._jobQueue); + this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue); } async start (): Promise { @@ -71,36 +67,13 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); - const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: serverConfig } = config; - - assert(upstream, 'Missing upstream config'); - assert(dbConfig, 'Missing database config'); - assert(serverConfig, 'Missing server config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -112,7 +85,7 @@ export const main = async (): Promise => { const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); await indexer.init(); - const jobRunner = new JobRunner(jobQueueConfig, serverConfig, indexer, jobQueue); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index 85a0fb63..41a1ded0 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -12,9 +12,7 @@ import debug from 'debug'; import 'graphql-import-node'; import { createServer } from 'http'; -import { getCache } from '@vulcanize/cache'; -import { EthClient } from '@vulcanize/ipld-eth-client'; -import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue } from '@vulcanize/util'; +import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, initClients } from '@vulcanize/util'; import typeDefs from './schema'; @@ -37,42 +35,19 @@ export const main = async (): Promise => { }) .argv; - const config = await getConfig(argv.f); - - assert(config.server, 'Missing server config'); + const config: Config = await getConfig(argv.f); + const { ethClient, postgraphileClient, ethProvider } = await initClients(config); const { host, port } = config.server; - const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config; - - assert(dbConfig, 'Missing database config'); - - const db = new Database(dbConfig); + const db = new Database(config.database); await db.init(); - assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; - assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); - - const cache = await getCache(cacheConfig); - const ethClient = new EthClient({ - gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const ethProvider = getCustomProvider(rpcProviderEndpoint); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); + const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -84,7 +59,7 @@ export const main = async (): Promise => { const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); diff --git a/packages/util/index.ts b/packages/util/index.ts index 734f7a3b..afd30aea 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -12,5 +12,5 @@ export * from './src/events'; export * from './src/types'; export * from './src/indexer'; export * from './src/job-runner'; +export * from './src/ipld-helper'; export * from './src/graph-decimal'; -export * from './src/ipldHelper'; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 98521c1b..60118a4e 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -9,9 +9,9 @@ import toml from 'toml'; import debug from 'debug'; import { ConnectionOptions } from 'typeorm'; -import { BaseProvider } from '@ethersproject/providers'; import { Config as CacheConfig, getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import { BaseProvider } from '@ethersproject/providers'; import { getCustomProvider } from './misc'; @@ -73,10 +73,7 @@ export const getConfig = async (configFile: string): Promise => { return config; }; -export const getResetConfig = async (config: Config): Promise<{ - dbConfig: ConnectionOptions, - serverConfig: ServerConfig, - upstreamConfig: UpstreamConfig, +export const initClients = async (config: Config): Promise<{ ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider @@ -85,9 +82,10 @@ export const getResetConfig = async (config: Config): Promise<{ assert(serverConfig, 'Missing server config'); assert(dbConfig, 'Missing database config'); - assert(upstreamConfig, 'Missing upstream config'); + const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstreamConfig; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint'); @@ -108,9 +106,6 @@ export const getResetConfig = async (config: Config): Promise<{ const ethProvider = getCustomProvider(rpcProviderEndpoint); return { - dbConfig, - serverConfig, - upstreamConfig, ethClient, postgraphileClient, ethProvider diff --git a/packages/util/src/ipldHelper.ts b/packages/util/src/ipld-helper.ts similarity index 100% rename from packages/util/src/ipldHelper.ts rename to packages/util/src/ipld-helper.ts diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 6cefa79d..4b5ba9c4 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import debug from 'debug'; import { In } from 'typeorm'; -import { JobQueueConfig, ServerConfig } from './config'; +import { JobQueueConfig } from './config'; import { JOB_KIND_INDEX, JOB_KIND_PRUNE, @@ -32,13 +32,11 @@ export class JobRunner { _jobQueue: JobQueue _jobQueueConfig: JobQueueConfig _blockProcessStartTime?: Date - _serverConfig: ServerConfig - constructor (jobQueueConfig: JobQueueConfig, serverConfig: ServerConfig, indexer: IndexerInterface, jobQueue: JobQueue) { + constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { this._indexer = indexer; this._jobQueue = jobQueue; this._jobQueueConfig = jobQueueConfig; - this._serverConfig = serverConfig; } async processBlock (job: any): Promise {