Refactor peer CLI to be run from watchers (#331)

* Refactor peer CLI to be run from watchers

* Participate in chat protocol only through chat CLI
This commit is contained in:
prathamesh0 2023-02-27 14:15:26 +05:30 committed by GitHub
parent 90d60f54a6
commit 0400546996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 209 additions and 106 deletions

View File

@ -43,9 +43,13 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
```bash ```bash
# In packages/cli # In packages/cli
yarn chat --relay-node <RELAY_NODE_URL> yarn chat --relay-multiaddr <RELAY_MULTIADDR> --max-connections [MAX_CONNECTIONS] --dial-timeout [DIAL_TIMEOUT] --max-relay-connections [MAX_RELAY_CONNECTIONS] --peer-id-file [PEER_ID_FILE_PATH]
``` ```
* `relay-node`: multiaddr of a primary hop enabled relay node * `relay-multiaddr (r)`: multiaddr of a primary hop enabled relay node
* `max-connections`: max number of connections for this peer
* `dial-timeout`: timeout for dial to peers (ms)
* `max-relay-connections`: max number of relay node connections for this peer
* `peer-id-file (f)`: file path for peer id to be used (json)
* The process starts reading from `stdin` and outputs messages from others peers to `stdout`. * The process starts reading from `stdin` and outputs messages from others peers over the `/chat/1.0.0` protocol to `stdout`.

View File

@ -8,7 +8,7 @@
"build": "yarn clean && tsc && yarn copy-assets", "build": "yarn clean && tsc && yarn copy-assets",
"clean": "rm -rf ./dist", "clean": "rm -rf ./dist",
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/", "copy-assets": "copyfiles -u 1 src/**/*.gql dist/",
"chat": "node dist/chat.js" "chat": "DEBUG='vulcanize:*, laconic:*' node dist/chat.js"
}, },
"dependencies": { "dependencies": {
"@cerc-io/peer": "^0.2.29", "@cerc-io/peer": "^0.2.29",

View File

@ -3,35 +3,25 @@
// //
import * as readline from 'readline'; import * as readline from 'readline';
import { hideBin } from 'yargs/helpers'; import debug from 'debug';
import yargs from 'yargs';
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183 // @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
import { PeerId } from '@libp2p/interface-peer-id'; import { PeerId } from '@libp2p/interface-peer-id';
import { PeerCmd } from './peer';
const log = debug('vulcanize:chat');
const TEST_TOPIC = 'test'; const TEST_TOPIC = 'test';
interface Arguments {
relayNode: string;
}
async function main (): Promise<void> { async function main (): Promise<void> {
const argv: Arguments = _getArgv(); const peerCmd = new PeerCmd();
const peer = await peerCmd.exec(TEST_TOPIC);
// https://adamcoster.com/blog/commonjs-and-esm-importexport-compatibility-examples#importing-esm-into-commonjs-cjs
const { Peer } = await import('@cerc-io/peer');
const peer = new Peer(argv.relayNode, true);
await peer.init({});
peer.subscribeMessage((peerId: PeerId, message: string) => { peer.subscribeMessage((peerId: PeerId, message: string) => {
console.log(`> ${peerId.toString()} > ${message}`); log(`> ${peerId.toString()} > ${message}`);
}); });
peer.subscribeTopic(TEST_TOPIC, (peerId, data) => {
console.log(`> ${peerId.toString()} > ${data}`);
});
console.log(`Peer ID: ${peer.peerId?.toString()}`);
const rl = readline.createInterface({ const rl = readline.createInterface({
input: process.stdin, input: process.stdin,
output: process.stdout output: process.stdout
@ -41,19 +31,7 @@ async function main (): Promise<void> {
peer.broadcastMessage(input); peer.broadcastMessage(input);
}); });
console.log('Reading input...'); log('Reading input...');
}
function _getArgv (): any {
return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
relayNode: {
type: 'string',
describe: 'Relay node URL',
demandOption: true
}
}).argv;
} }
main().catch(err => { main().catch(err => {

View File

@ -14,3 +14,4 @@ export * from './server';
export * from './job-runner'; export * from './job-runner';
export * from './index-block'; export * from './index-block';
export * from './fill'; export * from './fill';
export * from './peer';

92
packages/cli/src/peer.ts Normal file
View File

@ -0,0 +1,92 @@
//
// Copyright 2023 Vulcanize, Inc.
//
import { hideBin } from 'yargs/helpers';
import yargs from 'yargs';
import debug from 'debug';
import {
PeerInitConfig,
PeerIdObj
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
} from '@cerc-io/peer';
import { readPeerId } from './utils';
const log = debug('vulcanize:peer');
interface Arguments {
relayMultiaddr: string;
maxConnections: number;
dialTimeout: number;
maxRelayConnections: number;
peerIdFile: string;
}
export class PeerCmd {
async exec (pubSubTopic?: string, parseLibp2pMessage?: (log: debug.Debugger, peerId: string, data: any) => void): Promise<any> {
const argv: Arguments = _getArgv();
const { Peer } = await import('@cerc-io/peer');
const peer = new Peer(argv.relayMultiaddr, true);
let peerIdObj: PeerIdObj | undefined;
if (argv.peerIdFile) {
peerIdObj = readPeerId(argv.peerIdFile);
}
const peerNodeInit: PeerInitConfig = {
maxConnections: argv.maxConnections,
dialTimeout: argv.dialTimeout,
maxRelayConnections: argv.maxRelayConnections
};
await peer.init(peerNodeInit, peerIdObj);
log(`Peer ID: ${peer.peerId?.toString()}`);
// Subscribe this peer to a pubsub topic if provided
if (pubSubTopic) {
peer.subscribeTopic(pubSubTopic, (peerId, data) => {
if (parseLibp2pMessage) {
parseLibp2pMessage(log, peerId.toString(), data);
} else {
log(`> ${peerId.toString()} > ${data}`);
}
});
}
return peer;
}
}
function _getArgv (): any {
return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
relayMultiaddr: {
type: 'string',
alias: 'r',
describe: 'Multiaddr of the primary relay node for this peer',
demandOption: true
},
maxConnections: {
type: 'number',
describe: 'Max number of connections for a peer'
},
dialTimeout: {
type: 'number',
describe: 'Timeout for dial to peers (ms)'
},
maxRelayConnections: {
type: 'number',
describe: 'Max number of relay node connections for a peer'
},
peerIdFile: {
type: 'string',
alias: 'f',
describe: 'Peer id file path (json)'
}
}).argv;
}

View File

@ -105,7 +105,7 @@ export class ServerCmd {
async exec ( async exec (
createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcher) => Promise<any>, createResolvers: (indexer: IndexerInterface, eventWatcher: EventWatcher) => Promise<any>,
typeDefs: TypeSource, typeDefs: TypeSource,
parseLibp2pMessage?: (peerId: string, data: any) => void parseLibp2pMessage?: (log: debug.Debugger, peerId: string, data: any) => void
): Promise<{ ): Promise<{
app: Application, app: Application,
server: ApolloServer server: ApolloServer
@ -146,7 +146,7 @@ export class ServerCmd {
async _startP2PNodes ( async _startP2PNodes (
p2pConfig: P2PConfig, p2pConfig: P2PConfig,
parseLibp2pMessage?: (peerId: string, data: any) => void parseLibp2pMessage?: (log: debug.Debugger, peerId: string, data: any) => void
): Promise<void> { ): Promise<void> {
const { createRelayNode, Peer } = await import('@cerc-io/peer'); const { createRelayNode, Peer } = await import('@cerc-io/peer');
const { const {
@ -206,7 +206,7 @@ export class ServerCmd {
peer.subscribeTopic(peerConfig.pubSubTopic, (peerId, data) => { peer.subscribeTopic(peerConfig.pubSubTopic, (peerId, data) => {
if (parseLibp2pMessage) { if (parseLibp2pMessage) {
parseLibp2pMessage(peerId.toString(), data); parseLibp2pMessage(log, peerId.toString(), data);
} }
}); });

View File

@ -23,7 +23,8 @@
"import-state": "DEBUG=vulcanize:* node --enable-source-maps dist/cli/import-state.js", "import-state": "DEBUG=vulcanize:* node --enable-source-maps dist/cli/import-state.js",
"import-state:dev": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts", "import-state:dev": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts",
"inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts", "inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts",
"index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts" "index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts",
"peer": "DEBUG='vulcanize:*, laconic:*' node --enable-source-maps dist/cli/peer.js"
}, },
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -0,0 +1,20 @@
import debug from 'debug';
import { PeerCmd } from '@cerc-io/cli';
import { parseLibp2pMessage } from '../libp2p-utils';
const log = debug('vulcanize:peer');
const MOBYMASK_TOPIC = 'mobymask';
export const main = async (): Promise<any> => {
const peerCmd = new PeerCmd();
await peerCmd.exec(MOBYMASK_TOPIC, parseLibp2pMessage);
};
main().then(() => {
log('Starting peer...');
}).catch(err => {
log(err);
});

View File

@ -0,0 +1,73 @@
//
// Copyright 2023 Vulcanize, Inc.
//
import debug from 'debug';
import { ethers } from 'ethers';
import { abi as PhisherRegistryABI } from './artifacts/PhisherRegistry.json';
const contractInterface = new ethers.utils.Interface(PhisherRegistryABI);
const MESSAGE_KINDS = {
INVOKE: 'invoke',
REVOKE: 'revoke'
};
export function parseLibp2pMessage (log: debug.Debugger, peerId: string, data: any): void {
log('Received a message on mobymask P2P network from peer:', peerId);
const { kind, message } = data;
switch (kind) {
case MESSAGE_KINDS.INVOKE: {
_parseInvocation(log, message);
break;
}
case MESSAGE_KINDS.REVOKE: {
_parseRevocation(log, message);
break;
}
default: {
log(`libp2p message of unknown kind ${kind}`);
log(JSON.stringify(message, null, 2));
break;
}
}
log('------------------------------------------');
}
function _parseInvocation (log: debug.Debugger, msg: any): void {
log('Signed invocations:');
log(JSON.stringify(msg, null, 2));
const [{ invocations: { batch: invocationsList } }] = msg;
Array.from(invocationsList).forEach((invocation: any) => {
const txData = invocation.transaction.data;
const decoded = contractInterface.parseTransaction({ data: txData });
log(`method: ${decoded.name}, value: ${decoded.args[0]}`);
});
}
function _parseRevocation (log: debug.Debugger, msg: any): void {
const { signedDelegation, signedIntendedRevocation } = msg;
log('Signed delegation:');
log(JSON.stringify(signedDelegation, null, 2));
log('Signed intention to revoke:');
const stringifiedSignedIntendedRevocation = JSON.stringify(
signedIntendedRevocation,
(key, value) => {
if (key === 'delegationHash' && value.type === 'Buffer') {
// Show hex value for delegationHash instead of Buffer
return ethers.utils.hexlify(Buffer.from(value));
}
return value;
},
2
);
log(stringifiedSignedIntendedRevocation);
}

View File

@ -6,19 +6,16 @@ import fs from 'fs';
import path from 'path'; import path from 'path';
import 'reflect-metadata'; import 'reflect-metadata';
import debug from 'debug'; import debug from 'debug';
import { ethers } from 'ethers';
import { ServerCmd } from '@cerc-io/cli'; import { ServerCmd } from '@cerc-io/cli';
import { createResolvers } from './resolvers'; import { createResolvers } from './resolvers';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { Database } from './database'; import { Database } from './database';
import { abi as PhisherRegistryABI } from './artifacts/PhisherRegistry.json'; import { parseLibp2pMessage } from './libp2p-utils';
const log = debug('vulcanize:server'); const log = debug('vulcanize:server');
const contractInterface = new ethers.utils.Interface(PhisherRegistryABI);
export const main = async (): Promise<any> => { export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd(); const serverCmd = new ServerCmd();
await serverCmd.init(Database); await serverCmd.init(Database);
@ -28,69 +25,6 @@ export const main = async (): Promise<any> => {
return serverCmd.exec(createResolvers, typeDefs, parseLibp2pMessage); return serverCmd.exec(createResolvers, typeDefs, parseLibp2pMessage);
}; };
const MESSAGE_KINDS = {
INVOKE: 'invoke',
REVOKE: 'revoke'
};
function parseLibp2pMessage (peerId: string, data: any): void {
log('Received a message on mobymask P2P network from peer:', peerId);
const { kind, message } = data;
switch (kind) {
case MESSAGE_KINDS.INVOKE: {
_parseInvocation(message);
break;
}
case MESSAGE_KINDS.REVOKE: {
_parseRevocation(message);
break;
}
default: {
log(`libp2p message of unknown kind ${kind}`);
log(JSON.stringify(message, null, 2));
break;
}
}
log('------------------------------------------');
}
function _parseInvocation (msg: any): void {
log('Signed invocations:');
log(JSON.stringify(msg, null, 2));
const [{ invocations: { batch: invocationsList } }] = msg;
Array.from(invocationsList).forEach((invocation: any) => {
const txData = invocation.transaction.data;
const decoded = contractInterface.parseTransaction({ data: txData });
log(`method: ${decoded.name}, value: ${decoded.args[0]}`);
});
}
function _parseRevocation (msg: any): void {
const { signedDelegation, signedIntendedRevocation } = msg;
log('Signed delegation:');
log(JSON.stringify(signedDelegation, null, 2));
log('Signed intention to revoke:');
const stringifiedSignedIntendedRevocation = JSON.stringify(
signedIntendedRevocation,
(key, value) => {
if (key === 'delegationHash' && value.type === 'Buffer') {
// Show hex value for delegationHash instead of Buffer
return ethers.utils.hexlify(Buffer.from(value));
}
return value;
},
2
);
log(stringifiedSignedIntendedRevocation);
}
main().then(() => { main().then(() => {
log('Starting server...'); log('Starting server...');
}).catch(err => { }).catch(err => {