watcher-ts/packages/cli/src/server.ts

352 lines
9.4 KiB
TypeScript

//
// Copyright 2022 Vulcanize, Inc.
//
import debug from 'debug';
import path from 'path';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import express, { Application } from 'express';
import { ApolloServer } from 'apollo-server-express';
import winston from 'winston';
import { JsonRpcProvider } from '@ethersproject/providers';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients,
KIND_ACTIVE,
createAndStartServer,
startGQLMetricsServer,
EventWatcher,
GraphWatcherInterface,
Config,
PaymentsManager,
Consensus,
readParty,
UpstreamConfig,
fillBlocks,
createGQLLogger,
createEthRPCHandlers
} from '@cerc-io/util';
import { TypeSource } from '@graphql-tools/utils';
import type {
RelayNodeInitConfig,
PeerInitConfig,
PeerIdObj,
Peer
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
} from '@cerc-io/peer';
import { utils } from '@cerc-io/nitro-node';
// @ts-expect-error TODO: Resolve (Not able to find the type declarations)
import type { Libp2p } from '@cerc-io/libp2p';
import { BaseCmd } from './base';
import { readPeerId, getStartBlock } from './utils/index';
const log = debug('vulcanize:server');
interface Arguments {
configFile: string;
}
export class ServerCmd {
_argv?: Arguments;
_baseCmd: BaseCmd;
_peer?: Peer;
_nitro?: utils.Nitro;
_consensus?: Consensus;
constructor () {
this._baseCmd = new BaseCmd();
}
get config (): Config {
return this._baseCmd.config;
}
get clients (): Clients {
return this._baseCmd.clients;
}
get ethProvider (): JsonRpcProvider {
return this._baseCmd.ethProvider;
}
get database (): DatabaseInterface {
return this._baseCmd.database;
}
get peer (): Peer | undefined {
return this._peer;
}
get nitro (): utils.Nitro | undefined {
return this._nitro;
}
get consensus (): Consensus | undefined {
return this._consensus;
}
async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
return this._baseCmd.initConfig(this._argv.configFile);
}
async init (
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
clients: { [key: string]: any } = {}
): Promise<void> {
await this.initConfig();
await this._baseCmd.init(Database, clients);
}
async initIndexer (
Indexer: new (
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
): Promise<void> {
await this._baseCmd.initIndexer(Indexer, graphWatcher);
await this._baseCmd.initEventWatcher();
}
async initP2P (): Promise<{ relayNode?: Libp2p, peer?: Peer }> {
let relayNode: Libp2p | undefined;
// Start P2P nodes if config provided
const p2pConfig = this._baseCmd.config.server.p2p;
if (!p2pConfig) {
return {};
}
const { createRelayNode, Peer } = await import('@cerc-io/peer');
const {
RELAY_DEFAULT_HOST,
RELAY_DEFAULT_PORT,
RELAY_DEFAULT_MAX_DIAL_RETRY,
RELAY_REDIAL_INTERVAL,
DEFAULT_PING_INTERVAL,
DIAL_TIMEOUT
} = await import('@cerc-io/peer');
// Run the relay node if enabled
if (p2pConfig.enableRelay) {
const relayConfig = p2pConfig.relay;
assert(relayConfig, 'Relay config not set');
let peerIdObj: PeerIdObj | undefined;
if (relayConfig.peerIdFile) {
peerIdObj = readPeerId(relayConfig.peerIdFile);
}
const relayNodeInit: RelayNodeInitConfig = {
host: relayConfig.host ?? RELAY_DEFAULT_HOST,
port: relayConfig.port ?? RELAY_DEFAULT_PORT,
announceDomain: relayConfig.announce,
relayPeers: relayConfig.relayPeers ?? [],
denyMultiaddrs: relayConfig.denyMultiaddrs ?? [],
dialTimeout: relayConfig.dialTimeout ?? DIAL_TIMEOUT,
pingInterval: relayConfig.pingInterval ?? DEFAULT_PING_INTERVAL,
redialInterval: relayConfig.redialInterval ?? RELAY_REDIAL_INTERVAL,
maxDialRetry: relayConfig.maxDialRetry ?? RELAY_DEFAULT_MAX_DIAL_RETRY,
peerIdObj,
pubsub: relayConfig.pubsub,
enableDebugInfo: relayConfig.enableDebugInfo
};
relayNode = await createRelayNode(relayNodeInit);
}
// Run a peer node if enabled
if (p2pConfig.enablePeer) {
const peerConfig = p2pConfig.peer;
assert(peerConfig, 'Peer config not set');
let peerIdObj: PeerIdObj | undefined;
if (peerConfig.peerIdFile) {
peerIdObj = readPeerId(peerConfig.peerIdFile);
}
this._peer = new Peer(peerConfig.relayMultiaddr, true);
const peerNodeInit: PeerInitConfig = {
pingInterval: peerConfig.pingInterval,
pingTimeout: peerConfig.pingTimeout,
denyMultiaddrs: peerConfig.denyMultiaddrs,
maxRelayConnections: peerConfig.maxRelayConnections,
relayRedialInterval: peerConfig.relayRedialInterval,
maxConnections: peerConfig.maxConnections,
dialTimeout: peerConfig.dialTimeout,
pubsub: peerConfig.pubsub,
directPeers: peerConfig.directPeers,
enableDebugInfo: peerConfig.enableDebugInfo
};
await this._peer.init(peerNodeInit, peerIdObj);
log(`Peer ID: ${this._peer.peerId?.toString()}`);
}
return { relayNode, peer: this._peer };
}
async initConsensus (): Promise<Consensus | undefined> {
const p2pConfig = this._baseCmd.config.server.p2p;
if (!p2pConfig || !p2pConfig.consensus) {
return;
}
const { consensus: consensusConfig } = p2pConfig;
// Setup consensus engine if enabled
// Consensus requires p2p peer to be enabled
if (!p2pConfig.enablePeer || !consensusConfig.enabled) {
return;
}
assert(this.peer);
const watcherPartyPeers = readParty(consensusConfig.watcherPartyPeersFile);
// Create and initialize the consensus engine
this._consensus = new Consensus({
peer: this.peer,
publicKey: consensusConfig.publicKey,
privateKey: consensusConfig.privateKey,
partyPeers: watcherPartyPeers
});
// Connect registers the required p2p protocol handlers and starts the engine
this._consensus.connect();
log('Consensus engine started');
return this._consensus;
}
async initNitro (nitroContractAddresses: { [key: string]: string }): Promise<utils.Nitro | undefined> {
// Start a Nitro node
const { server: { p2p: { enablePeer, nitro: nitroConfig } } } = this._baseCmd.config;
// Nitro requires p2p peer to be enabled
if (!enablePeer || !nitroConfig) {
return;
}
assert(this.peer);
log(`Using chain URL ${nitroConfig.chainUrl} for Nitro node`);
this._nitro = await utils.Nitro.setupNode(
nitroConfig.privateKey,
nitroConfig.chainUrl,
nitroConfig.chainPrivateKey,
nitroContractAddresses,
this.peer,
true,
path.resolve(nitroConfig.store || './out/nitro-data')
);
log(`Nitro node started with address: ${this._nitro.node.address}`);
return this._nitro;
}
async exec (
createResolvers: (
indexer: IndexerInterface,
eventWatcher: EventWatcher,
gqlLogger: winston.Logger
) => Promise<any>,
typeDefs: TypeSource,
paymentsManager?: PaymentsManager
): Promise<{
app: Application,
server: ApolloServer
}> {
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;
const ethProvider = this._baseCmd.ethProvider;
assert(config);
assert(jobQueue);
assert(indexer);
assert(eventWatcher);
if (config.server.kind === KIND_ACTIVE) {
// Delete all active and pending (before completed) jobs to prevent creating jobs after completion of processing previous block
await jobQueue.deleteAllJobs('completed');
const syncStatus = await indexer.getSyncStatus();
if (!syncStatus) {
const contracts = await this.database.getContracts();
const startBlock = getStartBlock(contracts);
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
config.jobQueue.blockDelayInMilliSecs,
{
startBlock,
endBlock: startBlock
}
);
}
await eventWatcher.start();
}
const gqlLogger = createGQLLogger(config.server.gql.logDir);
const resolvers = await createResolvers(indexer, eventWatcher, gqlLogger);
const ethRPCHandlers = await createEthRPCHandlers(indexer, ethProvider);
// Create an Express app
const app: Application = express();
const server = await createAndStartServer(
app,
typeDefs,
resolvers,
ethRPCHandlers,
config.server,
paymentsManager
);
await startGQLMetricsServer(config);
return { app, server };
}
_getArgv (): any {
return yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;
}
}