diff --git a/packages/cli/package.json b/packages/cli/package.json index 494fa4bd..7b2978f8 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -10,7 +10,6 @@ "copy-assets": "copyfiles -u 1 src/**/*.gql dist/" }, "dependencies": { - "@cerc-io/ipld-eth-client": "^0.2.16", "@cerc-io/util": "^0.2.16", "@ethersproject/providers": "^5.4.4", "@graphql-tools/utils": "^9.1.1", @@ -24,6 +23,7 @@ "yargs": "^17.0.1" }, "devDependencies": { + "@types/express": "^4.17.14", "@typescript-eslint/eslint-plugin": "^4.25.0", "@typescript-eslint/parser": "^4.25.0", "eslint-config-semistandard": "^15.0.1", diff --git a/packages/cli/src/base.ts b/packages/cli/src/base.ts index 6e7e2f16..706e4b6f 100644 --- a/packages/cli/src/base.ts +++ b/packages/cli/src/base.ts @@ -17,10 +17,9 @@ import { IndexerInterface, ServerConfig, Clients, - EventWatcherInterface, + EventWatcher, GraphWatcherInterface } from '@cerc-io/util'; -import { EthClient } from '@cerc-io/ipld-eth-client'; export class BaseCmd { _config?: Config; @@ -29,7 +28,7 @@ export class BaseCmd { _jobQueue?: JobQueue _database?: DatabaseInterface; _indexer?: IndexerInterface; - _eventWatcher?: EventWatcherInterface; + _eventWatcher?: EventWatcher; get config (): Config | undefined { return this._config; @@ -55,7 +54,7 @@ export class BaseCmd { return this._indexer; } - get eventWatcher (): EventWatcherInterface | undefined { + get eventWatcher (): EventWatcher | undefined { return this._eventWatcher; } @@ -103,7 +102,7 @@ export class BaseCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { assert(this._config); assert(this._database); assert(this._clients); @@ -119,14 +118,7 @@ export class BaseCmd { } } - async initEventWatcher ( - EventWatcher: new( - ethClient: EthClient, - indexer: IndexerInterface, - pubsub: PubSub, - jobQueue: JobQueue - ) => EventWatcherInterface - ): Promise { + async initEventWatcher (): Promise { assert(this._clients?.ethClient); assert(this._indexer); assert(this._jobQueue); diff --git a/packages/cli/src/checkpoint/create.ts b/packages/cli/src/checkpoint/create.ts index 61476813..dc76ccaa 100644 --- a/packages/cli/src/checkpoint/create.ts +++ b/packages/cli/src/checkpoint/create.ts @@ -80,7 +80,7 @@ export class CreateCheckpointCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/checkpoint/verify.ts b/packages/cli/src/checkpoint/verify.ts index 9806e87b..0a8dd1b2 100644 --- a/packages/cli/src/checkpoint/verify.ts +++ b/packages/cli/src/checkpoint/verify.ts @@ -81,7 +81,7 @@ export class VerifyCheckpointCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/export-state.ts b/packages/cli/src/export-state.ts index 0ded6b70..66ac9a0e 100644 --- a/packages/cli/src/export-state.ts +++ b/packages/cli/src/export-state.ts @@ -87,7 +87,7 @@ export class ExportStateCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/fill.ts b/packages/cli/src/fill.ts index 90cf076a..afcf180b 100644 --- a/packages/cli/src/fill.ts +++ b/packages/cli/src/fill.ts @@ -8,7 +8,6 @@ import debug from 'debug'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import { ConnectionOptions } from 'typeorm'; -import { PubSub } from 'graphql-subscriptions'; import { JsonRpcProvider } from '@ethersproject/providers'; import { @@ -19,12 +18,10 @@ import { IndexerInterface, ServerConfig, Clients, - EventWatcherInterface, fillBlocks, GraphWatcherInterface, Config } from '@cerc-io/util'; -import { EthClient } from '@cerc-io/ipld-eth-client'; import { BaseCmd } from './base'; @@ -63,10 +60,6 @@ export class FillCmd { return this._baseCmd.database; } - get indexer (): IndexerInterface | undefined { - return this._baseCmd.indexer; - } - async initConfig (): Promise { this._argv = this._getArgv(); assert(this._argv); @@ -95,16 +88,10 @@ export class FillCmd { jobQueue: JobQueue, graphWatcher?: GraphWatcherInterface ) => IndexerInterface, - EventWatcher: new( - ethClient: EthClient, - indexer: IndexerInterface, - pubsub: PubSub, - jobQueue: JobQueue - ) => EventWatcherInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { await this._baseCmd.initIndexer(Indexer, graphWatcher); - await this._baseCmd.initEventWatcher(EventWatcher); + await this._baseCmd.initEventWatcher(); } async exec (contractEntitiesMap: Map = new Map()): Promise { diff --git a/packages/cli/src/import-state.ts b/packages/cli/src/import-state.ts index e042b2ac..7102faa1 100644 --- a/packages/cli/src/import-state.ts +++ b/packages/cli/src/import-state.ts @@ -9,10 +9,8 @@ import path from 'path'; import fs from 'fs'; import debug from 'debug'; import { ConnectionOptions } from 'typeorm'; -import { PubSub } from 'graphql-subscriptions'; import { JsonRpcProvider } from '@ethersproject/providers'; -import { EthClient } from '@cerc-io/ipld-eth-client'; import { DEFAULT_CONFIG_PATH, JobQueue, @@ -20,7 +18,6 @@ import { IndexerInterface, ServerConfig, Clients, - EventWatcherInterface, fillBlocks, StateKind, GraphWatcherInterface, @@ -91,16 +88,10 @@ export class ImportStateCmd { jobQueue: JobQueue, graphWatcher?: GraphWatcherInterface ) => IndexerInterface, - EventWatcher: new( - ethClient: EthClient, - indexer: IndexerInterface, - pubsub: PubSub, - jobQueue: JobQueue - ) => EventWatcherInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { await this._baseCmd.initIndexer(Indexer, graphWatcher); - await this._baseCmd.initEventWatcher(EventWatcher); + await this._baseCmd.initEventWatcher(); } async exec (State: new() => any, graphDb?: GraphDatabase): Promise { diff --git a/packages/cli/src/index-block.ts b/packages/cli/src/index-block.ts index 7d08caf8..1c252d83 100644 --- a/packages/cli/src/index-block.ts +++ b/packages/cli/src/index-block.ts @@ -80,7 +80,7 @@ export class IndexBlockCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/inspect-cid.ts b/packages/cli/src/inspect-cid.ts index 61b9111e..bf50daab 100644 --- a/packages/cli/src/inspect-cid.ts +++ b/packages/cli/src/inspect-cid.ts @@ -83,7 +83,7 @@ export class InspectCIDCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/job-runner.ts b/packages/cli/src/job-runner.ts index 9e624b39..99ac9cb4 100644 --- a/packages/cli/src/job-runner.ts +++ b/packages/cli/src/job-runner.ts @@ -89,7 +89,7 @@ export class JobRunnerCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/reset/watcher.ts b/packages/cli/src/reset/watcher.ts index c92d6aba..194f090c 100644 --- a/packages/cli/src/reset/watcher.ts +++ b/packages/cli/src/reset/watcher.ts @@ -79,7 +79,7 @@ export class ResetWatcherCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index 6541de5f..45742d58 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -7,12 +7,10 @@ import { hideBin } from 'yargs/helpers'; import 'reflect-metadata'; import assert from 'assert'; import { ConnectionOptions } from 'typeorm'; -import { PubSub } from 'graphql-subscriptions'; import express, { Application } from 'express'; import { ApolloServer } from 'apollo-server-express'; import { JsonRpcProvider } from '@ethersproject/providers'; -import { EthClient } from '@cerc-io/ipld-eth-client'; import { DEFAULT_CONFIG_PATH, JobQueue, @@ -20,10 +18,10 @@ import { IndexerInterface, ServerConfig, Clients, - EventWatcherInterface, KIND_ACTIVE, createAndStartServer, startGQLMetricsServer, + EventWatcher, GraphWatcherInterface, Config } from '@cerc-io/util'; @@ -87,20 +85,14 @@ export class ServerCmd { jobQueue: JobQueue, graphWatcher?: GraphWatcherInterface ) => IndexerInterface, - EventWatcher: new( - ethClient: EthClient, - indexer: IndexerInterface, - pubsub: PubSub, - jobQueue: JobQueue - ) => EventWatcherInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { await this._baseCmd.initIndexer(Indexer, graphWatcher); - await this._baseCmd.initEventWatcher(EventWatcher); + await this._baseCmd.initEventWatcher(); } async exec ( - createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcherInterface) => Promise, + createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcher) => Promise, typeDefs: TypeSource ): Promise<{ app: Application, diff --git a/packages/cli/src/watch-contract.ts b/packages/cli/src/watch-contract.ts index bf32a7fb..bc566629 100644 --- a/packages/cli/src/watch-contract.ts +++ b/packages/cli/src/watch-contract.ts @@ -82,7 +82,7 @@ export class WatchContractCmd { graphWatcher?: GraphWatcherInterface ) => IndexerInterface, graphWatcher?: GraphWatcherInterface - ) { + ): Promise { return this._baseCmd.initIndexer(Indexer, graphWatcher); } diff --git a/packages/codegen/src/events.ts b/packages/codegen/src/events.ts deleted file mode 100644 index 51720dc0..00000000 --- a/packages/codegen/src/events.ts +++ /dev/null @@ -1,21 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import fs from 'fs'; -import path from 'path'; -import Handlebars from 'handlebars'; -import { Writable } from 'stream'; - -const TEMPLATE_FILE = './templates/events-template.handlebars'; - -/** - * Writes the events file generated from a template to a stream. - * @param outStream A writable output stream to write the events file to. - */ -export function exportEvents (outStream: Writable): void { - const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString(); - const template = Handlebars.compile(templateString); - const events = template({}); - outStream.write(events); -} diff --git a/packages/codegen/src/generate-code.ts b/packages/codegen/src/generate-code.ts index 1eddf688..5b80b153 100644 --- a/packages/codegen/src/generate-code.ts +++ b/packages/codegen/src/generate-code.ts @@ -24,7 +24,6 @@ import { generateArtifacts } from './artifacts'; import { exportPackage } from './package'; import { exportTSConfig } from './tsconfig'; import { exportReadme } from './readme'; -import { exportEvents } from './events'; import { exportJobRunner } from './job-runner'; import { exportWatchContract } from './watch-contract'; import { exportLint } from './lint'; @@ -226,11 +225,6 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) { : process.stdout; exportReadme(path.basename(outputDir), config.port, outStream); - outStream = outputDir - ? fs.createWriteStream(path.join(outputDir, 'src/events.ts')) - : process.stdout; - exportEvents(outStream); - outStream = outputDir ? fs.createWriteStream(path.join(outputDir, 'src/job-runner.ts')) : process.stdout; diff --git a/packages/codegen/src/templates/checkpoint-create-template.handlebars b/packages/codegen/src/templates/checkpoint-create-template.handlebars index 9f584493..beb64803 100644 --- a/packages/codegen/src/templates/checkpoint-create-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-create-template.handlebars @@ -43,5 +43,6 @@ export const handler = async (argv: any): Promise => { {{/if}} await createCheckpointCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await createCheckpointCmd.exec(); }; diff --git a/packages/codegen/src/templates/checkpoint-verify-template.handlebars b/packages/codegen/src/templates/checkpoint-verify-template.handlebars index 3fdfda99..c3861260 100644 --- a/packages/codegen/src/templates/checkpoint-verify-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-verify-template.handlebars @@ -35,5 +35,6 @@ export const handler = async (argv: any): Promise => { ); await verifyCheckpointCmd.initIndexer(Indexer, graphWatcher); + await verifyCheckpointCmd.exec(graphDb); }; diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars deleted file mode 100644 index 4cb6e10d..00000000 --- a/packages/codegen/src/templates/events-template.handlebars +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { PubSub } from 'graphql-subscriptions'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import { - JobQueue, - EventWatcher as BaseEventWatcher, - EventWatcherInterface, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - IndexerInterface -} from '@cerc-io/util'; - -import { Indexer } from './indexer'; - -export class EventWatcher implements EventWatcherInterface { - _ethClient: EthClient - _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined - _baseEventWatcher: BaseEventWatcher - _pubsub: PubSub - _jobQueue: JobQueue - - constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - assert(ethClient); - assert(indexer); - - this._ethClient = ethClient; - this._indexer = indexer as Indexer; - this._pubsub = pubsub; - this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); - } - - getEventIterator (): AsyncIterator { - return this._baseEventWatcher.getEventIterator(); - } - - getBlockProgressEventIterator (): AsyncIterator { - return this._baseEventWatcher.getBlockProgressEventIterator(); - } - - async start (): Promise { - assert(!this._subscription, 'subscription already started'); - - await this.initBlockProcessingOnCompleteHandler(); - await this.initEventProcessingOnCompleteHandler(); - this._baseEventWatcher.startBlockProcessing(); - } - - async stop (): Promise { - this._baseEventWatcher.stop(); - } - - async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseEventWatcher.blockProcessingCompleteHandler(job); - }); - } - - async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseEventWatcher.eventProcessingCompleteHandler(job); - }); - } -} diff --git a/packages/codegen/src/templates/export-state-template.handlebars b/packages/codegen/src/templates/export-state-template.handlebars index dcbcd694..344cb897 100644 --- a/packages/codegen/src/templates/export-state-template.handlebars +++ b/packages/codegen/src/templates/export-state-template.handlebars @@ -31,6 +31,7 @@ const main = async (): Promise => { {{/if}} await exportStateCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await exportStateCmd.exec(); }; diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index fba4dc55..f24cdf20 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; import debug from 'debug'; @@ -14,7 +13,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); @@ -33,7 +31,7 @@ export const main = async (): Promise => { ); {{/if}} - await fillCmd.initIndexer(Indexer, EventWatcher{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await fillCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); {{#if (subgraphPath)}} // Get contractEntitiesMap required for fill-state diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index 95cbb3f1..5b674af8 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -12,7 +12,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database'; import { Indexer } from '../indexer'; -import { EventWatcher } from '../events'; import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); @@ -32,7 +31,8 @@ export const main = async (): Promise => { ); {{/if}} - await importStateCmd.initIndexer(Indexer, EventWatcher{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await importStateCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await importStateCmd.exec(State{{#if (subgraphPath)}}, graphDb{{/if}}); }; diff --git a/packages/codegen/src/templates/index-block-template.handlebars b/packages/codegen/src/templates/index-block-template.handlebars index 6044860f..ec0d82d0 100644 --- a/packages/codegen/src/templates/index-block-template.handlebars +++ b/packages/codegen/src/templates/index-block-template.handlebars @@ -31,6 +31,7 @@ const main = async (): Promise => { {{/if}} await indexBlockCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await indexBlockCmd.exec(); }; diff --git a/packages/codegen/src/templates/inspect-cid-template.handlebars b/packages/codegen/src/templates/inspect-cid-template.handlebars index a6812f3d..4ad8c2df 100644 --- a/packages/codegen/src/templates/inspect-cid-template.handlebars +++ b/packages/codegen/src/templates/inspect-cid-template.handlebars @@ -31,6 +31,7 @@ const main = async (): Promise => { {{/if}} await inspectCIDCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await inspectCIDCmd.exec(); }; diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index aeb9b201..c8000441 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -54,7 +54,6 @@ "ethers": "^5.4.4", "express": "^4.18.2", "graphql": "^15.5.0", - "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "reflect-metadata": "^0.1.13", "typeorm": "^0.2.32", diff --git a/packages/codegen/src/templates/reset-watcher-template.handlebars b/packages/codegen/src/templates/reset-watcher-template.handlebars index a70007cb..7b6bd5dc 100644 --- a/packages/codegen/src/templates/reset-watcher-template.handlebars +++ b/packages/codegen/src/templates/reset-watcher-template.handlebars @@ -36,5 +36,6 @@ export const handler = async (argv: any): Promise => { {{/if}} await resetWatcherCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await resetWatcherCmd.exec(); }; diff --git a/packages/codegen/src/templates/resolvers-template.handlebars b/packages/codegen/src/templates/resolvers-template.handlebars index 9e3881fe..b7e1f71c 100644 --- a/packages/codegen/src/templates/resolvers-template.handlebars +++ b/packages/codegen/src/templates/resolvers-template.handlebars @@ -17,7 +17,7 @@ import { getResultState, setGQLCacheHints, IndexerInterface, - EventWatcherInterface + EventWatcher } from '@cerc-io/util'; import { Indexer } from './indexer'; @@ -31,9 +31,8 @@ import { {{query.entityName}} } from './entity/{{query.entityName}}'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise => { const indexer = indexerArg as Indexer; - const eventWatcher = eventWatcherArg as EventWatcher; const gqlCacheConfig = indexer.serverConfig.gqlCache; diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index af0f1118..171cf4d5 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -15,7 +15,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database'; -import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -34,9 +33,10 @@ export const main = async (): Promise => { ); {{/if}} - await serverCmd.initIndexer(Indexer, EventWatcher{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await serverCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); + return serverCmd.exec(createResolvers, typeDefs); }; diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index 2c277d54..d32d4e13 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -31,6 +31,7 @@ const main = async (): Promise => { {{/if}} await watchContractCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}}); + await watchContractCmd.exec(); }; diff --git a/packages/eden-watcher/package.json b/packages/eden-watcher/package.json index dee70d9e..74fd498c 100644 --- a/packages/eden-watcher/package.json +++ b/packages/eden-watcher/package.json @@ -51,7 +51,6 @@ "ethers": "^5.4.4", "express": "^4.18.2", "graphql": "^15.5.0", - "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "reflect-metadata": "^0.1.13", "typeorm": "^0.2.32", diff --git a/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts b/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts index f2f1971d..f4b91aa0 100644 --- a/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts +++ b/packages/eden-watcher/src/cli/checkpoint-cmds/create.ts @@ -39,5 +39,6 @@ export const handler = async (argv: any): Promise => { ); await createCheckpointCmd.initIndexer(Indexer, graphWatcher); + await createCheckpointCmd.exec(); }; diff --git a/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts b/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts index 3fdfda99..c3861260 100644 --- a/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts +++ b/packages/eden-watcher/src/cli/checkpoint-cmds/verify.ts @@ -35,5 +35,6 @@ export const handler = async (argv: any): Promise => { ); await verifyCheckpointCmd.initIndexer(Indexer, graphWatcher); + await verifyCheckpointCmd.exec(graphDb); }; diff --git a/packages/eden-watcher/src/cli/export-state.ts b/packages/eden-watcher/src/cli/export-state.ts index 3f6de118..e8e1fc2e 100644 --- a/packages/eden-watcher/src/cli/export-state.ts +++ b/packages/eden-watcher/src/cli/export-state.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await exportStateCmd.initIndexer(Indexer, graphWatcher); + await exportStateCmd.exec(); }; diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index 1b0368d9..0ab2593c 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -10,7 +10,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; -import { EventWatcher } from '../events'; import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); @@ -28,7 +27,8 @@ export const main = async (): Promise => { ENTITY_TO_LATEST_ENTITY_MAP ); - await importStateCmd.initIndexer(Indexer, EventWatcher, graphWatcher); + await importStateCmd.initIndexer(Indexer, graphWatcher); + await importStateCmd.exec(State, graphDb); }; diff --git a/packages/eden-watcher/src/cli/index-block.ts b/packages/eden-watcher/src/cli/index-block.ts index 78d4ec7b..32998111 100644 --- a/packages/eden-watcher/src/cli/index-block.ts +++ b/packages/eden-watcher/src/cli/index-block.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await indexBlockCmd.initIndexer(Indexer, graphWatcher); + await indexBlockCmd.exec(); }; diff --git a/packages/eden-watcher/src/cli/inspect-cid.ts b/packages/eden-watcher/src/cli/inspect-cid.ts index 2dc25314..4453e4d0 100644 --- a/packages/eden-watcher/src/cli/inspect-cid.ts +++ b/packages/eden-watcher/src/cli/inspect-cid.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await inspectCIDCmd.initIndexer(Indexer, graphWatcher); + await inspectCIDCmd.exec(); }; diff --git a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts index c0a92058..cdbe328f 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/watcher.ts @@ -32,5 +32,6 @@ export const handler = async (argv: any): Promise => { ); await resetWatcherCmd.initIndexer(Indexer, graphWatcher); + await resetWatcherCmd.exec(); }; diff --git a/packages/eden-watcher/src/cli/watch-contract.ts b/packages/eden-watcher/src/cli/watch-contract.ts index 9b2291dc..687c4921 100644 --- a/packages/eden-watcher/src/cli/watch-contract.ts +++ b/packages/eden-watcher/src/cli/watch-contract.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await watchContractCmd.initIndexer(Indexer, graphWatcher); + await watchContractCmd.exec(); }; diff --git a/packages/eden-watcher/src/events.ts b/packages/eden-watcher/src/events.ts deleted file mode 100644 index 4cb6e10d..00000000 --- a/packages/eden-watcher/src/events.ts +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { PubSub } from 'graphql-subscriptions'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import { - JobQueue, - EventWatcher as BaseEventWatcher, - EventWatcherInterface, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - IndexerInterface -} from '@cerc-io/util'; - -import { Indexer } from './indexer'; - -export class EventWatcher implements EventWatcherInterface { - _ethClient: EthClient - _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined - _baseEventWatcher: BaseEventWatcher - _pubsub: PubSub - _jobQueue: JobQueue - - constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - assert(ethClient); - assert(indexer); - - this._ethClient = ethClient; - this._indexer = indexer as Indexer; - this._pubsub = pubsub; - this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); - } - - getEventIterator (): AsyncIterator { - return this._baseEventWatcher.getEventIterator(); - } - - getBlockProgressEventIterator (): AsyncIterator { - return this._baseEventWatcher.getBlockProgressEventIterator(); - } - - async start (): Promise { - assert(!this._subscription, 'subscription already started'); - - await this.initBlockProcessingOnCompleteHandler(); - await this.initEventProcessingOnCompleteHandler(); - this._baseEventWatcher.startBlockProcessing(); - } - - async stop (): Promise { - this._baseEventWatcher.stop(); - } - - async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseEventWatcher.blockProcessingCompleteHandler(job); - }); - } - - async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseEventWatcher.eventProcessingCompleteHandler(job); - }); - } -} diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index 1971e4b5..c3f41aee 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; import debug from 'debug'; @@ -12,7 +11,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); @@ -29,7 +27,7 @@ export const main = async (): Promise => { ENTITY_TO_LATEST_ENTITY_MAP ); - await fillCmd.initIndexer(Indexer, EventWatcher, graphWatcher); + await fillCmd.initIndexer(Indexer, graphWatcher); // Get contractEntitiesMap required for fill-state // NOTE: Assuming each entity type is only mapped to a single contract diff --git a/packages/eden-watcher/src/resolvers.ts b/packages/eden-watcher/src/resolvers.ts index 4841c021..bf1c1283 100644 --- a/packages/eden-watcher/src/resolvers.ts +++ b/packages/eden-watcher/src/resolvers.ts @@ -8,11 +8,9 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; -import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints, IndexerInterface, EventWatcherInterface } from '@cerc-io/util'; +import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints, IndexerInterface, EventWatcher } from '@cerc-io/util'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; - import { Producer } from './entity/Producer'; import { ProducerSet } from './entity/ProducerSet'; import { ProducerSetChange } from './entity/ProducerSetChange'; @@ -34,9 +32,8 @@ import { Account } from './entity/Account'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise => { const indexer = indexerArg as Indexer; - const eventWatcher = eventWatcherArg as EventWatcher; const gqlCacheConfig = indexer.serverConfig.gqlCache; diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index 43b4caea..12899c82 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -13,7 +13,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; -import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -30,7 +29,7 @@ export const main = async (): Promise => { ENTITY_TO_LATEST_ENTITY_MAP ); - await serverCmd.initIndexer(Indexer, EventWatcher, graphWatcher); + await serverCmd.initIndexer(Indexer, graphWatcher); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); return serverCmd.exec(createResolvers, typeDefs); diff --git a/packages/erc20-watcher/package.json b/packages/erc20-watcher/package.json index d71796ad..4e60ad68 100644 --- a/packages/erc20-watcher/package.json +++ b/packages/erc20-watcher/package.json @@ -53,7 +53,6 @@ "express": "^4.18.2", "graphql": "^15.5.0", "graphql-request": "^3.4.0", - "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "lodash": "^4.17.21", "reflect-metadata": "^0.1.13", diff --git a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts index 2dfdd68c..c6e651f5 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts @@ -21,5 +21,6 @@ export const handler = async (argv: any): Promise => { const resetWatcherCmd = new ResetWatcherCmd(); await resetWatcherCmd.init(argv, Database); await resetWatcherCmd.initIndexer(Indexer); + await resetWatcherCmd.exec(); }; diff --git a/packages/erc20-watcher/src/cli/watch-contract.ts b/packages/erc20-watcher/src/cli/watch-contract.ts index 1934c09f..b68568e7 100644 --- a/packages/erc20-watcher/src/cli/watch-contract.ts +++ b/packages/erc20-watcher/src/cli/watch-contract.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const watchContractCmd = new WatchContractCmd(); await watchContractCmd.init(Database); await watchContractCmd.initIndexer(Indexer); + await watchContractCmd.exec(); }; diff --git a/packages/erc20-watcher/src/events.ts b/packages/erc20-watcher/src/events.ts deleted file mode 100644 index 4cb6e10d..00000000 --- a/packages/erc20-watcher/src/events.ts +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { PubSub } from 'graphql-subscriptions'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import { - JobQueue, - EventWatcher as BaseEventWatcher, - EventWatcherInterface, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - IndexerInterface -} from '@cerc-io/util'; - -import { Indexer } from './indexer'; - -export class EventWatcher implements EventWatcherInterface { - _ethClient: EthClient - _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined - _baseEventWatcher: BaseEventWatcher - _pubsub: PubSub - _jobQueue: JobQueue - - constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - assert(ethClient); - assert(indexer); - - this._ethClient = ethClient; - this._indexer = indexer as Indexer; - this._pubsub = pubsub; - this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); - } - - getEventIterator (): AsyncIterator { - return this._baseEventWatcher.getEventIterator(); - } - - getBlockProgressEventIterator (): AsyncIterator { - return this._baseEventWatcher.getBlockProgressEventIterator(); - } - - async start (): Promise { - assert(!this._subscription, 'subscription already started'); - - await this.initBlockProcessingOnCompleteHandler(); - await this.initEventProcessingOnCompleteHandler(); - this._baseEventWatcher.startBlockProcessing(); - } - - async stop (): Promise { - this._baseEventWatcher.stop(); - } - - async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseEventWatcher.blockProcessingCompleteHandler(job); - }); - } - - async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseEventWatcher.eventProcessingCompleteHandler(job); - }); - } -} diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index f37755e1..185e0b64 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -9,14 +9,13 @@ import { FillCmd } from '@cerc-io/cli'; import { Database } from './database'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); export const main = async (): Promise => { const fillCmd = new FillCmd(); await fillCmd.init(Database); - await fillCmd.initIndexer(Indexer, EventWatcher); + await fillCmd.initIndexer(Indexer); await fillCmd.exec(); }; diff --git a/packages/erc20-watcher/src/resolvers.ts b/packages/erc20-watcher/src/resolvers.ts index 6a9839d6..dd14bee1 100644 --- a/packages/erc20-watcher/src/resolvers.ts +++ b/packages/erc20-watcher/src/resolvers.ts @@ -6,17 +6,15 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; -import { EventWatcherInterface, IndexerInterface, ValueResult } from '@cerc-io/util'; +import { EventWatcher, IndexerInterface, ValueResult } from '@cerc-io/util'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; import { CONTRACT_KIND } from './utils/index'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise => { const indexer = indexerArg as Indexer; - const eventWatcher = eventWatcherArg as EventWatcher; return { BigInt: new BigInt('bigInt'), diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 692b85a9..b4a677b6 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -13,14 +13,13 @@ import typeDefs from './schema'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database } from './database'; -import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { const serverCmd = new ServerCmd(); await serverCmd.init(Database); - await serverCmd.initIndexer(Indexer, EventWatcher); + await serverCmd.initIndexer(Indexer); return serverCmd.exec(createResolvers, typeDefs); }; diff --git a/packages/erc721-watcher/package.json b/packages/erc721-watcher/package.json index b29a08f5..e3098940 100644 --- a/packages/erc721-watcher/package.json +++ b/packages/erc721-watcher/package.json @@ -58,7 +58,6 @@ "ethers": "^5.4.4", "express": "^4.18.2", "graphql": "^15.5.0", - "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "reflect-metadata": "^0.1.13", "typeorm": "^0.2.32", diff --git a/packages/erc721-watcher/src/cli/checkpoint-cmds/create.ts b/packages/erc721-watcher/src/cli/checkpoint-cmds/create.ts index 75b09ba9..01408fe5 100644 --- a/packages/erc721-watcher/src/cli/checkpoint-cmds/create.ts +++ b/packages/erc721-watcher/src/cli/checkpoint-cmds/create.ts @@ -28,5 +28,6 @@ export const handler = async (argv: any): Promise => { const createCheckpointCmd = new CreateCheckpointCmd(); await createCheckpointCmd.init(argv, Database); await createCheckpointCmd.initIndexer(Indexer); + await createCheckpointCmd.exec(); }; diff --git a/packages/erc721-watcher/src/cli/export-state.ts b/packages/erc721-watcher/src/cli/export-state.ts index 5a9bbcfb..d0ae922d 100644 --- a/packages/erc721-watcher/src/cli/export-state.ts +++ b/packages/erc721-watcher/src/cli/export-state.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const exportStateCmd = new ExportStateCmd(); await exportStateCmd.init(Database); await exportStateCmd.initIndexer(Indexer); + await exportStateCmd.exec(); }; diff --git a/packages/erc721-watcher/src/cli/import-state.ts b/packages/erc721-watcher/src/cli/import-state.ts index 8980144e..79771fcd 100644 --- a/packages/erc721-watcher/src/cli/import-state.ts +++ b/packages/erc721-watcher/src/cli/import-state.ts @@ -9,7 +9,6 @@ import { ImportStateCmd } from '@cerc-io/cli'; import { Database } from '../database'; import { Indexer } from '../indexer'; -import { EventWatcher } from '../events'; import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); @@ -17,7 +16,8 @@ const log = debug('vulcanize:import-state'); export const main = async (): Promise => { const importStateCmd = new ImportStateCmd(); await importStateCmd.init(Database); - await importStateCmd.initIndexer(Indexer, EventWatcher); + await importStateCmd.initIndexer(Indexer); + await importStateCmd.exec(State); }; diff --git a/packages/erc721-watcher/src/cli/index-block.ts b/packages/erc721-watcher/src/cli/index-block.ts index fab95b2c..4ebad2a0 100644 --- a/packages/erc721-watcher/src/cli/index-block.ts +++ b/packages/erc721-watcher/src/cli/index-block.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const indexBlockCmd = new IndexBlockCmd(); await indexBlockCmd.init(Database); await indexBlockCmd.initIndexer(Indexer); + await indexBlockCmd.exec(); }; diff --git a/packages/erc721-watcher/src/cli/inspect-cid.ts b/packages/erc721-watcher/src/cli/inspect-cid.ts index d708a1b3..740cfdf3 100644 --- a/packages/erc721-watcher/src/cli/inspect-cid.ts +++ b/packages/erc721-watcher/src/cli/inspect-cid.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const inspectCIDCmd = new InspectCIDCmd(); await inspectCIDCmd.init(Database); await inspectCIDCmd.initIndexer(Indexer); + await inspectCIDCmd.exec(); }; diff --git a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts index 2dfdd68c..c6e651f5 100644 --- a/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc721-watcher/src/cli/reset-cmds/watcher.ts @@ -21,5 +21,6 @@ export const handler = async (argv: any): Promise => { const resetWatcherCmd = new ResetWatcherCmd(); await resetWatcherCmd.init(argv, Database); await resetWatcherCmd.initIndexer(Indexer); + await resetWatcherCmd.exec(); }; diff --git a/packages/erc721-watcher/src/cli/watch-contract.ts b/packages/erc721-watcher/src/cli/watch-contract.ts index 1934c09f..b68568e7 100644 --- a/packages/erc721-watcher/src/cli/watch-contract.ts +++ b/packages/erc721-watcher/src/cli/watch-contract.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const watchContractCmd = new WatchContractCmd(); await watchContractCmd.init(Database); await watchContractCmd.initIndexer(Indexer); + await watchContractCmd.exec(); }; diff --git a/packages/erc721-watcher/src/events.ts b/packages/erc721-watcher/src/events.ts deleted file mode 100644 index 4cb6e10d..00000000 --- a/packages/erc721-watcher/src/events.ts +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { PubSub } from 'graphql-subscriptions'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import { - JobQueue, - EventWatcher as BaseEventWatcher, - EventWatcherInterface, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - IndexerInterface -} from '@cerc-io/util'; - -import { Indexer } from './indexer'; - -export class EventWatcher implements EventWatcherInterface { - _ethClient: EthClient - _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined - _baseEventWatcher: BaseEventWatcher - _pubsub: PubSub - _jobQueue: JobQueue - - constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - assert(ethClient); - assert(indexer); - - this._ethClient = ethClient; - this._indexer = indexer as Indexer; - this._pubsub = pubsub; - this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); - } - - getEventIterator (): AsyncIterator { - return this._baseEventWatcher.getEventIterator(); - } - - getBlockProgressEventIterator (): AsyncIterator { - return this._baseEventWatcher.getBlockProgressEventIterator(); - } - - async start (): Promise { - assert(!this._subscription, 'subscription already started'); - - await this.initBlockProcessingOnCompleteHandler(); - await this.initEventProcessingOnCompleteHandler(); - this._baseEventWatcher.startBlockProcessing(); - } - - async stop (): Promise { - this._baseEventWatcher.stop(); - } - - async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseEventWatcher.blockProcessingCompleteHandler(job); - }); - } - - async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseEventWatcher.eventProcessingCompleteHandler(job); - }); - } -} diff --git a/packages/erc721-watcher/src/fill.ts b/packages/erc721-watcher/src/fill.ts index 4c0eaead..ba71cb0b 100644 --- a/packages/erc721-watcher/src/fill.ts +++ b/packages/erc721-watcher/src/fill.ts @@ -9,7 +9,6 @@ import { FillCmd } from '@cerc-io/cli'; import { Database } from './database'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); @@ -17,7 +16,7 @@ export const main = async (): Promise => { const fillCmd = new FillCmd(); await fillCmd.init(Database); - await fillCmd.initIndexer(Indexer, EventWatcher); + await fillCmd.initIndexer(Indexer); await fillCmd.exec(); }; diff --git a/packages/erc721-watcher/src/resolvers.ts b/packages/erc721-watcher/src/resolvers.ts index c576cb21..a1b7d4ad 100644 --- a/packages/erc721-watcher/src/resolvers.ts +++ b/packages/erc721-watcher/src/resolvers.ts @@ -8,16 +8,14 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLScalarType } from 'graphql'; -import { ValueResult, BlockHeight, getResultState, IndexerInterface, EventWatcherInterface } from '@cerc-io/util'; +import { ValueResult, BlockHeight, getResultState, IndexerInterface, EventWatcher } from '@cerc-io/util'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise => { const indexer = indexerArg as Indexer; - const eventWatcher = eventWatcherArg as EventWatcher; return { BigInt: new BigInt('bigInt'), diff --git a/packages/erc721-watcher/src/server.ts b/packages/erc721-watcher/src/server.ts index 35a6240a..ff23134f 100644 --- a/packages/erc721-watcher/src/server.ts +++ b/packages/erc721-watcher/src/server.ts @@ -12,14 +12,13 @@ import { ServerCmd } from '@cerc-io/cli'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database } from './database'; -import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { const serverCmd = new ServerCmd(); await serverCmd.init(Database); - await serverCmd.initIndexer(Indexer, EventWatcher); + await serverCmd.initIndexer(Indexer); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); return serverCmd.exec(createResolvers, typeDefs); diff --git a/packages/graph-test-watcher/package.json b/packages/graph-test-watcher/package.json index 3ae637e1..21683479 100644 --- a/packages/graph-test-watcher/package.json +++ b/packages/graph-test-watcher/package.json @@ -51,7 +51,6 @@ "ethers": "^5.4.4", "express": "^4.18.2", "graphql": "^15.5.0", - "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "lodash": "^4.17.21", "reflect-metadata": "^0.1.13", diff --git a/packages/graph-test-watcher/src/cli/checkpoint-cmds/create.ts b/packages/graph-test-watcher/src/cli/checkpoint-cmds/create.ts index f2f1971d..f4b91aa0 100644 --- a/packages/graph-test-watcher/src/cli/checkpoint-cmds/create.ts +++ b/packages/graph-test-watcher/src/cli/checkpoint-cmds/create.ts @@ -39,5 +39,6 @@ export const handler = async (argv: any): Promise => { ); await createCheckpointCmd.initIndexer(Indexer, graphWatcher); + await createCheckpointCmd.exec(); }; diff --git a/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts b/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts index 3fdfda99..c3861260 100644 --- a/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts +++ b/packages/graph-test-watcher/src/cli/checkpoint-cmds/verify.ts @@ -35,5 +35,6 @@ export const handler = async (argv: any): Promise => { ); await verifyCheckpointCmd.initIndexer(Indexer, graphWatcher); + await verifyCheckpointCmd.exec(graphDb); }; diff --git a/packages/graph-test-watcher/src/cli/export-state.ts b/packages/graph-test-watcher/src/cli/export-state.ts index 3f6de118..e8e1fc2e 100644 --- a/packages/graph-test-watcher/src/cli/export-state.ts +++ b/packages/graph-test-watcher/src/cli/export-state.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await exportStateCmd.initIndexer(Indexer, graphWatcher); + await exportStateCmd.exec(); }; diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index 1b0368d9..0ab2593c 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -10,7 +10,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Indexer } from '../indexer'; -import { EventWatcher } from '../events'; import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); @@ -28,7 +27,8 @@ export const main = async (): Promise => { ENTITY_TO_LATEST_ENTITY_MAP ); - await importStateCmd.initIndexer(Indexer, EventWatcher, graphWatcher); + await importStateCmd.initIndexer(Indexer, graphWatcher); + await importStateCmd.exec(State, graphDb); }; diff --git a/packages/graph-test-watcher/src/cli/index-block.ts b/packages/graph-test-watcher/src/cli/index-block.ts index 78d4ec7b..32998111 100644 --- a/packages/graph-test-watcher/src/cli/index-block.ts +++ b/packages/graph-test-watcher/src/cli/index-block.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await indexBlockCmd.initIndexer(Indexer, graphWatcher); + await indexBlockCmd.exec(); }; diff --git a/packages/graph-test-watcher/src/cli/inspect-cid.ts b/packages/graph-test-watcher/src/cli/inspect-cid.ts index 2dc25314..4453e4d0 100644 --- a/packages/graph-test-watcher/src/cli/inspect-cid.ts +++ b/packages/graph-test-watcher/src/cli/inspect-cid.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await inspectCIDCmd.initIndexer(Indexer, graphWatcher); + await inspectCIDCmd.exec(); }; diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts index c0a92058..cdbe328f 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/watcher.ts @@ -32,5 +32,6 @@ export const handler = async (argv: any): Promise => { ); await resetWatcherCmd.initIndexer(Indexer, graphWatcher); + await resetWatcherCmd.exec(); }; diff --git a/packages/graph-test-watcher/src/cli/watch-contract.ts b/packages/graph-test-watcher/src/cli/watch-contract.ts index 9b2291dc..687c4921 100644 --- a/packages/graph-test-watcher/src/cli/watch-contract.ts +++ b/packages/graph-test-watcher/src/cli/watch-contract.ts @@ -27,6 +27,7 @@ const main = async (): Promise => { ); await watchContractCmd.initIndexer(Indexer, graphWatcher); + await watchContractCmd.exec(); }; diff --git a/packages/graph-test-watcher/src/events.ts b/packages/graph-test-watcher/src/events.ts deleted file mode 100644 index 4cb6e10d..00000000 --- a/packages/graph-test-watcher/src/events.ts +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { PubSub } from 'graphql-subscriptions'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import { - JobQueue, - EventWatcher as BaseEventWatcher, - EventWatcherInterface, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - IndexerInterface -} from '@cerc-io/util'; - -import { Indexer } from './indexer'; - -export class EventWatcher implements EventWatcherInterface { - _ethClient: EthClient - _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined - _baseEventWatcher: BaseEventWatcher - _pubsub: PubSub - _jobQueue: JobQueue - - constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - assert(ethClient); - assert(indexer); - - this._ethClient = ethClient; - this._indexer = indexer as Indexer; - this._pubsub = pubsub; - this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); - } - - getEventIterator (): AsyncIterator { - return this._baseEventWatcher.getEventIterator(); - } - - getBlockProgressEventIterator (): AsyncIterator { - return this._baseEventWatcher.getBlockProgressEventIterator(); - } - - async start (): Promise { - assert(!this._subscription, 'subscription already started'); - - await this.initBlockProcessingOnCompleteHandler(); - await this.initEventProcessingOnCompleteHandler(); - this._baseEventWatcher.startBlockProcessing(); - } - - async stop (): Promise { - this._baseEventWatcher.stop(); - } - - async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseEventWatcher.blockProcessingCompleteHandler(job); - }); - } - - async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseEventWatcher.eventProcessingCompleteHandler(job); - }); - } -} diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index 83745e35..a15c9f43 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -2,7 +2,6 @@ // Copyright 2021 Vulcanize, Inc. // -import assert from 'assert'; import 'reflect-metadata'; import debug from 'debug'; @@ -12,7 +11,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); @@ -29,7 +27,7 @@ export const main = async (): Promise => { ENTITY_TO_LATEST_ENTITY_MAP ); - await fillCmd.initIndexer(Indexer, EventWatcher, graphWatcher); + await fillCmd.initIndexer(Indexer, graphWatcher); // Get contractEntitiesMap required for fill-state // NOTE: Assuming each entity type is only mapped to a single contract diff --git a/packages/graph-test-watcher/src/resolvers.ts b/packages/graph-test-watcher/src/resolvers.ts index 8edbb706..b80ea97a 100644 --- a/packages/graph-test-watcher/src/resolvers.ts +++ b/packages/graph-test-watcher/src/resolvers.ts @@ -17,11 +17,10 @@ import { getResultState, setGQLCacheHints, IndexerInterface, - EventWatcherInterface + EventWatcher } from '@cerc-io/util'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; import { Author } from './entity/Author'; import { Blog } from './entity/Blog'; @@ -29,9 +28,8 @@ import { Category } from './entity/Category'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise => { const indexer = indexerArg as Indexer; - const eventWatcher = eventWatcherArg as EventWatcher; const gqlCacheConfig = indexer.serverConfig.gqlCache; diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index 43b4caea..12899c82 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -13,7 +13,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database'; -import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -30,7 +29,7 @@ export const main = async (): Promise => { ENTITY_TO_LATEST_ENTITY_MAP ); - await serverCmd.initIndexer(Indexer, EventWatcher, graphWatcher); + await serverCmd.initIndexer(Indexer, graphWatcher); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); return serverCmd.exec(createResolvers, typeDefs); diff --git a/packages/mobymask-watcher/package.json b/packages/mobymask-watcher/package.json index e092114d..f0ce83be 100644 --- a/packages/mobymask-watcher/package.json +++ b/packages/mobymask-watcher/package.json @@ -49,7 +49,6 @@ "ethers": "^5.4.4", "express": "^4.18.2", "graphql": "^15.5.0", - "graphql-subscriptions": "^2.0.0", "json-bigint": "^1.0.0", "reflect-metadata": "^0.1.13", "typeorm": "^0.2.32", diff --git a/packages/mobymask-watcher/src/cli/checkpoint-cmds/create.ts b/packages/mobymask-watcher/src/cli/checkpoint-cmds/create.ts index 75b09ba9..01408fe5 100644 --- a/packages/mobymask-watcher/src/cli/checkpoint-cmds/create.ts +++ b/packages/mobymask-watcher/src/cli/checkpoint-cmds/create.ts @@ -28,5 +28,6 @@ export const handler = async (argv: any): Promise => { const createCheckpointCmd = new CreateCheckpointCmd(); await createCheckpointCmd.init(argv, Database); await createCheckpointCmd.initIndexer(Indexer); + await createCheckpointCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/cli/export-state.ts b/packages/mobymask-watcher/src/cli/export-state.ts index 5a9bbcfb..d0ae922d 100644 --- a/packages/mobymask-watcher/src/cli/export-state.ts +++ b/packages/mobymask-watcher/src/cli/export-state.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const exportStateCmd = new ExportStateCmd(); await exportStateCmd.init(Database); await exportStateCmd.initIndexer(Indexer); + await exportStateCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/cli/import-state.ts b/packages/mobymask-watcher/src/cli/import-state.ts index 8980144e..79771fcd 100644 --- a/packages/mobymask-watcher/src/cli/import-state.ts +++ b/packages/mobymask-watcher/src/cli/import-state.ts @@ -9,7 +9,6 @@ import { ImportStateCmd } from '@cerc-io/cli'; import { Database } from '../database'; import { Indexer } from '../indexer'; -import { EventWatcher } from '../events'; import { State } from '../entity/State'; const log = debug('vulcanize:import-state'); @@ -17,7 +16,8 @@ const log = debug('vulcanize:import-state'); export const main = async (): Promise => { const importStateCmd = new ImportStateCmd(); await importStateCmd.init(Database); - await importStateCmd.initIndexer(Indexer, EventWatcher); + await importStateCmd.initIndexer(Indexer); + await importStateCmd.exec(State); }; diff --git a/packages/mobymask-watcher/src/cli/index-block.ts b/packages/mobymask-watcher/src/cli/index-block.ts index fab95b2c..4ebad2a0 100644 --- a/packages/mobymask-watcher/src/cli/index-block.ts +++ b/packages/mobymask-watcher/src/cli/index-block.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const indexBlockCmd = new IndexBlockCmd(); await indexBlockCmd.init(Database); await indexBlockCmd.initIndexer(Indexer); + await indexBlockCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/cli/inspect-cid.ts b/packages/mobymask-watcher/src/cli/inspect-cid.ts index d708a1b3..740cfdf3 100644 --- a/packages/mobymask-watcher/src/cli/inspect-cid.ts +++ b/packages/mobymask-watcher/src/cli/inspect-cid.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const inspectCIDCmd = new InspectCIDCmd(); await inspectCIDCmd.init(Database); await inspectCIDCmd.initIndexer(Indexer); + await inspectCIDCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts index 2dfdd68c..c6e651f5 100644 --- a/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/mobymask-watcher/src/cli/reset-cmds/watcher.ts @@ -21,5 +21,6 @@ export const handler = async (argv: any): Promise => { const resetWatcherCmd = new ResetWatcherCmd(); await resetWatcherCmd.init(argv, Database); await resetWatcherCmd.initIndexer(Indexer); + await resetWatcherCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/cli/watch-contract.ts b/packages/mobymask-watcher/src/cli/watch-contract.ts index 1934c09f..b68568e7 100644 --- a/packages/mobymask-watcher/src/cli/watch-contract.ts +++ b/packages/mobymask-watcher/src/cli/watch-contract.ts @@ -16,6 +16,7 @@ const main = async (): Promise => { const watchContractCmd = new WatchContractCmd(); await watchContractCmd.init(Database); await watchContractCmd.initIndexer(Indexer); + await watchContractCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/events.ts b/packages/mobymask-watcher/src/events.ts deleted file mode 100644 index 4cb6e10d..00000000 --- a/packages/mobymask-watcher/src/events.ts +++ /dev/null @@ -1,70 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { PubSub } from 'graphql-subscriptions'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import { - JobQueue, - EventWatcher as BaseEventWatcher, - EventWatcherInterface, - QUEUE_BLOCK_PROCESSING, - QUEUE_EVENT_PROCESSING, - IndexerInterface -} from '@cerc-io/util'; - -import { Indexer } from './indexer'; - -export class EventWatcher implements EventWatcherInterface { - _ethClient: EthClient - _indexer: Indexer - _subscription: ZenObservable.Subscription | undefined - _baseEventWatcher: BaseEventWatcher - _pubsub: PubSub - _jobQueue: JobQueue - - constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { - assert(ethClient); - assert(indexer); - - this._ethClient = ethClient; - this._indexer = indexer as Indexer; - this._pubsub = pubsub; - this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue); - } - - getEventIterator (): AsyncIterator { - return this._baseEventWatcher.getEventIterator(); - } - - getBlockProgressEventIterator (): AsyncIterator { - return this._baseEventWatcher.getBlockProgressEventIterator(); - } - - async start (): Promise { - assert(!this._subscription, 'subscription already started'); - - await this.initBlockProcessingOnCompleteHandler(); - await this.initEventProcessingOnCompleteHandler(); - this._baseEventWatcher.startBlockProcessing(); - } - - async stop (): Promise { - this._baseEventWatcher.stop(); - } - - async initBlockProcessingOnCompleteHandler (): Promise { - this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { - await this._baseEventWatcher.blockProcessingCompleteHandler(job); - }); - } - - async initEventProcessingOnCompleteHandler (): Promise { - await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { - await this._baseEventWatcher.eventProcessingCompleteHandler(job); - }); - } -} diff --git a/packages/mobymask-watcher/src/fill.ts b/packages/mobymask-watcher/src/fill.ts index 4c0eaead..0cfffd06 100644 --- a/packages/mobymask-watcher/src/fill.ts +++ b/packages/mobymask-watcher/src/fill.ts @@ -9,15 +9,14 @@ import { FillCmd } from '@cerc-io/cli'; import { Database } from './database'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); export const main = async (): Promise => { const fillCmd = new FillCmd(); await fillCmd.init(Database); + await fillCmd.initIndexer(Indexer); - await fillCmd.initIndexer(Indexer, EventWatcher); await fillCmd.exec(); }; diff --git a/packages/mobymask-watcher/src/resolvers.ts b/packages/mobymask-watcher/src/resolvers.ts index ba264794..5a1fb6e1 100644 --- a/packages/mobymask-watcher/src/resolvers.ts +++ b/packages/mobymask-watcher/src/resolvers.ts @@ -8,16 +8,14 @@ import debug from 'debug'; import Decimal from 'decimal.js'; import { GraphQLScalarType } from 'graphql'; -import { ValueResult, gqlTotalQueryCount, gqlQueryCount, getResultState, IndexerInterface, EventWatcherInterface } from '@cerc-io/util'; +import { ValueResult, gqlTotalQueryCount, gqlQueryCount, getResultState, IndexerInterface, EventWatcher } from '@cerc-io/util'; import { Indexer } from './indexer'; -import { EventWatcher } from './events'; const log = debug('vulcanize:resolver'); -export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise => { +export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise => { const indexer = indexerArg as Indexer; - const eventWatcher = eventWatcherArg as EventWatcher; return { BigInt: new BigInt('bigInt'), diff --git a/packages/mobymask-watcher/src/server.ts b/packages/mobymask-watcher/src/server.ts index 35a6240a..ff23134f 100644 --- a/packages/mobymask-watcher/src/server.ts +++ b/packages/mobymask-watcher/src/server.ts @@ -12,14 +12,13 @@ import { ServerCmd } from '@cerc-io/cli'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; import { Database } from './database'; -import { EventWatcher } from './events'; const log = debug('vulcanize:server'); export const main = async (): Promise => { const serverCmd = new ServerCmd(); await serverCmd.init(Database); - await serverCmd.initIndexer(Indexer, EventWatcher); + await serverCmd.initIndexer(Indexer); const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString(); return serverCmd.exec(createResolvers, typeDefs); diff --git a/packages/util/package.json b/packages/util/package.json index 820db202..af3342cb 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -44,7 +44,6 @@ "@types/ws": "^8.5.3", "@typescript-eslint/eslint-plugin": "^4.25.0", "@typescript-eslint/parser": "^4.25.0", - "apollo-server-express": "^3.11.1", "decimal.js": "^10.3.1", "eslint": "^7.27.0", "eslint-config-semistandard": "^15.0.1", diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index f6761c72..9956b4d6 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -23,7 +23,6 @@ export const BlockProgressEvent = 'block-progress-event'; export class EventWatcher { _ethClient: EthClient _indexer: IndexerInterface - _subscription?: ZenObservable.Subscription _pubsub: PubSub _jobQueue: JobQueue @@ -42,11 +41,22 @@ export class EventWatcher { return this._pubsub.asyncIterator([BlockProgressEvent]); } - async stop (): Promise { - if (this._subscription) { - log('Stopped watching upstream blocks'); - this._subscription.unsubscribe(); - } + async start (): Promise { + await this.initBlockProcessingOnCompleteHandler(); + await this.initEventProcessingOnCompleteHandler(); + this.startBlockProcessing(); + } + + async initBlockProcessingOnCompleteHandler (): Promise { + this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => { + await this.blockProcessingCompleteHandler(job); + }); + } + + async initEventProcessingOnCompleteHandler (): Promise { + await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => { + await this.eventProcessingCompleteHandler(job); + }); } async startBlockProcessing (): Promise { diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index 3afdd9f0..b4cd082e 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -5,17 +5,18 @@ import debug from 'debug'; import { JobQueue } from './job-queue'; -import { EventWatcherInterface, IndexerInterface } from './types'; +import { IndexerInterface } from './types'; import { wait } from './misc'; import { processBlockByNumber } from './common'; import { DEFAULT_PREFETCH_BATCH_SIZE } from './constants'; +import { EventWatcher } from './events'; const log = debug('vulcanize:fill'); export const fillBlocks = async ( jobQueue: JobQueue, indexer: IndexerInterface, - eventWatcher: EventWatcherInterface, + eventWatcher: EventWatcher, blockDelayInMilliSecs: number, argv: { startBlock: number, diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index db0c4162..2802bee3 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -140,13 +140,6 @@ export interface IndexerInterface { getResultEvent (event: EventInterface): any } -export interface EventWatcherInterface { - start (): Promise - getBlockProgressEventIterator (): AsyncIterator - initBlockProcessingOnCompleteHandler (): Promise - initEventProcessingOnCompleteHandler (): Promise -} - export interface DatabaseInterface { _conn: Connection; readonly baseDatabase: Database