Move event watcher to util (#262)

This commit is contained in:
prathamesh0 2022-11-25 05:49:37 -06:00 committed by GitHub
parent e47aab2ed7
commit 94c8ed9575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
89 changed files with 111 additions and 590 deletions

View File

@ -10,7 +10,6 @@
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/"
},
"dependencies": {
"@cerc-io/ipld-eth-client": "^0.2.16",
"@cerc-io/util": "^0.2.16",
"@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1",
@ -24,6 +23,7 @@
"yargs": "^17.0.1"
},
"devDependencies": {
"@types/express": "^4.17.14",
"@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0",
"eslint-config-semistandard": "^15.0.1",

View File

@ -17,10 +17,9 @@ import {
IndexerInterface,
ServerConfig,
Clients,
EventWatcherInterface,
EventWatcher,
GraphWatcherInterface
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
export class BaseCmd {
_config?: Config;
@ -29,7 +28,7 @@ export class BaseCmd {
_jobQueue?: JobQueue
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
_eventWatcher?: EventWatcherInterface;
_eventWatcher?: EventWatcher;
get config (): Config | undefined {
return this._config;
@ -55,7 +54,7 @@ export class BaseCmd {
return this._indexer;
}
get eventWatcher (): EventWatcherInterface | undefined {
get eventWatcher (): EventWatcher | undefined {
return this._eventWatcher;
}
@ -103,7 +102,7 @@ export class BaseCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
assert(this._config);
assert(this._database);
assert(this._clients);
@ -119,14 +118,7 @@ export class BaseCmd {
}
}
async initEventWatcher (
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface
): Promise<void> {
async initEventWatcher (): Promise<void> {
assert(this._clients?.ethClient);
assert(this._indexer);
assert(this._jobQueue);

View File

@ -80,7 +80,7 @@ export class CreateCheckpointCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -81,7 +81,7 @@ export class VerifyCheckpointCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -87,7 +87,7 @@ export class ExportStateCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -8,7 +8,6 @@ import debug from 'debug';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
import { JsonRpcProvider } from '@ethersproject/providers';
import {
@ -19,12 +18,10 @@ import {
IndexerInterface,
ServerConfig,
Clients,
EventWatcherInterface,
fillBlocks,
GraphWatcherInterface,
Config
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { BaseCmd } from './base';
@ -63,10 +60,6 @@ export class FillCmd {
return this._baseCmd.database;
}
get indexer (): IndexerInterface | undefined {
return this._baseCmd.indexer;
}
async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
@ -95,16 +88,10 @@ export class FillCmd {
jobQueue: JobQueue,
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
await this._baseCmd.initIndexer(Indexer, graphWatcher);
await this._baseCmd.initEventWatcher(EventWatcher);
await this._baseCmd.initEventWatcher();
}
async exec (contractEntitiesMap: Map<string, string[]> = new Map()): Promise<void> {

View File

@ -9,10 +9,8 @@ import path from 'path';
import fs from 'fs';
import debug from 'debug';
import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
import { JsonRpcProvider } from '@ethersproject/providers';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
@ -20,7 +18,6 @@ import {
IndexerInterface,
ServerConfig,
Clients,
EventWatcherInterface,
fillBlocks,
StateKind,
GraphWatcherInterface,
@ -91,16 +88,10 @@ export class ImportStateCmd {
jobQueue: JobQueue,
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
await this._baseCmd.initIndexer(Indexer, graphWatcher);
await this._baseCmd.initEventWatcher(EventWatcher);
await this._baseCmd.initEventWatcher();
}
async exec (State: new() => any, graphDb?: GraphDatabase): Promise<void> {

View File

@ -80,7 +80,7 @@ export class IndexBlockCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -83,7 +83,7 @@ export class InspectCIDCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -89,7 +89,7 @@ export class JobRunnerCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -79,7 +79,7 @@ export class ResetWatcherCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -7,12 +7,10 @@ import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
import express, { Application } from 'express';
import { ApolloServer } from 'apollo-server-express';
import { JsonRpcProvider } from '@ethersproject/providers';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
@ -20,10 +18,10 @@ import {
IndexerInterface,
ServerConfig,
Clients,
EventWatcherInterface,
KIND_ACTIVE,
createAndStartServer,
startGQLMetricsServer,
EventWatcher,
GraphWatcherInterface,
Config
} from '@cerc-io/util';
@ -87,20 +85,14 @@ export class ServerCmd {
jobQueue: JobQueue,
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
await this._baseCmd.initIndexer(Indexer, graphWatcher);
await this._baseCmd.initEventWatcher(EventWatcher);
await this._baseCmd.initEventWatcher();
}
async exec (
createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcherInterface) => Promise<any>,
createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcher) => Promise<any>,
typeDefs: TypeSource
): Promise<{
app: Application,

View File

@ -82,7 +82,7 @@ export class WatchContractCmd {
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
) {
): Promise<void> {
return this._baseCmd.initIndexer(Indexer, graphWatcher);
}

View File

@ -1,21 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const TEMPLATE_FILE = './templates/events-template.handlebars';
/**
* Writes the events file generated from a template to a stream.
* @param outStream A writable output stream to write the events file to.
*/
export function exportEvents (outStream: Writable): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const events = template({});
outStream.write(events);
}

View File

@ -24,7 +24,6 @@ import { generateArtifacts } from './artifacts';
import { exportPackage } from './package';
import { exportTSConfig } from './tsconfig';
import { exportReadme } from './readme';
import { exportEvents } from './events';
import { exportJobRunner } from './job-runner';
import { exportWatchContract } from './watch-contract';
import { exportLint } from './lint';
@ -226,11 +225,6 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) {
: process.stdout;
exportReadme(path.basename(outputDir), config.port, outStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/events.ts'))
: process.stdout;
exportEvents(outStream);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/job-runner.ts'))
: process.stdout;

View File

@ -43,5 +43,6 @@ export const handler = async (argv: any): Promise<void> => {
{{/if}}
await createCheckpointCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await createCheckpointCmd.exec();
};

View File

@ -35,5 +35,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await verifyCheckpointCmd.initIndexer(Indexer, graphWatcher);
await verifyCheckpointCmd.exec(graphDb);
};

View File

@ -1,70 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { PubSub } from 'graphql-subscriptions';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
IndexerInterface
} from '@cerc-io/util';
import { Indexer } from './indexer';
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_baseEventWatcher: BaseEventWatcher
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getEventIterator();
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}

View File

@ -31,6 +31,7 @@ const main = async (): Promise<void> => {
{{/if}}
await exportStateCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await exportStateCmd.exec();
};

View File

@ -2,7 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import debug from 'debug';
@ -14,7 +13,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
@ -33,7 +31,7 @@ export const main = async (): Promise<any> => {
);
{{/if}}
await fillCmd.initIndexer(Indexer, EventWatcher{{#if (subgraphPath)}}, graphWatcher{{/if}});
await fillCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
{{#if (subgraphPath)}}
// Get contractEntitiesMap required for fill-state

View File

@ -12,7 +12,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
const log = debug('vulcanize:import-state');
@ -32,7 +31,8 @@ export const main = async (): Promise<any> => {
);
{{/if}}
await importStateCmd.initIndexer(Indexer, EventWatcher{{#if (subgraphPath)}}, graphWatcher{{/if}});
await importStateCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await importStateCmd.exec(State{{#if (subgraphPath)}}, graphDb{{/if}});
};

View File

@ -31,6 +31,7 @@ const main = async (): Promise<void> => {
{{/if}}
await indexBlockCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await indexBlockCmd.exec();
};

View File

@ -31,6 +31,7 @@ const main = async (): Promise<void> => {
{{/if}}
await inspectCIDCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await inspectCIDCmd.exec();
};

View File

@ -54,7 +54,6 @@
"ethers": "^5.4.4",
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",

View File

@ -36,5 +36,6 @@ export const handler = async (argv: any): Promise<void> => {
{{/if}}
await resetWatcherCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await resetWatcherCmd.exec();
};

View File

@ -17,7 +17,7 @@ import {
getResultState,
setGQLCacheHints,
IndexerInterface,
EventWatcherInterface
EventWatcher
} from '@cerc-io/util';
import { Indexer } from './indexer';
@ -31,9 +31,8 @@ import { {{query.entityName}} } from './entity/{{query.entityName}}';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;
const gqlCacheConfig = indexer.serverConfig.gqlCache;

View File

@ -15,7 +15,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
@ -34,9 +33,10 @@ export const main = async (): Promise<any> => {
);
{{/if}}
await serverCmd.initIndexer(Indexer, EventWatcher{{#if (subgraphPath)}}, graphWatcher{{/if}});
await serverCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();
return serverCmd.exec(createResolvers, typeDefs);
};

View File

@ -31,6 +31,7 @@ const main = async (): Promise<void> => {
{{/if}}
await watchContractCmd.initIndexer(Indexer{{#if (subgraphPath)}}, graphWatcher{{/if}});
await watchContractCmd.exec();
};

View File

@ -51,7 +51,6 @@
"ethers": "^5.4.4",
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",

View File

@ -39,5 +39,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await createCheckpointCmd.initIndexer(Indexer, graphWatcher);
await createCheckpointCmd.exec();
};

View File

@ -35,5 +35,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await verifyCheckpointCmd.initIndexer(Indexer, graphWatcher);
await verifyCheckpointCmd.exec(graphDb);
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await exportStateCmd.initIndexer(Indexer, graphWatcher);
await exportStateCmd.exec();
};

View File

@ -10,7 +10,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
const log = debug('vulcanize:import-state');
@ -28,7 +27,8 @@ export const main = async (): Promise<any> => {
ENTITY_TO_LATEST_ENTITY_MAP
);
await importStateCmd.initIndexer(Indexer, EventWatcher, graphWatcher);
await importStateCmd.initIndexer(Indexer, graphWatcher);
await importStateCmd.exec(State, graphDb);
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await indexBlockCmd.initIndexer(Indexer, graphWatcher);
await indexBlockCmd.exec();
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await inspectCIDCmd.initIndexer(Indexer, graphWatcher);
await inspectCIDCmd.exec();
};

View File

@ -32,5 +32,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await resetWatcherCmd.initIndexer(Indexer, graphWatcher);
await resetWatcherCmd.exec();
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await watchContractCmd.initIndexer(Indexer, graphWatcher);
await watchContractCmd.exec();
};

View File

@ -1,70 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { PubSub } from 'graphql-subscriptions';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
IndexerInterface
} from '@cerc-io/util';
import { Indexer } from './indexer';
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_baseEventWatcher: BaseEventWatcher
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getEventIterator();
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}

View File

@ -2,7 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import debug from 'debug';
@ -12,7 +11,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
@ -29,7 +27,7 @@ export const main = async (): Promise<any> => {
ENTITY_TO_LATEST_ENTITY_MAP
);
await fillCmd.initIndexer(Indexer, EventWatcher, graphWatcher);
await fillCmd.initIndexer(Indexer, graphWatcher);
// Get contractEntitiesMap required for fill-state
// NOTE: Assuming each entity type is only mapped to a single contract

View File

@ -8,11 +8,9 @@ import debug from 'debug';
import Decimal from 'decimal.js';
import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql';
import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints, IndexerInterface, EventWatcherInterface } from '@cerc-io/util';
import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer, getResultState, setGQLCacheHints, IndexerInterface, EventWatcher } from '@cerc-io/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { Producer } from './entity/Producer';
import { ProducerSet } from './entity/ProducerSet';
import { ProducerSetChange } from './entity/ProducerSetChange';
@ -34,9 +32,8 @@ import { Account } from './entity/Account';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;
const gqlCacheConfig = indexer.serverConfig.gqlCache;

View File

@ -13,7 +13,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
@ -30,7 +29,7 @@ export const main = async (): Promise<any> => {
ENTITY_TO_LATEST_ENTITY_MAP
);
await serverCmd.initIndexer(Indexer, EventWatcher, graphWatcher);
await serverCmd.initIndexer(Indexer, graphWatcher);
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();
return serverCmd.exec(createResolvers, typeDefs);

View File

@ -53,7 +53,6 @@
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",

View File

@ -21,5 +21,6 @@ export const handler = async (argv: any): Promise<void> => {
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database);
await resetWatcherCmd.initIndexer(Indexer);
await resetWatcherCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const watchContractCmd = new WatchContractCmd();
await watchContractCmd.init(Database);
await watchContractCmd.initIndexer(Indexer);
await watchContractCmd.exec();
};

View File

@ -1,70 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { PubSub } from 'graphql-subscriptions';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
IndexerInterface
} from '@cerc-io/util';
import { Indexer } from './indexer';
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_baseEventWatcher: BaseEventWatcher
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getEventIterator();
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}

View File

@ -9,14 +9,13 @@ import { FillCmd } from '@cerc-io/cli';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const fillCmd = new FillCmd();
await fillCmd.init(Database);
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);
await fillCmd.exec();
};

View File

@ -6,17 +6,15 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';
import { EventWatcherInterface, IndexerInterface, ValueResult } from '@cerc-io/util';
import { EventWatcher, IndexerInterface, ValueResult } from '@cerc-io/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { CONTRACT_KIND } from './utils/index';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;
return {
BigInt: new BigInt('bigInt'),

View File

@ -13,14 +13,13 @@ import typeDefs from './schema';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database);
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);
return serverCmd.exec(createResolvers, typeDefs);
};

View File

@ -58,7 +58,6 @@
"ethers": "^5.4.4",
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",

View File

@ -28,5 +28,6 @@ export const handler = async (argv: any): Promise<void> => {
const createCheckpointCmd = new CreateCheckpointCmd();
await createCheckpointCmd.init(argv, Database);
await createCheckpointCmd.initIndexer(Indexer);
await createCheckpointCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const exportStateCmd = new ExportStateCmd();
await exportStateCmd.init(Database);
await exportStateCmd.initIndexer(Indexer);
await exportStateCmd.exec();
};

View File

@ -9,7 +9,6 @@ import { ImportStateCmd } from '@cerc-io/cli';
import { Database } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
const log = debug('vulcanize:import-state');
@ -17,7 +16,8 @@ const log = debug('vulcanize:import-state');
export const main = async (): Promise<any> => {
const importStateCmd = new ImportStateCmd();
await importStateCmd.init(Database);
await importStateCmd.initIndexer(Indexer, EventWatcher);
await importStateCmd.initIndexer(Indexer);
await importStateCmd.exec(State);
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const indexBlockCmd = new IndexBlockCmd();
await indexBlockCmd.init(Database);
await indexBlockCmd.initIndexer(Indexer);
await indexBlockCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const inspectCIDCmd = new InspectCIDCmd();
await inspectCIDCmd.init(Database);
await inspectCIDCmd.initIndexer(Indexer);
await inspectCIDCmd.exec();
};

View File

@ -21,5 +21,6 @@ export const handler = async (argv: any): Promise<void> => {
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database);
await resetWatcherCmd.initIndexer(Indexer);
await resetWatcherCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const watchContractCmd = new WatchContractCmd();
await watchContractCmd.init(Database);
await watchContractCmd.initIndexer(Indexer);
await watchContractCmd.exec();
};

View File

@ -1,70 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { PubSub } from 'graphql-subscriptions';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
IndexerInterface
} from '@cerc-io/util';
import { Indexer } from './indexer';
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_baseEventWatcher: BaseEventWatcher
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getEventIterator();
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}

View File

@ -9,7 +9,6 @@ import { FillCmd } from '@cerc-io/cli';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
@ -17,7 +16,7 @@ export const main = async (): Promise<any> => {
const fillCmd = new FillCmd();
await fillCmd.init(Database);
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);
await fillCmd.exec();
};

View File

@ -8,16 +8,14 @@ import debug from 'debug';
import Decimal from 'decimal.js';
import { GraphQLScalarType } from 'graphql';
import { ValueResult, BlockHeight, getResultState, IndexerInterface, EventWatcherInterface } from '@cerc-io/util';
import { ValueResult, BlockHeight, getResultState, IndexerInterface, EventWatcher } from '@cerc-io/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;
return {
BigInt: new BigInt('bigInt'),

View File

@ -12,14 +12,13 @@ import { ServerCmd } from '@cerc-io/cli';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database);
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();
return serverCmd.exec(createResolvers, typeDefs);

View File

@ -51,7 +51,6 @@
"ethers": "^5.4.4",
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",

View File

@ -39,5 +39,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await createCheckpointCmd.initIndexer(Indexer, graphWatcher);
await createCheckpointCmd.exec();
};

View File

@ -35,5 +35,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await verifyCheckpointCmd.initIndexer(Indexer, graphWatcher);
await verifyCheckpointCmd.exec(graphDb);
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await exportStateCmd.initIndexer(Indexer, graphWatcher);
await exportStateCmd.exec();
};

View File

@ -10,7 +10,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
const log = debug('vulcanize:import-state');
@ -28,7 +27,8 @@ export const main = async (): Promise<any> => {
ENTITY_TO_LATEST_ENTITY_MAP
);
await importStateCmd.initIndexer(Indexer, EventWatcher, graphWatcher);
await importStateCmd.initIndexer(Indexer, graphWatcher);
await importStateCmd.exec(State, graphDb);
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await indexBlockCmd.initIndexer(Indexer, graphWatcher);
await indexBlockCmd.exec();
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await inspectCIDCmd.initIndexer(Indexer, graphWatcher);
await inspectCIDCmd.exec();
};

View File

@ -32,5 +32,6 @@ export const handler = async (argv: any): Promise<void> => {
);
await resetWatcherCmd.initIndexer(Indexer, graphWatcher);
await resetWatcherCmd.exec();
};

View File

@ -27,6 +27,7 @@ const main = async (): Promise<void> => {
);
await watchContractCmd.initIndexer(Indexer, graphWatcher);
await watchContractCmd.exec();
};

View File

@ -1,70 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { PubSub } from 'graphql-subscriptions';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
IndexerInterface
} from '@cerc-io/util';
import { Indexer } from './indexer';
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_baseEventWatcher: BaseEventWatcher
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getEventIterator();
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}

View File

@ -2,7 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import 'reflect-metadata';
import debug from 'debug';
@ -12,7 +11,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
@ -29,7 +27,7 @@ export const main = async (): Promise<any> => {
ENTITY_TO_LATEST_ENTITY_MAP
);
await fillCmd.initIndexer(Indexer, EventWatcher, graphWatcher);
await fillCmd.initIndexer(Indexer, graphWatcher);
// Get contractEntitiesMap required for fill-state
// NOTE: Assuming each entity type is only mapped to a single contract

View File

@ -17,11 +17,10 @@ import {
getResultState,
setGQLCacheHints,
IndexerInterface,
EventWatcherInterface
EventWatcher
} from '@cerc-io/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { Author } from './entity/Author';
import { Blog } from './entity/Blog';
@ -29,9 +28,8 @@ import { Category } from './entity/Category';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;
const gqlCacheConfig = indexer.serverConfig.gqlCache;

View File

@ -13,7 +13,6 @@ import { getGraphDbAndWatcher } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
@ -30,7 +29,7 @@ export const main = async (): Promise<any> => {
ENTITY_TO_LATEST_ENTITY_MAP
);
await serverCmd.initIndexer(Indexer, EventWatcher, graphWatcher);
await serverCmd.initIndexer(Indexer, graphWatcher);
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();
return serverCmd.exec(createResolvers, typeDefs);

View File

@ -49,7 +49,6 @@
"ethers": "^5.4.4",
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",

View File

@ -28,5 +28,6 @@ export const handler = async (argv: any): Promise<void> => {
const createCheckpointCmd = new CreateCheckpointCmd();
await createCheckpointCmd.init(argv, Database);
await createCheckpointCmd.initIndexer(Indexer);
await createCheckpointCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const exportStateCmd = new ExportStateCmd();
await exportStateCmd.init(Database);
await exportStateCmd.initIndexer(Indexer);
await exportStateCmd.exec();
};

View File

@ -9,7 +9,6 @@ import { ImportStateCmd } from '@cerc-io/cli';
import { Database } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
const log = debug('vulcanize:import-state');
@ -17,7 +16,8 @@ const log = debug('vulcanize:import-state');
export const main = async (): Promise<any> => {
const importStateCmd = new ImportStateCmd();
await importStateCmd.init(Database);
await importStateCmd.initIndexer(Indexer, EventWatcher);
await importStateCmd.initIndexer(Indexer);
await importStateCmd.exec(State);
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const indexBlockCmd = new IndexBlockCmd();
await indexBlockCmd.init(Database);
await indexBlockCmd.initIndexer(Indexer);
await indexBlockCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const inspectCIDCmd = new InspectCIDCmd();
await inspectCIDCmd.init(Database);
await inspectCIDCmd.initIndexer(Indexer);
await inspectCIDCmd.exec();
};

View File

@ -21,5 +21,6 @@ export const handler = async (argv: any): Promise<void> => {
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database);
await resetWatcherCmd.initIndexer(Indexer);
await resetWatcherCmd.exec();
};

View File

@ -16,6 +16,7 @@ const main = async (): Promise<void> => {
const watchContractCmd = new WatchContractCmd();
await watchContractCmd.init(Database);
await watchContractCmd.initIndexer(Indexer);
await watchContractCmd.exec();
};

View File

@ -1,70 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { PubSub } from 'graphql-subscriptions';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
IndexerInterface
} from '@cerc-io/util';
import { Indexer } from './indexer';
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
_baseEventWatcher: BaseEventWatcher
_pubsub: PubSub
_jobQueue: JobQueue
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}
getEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getEventIterator();
}
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}
async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}
async stop (): Promise<void> {
this._baseEventWatcher.stop();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}

View File

@ -9,15 +9,14 @@ import { FillCmd } from '@cerc-io/cli';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
export const main = async (): Promise<any> => {
const fillCmd = new FillCmd();
await fillCmd.init(Database);
await fillCmd.initIndexer(Indexer);
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.exec();
};

View File

@ -8,16 +8,14 @@ import debug from 'debug';
import Decimal from 'decimal.js';
import { GraphQLScalarType } from 'graphql';
import { ValueResult, gqlTotalQueryCount, gqlQueryCount, getResultState, IndexerInterface, EventWatcherInterface } from '@cerc-io/util';
import { ValueResult, gqlTotalQueryCount, gqlQueryCount, getResultState, IndexerInterface, EventWatcher } from '@cerc-io/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
const log = debug('vulcanize:resolver');
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;
return {
BigInt: new BigInt('bigInt'),

View File

@ -12,14 +12,13 @@ import { ServerCmd } from '@cerc-io/cli';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database);
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();
return serverCmd.exec(createResolvers, typeDefs);

View File

@ -44,7 +44,6 @@
"@types/ws": "^8.5.3",
"@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0",
"apollo-server-express": "^3.11.1",
"decimal.js": "^10.3.1",
"eslint": "^7.27.0",
"eslint-config-semistandard": "^15.0.1",

View File

@ -23,7 +23,6 @@ export const BlockProgressEvent = 'block-progress-event';
export class EventWatcher {
_ethClient: EthClient
_indexer: IndexerInterface
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
@ -42,11 +41,22 @@ export class EventWatcher {
return this._pubsub.asyncIterator([BlockProgressEvent]);
}
async stop (): Promise<void> {
if (this._subscription) {
log('Stopped watching upstream blocks');
this._subscription.unsubscribe();
}
async start (): Promise<void> {
await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this.startBlockProcessing();
}
async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.blockProcessingCompleteHandler(job);
});
}
async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this.eventProcessingCompleteHandler(job);
});
}
async startBlockProcessing (): Promise<void> {

View File

@ -5,17 +5,18 @@
import debug from 'debug';
import { JobQueue } from './job-queue';
import { EventWatcherInterface, IndexerInterface } from './types';
import { IndexerInterface } from './types';
import { wait } from './misc';
import { processBlockByNumber } from './common';
import { DEFAULT_PREFETCH_BATCH_SIZE } from './constants';
import { EventWatcher } from './events';
const log = debug('vulcanize:fill');
export const fillBlocks = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
eventWatcher: EventWatcherInterface,
eventWatcher: EventWatcher,
blockDelayInMilliSecs: number,
argv: {
startBlock: number,

View File

@ -140,13 +140,6 @@ export interface IndexerInterface {
getResultEvent (event: EventInterface): any
}
export interface EventWatcherInterface {
start (): Promise<void>
getBlockProgressEventIterator (): AsyncIterator<any>
initBlockProcessingOnCompleteHandler (): Promise<void>
initEventProcessingOnCompleteHandler (): Promise<void>
}
export interface DatabaseInterface {
_conn: Connection;
readonly baseDatabase: Database