watcher-ts/packages/cli/src/import-state.ts
Nabarun Gogoi cd29b47ecc
Implement peer package to send messages between peers (#279)
* Initial implementation of class and discoving other peers

* Implement peer connection and sending messages between peers

* Add react app and use peer package for broadcasting

* Maintain stream for each remote peer

* Refactor code in peer package

* Add serve package for react-app

* Add readme for running react app

* Add peer package readme

* Add logs for events with details

* Add a chat CLI using peer package (#280)

* Add a flag to instantiate Peer for nodejs

* Add a basic chat CLI using peer

* Add a signal server arg to chat CLI

* Add instructions for chat CLI

* Fix typescript and ESM issues after adding peer package (#282)

* Fix build issues in util package

* Update eslint TS plugins

* Scope react app package name

* Convert cli package back to CJS and dynamically import ESM

* Upgrade ts-node version

* Fix tests

* Setup a relay node and pubsub based discovery (#284)

* Add a script to setup a relay node

* Use pubsub based peer discovery

* Add peer multiaddr to connection log

* Catch relay node dial errors

* Increase discovery interval and dial all mutiaddr

* Add UI to display self peer ID and multiaddrs

* Add UI for displaying live remote connections

* Send js objects in peer broadcast messages

* Add react-peer package for using peer in react app

* Reduce disconnect frequency between peers (#287)

* Restrict number of max concurrent dials per peer

* Increase hop relay timeout to 1 day

* Self review changes

* Review changes

* Increase pubsub discovery interval and add steps to create a peer id  (#290)

* Increase pubsub discovery interval

* Disable autodial to avoid peer dials without protocol on a reconnect

* Add steps to create a peer id and use for relay node

* Add back dependency to run signalling server

* Avoid bootstrapping and dial to relay node directly

Co-authored-by: prathamesh0 <42446521+prathamesh0@users.noreply.github.com>
Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2023-01-10 20:10:27 +05:30

195 lines
5.2 KiB
TypeScript

//
// Copyright 2022 Vulcanize, Inc.
//
import yargs from 'yargs';
import 'reflect-metadata';
import assert from 'assert';
import path from 'path';
import fs from 'fs';
import debug from 'debug';
import { ConnectionOptions } from 'typeorm';
import { JsonRpcProvider } from '@ethersproject/providers';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients,
fillBlocks,
StateKind,
GraphWatcherInterface,
GraphDatabase,
updateEntitiesFromState,
Config
} from '@cerc-io/util';
import { BaseCmd } from './base';
const log = debug('vulcanize:import-state');
interface Arguments {
configFile: string;
importFile: string;
}
export class ImportStateCmd {
_argv?: Arguments
_baseCmd: BaseCmd;
constructor () {
this._baseCmd = new BaseCmd();
}
get config (): Config {
return this._baseCmd.config;
}
get clients (): Clients {
return this._baseCmd.clients;
}
get ethProvider (): JsonRpcProvider {
return this._baseCmd.ethProvider;
}
get database (): DatabaseInterface {
return this._baseCmd.database;
}
async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
return this._baseCmd.initConfig(this._argv.configFile);
}
async init (
Database: new (
config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
clients: { [key: string]: any } = {}
): Promise<void> {
await this.initConfig();
await this._baseCmd.init(Database, clients);
}
async initIndexer (
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcherInterface
) => IndexerInterface,
graphWatcher?: GraphWatcherInterface
): Promise<void> {
await this._baseCmd.initIndexer(Indexer, graphWatcher);
await this._baseCmd.initEventWatcher();
}
async exec (State: new() => any, graphDb?: GraphDatabase): Promise<void> {
assert(this._argv);
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const database = this._baseCmd.database;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;
assert(config);
assert(jobQueue);
assert(database);
assert(indexer);
assert(eventWatcher);
// Import data.
const importFilePath = path.resolve(this._argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const codec = await import('@ipld/dag-cbor');
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
config.jobQueue.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);
// Fill the Contracts.
for (const contract of importData.contracts) {
indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);
// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();
state = Object.assign(state, checkpoint);
state.block = block;
state.data = Buffer.from(codec.encode(state.data));
state = await indexer.saveOrUpdateState(state);
// Fill entities using State if:
// relationsMap defined for the watcher,
// graphDb instance is avaiable
// TODO: Fill latest entity tables
if (indexer.getRelationsMap && graphDb) {
await updateEntitiesFromState(graphDb, indexer, state);
}
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
await database.close();
}
_getArgv (): any {
return yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;
}
}