mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-04-15 11:41:16 +00:00
* Add subgraph schema types to the generated schema * Add queries for subgraph entities * Add entity generation for subgraph entities * Call subgraph event handler in indexer * Refactor subgraph schema and entity generation * Add resolvers generation for subgraph entities * Get event signature in the event * Add NonNullType check for field type in entity generation * Auto-diff based on store set * Use contract address from data source in loader * Change subgraph-schema arg to subgraph-path arg
119 lines
3.7 KiB
Handlebars
119 lines
3.7 KiB
Handlebars
//
|
|
// Copyright 2021 Vulcanize, Inc.
|
|
//
|
|
|
|
import assert from 'assert';
|
|
import 'reflect-metadata';
|
|
import yargs from 'yargs';
|
|
import { hideBin } from 'yargs/helpers';
|
|
import debug from 'debug';
|
|
import { PubSub } from 'apollo-server-express';
|
|
import fs from 'fs';
|
|
import path from 'path';
|
|
|
|
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients } from '@vulcanize/util';
|
|
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
|
import * as codec from '@ipld/dag-cbor';
|
|
|
|
import { Database } from '../database';
|
|
import { Indexer } from '../indexer';
|
|
import { EventWatcher } from '../events';
|
|
import { IPLDBlock } from '../entity/IPLDBlock';
|
|
|
|
const log = debug('vulcanize:import-state');
|
|
|
|
export const main = async (): Promise<any> => {
|
|
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
|
|
'parse-numbers': false
|
|
}).options({
|
|
configFile: {
|
|
alias: 'f',
|
|
type: 'string',
|
|
demandOption: true,
|
|
describe: 'configuration file path (toml)',
|
|
default: DEFAULT_CONFIG_PATH
|
|
},
|
|
importFile: {
|
|
alias: 'i',
|
|
type: 'string',
|
|
demandOption: true,
|
|
describe: 'Import file path (JSON)'
|
|
}
|
|
}).argv;
|
|
|
|
const config: Config = await getConfig(argv.configFile);
|
|
const { ethClient, postgraphileClient, ethProvider } = await initClients(config);
|
|
|
|
const db = new Database(config.database);
|
|
await db.init();
|
|
|
|
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
|
await graphDb.init();
|
|
|
|
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
|
|
|
|
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
|
|
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
|
const pubsub = new PubSub();
|
|
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
|
|
|
|
graphWatcher.setIndexer(indexer);
|
|
await graphWatcher.init();
|
|
|
|
const jobQueueConfig = config.jobQueue;
|
|
assert(jobQueueConfig, 'Missing job queue config');
|
|
|
|
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
|
|
assert(dbConnectionString, 'Missing job queue db connection string');
|
|
|
|
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
|
|
await jobQueue.start();
|
|
|
|
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
|
|
|
|
// Import data.
|
|
const importFilePath = path.resolve(argv.importFile);
|
|
const encodedImportData = fs.readFileSync(importFilePath);
|
|
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
|
|
|
|
// Fill the snapshot block.
|
|
await fillBlocks(
|
|
jobQueue,
|
|
indexer,
|
|
postgraphileClient,
|
|
eventWatcher,
|
|
config.upstream.ethServer.blockDelayInMilliSecs,
|
|
{
|
|
startBlock: importData.snapshotBlock.blockNumber,
|
|
endBlock: importData.snapshotBlock.blockNumber
|
|
}
|
|
);
|
|
|
|
// Fill the Contracts.
|
|
for (const contract of importData.contracts) {
|
|
await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
|
|
}
|
|
|
|
// Get the snapshot block.
|
|
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
|
|
assert(block);
|
|
|
|
// Fill the IPLDBlocks.
|
|
for (const checkpoint of importData.ipldCheckpoints) {
|
|
let ipldBlock = new IPLDBlock();
|
|
|
|
ipldBlock = Object.assign(ipldBlock, checkpoint);
|
|
ipldBlock.block = block;
|
|
|
|
ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data));
|
|
|
|
await db.saveOrUpdateIPLDBlock(ipldBlock);
|
|
}
|
|
};
|
|
|
|
main().catch(err => {
|
|
log(err);
|
|
}).finally(() => {
|
|
process.exit(0);
|
|
});
|