nikugogoi 9d95e49ec9
Change block processing to be pull based (#288)
* Implement pull based watcher for uni-watcher

* Fix same block processed multiple times

* Implement wait time for fetching block from config

* Use blockProgress event to fetch and process next block

* Rename utils index to misc
2021-10-26 17:36:21 +05:30

129 lines
3.7 KiB

// Copyright 2021 Vulcanize, Inc.
import assert from 'assert';
import 'reflect-metadata';
import express, { Application } from 'express';
import { ApolloServer, PubSub } from 'apollo-server-express';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import 'graphql-import-node';
import { createServer } from 'http';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import typeDefs from './schema';
import { createResolvers as createMockResolvers } from './mock/resolvers';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
const config = await getConfig(argv.f);
assert(config.server, 'Missing server config');
const { host, port, mode } = config.server;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
assert(dbConfig, 'Missing database config');
const db = new Database(dbConfig);
await db.init();
assert(upstream, 'Missing upstream config');
const {
ethServer: {
cache: cacheConfig
} = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const pubSub = new PubSub();
const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue);
await eventWatcher.start();
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);
const app: Application = express();
const server = new ApolloServer({
await server.start();
server.applyMiddleware({ app });
const httpServer = createServer(app);
httpServer.listen(port, host, () => {
log(`Server is listening on host ${host} port ${port}`);
return { app, server };
main().then(() => {
log('Starting server...');
}).catch(err => {
process.on('uncaughtException', err => {
log('uncaughtException', err);