From c46d5c3f33b026f9dc44450a6c939cefd2eea053 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Mon, 6 Mar 2023 10:20:51 +0530 Subject: [PATCH] Broadcast peer info over floodsub when requested (#332) * Refactor discovery handler * Broadcast peer info over floodsub on requests * Broadcast peer info from relay nodes * Make debug reponse handler optional * Register debug info request handler on peer init * Move debug info types to types dir * Return method to unsubscribe from the debug topic * Make debug info flag optional for relay nodes * Restructure peer connection info data * Refactor getting peer info to be used in react app * Refactor duplicate code to utils * Rename peer methods --------- Co-authored-by: nabarun --- packages/cli/README.md | 3 +- packages/cli/src/peer.ts | 9 +- packages/cli/src/server.ts | 6 +- packages/peer/src/cli/relay.ts | 8 +- packages/peer/src/constants.ts | 4 + packages/peer/src/index.ts | 1 + packages/peer/src/peer.ts | 136 +++++++++++++++++++++----- packages/peer/src/relay.ts | 56 ++++++++++- packages/peer/src/types/debug-info.ts | 61 ++++++++++++ packages/peer/src/utils/index.ts | 73 +++++++++++++- packages/util/src/config.ts | 6 ++ 11 files changed, 332 insertions(+), 31 deletions(-) create mode 100644 packages/peer/src/types/debug-info.ts diff --git a/packages/cli/README.md b/packages/cli/README.md index 1eacc7ab..ff2fc93e 100644 --- a/packages/cli/README.md +++ b/packages/cli/README.md @@ -43,7 +43,7 @@ A basic CLI to pass messages between peers using `stdin`/`stdout` ```bash # In packages/cli - yarn chat --relay-multiaddr --max-connections [MAX_CONNECTIONS] --dial-timeout [DIAL_TIMEOUT] --max-relay-connections [MAX_RELAY_CONNECTIONS] --peer-id-file [PEER_ID_FILE_PATH] + yarn chat --relay-multiaddr --max-connections [MAX_CONNECTIONS] --dial-timeout [DIAL_TIMEOUT] --max-relay-connections [MAX_RELAY_CONNECTIONS] --peer-id-file [PEER_ID_FILE_PATH] --enable-debug-info [ENABLE_DEBUG_INFO] ``` * `relay-multiaddr (r)`: multiaddr of a primary hop enabled relay node @@ -51,5 +51,6 @@ A basic CLI to pass messages between peers using `stdin`/`stdout` * `dial-timeout`: timeout for dial to peers (ms) * `max-relay-connections`: max number of relay node connections for this peer * `peer-id-file (f)`: file path for peer id to be used (json) + * `enable-debug-info`: Whether to broadcast node's info over floodsub on request * The process starts reading from `stdin` and outputs messages from others peers over the `/chat/1.0.0` protocol to `stdout`. diff --git a/packages/cli/src/peer.ts b/packages/cli/src/peer.ts index a3d5ebc6..c690543c 100644 --- a/packages/cli/src/peer.ts +++ b/packages/cli/src/peer.ts @@ -23,6 +23,7 @@ interface Arguments { dialTimeout: number; maxRelayConnections: number; peerIdFile: string; + enableDebugInfo: boolean; } export class PeerCmd { @@ -40,7 +41,8 @@ export class PeerCmd { const peerNodeInit: PeerInitConfig = { maxConnections: argv.maxConnections, dialTimeout: argv.dialTimeout, - maxRelayConnections: argv.maxRelayConnections + maxRelayConnections: argv.maxRelayConnections, + enableDebugInfo: argv.enableDebugInfo }; await peer.init(peerNodeInit, peerIdObj); @@ -87,6 +89,11 @@ function _getArgv (): any { type: 'string', alias: 'f', describe: 'Peer id file path (json)' + }, + enableDebugInfo: { + type: 'boolean', + describe: 'Whether to participate in exchanging debug info over floodsub', + default: false } }).argv; } diff --git a/packages/cli/src/server.ts b/packages/cli/src/server.ts index e2a1f703..e7f9eb27 100644 --- a/packages/cli/src/server.ts +++ b/packages/cli/src/server.ts @@ -177,7 +177,8 @@ export class ServerCmd { pingInterval: relayConfig.pingInterval ?? DEFAULT_PING_INTERVAL, redialInterval: relayConfig.redialInterval ?? RELAY_REDIAL_INTERVAL, maxDialRetry: relayConfig.maxDialRetry ?? RELAY_DEFAULT_MAX_DIAL_RETRY, - peerIdObj + peerIdObj, + enableDebugInfo: relayConfig.enableDebugInfo }; await createRelayNode(relayNodeInit); } @@ -200,7 +201,8 @@ export class ServerCmd { maxRelayConnections: peerConfig.maxRelayConnections, relayRedialInterval: peerConfig.relayRedialInterval, maxConnections: peerConfig.maxConnections, - dialTimeout: peerConfig.dialTimeout + dialTimeout: peerConfig.dialTimeout, + enableDebugInfo: peerConfig.enableDebugInfo }; await peer.init(peerNodeInit, peerIdObj); diff --git a/packages/peer/src/cli/relay.ts b/packages/peer/src/cli/relay.ts index ab09073f..00702f13 100644 --- a/packages/peer/src/cli/relay.ts +++ b/packages/peer/src/cli/relay.ts @@ -24,6 +24,7 @@ interface Arguments { pingInterval: number; redialInterval: number; maxDialRetry: number; + enableDebugInfo?: boolean; } async function main (): Promise { @@ -63,7 +64,8 @@ async function main (): Promise { dialTimeout: argv.dialTimeout, pingInterval: argv.pingInterval, redialInterval: argv.redialInterval, - maxDialRetry: argv.maxDialRetry + maxDialRetry: argv.maxDialRetry, + enableDebugInfo: argv.enableDebugInfo }; await createRelayNode(relayNodeInit); } @@ -118,6 +120,10 @@ function _getArgv (): Arguments { type: 'number', describe: 'Maximum number of dial retries to be attempted to a relay peer', default: RELAY_DEFAULT_MAX_DIAL_RETRY + }, + enableDebugInfo: { + type: 'boolean', + describe: "Whether to broadcast node's info over floodsub on request" } // https://github.com/yargs/yargs/blob/main/docs/typescript.md?plain=1#L83 }).parseSync(); diff --git a/packages/peer/src/constants.ts b/packages/peer/src/constants.ts index f936da19..d9a30e22 100644 --- a/packages/peer/src/constants.ts +++ b/packages/peer/src/constants.ts @@ -2,6 +2,10 @@ // Copyright 2023 Vulcanize, Inc. // +export const P2P_CIRCUIT_ID = 'p2p-circuit'; +export const CHAT_PROTOCOL = '/chat/1.0.0'; +export const DEBUG_INFO_TOPIC = 'debug-info'; + // How often a peer should broadcast it's peer data over pubsub discovery topic // (interval at which other peers get corresponding discovery event) export const PUBSUB_DISCOVERY_INTERVAL = 10000; // 10 seconds diff --git a/packages/peer/src/index.ts b/packages/peer/src/index.ts index 9461803c..2082e9e2 100644 --- a/packages/peer/src/index.ts +++ b/packages/peer/src/index.ts @@ -13,3 +13,4 @@ export { DEFAULT_PING_INTERVAL, DIAL_TIMEOUT } from './constants.js'; +export { DebugMsg } from './types/debug-info.js'; diff --git a/packages/peer/src/peer.ts b/packages/peer/src/peer.ts index ea76ef5c..925ce9c7 100644 --- a/packages/peer/src/peer.ts +++ b/packages/peer/src/peer.ts @@ -38,15 +38,17 @@ import { RELAY_TAG, RELAY_REDIAL_INTERVAL, DEFAULT_MAX_RELAY_CONNECTIONS, - DEFAULT_PING_TIMEOUT + DEFAULT_PING_TIMEOUT, + P2P_CIRCUIT_ID, + CHAT_PROTOCOL, + DEBUG_INFO_TOPIC } from './constants.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; -import { dialWithRetry } from './utils/index.js'; - -const P2P_CIRCUIT_ID = 'p2p-circuit'; -export const CHAT_PROTOCOL = '/chat/1.0.0'; +import { debugInfoRequestHandler, dialWithRetry, getConnectionsInfo, getSelfInfo } from './utils/index.js'; +import { ConnectionType, DebugPeerInfo, DebugRequest, PeerConnectionInfo, PeerSelfInfo } from './types/debug-info.js'; const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged'; +const ERR_DEBUG_INFO_NOT_ENABLED = 'Debug info not enabled'; export interface PeerIdObj { id: string; @@ -62,6 +64,7 @@ export interface PeerInitConfig { maxConnections?: number; minConnections?: number; dialTimeout?: number; + enableDebugInfo?: boolean; } export class Peer { @@ -75,6 +78,8 @@ export class Peer { _relayRedialInterval?: number _maxRelayConnections?: number + _debugInfoEnabled?: boolean + _peerStreamMap: Map> = new Map() _messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] _topicHandlers: Map void>> = new Map() @@ -116,6 +121,7 @@ _peerStreamMap: Map> = new Map() async init (initOptions: PeerInitConfig, peerIdObj?: PeerIdObj): Promise { this._relayRedialInterval = initOptions.relayRedialInterval; this._maxRelayConnections = initOptions.maxRelayConnections; + this._debugInfoEnabled = initOptions.enableDebugInfo; const pingTimeout = initOptions.pingTimeout ?? DEFAULT_PING_TIMEOUT; try { @@ -227,6 +233,11 @@ _peerStreamMap: Map> = new Map() this._node.pubsub.addEventListener('message', (evt) => { this._handlePubSubMessage(evt.detail); }); + + if (this._debugInfoEnabled) { + console.log('Debug info enabled'); + this._registerDebugInfoRequestHandler(); + } } async close (): Promise { @@ -247,6 +258,53 @@ _peerStreamMap: Map> = new Map() await Promise.all(hangUpPromises); } + async getInfo (): Promise { + assert(this.node); + assert(this.peerId); + + const selfInfo: PeerSelfInfo = this.getPeerSelfInfo(); + const connInfo: PeerConnectionInfo[] = this.getPeerConnectionsInfo(); + const metrics = await this.metrics.getMetricsAsMap(); + + return { + selfInfo, + connInfo, + metrics + }; + } + + getPeerSelfInfo (): PeerSelfInfo { + assert(this._node); + + const selfInfo = getSelfInfo(this._node); + + return { + ...selfInfo, + primaryRelayMultiaddr: this.relayNodeMultiaddr.toString(), + primaryRelayPeerId: this.relayNodeMultiaddr.getPeerId() + }; + } + + getPeerConnectionsInfo (): PeerConnectionInfo[] { + assert(this._node); + assert(this._peerHeartbeatChecker); + const connectionsInfo = getConnectionsInfo(this._node, this._peerHeartbeatChecker); + + return connectionsInfo.map(connectionInfo => { + const peerConnectionInfo: PeerConnectionInfo = { + ...connectionInfo, + isPeerRelay: this.isRelayPeerMultiaddr(connectionInfo.multiaddr), + isPeerRelayPrimary: this.isPrimaryRelay(connectionInfo.multiaddr) + }; + + if (peerConnectionInfo.type === ConnectionType.Relayed) { + peerConnectionInfo.hopRelayPeerId = multiaddr(peerConnectionInfo.multiaddr).decapsulate('p2p-circuit/p2p').getPeerId(); + } + + return peerConnectionInfo; + }); + } + broadcastMessage (message: any): void { for (const [, stream] of this._peerStreamMap) { stream.push(message); @@ -258,6 +316,17 @@ _peerStreamMap: Map> = new Map() await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg))); } + async requestPeerInfo (): Promise { + assert(this._node); + + if (!this._debugInfoEnabled) { + throw new Error(ERR_DEBUG_INFO_NOT_ENABLED); + } + + const request: DebugRequest = { type: 'Request' }; + await this.floodMessage(DEBUG_INFO_TOPIC, request); + } + subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void { this._messageHandlers.push(handler); @@ -300,6 +369,14 @@ _peerStreamMap: Map> = new Map() return unsubscribe; } + subscribeDebugInfo (handler: (peerId: PeerId, data: any) => void): () => void { + if (!this._debugInfoEnabled) { + throw new Error(ERR_DEBUG_INFO_NOT_ENABLED); + } + + return this.subscribeTopic(DEBUG_INFO_TOPIC, handler); + } + isRelayPeerMultiaddr (multiaddrString: string): boolean { // Multiaddr not having p2p-circuit id or webrtc-star id is of a relay node return !(multiaddrString.includes(P2P_CIRCUIT_ID) || multiaddrString.includes(P2P_WEBRTC_STAR_ID)); @@ -377,24 +454,26 @@ _peerStreamMap: Map> = new Map() _handleDiscovery (peer: PeerInfo, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS): void { // Check connected peers as they are discovered repeatedly. - if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { - let isRelayPeer = false; - for (const multiaddr of peer.multiaddrs) { - if (this.isRelayPeerMultiaddr(multiaddr.toString())) { - isRelayPeer = true; - break; - } - } - - // Check relay connections limit if it's a relay peer - if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) { - // console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`); - return; - } - - console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString())); - this._connectPeer(peer); + if (this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { + return; } + + let isRelayPeer = false; + for (const multiaddr of peer.multiaddrs) { + if (this.isRelayPeerMultiaddr(multiaddr.toString())) { + isRelayPeer = true; + break; + } + } + + // Check relay connections limit if it's a relay peer + if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) { + // console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`); + return; + } + + console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString())); + this._connectPeer(peer); } async _handleConnect (connection: Connection, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS): Promise { @@ -569,6 +648,19 @@ _peerStreamMap: Map> = new Map() handler(msg.from, dataObj); }); } + + _registerDebugInfoRequestHandler (): void { + this.subscribeTopic(DEBUG_INFO_TOPIC, async (peerId: PeerId, msg: any): Promise => { + assert(this._node); + + await debugInfoRequestHandler({ + node: this._node, + getPeerInfo: this.getInfo.bind(this), + peerId, + msg + }); + }); + } } export async function createPeerId (): Promise { diff --git a/packages/peer/src/relay.ts b/packages/peer/src/relay.ts index 15279036..8291d240 100644 --- a/packages/peer/src/relay.ts +++ b/packages/peer/src/relay.ts @@ -5,16 +5,20 @@ import { Libp2p, createLibp2p } from '@cerc-io/libp2p'; import wrtc from 'wrtc'; import debug from 'debug'; +import assert from 'assert'; +import { toString as uint8ArrayToString } from 'uint8arrays/to-string'; import { noise } from '@chainsafe/libp2p-noise'; import { mplex } from '@libp2p/mplex'; import { WebRTCDirectNodeType, webRTCDirect } from '@cerc-io/webrtc-direct'; import { floodsub } from '@libp2p/floodsub'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; +import type { Message } from '@libp2p/interface-pubsub'; import type { Connection } from '@libp2p/interface-connection'; import { multiaddr } from '@multiformats/multiaddr'; import type { PeerId } from '@libp2p/interface-peer-id'; import { createFromJSON } from '@libp2p/peer-id-factory'; +import { PrometheusMetrics } from '@cerc-io/prometheus-metrics'; import { HOP_TIMEOUT, @@ -22,11 +26,13 @@ import { PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE, - MAX_CONCURRENT_DIALS_PER_PEER + MAX_CONCURRENT_DIALS_PER_PEER, + DEBUG_INFO_TOPIC } from './constants.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; -import { dialWithRetry } from './utils/index.js'; +import { debugInfoRequestHandler, dialWithRetry, getConnectionsInfo, getSelfInfo } from './utils/index.js'; import { PeerIdObj } from './peer.js'; +import { SelfInfo, ConnectionInfo } from './types/debug-info.js'; const log = debug('laconic:relay'); @@ -41,6 +47,7 @@ export interface RelayNodeInitConfig { pingTimeout?: number; redialInterval: number; maxDialRetry: number; + enableDebugInfo?: boolean; } export async function createRelayNode (init: RelayNodeInitConfig): Promise { @@ -58,6 +65,8 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise metrics }); const peerHeartbeatChecker = new PeerHearbeatChecker( @@ -159,6 +169,11 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise { + node.pubsub.subscribe(DEBUG_INFO_TOPIC); + + // Listen for pubsub messages + node.pubsub.addEventListener('message', async (evt) => { + const msg: Message = evt.detail; + + // Messages should be signed since globalSignaturePolicy is set to 'StrictSign' + assert(msg.type === 'signed'); + + if (msg.topic === DEBUG_INFO_TOPIC) { + const dataObj = JSON.parse(uint8ArrayToString(msg.data)); + + await debugInfoRequestHandler({ + node, + getPeerInfo: async () => _getRelayPeerInfo(node, peerHeartbeatChecker, metrics), + peerId: msg.from, + msg: dataObj + }); + } + }); +} + +async function _getRelayPeerInfo (node: Libp2p, peerHeartbeatChecker: PeerHearbeatChecker, metrics: PrometheusMetrics): Promise { + const selfInfo: SelfInfo = getSelfInfo(node); + const connInfo: ConnectionInfo[] = getConnectionsInfo(node, peerHeartbeatChecker); + const metricsMap = await metrics.getMetricsAsMap(); + + return { + selfInfo, + connInfo, + metrics: metricsMap + }; +} diff --git a/packages/peer/src/types/debug-info.ts b/packages/peer/src/types/debug-info.ts new file mode 100644 index 00000000..a0412f9b --- /dev/null +++ b/packages/peer/src/types/debug-info.ts @@ -0,0 +1,61 @@ +// +// Copyright 2023 Vulcanize, Inc. +// + +import type { Direction } from '@libp2p/interface-connection'; +import { RegistryMetricData } from '@cerc-io/prometheus-metrics'; + +export interface SelfInfo { + peerId: string; + multiaddrs: string[]; +} + +export interface PeerSelfInfo extends SelfInfo { + primaryRelayMultiaddr: string; + primaryRelayPeerId: string | null; +} + +export enum ConnectionType { + Relayed = 'relayed', + Direct = 'direct' +} + +export interface ConnectionInfo { + id: string; + peerId: string; + multiaddr: string; + direction: Direction; + status: string; + latency: number[]; + type: ConnectionType; +} + +export interface PeerConnectionInfo extends ConnectionInfo { + isPeerRelay: boolean; + isPeerRelayPrimary: boolean; + hopRelayPeerId?: string | null; +} + +export interface DebugPeerInfo { + selfInfo: PeerSelfInfo; + connInfo: PeerConnectionInfo[]; + metrics: Map>; +} + +export interface DebugRelayInfo { + selfInfo: SelfInfo; + connInfo: ConnectionInfo[]; + metrics: Map>; +} + +export interface DebugRequest { + type: 'Request' +} + +export interface DebugResponse { + type: 'Response', + dst: string, + peerInfo: DebugPeerInfo | DebugRelayInfo +} + +export type DebugMsg = DebugRequest | DebugResponse diff --git a/packages/peer/src/utils/index.ts b/packages/peer/src/utils/index.ts index 7f3bcc6d..e258089a 100644 --- a/packages/peer/src/utils/index.ts +++ b/packages/peer/src/utils/index.ts @@ -2,9 +2,19 @@ // Copyright 2023 Vulcanize, Inc. // +import { uniqueNamesGenerator, adjectives, colors, names } from 'unique-names-generator'; +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'; +import debug from 'debug'; + import { Libp2p } from '@cerc-io/libp2p'; import { Multiaddr } from '@multiformats/multiaddr'; -import { uniqueNamesGenerator, adjectives, colors, names } from 'unique-names-generator'; +import type { PeerId } from '@libp2p/interface-peer-id'; + +import { ConnectionInfo, ConnectionType, DebugMsg, DebugPeerInfo, DebugResponse, SelfInfo } from '../types/debug-info.js'; +import { DEBUG_INFO_TOPIC } from '../constants.js'; +import { PeerHearbeatChecker } from '../peer-heartbeat-checker.js'; + +const log = debug('laconic:utils'); interface DialWithRetryOptions { redialInterval: number @@ -63,3 +73,64 @@ export const getPseudonymForPeerId = (peerId: string): string => { separator: '-' }); }; + +/** + * Handler for pubsub debug info request + * @param peerId + * @param msg + */ +export const debugInfoRequestHandler = async ( + params: { + node: Libp2p, + getPeerInfo: () => Promise + peerId: PeerId, + msg: any, +}): Promise => { + const { node, peerId, msg, getPeerInfo } = params; + const debugMsg = msg as DebugMsg; + const msgType = debugMsg.type; + + if (msgType === 'Request') { + log('got a debug info request from', peerId.toString()); + const peerInfo: DebugPeerInfo = await getPeerInfo(); + const response: DebugResponse = { + type: 'Response', + dst: peerId.toString(), + peerInfo + }; + + await node.pubsub.publish(DEBUG_INFO_TOPIC, uint8ArrayFromString(JSON.stringify(response))); + } +}; + +/** + * Method to get self node info + * @param node + * @returns + */ +export const getSelfInfo = (node: Libp2p): SelfInfo => { + return { + peerId: node.peerId.toString(), + multiaddrs: node.getMultiaddrs().map(multiaddr => multiaddr.toString()) + }; +}; + +/** + * Method to get connections info + * @param node + * @param peerHeartbeatChecker + * @returns + */ +export const getConnectionsInfo = (node: Libp2p, peerHeartbeatChecker: PeerHearbeatChecker): ConnectionInfo[] => { + return node.getConnections().map(connection => { + return { + id: connection.id, + peerId: connection.remotePeer.toString(), + multiaddr: connection.remoteAddr.toString(), + direction: connection.stat.direction, + status: connection.stat.status, + type: connection.remoteAddr.toString().includes('p2p-circuit/p2p') ? ConnectionType.Relayed : ConnectionType.Direct, + latency: peerHeartbeatChecker.getLatencyData(connection.remotePeer) + }; + }); +}; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 9a01fae5..ea3b68a0 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -64,6 +64,9 @@ export interface RelayConfig { // Max number of dial retries to be attempted to a relay peer maxDialRetry?: number; + + // Broadcast node's info over floodsub on requests + enableDebugInfo?: boolean; } // Peer config @@ -94,6 +97,9 @@ export interface PeerConfig { // Peer id file path (json) peerIdFile?: string; + + // Participate in exchange of debug info over floodsub + enableDebugInfo?: boolean; } // P2P config