Pass initialization options when starting a peer (#328)

* Pass initialization options when starting a peer

* Update config used for relay and peer nodes in watcher

* Rename types

* Refactor mobymask libp2p message parsing

* Enable laconic debug logs in server command
This commit is contained in:
prathamesh0 2023-02-21 17:57:25 +05:30 committed by GitHub
parent 6fa3ee28b5
commit 6a8b9a2385
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 206 additions and 104 deletions

View File

@ -20,7 +20,7 @@ async function main (): Promise<void> {
// https://adamcoster.com/blog/commonjs-and-esm-importexport-compatibility-examples#importing-esm-into-commonjs-cjs
const { Peer } = await import('@cerc-io/peer');
const peer = new Peer(argv.relayNode, true);
await peer.init();
await peer.init({});
peer.subscribeMessage((peerId: PeerId, message: string) => {
console.log(`> ${peerId.toString()} > ${message}`);

View File

@ -28,8 +28,12 @@ import {
P2PConfig
} from '@cerc-io/util';
import { TypeSource } from '@graphql-tools/utils';
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
import { RelayNodeInit, PeerIdObj } from '@cerc-io/peer';
import {
RelayNodeInitConfig,
PeerInitConfig,
PeerIdObj
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
} from '@cerc-io/peer';
import { BaseCmd } from './base';
import { readPeerId } from './utils/index';
@ -145,7 +149,13 @@ export class ServerCmd {
parseLibp2pMessage?: (peerId: string, data: any) => void
): Promise<void> {
const { createRelayNode, Peer } = await import('@cerc-io/peer');
const { RELAY_DEFAULT_HOST, RELAY_DEFAULT_PORT, RELAY_DEFAULT_MAX_DIAL_RETRY } = await import('@cerc-io/peer');
const {
RELAY_DEFAULT_HOST,
RELAY_DEFAULT_PORT,
RELAY_DEFAULT_MAX_DIAL_RETRY,
RELAY_REDIAL_INTERVAL,
PING_INTERVAL
} = await import('@cerc-io/peer');
// Run the relay node if enabled
if (p2pConfig.enableRelay) {
@ -157,11 +167,13 @@ export class ServerCmd {
peerIdObj = readPeerId(relayConfig.peerIdFile);
}
const relayNodeInit: RelayNodeInit = {
const relayNodeInit: RelayNodeInitConfig = {
host: relayConfig.host ?? RELAY_DEFAULT_HOST,
port: relayConfig.port ?? RELAY_DEFAULT_PORT,
announceDomain: relayConfig.announce,
relayPeers: relayConfig.relayPeers ?? [],
pingInterval: relayConfig.pingInterval ?? PING_INTERVAL,
redialInterval: relayConfig.redialInterval ?? RELAY_REDIAL_INTERVAL,
maxDialRetry: relayConfig.maxDialRetry ?? RELAY_DEFAULT_MAX_DIAL_RETRY,
peerIdObj
};
@ -170,10 +182,22 @@ export class ServerCmd {
// Run a peer node if enabled
if (p2pConfig.enablePeer) {
const peer = new Peer(p2pConfig.relayMultiaddr, true);
await peer.init();
const peerConfig = p2pConfig.peer;
assert(peerConfig, 'Peer config not set');
peer.subscribeTopic(p2pConfig.pubSubTopic, (peerId, data) => {
const peer = new Peer(peerConfig.relayMultiaddr, true);
const peerNodeInit: PeerInitConfig = {
pingInterval: peerConfig.pingInterval,
pingTimeout: peerConfig.pingTimeout,
maxRelayConnections: peerConfig.maxRelayConnections,
relayRedialInterval: peerConfig.relayRedialInterval,
maxConnections: peerConfig.maxConnections,
dialTimeout: peerConfig.dialTimeout
};
await peer.init(peerNodeInit);
peer.subscribeTopic(peerConfig.pubSubTopic, (peerId, data) => {
if (parseLibp2pMessage) {
parseLibp2pMessage(peerId.toString(), data);
}

View File

@ -20,10 +20,8 @@
maxEventsBlockRange = -1
[server.p2p]
enablePeer = true
relayMultiaddr = ''
pubSubTopic = 'mobymask'
enableRelay = true
enablePeer = true
[server.p2p.relay]
host = "127.0.0.1"
@ -31,6 +29,10 @@
relayPeers = []
peerIdFile = ''
[server.p2p.peer]
relayMultiaddr = ''
pubSubTopic = 'mobymask'
[metrics]
host = "127.0.0.1"
port = 9000

View File

@ -9,7 +9,7 @@
"build": "yarn clean && tsc && yarn copy-assets",
"clean": "rm -rf ./dist",
"copy-assets": "copyfiles -u 1 src/**/*.gql dist/",
"server": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/server.js",
"server": "DEBUG='vulcanize:*, laconic:*' YARN_CHILD_PROCESS=true node --enable-source-maps dist/server.js",
"server:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/server.ts",
"job-runner": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true node --enable-source-maps dist/job-runner.js",
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",

View File

@ -33,28 +33,46 @@ const MESSAGE_KINDS = {
REVOKE: 'revoke'
};
function parseLibp2pMessage (peerId: string, data: any) {
function parseLibp2pMessage (peerId: string, data: any): void {
log('Received a message on mobymask P2P network from peer:', peerId);
const { kind, message } = data;
switch (kind) {
case MESSAGE_KINDS.INVOKE: {
log('Signed invocations:');
log(JSON.stringify(message, null, 2));
_parseInvocation(message);
break;
}
const [{ invocations: { batch: invocationsList } }] = message;
case MESSAGE_KINDS.REVOKE: {
_parseRevocation(message);
break;
}
default: {
log(`libp2p message of unknown kind ${kind}`);
log(JSON.stringify(message, null, 2));
break;
}
}
log('------------------------------------------');
}
function _parseInvocation (msg: any): void {
log('Signed invocations:');
log(JSON.stringify(msg, null, 2));
const [{ invocations: { batch: invocationsList } }] = msg;
Array.from(invocationsList).forEach((invocation: any) => {
const txData = invocation.transaction.data;
const decoded = contractInterface.parseTransaction({ data: txData });
log(`method: ${decoded.name}, value: ${decoded.args[0]}`);
});
}
break;
}
case MESSAGE_KINDS.REVOKE: {
const { signedDelegation, signedIntendedRevocation } = message;
function _parseRevocation (msg: any): void {
const { signedDelegation, signedIntendedRevocation } = msg;
log('Signed delegation:');
log(JSON.stringify(signedDelegation, null, 2));
log('Signed intention to revoke:');
@ -71,15 +89,6 @@ function parseLibp2pMessage (peerId: string, data: any) {
2
);
log(stringifiedSignedIntendedRevocation);
break;
}
default:
break;
}
log('------------------------------------------');
}
main().then(() => {

View File

@ -3,9 +3,9 @@ import { hideBin } from 'yargs/helpers';
import fs from 'fs';
import path from 'path';
import { RelayNodeInit, createRelayNode } from '../relay.js';
import { RelayNodeInitConfig, createRelayNode } from '../relay.js';
import { PeerIdObj } from '../peer.js';
import { RELAY_DEFAULT_HOST, RELAY_DEFAULT_PORT, RELAY_DEFAULT_MAX_DIAL_RETRY } from '../constants.js';
import { RELAY_DEFAULT_HOST, RELAY_DEFAULT_PORT, RELAY_DEFAULT_MAX_DIAL_RETRY, RELAY_REDIAL_INTERVAL, PING_INTERVAL } from '../constants.js';
interface Arguments {
host: string;
@ -13,6 +13,8 @@ interface Arguments {
announce?: string;
peerIdFile?: string;
relayPeers?: string;
pingInterval: number;
redialInterval: number;
maxDialRetry: number;
}
@ -44,13 +46,15 @@ async function main (): Promise<void> {
relayPeersList = JSON.parse(relayPeersListObj);
}
const relayNodeInit: RelayNodeInit = {
const relayNodeInit: RelayNodeInitConfig = {
host: argv.host,
port: argv.port,
peerIdObj,
announceDomain: argv.announce,
relayPeers: relayPeersList,
maxDialRetry: argv.maxDialRetry,
peerIdObj
pingInterval: argv.pingInterval,
redialInterval: argv.redialInterval,
maxDialRetry: argv.maxDialRetry
};
await createRelayNode(relayNodeInit);
}
@ -86,6 +90,16 @@ function _getArgv (): Arguments {
alias: 'r',
describe: 'Relay peer multiaddr(s) list file path (json)'
},
pingInterval: {
type: 'number',
describe: 'Interval to check relay peer connections using ping (ms)',
default: PING_INTERVAL
},
redialInterval: {
type: 'number',
describe: 'Redial interval to relay peer on connection failure (ms)',
default: RELAY_REDIAL_INTERVAL
},
maxDialRetry: {
type: 'number',
describe: 'Maximum number of dial retries to be attempted to a relay peer',

View File

@ -21,16 +21,15 @@ export const RELAY_TAG = {
value: 100
};
// Interval time in ms to check connection with ping for connected peer
// Currently only checking for relay node
export const CONN_CHECK_INTERVAL = 10000; // 10 seconds
// Interval in ms to check peer connections using ping
export const PING_INTERVAL = 10000; // 10 seconds
// Ping timeout used to check if connection is alive
// Should be lesser than CONN_CHECK_INTERVAL
// Should be less than PING_INTERVAL
export const PING_TIMEOUT = 5000; // 5 seconds
// Delay time in ms to redial relay node on failing to connect
export const RELAY_REDIAL_DELAY = 5000; // 5 seconds
// Redial interval (in ms) to relay node on connection failure
export const RELAY_REDIAL_INTERVAL = 5000; // 5 seconds
// Max number of relay node connections for a peer after which it starts igoring them
export const DEFAULT_MAX_RELAY_CONNECTIONS = 2;

View File

@ -2,7 +2,13 @@
// Copyright 2022 Vulcanize, Inc.
//
export { Peer, PeerIdObj, createPeerId } from './peer.js';
export { RelayNodeInit, createRelayNode } from './relay.js';
export { Peer, PeerIdObj, PeerInitConfig, createPeerId } from './peer.js';
export { RelayNodeInitConfig, createRelayNode } from './relay.js';
export { getPseudonymForPeerId } from './utils/index.js';
export { RELAY_DEFAULT_HOST, RELAY_DEFAULT_PORT, RELAY_DEFAULT_MAX_DIAL_RETRY } from './constants.js';
export {
RELAY_DEFAULT_HOST,
RELAY_DEFAULT_PORT,
RELAY_REDIAL_INTERVAL,
RELAY_DEFAULT_MAX_DIAL_RETRY,
PING_INTERVAL
} from './constants.js';

View File

@ -5,7 +5,7 @@
import { Libp2p } from '@cerc-io/libp2p';
import type { PeerId } from '@libp2p/interface-peer-id';
import { CONN_CHECK_INTERVAL } from './constants.js';
import { PING_INTERVAL } from './constants.js';
interface PeerData {
intervalId: NodeJS.Timer;
@ -17,10 +17,12 @@ interface PeerData {
*/
export class PeerHearbeatChecker {
_node: Libp2p;
_pingInterval: number;
_peerMap: Map<string, PeerData> = new Map()
constructor (node: Libp2p) {
constructor (node: Libp2p, pingInterval = PING_INTERVAL) {
this._node = node;
this._pingInterval = pingInterval;
}
/**
@ -53,7 +55,7 @@ export class PeerHearbeatChecker {
peerId,
handlePingDisconnect
);
}, CONN_CHECK_INTERVAL);
}, this._pingInterval);
this._peerMap.set(
peerIdString,

View File

@ -36,7 +36,7 @@ import {
PUBSUB_DISCOVERY_INTERVAL,
PUBSUB_SIGNATURE_POLICY,
RELAY_TAG,
RELAY_REDIAL_DELAY,
RELAY_REDIAL_INTERVAL,
DEFAULT_MAX_RELAY_CONNECTIONS,
PING_TIMEOUT
} from './constants.js';
@ -48,20 +48,35 @@ export const CHAT_PROTOCOL = '/chat/1.0.0';
const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged';
export type PeerIdObj = {
export interface PeerIdObj {
id: string;
privKey: string;
pubKey: string;
};
}
export interface PeerInitConfig {
pingInterval?: number;
pingTimeout?: number;
maxRelayConnections?: number;
relayRedialInterval?: number;
maxConnections?: number;
minConnections?: number;
dialTimeout?: number;
}
export class Peer {
_node?: Libp2p
_peerHeartbeatChecker?: PeerHearbeatChecker
_wrtcTransport: (components: WebRTCDirectComponents) => Transport
_relayNodeMultiaddr: Multiaddr
_numRelayConnections = 0
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_pingInterval?: number
_relayRedialInterval?: number
_maxRelayConnections?: number
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
_metrics = new PrometheusMetrics()
@ -97,10 +112,11 @@ export class Peer {
return this._metrics;
}
async init (
peerIdObj?: PeerIdObj,
maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS
): Promise<void> {
async init (initOptions: PeerInitConfig, peerIdObj?: PeerIdObj): Promise<void> {
this._pingInterval = initOptions.pingInterval;
this._relayRedialInterval = initOptions.relayRedialInterval;
this._maxRelayConnections = initOptions.maxRelayConnections;
try {
let peerId: PeerId | undefined;
if (peerIdObj) {
@ -134,13 +150,13 @@ export class Peer {
connectionManager: {
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER,
autoDial: false,
maxConnections: MAX_CONNECTIONS,
minConnections: MIN_CONNECTIONS,
dialTimeout: DIAL_TIMEOUT,
maxConnections: initOptions.maxConnections ?? MAX_CONNECTIONS,
minConnections: initOptions.minConnections ?? MIN_CONNECTIONS,
dialTimeout: initOptions.dialTimeout ?? DIAL_TIMEOUT,
keepMultipleConnections: true // Set true to get connections with multiple multiaddr
},
ping: {
timeout: PING_TIMEOUT
timeout: initOptions.pingTimeout ?? PING_TIMEOUT
},
metrics: () => this._metrics
});
@ -150,10 +166,10 @@ export class Peer {
}
console.log('libp2p node created', this._node);
this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node);
this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node, this._pingInterval);
// Dial to the HOP enabled primary relay node
await this._dialRelay();
await this._dialRelay(this._relayRedialInterval);
// Listen for change in stored multiaddrs
this._node.peerStore.addEventListener('change:multiaddrs', (evt) => {
@ -178,13 +194,13 @@ export class Peer {
// Listen for peers discovery
this._node.addEventListener('peer:discovery', (evt) => {
// console.log('event peer:discovery', evt);
this._handleDiscovery(evt.detail, maxRelayConnections);
this._handleDiscovery(evt.detail, this._maxRelayConnections);
});
// Listen for peers connection
this._node.addEventListener('peer:connect', async (evt) => {
console.log('event peer:connect', evt);
await this._handleConnect(evt.detail, maxRelayConnections);
await this._handleConnect(evt.detail, this._maxRelayConnections);
});
// Listen for peers disconnecting
@ -320,7 +336,7 @@ export class Peer {
}
}
async _dialRelay (): Promise<void> {
async _dialRelay (redialInterval = RELAY_REDIAL_INTERVAL): Promise<void> {
assert(this._node);
const relayMultiaddr = this._relayNodeMultiaddr;
console.log('Dialling primary relay node');
@ -329,7 +345,7 @@ export class Peer {
this._node,
relayMultiaddr,
{
redialDelay: RELAY_REDIAL_DELAY,
redialInterval: redialInterval,
maxRetry: Infinity
}
);
@ -352,7 +368,7 @@ export class Peer {
});
}
_handleDiscovery (peer: PeerInfo, maxRelayConnections: number): void {
_handleDiscovery (peer: PeerInfo, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS): void {
// Check connected peers as they are discovered repeatedly.
if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
let isRelayPeer = false;
@ -374,7 +390,7 @@ export class Peer {
}
}
async _handleConnect (connection: Connection, maxRelayConnections: number): Promise<void> {
async _handleConnect (connection: Connection, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS): Promise<void> {
assert(this._node);
const remotePeerId = connection.remotePeer;
const remotePeerIdString = connection.remotePeer.toString();
@ -474,7 +490,7 @@ export class Peer {
if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) {
// Reconnect to primary relay node if disconnected
await this._dialRelay();
await this._dialRelay(this._relayRedialInterval);
}
}

View File

@ -16,23 +16,25 @@ import { multiaddr } from '@multiformats/multiaddr';
import type { PeerId } from '@libp2p/interface-peer-id';
import { createFromJSON } from '@libp2p/peer-id-factory';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE, RELAY_REDIAL_DELAY } from './constants.js';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE } from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
import { PeerIdObj } from './peer.js';
const log = debug('laconic:relay');
export interface RelayNodeInit {
export interface RelayNodeInitConfig {
host: string;
port: number;
peerIdObj?: PeerIdObj;
announceDomain?: string;
relayPeers: string[];
pingInterval: number;
redialInterval: number;
maxDialRetry: number;
peerIdObj?: PeerIdObj;
}
export async function createRelayNode (init: RelayNodeInit): Promise<Libp2p> {
export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2p> {
const listenMultiaddrs = [`/ip4/${init.host}/tcp/${init.port}/http/p2p-webrtc-direct`];
const announceMultiaddrs = [];
@ -83,7 +85,7 @@ export async function createRelayNode (init: RelayNodeInit): Promise<Libp2p> {
}
});
const peerHeartbeatChecker = new PeerHearbeatChecker(node);
const peerHeartbeatChecker = new PeerHearbeatChecker(node, init.pingInterval);
console.log(`Relay node started with id ${node.peerId.toString()}`);
console.log('Listening on:');
@ -123,7 +125,7 @@ export async function createRelayNode (init: RelayNodeInit): Promise<Libp2p> {
node,
remoteAddr,
{
redialDelay: RELAY_REDIAL_DELAY,
redialInterval: init.redialInterval,
maxRetry: init.maxDialRetry
}
).catch((error: Error) => console.log(error.message));
@ -132,20 +134,20 @@ export async function createRelayNode (init: RelayNodeInit): Promise<Libp2p> {
if (init.relayPeers.length) {
console.log('Dialling relay peers');
await _dialRelayPeers(node, init.relayPeers, init.maxDialRetry);
await _dialRelayPeers(node, init.relayPeers, init.maxDialRetry, init.redialInterval);
}
return node;
}
async function _dialRelayPeers (node: Libp2p, relayPeersList: string[], maxDialRetry: number): Promise<void> {
async function _dialRelayPeers (node: Libp2p, relayPeersList: string[], maxDialRetry: number, redialInterval: number): Promise<void> {
relayPeersList.forEach(async (relayPeer) => {
const relayMultiaddr = multiaddr(relayPeer);
await dialWithRetry(
node,
relayMultiaddr,
{
redialDelay: RELAY_REDIAL_DELAY,
redialInterval,
maxRetry: maxDialRetry
}
).catch((error: Error) => console.log(error.message));

View File

@ -7,12 +7,12 @@ import { Multiaddr } from '@multiformats/multiaddr';
import { uniqueNamesGenerator, adjectives, colors, names } from 'unique-names-generator';
interface DialWithRetryOptions {
redialDelay: number
redialInterval: number
maxRetry: number
}
const DEFAULT_DIAL_RETRY_OPTIONS: DialWithRetryOptions = {
redialDelay: 5000, // ms
redialInterval: 5000, // ms
maxRetry: 5
};
@ -24,7 +24,7 @@ const DEFAULT_DIAL_RETRY_OPTIONS: DialWithRetryOptions = {
* @param options
*/
export const dialWithRetry = async (node: Libp2p, multiaddr: Multiaddr, options: Partial<DialWithRetryOptions>) => {
const { redialDelay, maxRetry } = {
const { redialInterval, maxRetry } = {
...DEFAULT_DIAL_RETRY_OPTIONS,
...options
};
@ -38,11 +38,11 @@ export const dialWithRetry = async (node: Libp2p, multiaddr: Multiaddr, options:
return connection;
} catch (err) {
console.log(`Could not dial node ${multiaddr.toString()}`, err);
console.log(`Retrying after ${redialDelay}ms`);
console.log(`Retrying after ${redialInterval}ms`);
// TODO: Use wait method from util package.
// Issue using util package in react app.
await new Promise(resolve => setTimeout(resolve, redialDelay));
await new Promise(resolve => setTimeout(resolve, redialInterval));
}
}

View File

@ -44,19 +44,52 @@ export interface RelayConfig {
// Port to start listening on
port?: number;
// Domain name to be used in the announce address
announce?: string;
// Relay peer id file path (json)
peerIdFile?: string;
// Domain name to be used in the announce address
announce?: string;
// Relay peer multiaddr(s) list
relayPeers?: string[];
// Interval in ms to check relay peer connections using ping
pingInterval?: number;
// Redial interval in ms on connection failure
redialInterval?: number;
// Max number of dial retries to be attempted to a relay peer
maxDialRetry?: number;
}
// Peer config
export interface PeerConfig {
// Multiaddr of the primary relay node for this peer
relayMultiaddr: string;
// Pubsub topic to subscribe this peer to
pubSubTopic: string;
// Interval (ms) to check relay peer connections using ping
pingInterval?: number;
// Ping timeout (ms) used to check if connection is alive
pingTimeout?: number;
// Max number of relay node connections for a peer
maxRelayConnections?: number;
// Redial interval (ms) to relay node on connection failure
relayRedialInterval?: number;
// Max number of connections for a peer
maxConnections?: number;
// Timeout (ms) for dial to peers
dialTimeout?: number;
}
// P2P config
export interface P2PConfig {
// Enable relay node
@ -65,12 +98,7 @@ export interface P2PConfig {
// Enable peer node
enablePeer: boolean;
// Multiaddr of the primary relay node for this peer
relayMultiaddr: string;
// Pubsub topic to subscribe this peer to
pubSubTopic: string;
peer: PeerConfig;
}
export interface ServerConfig {