watcher-ts/packages/eden-watcher/src/server.ts

94 lines
3.0 KiB
TypeScript
Raw Normal View History

//
// Copyright 2021 Vulcanize, Inc.
//
import fs from 'fs';
import path from 'path';
import assert from 'assert';
import 'reflect-metadata';
import express, { Application } from 'express';
import { PubSub } from 'graphql-subscriptions';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import 'graphql-import-node';
import { DEFAULT_CONFIG_PATH, getConfig, Config, JobQueue, KIND_ACTIVE, initClients, startGQLMetricsServer, createAndStartServer } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;
const config: Config = await getConfig(argv.f);
const { ethClient, ethProvider } = await initClients(config);
const { kind: watcherKind } = config.server;
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}
const resolvers = await createResolvers(indexer, eventWatcher);
const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();
// Create an Express app
const app: Application = express();
const server = createAndStartServer(app, typeDefs, resolvers, config.server);
startGQLMetricsServer(config);
return { app, server };
};
main().then(() => {
log('Starting server...');
}).catch(err => {
log(err);
});