Broadcast peer info over floodsub when requested (#332)

* Refactor discovery handler

* Broadcast peer info over floodsub on requests

* Broadcast peer info from relay nodes

* Make debug reponse handler optional

* Register debug info request handler on peer init

* Move debug info types to types dir

* Return method to unsubscribe from the debug topic

* Make debug info flag optional for relay nodes

* Restructure peer connection info data

* Refactor getting peer info to be used in react app

* Refactor duplicate code to utils

* Rename peer methods

---------

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
prathamesh0 2023-03-06 10:20:51 +05:30 committed by GitHub
parent 0400546996
commit c46d5c3f33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 332 additions and 31 deletions

View File

@ -43,7 +43,7 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
```bash
# In packages/cli
yarn chat --relay-multiaddr <RELAY_MULTIADDR> --max-connections [MAX_CONNECTIONS] --dial-timeout [DIAL_TIMEOUT] --max-relay-connections [MAX_RELAY_CONNECTIONS] --peer-id-file [PEER_ID_FILE_PATH]
yarn chat --relay-multiaddr <RELAY_MULTIADDR> --max-connections [MAX_CONNECTIONS] --dial-timeout [DIAL_TIMEOUT] --max-relay-connections [MAX_RELAY_CONNECTIONS] --peer-id-file [PEER_ID_FILE_PATH] --enable-debug-info [ENABLE_DEBUG_INFO]
```
* `relay-multiaddr (r)`: multiaddr of a primary hop enabled relay node
@ -51,5 +51,6 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
* `dial-timeout`: timeout for dial to peers (ms)
* `max-relay-connections`: max number of relay node connections for this peer
* `peer-id-file (f)`: file path for peer id to be used (json)
* `enable-debug-info`: Whether to broadcast node's info over floodsub on request
* The process starts reading from `stdin` and outputs messages from others peers over the `/chat/1.0.0` protocol to `stdout`.

View File

@ -23,6 +23,7 @@ interface Arguments {
dialTimeout: number;
maxRelayConnections: number;
peerIdFile: string;
enableDebugInfo: boolean;
}
export class PeerCmd {
@ -40,7 +41,8 @@ export class PeerCmd {
const peerNodeInit: PeerInitConfig = {
maxConnections: argv.maxConnections,
dialTimeout: argv.dialTimeout,
maxRelayConnections: argv.maxRelayConnections
maxRelayConnections: argv.maxRelayConnections,
enableDebugInfo: argv.enableDebugInfo
};
await peer.init(peerNodeInit, peerIdObj);
@ -87,6 +89,11 @@ function _getArgv (): any {
type: 'string',
alias: 'f',
describe: 'Peer id file path (json)'
},
enableDebugInfo: {
type: 'boolean',
describe: 'Whether to participate in exchanging debug info over floodsub',
default: false
}
}).argv;
}

View File

@ -177,7 +177,8 @@ export class ServerCmd {
pingInterval: relayConfig.pingInterval ?? DEFAULT_PING_INTERVAL,
redialInterval: relayConfig.redialInterval ?? RELAY_REDIAL_INTERVAL,
maxDialRetry: relayConfig.maxDialRetry ?? RELAY_DEFAULT_MAX_DIAL_RETRY,
peerIdObj
peerIdObj,
enableDebugInfo: relayConfig.enableDebugInfo
};
await createRelayNode(relayNodeInit);
}
@ -200,7 +201,8 @@ export class ServerCmd {
maxRelayConnections: peerConfig.maxRelayConnections,
relayRedialInterval: peerConfig.relayRedialInterval,
maxConnections: peerConfig.maxConnections,
dialTimeout: peerConfig.dialTimeout
dialTimeout: peerConfig.dialTimeout,
enableDebugInfo: peerConfig.enableDebugInfo
};
await peer.init(peerNodeInit, peerIdObj);

View File

@ -24,6 +24,7 @@ interface Arguments {
pingInterval: number;
redialInterval: number;
maxDialRetry: number;
enableDebugInfo?: boolean;
}
async function main (): Promise<void> {
@ -63,7 +64,8 @@ async function main (): Promise<void> {
dialTimeout: argv.dialTimeout,
pingInterval: argv.pingInterval,
redialInterval: argv.redialInterval,
maxDialRetry: argv.maxDialRetry
maxDialRetry: argv.maxDialRetry,
enableDebugInfo: argv.enableDebugInfo
};
await createRelayNode(relayNodeInit);
}
@ -118,6 +120,10 @@ function _getArgv (): Arguments {
type: 'number',
describe: 'Maximum number of dial retries to be attempted to a relay peer',
default: RELAY_DEFAULT_MAX_DIAL_RETRY
},
enableDebugInfo: {
type: 'boolean',
describe: "Whether to broadcast node's info over floodsub on request"
}
// https://github.com/yargs/yargs/blob/main/docs/typescript.md?plain=1#L83
}).parseSync();

View File

@ -2,6 +2,10 @@
// Copyright 2023 Vulcanize, Inc.
//
export const P2P_CIRCUIT_ID = 'p2p-circuit';
export const CHAT_PROTOCOL = '/chat/1.0.0';
export const DEBUG_INFO_TOPIC = 'debug-info';
// How often a peer should broadcast it's peer data over pubsub discovery topic
// (interval at which other peers get corresponding discovery event)
export const PUBSUB_DISCOVERY_INTERVAL = 10000; // 10 seconds

View File

@ -13,3 +13,4 @@ export {
DEFAULT_PING_INTERVAL,
DIAL_TIMEOUT
} from './constants.js';
export { DebugMsg } from './types/debug-info.js';

View File

@ -38,15 +38,17 @@ import {
RELAY_TAG,
RELAY_REDIAL_INTERVAL,
DEFAULT_MAX_RELAY_CONNECTIONS,
DEFAULT_PING_TIMEOUT
DEFAULT_PING_TIMEOUT,
P2P_CIRCUIT_ID,
CHAT_PROTOCOL,
DEBUG_INFO_TOPIC
} from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
const P2P_CIRCUIT_ID = 'p2p-circuit';
export const CHAT_PROTOCOL = '/chat/1.0.0';
import { debugInfoRequestHandler, dialWithRetry, getConnectionsInfo, getSelfInfo } from './utils/index.js';
import { ConnectionType, DebugPeerInfo, DebugRequest, PeerConnectionInfo, PeerSelfInfo } from './types/debug-info.js';
const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged';
const ERR_DEBUG_INFO_NOT_ENABLED = 'Debug info not enabled';
export interface PeerIdObj {
id: string;
@ -62,6 +64,7 @@ export interface PeerInitConfig {
maxConnections?: number;
minConnections?: number;
dialTimeout?: number;
enableDebugInfo?: boolean;
}
export class Peer {
@ -75,6 +78,8 @@ export class Peer {
_relayRedialInterval?: number
_maxRelayConnections?: number
_debugInfoEnabled?: boolean
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
@ -116,6 +121,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
async init (initOptions: PeerInitConfig, peerIdObj?: PeerIdObj): Promise<void> {
this._relayRedialInterval = initOptions.relayRedialInterval;
this._maxRelayConnections = initOptions.maxRelayConnections;
this._debugInfoEnabled = initOptions.enableDebugInfo;
const pingTimeout = initOptions.pingTimeout ?? DEFAULT_PING_TIMEOUT;
try {
@ -227,6 +233,11 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
this._node.pubsub.addEventListener('message', (evt) => {
this._handlePubSubMessage(evt.detail);
});
if (this._debugInfoEnabled) {
console.log('Debug info enabled');
this._registerDebugInfoRequestHandler();
}
}
async close (): Promise<void> {
@ -247,6 +258,53 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
await Promise.all(hangUpPromises);
}
async getInfo (): Promise<DebugPeerInfo> {
assert(this.node);
assert(this.peerId);
const selfInfo: PeerSelfInfo = this.getPeerSelfInfo();
const connInfo: PeerConnectionInfo[] = this.getPeerConnectionsInfo();
const metrics = await this.metrics.getMetricsAsMap();
return {
selfInfo,
connInfo,
metrics
};
}
getPeerSelfInfo (): PeerSelfInfo {
assert(this._node);
const selfInfo = getSelfInfo(this._node);
return {
...selfInfo,
primaryRelayMultiaddr: this.relayNodeMultiaddr.toString(),
primaryRelayPeerId: this.relayNodeMultiaddr.getPeerId()
};
}
getPeerConnectionsInfo (): PeerConnectionInfo[] {
assert(this._node);
assert(this._peerHeartbeatChecker);
const connectionsInfo = getConnectionsInfo(this._node, this._peerHeartbeatChecker);
return connectionsInfo.map(connectionInfo => {
const peerConnectionInfo: PeerConnectionInfo = {
...connectionInfo,
isPeerRelay: this.isRelayPeerMultiaddr(connectionInfo.multiaddr),
isPeerRelayPrimary: this.isPrimaryRelay(connectionInfo.multiaddr)
};
if (peerConnectionInfo.type === ConnectionType.Relayed) {
peerConnectionInfo.hopRelayPeerId = multiaddr(peerConnectionInfo.multiaddr).decapsulate('p2p-circuit/p2p').getPeerId();
}
return peerConnectionInfo;
});
}
broadcastMessage (message: any): void {
for (const [, stream] of this._peerStreamMap) {
stream.push(message);
@ -258,6 +316,17 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg)));
}
async requestPeerInfo (): Promise<void> {
assert(this._node);
if (!this._debugInfoEnabled) {
throw new Error(ERR_DEBUG_INFO_NOT_ENABLED);
}
const request: DebugRequest = { type: 'Request' };
await this.floodMessage(DEBUG_INFO_TOPIC, request);
}
subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void {
this._messageHandlers.push(handler);
@ -300,6 +369,14 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
return unsubscribe;
}
subscribeDebugInfo (handler: (peerId: PeerId, data: any) => void): () => void {
if (!this._debugInfoEnabled) {
throw new Error(ERR_DEBUG_INFO_NOT_ENABLED);
}
return this.subscribeTopic(DEBUG_INFO_TOPIC, handler);
}
isRelayPeerMultiaddr (multiaddrString: string): boolean {
// Multiaddr not having p2p-circuit id or webrtc-star id is of a relay node
return !(multiaddrString.includes(P2P_CIRCUIT_ID) || multiaddrString.includes(P2P_WEBRTC_STAR_ID));
@ -377,24 +454,26 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
_handleDiscovery (peer: PeerInfo, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS): void {
// Check connected peers as they are discovered repeatedly.
if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
let isRelayPeer = false;
for (const multiaddr of peer.multiaddrs) {
if (this.isRelayPeerMultiaddr(multiaddr.toString())) {
isRelayPeer = true;
break;
}
}
// Check relay connections limit if it's a relay peer
if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) {
// console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`);
return;
}
console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer);
if (this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
return;
}
let isRelayPeer = false;
for (const multiaddr of peer.multiaddrs) {
if (this.isRelayPeerMultiaddr(multiaddr.toString())) {
isRelayPeer = true;
break;
}
}
// Check relay connections limit if it's a relay peer
if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) {
// console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`);
return;
}
console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer);
}
async _handleConnect (connection: Connection, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS): Promise<void> {
@ -569,6 +648,19 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
handler(msg.from, dataObj);
});
}
_registerDebugInfoRequestHandler (): void {
this.subscribeTopic(DEBUG_INFO_TOPIC, async (peerId: PeerId, msg: any): Promise<void> => {
assert(this._node);
await debugInfoRequestHandler({
node: this._node,
getPeerInfo: this.getInfo.bind(this),
peerId,
msg
});
});
}
}
export async function createPeerId (): Promise<PeerIdObj> {

View File

@ -5,16 +5,20 @@
import { Libp2p, createLibp2p } from '@cerc-io/libp2p';
import wrtc from 'wrtc';
import debug from 'debug';
import assert from 'assert';
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import { WebRTCDirectNodeType, webRTCDirect } from '@cerc-io/webrtc-direct';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import type { Message } from '@libp2p/interface-pubsub';
import type { Connection } from '@libp2p/interface-connection';
import { multiaddr } from '@multiformats/multiaddr';
import type { PeerId } from '@libp2p/interface-peer-id';
import { createFromJSON } from '@libp2p/peer-id-factory';
import { PrometheusMetrics } from '@cerc-io/prometheus-metrics';
import {
HOP_TIMEOUT,
@ -22,11 +26,13 @@ import {
PUBSUB_DISCOVERY_INTERVAL,
PUBSUB_SIGNATURE_POLICY,
WEBRTC_PORT_RANGE,
MAX_CONCURRENT_DIALS_PER_PEER
MAX_CONCURRENT_DIALS_PER_PEER,
DEBUG_INFO_TOPIC
} from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
import { debugInfoRequestHandler, dialWithRetry, getConnectionsInfo, getSelfInfo } from './utils/index.js';
import { PeerIdObj } from './peer.js';
import { SelfInfo, ConnectionInfo } from './types/debug-info.js';
const log = debug('laconic:relay');
@ -41,6 +47,7 @@ export interface RelayNodeInitConfig {
pingTimeout?: number;
redialInterval: number;
maxDialRetry: number;
enableDebugInfo?: boolean;
}
export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2p> {
@ -58,6 +65,8 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
const pingTimeout = init.pingTimeout ?? DEFAULT_PING_TIMEOUT;
const metrics = new PrometheusMetrics();
const node = await createLibp2p({
peerId,
addresses: {
@ -98,7 +107,8 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
},
ping: {
timeout: pingTimeout
}
},
metrics: () => metrics
});
const peerHeartbeatChecker = new PeerHearbeatChecker(
@ -159,6 +169,11 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
await _dialRelayPeers(node, init.relayPeers, init.maxDialRetry, init.redialInterval);
}
if (init.enableDebugInfo) {
log('Debug info enabled');
await _subscribeToDebugTopic(node, peerHeartbeatChecker, metrics);
}
return node;
}
@ -182,3 +197,38 @@ async function _handleDeadConnections (node: Libp2p, remotePeerId: PeerId): Prom
await node.hangUp(remotePeerId);
log('Closed');
}
async function _subscribeToDebugTopic (node: Libp2p, peerHeartbeatChecker: PeerHearbeatChecker, metrics: PrometheusMetrics): Promise<void> {
node.pubsub.subscribe(DEBUG_INFO_TOPIC);
// Listen for pubsub messages
node.pubsub.addEventListener('message', async (evt) => {
const msg: Message = evt.detail;
// Messages should be signed since globalSignaturePolicy is set to 'StrictSign'
assert(msg.type === 'signed');
if (msg.topic === DEBUG_INFO_TOPIC) {
const dataObj = JSON.parse(uint8ArrayToString(msg.data));
await debugInfoRequestHandler({
node,
getPeerInfo: async () => _getRelayPeerInfo(node, peerHeartbeatChecker, metrics),
peerId: msg.from,
msg: dataObj
});
}
});
}
async function _getRelayPeerInfo (node: Libp2p, peerHeartbeatChecker: PeerHearbeatChecker, metrics: PrometheusMetrics): Promise<any> {
const selfInfo: SelfInfo = getSelfInfo(node);
const connInfo: ConnectionInfo[] = getConnectionsInfo(node, peerHeartbeatChecker);
const metricsMap = await metrics.getMetricsAsMap();
return {
selfInfo,
connInfo,
metrics: metricsMap
};
}

View File

@ -0,0 +1,61 @@
//
// Copyright 2023 Vulcanize, Inc.
//
import type { Direction } from '@libp2p/interface-connection';
import { RegistryMetricData } from '@cerc-io/prometheus-metrics';
export interface SelfInfo {
peerId: string;
multiaddrs: string[];
}
export interface PeerSelfInfo extends SelfInfo {
primaryRelayMultiaddr: string;
primaryRelayPeerId: string | null;
}
export enum ConnectionType {
Relayed = 'relayed',
Direct = 'direct'
}
export interface ConnectionInfo {
id: string;
peerId: string;
multiaddr: string;
direction: Direction;
status: string;
latency: number[];
type: ConnectionType;
}
export interface PeerConnectionInfo extends ConnectionInfo {
isPeerRelay: boolean;
isPeerRelayPrimary: boolean;
hopRelayPeerId?: string | null;
}
export interface DebugPeerInfo {
selfInfo: PeerSelfInfo;
connInfo: PeerConnectionInfo[];
metrics: Map<string, RegistryMetricData<any>>;
}
export interface DebugRelayInfo {
selfInfo: SelfInfo;
connInfo: ConnectionInfo[];
metrics: Map<string, RegistryMetricData<any>>;
}
export interface DebugRequest {
type: 'Request'
}
export interface DebugResponse {
type: 'Response',
dst: string,
peerInfo: DebugPeerInfo | DebugRelayInfo
}
export type DebugMsg = DebugRequest | DebugResponse

View File

@ -2,9 +2,19 @@
// Copyright 2023 Vulcanize, Inc.
//
import { uniqueNamesGenerator, adjectives, colors, names } from 'unique-names-generator';
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
import debug from 'debug';
import { Libp2p } from '@cerc-io/libp2p';
import { Multiaddr } from '@multiformats/multiaddr';
import { uniqueNamesGenerator, adjectives, colors, names } from 'unique-names-generator';
import type { PeerId } from '@libp2p/interface-peer-id';
import { ConnectionInfo, ConnectionType, DebugMsg, DebugPeerInfo, DebugResponse, SelfInfo } from '../types/debug-info.js';
import { DEBUG_INFO_TOPIC } from '../constants.js';
import { PeerHearbeatChecker } from '../peer-heartbeat-checker.js';
const log = debug('laconic:utils');
interface DialWithRetryOptions {
redialInterval: number
@ -63,3 +73,64 @@ export const getPseudonymForPeerId = (peerId: string): string => {
separator: '-'
});
};
/**
* Handler for pubsub debug info request
* @param peerId
* @param msg
*/
export const debugInfoRequestHandler = async (
params: {
node: Libp2p,
getPeerInfo: () => Promise<DebugPeerInfo>
peerId: PeerId,
msg: any,
}): Promise<void> => {
const { node, peerId, msg, getPeerInfo } = params;
const debugMsg = msg as DebugMsg;
const msgType = debugMsg.type;
if (msgType === 'Request') {
log('got a debug info request from', peerId.toString());
const peerInfo: DebugPeerInfo = await getPeerInfo();
const response: DebugResponse = {
type: 'Response',
dst: peerId.toString(),
peerInfo
};
await node.pubsub.publish(DEBUG_INFO_TOPIC, uint8ArrayFromString(JSON.stringify(response)));
}
};
/**
* Method to get self node info
* @param node
* @returns
*/
export const getSelfInfo = (node: Libp2p): SelfInfo => {
return {
peerId: node.peerId.toString(),
multiaddrs: node.getMultiaddrs().map(multiaddr => multiaddr.toString())
};
};
/**
* Method to get connections info
* @param node
* @param peerHeartbeatChecker
* @returns
*/
export const getConnectionsInfo = (node: Libp2p, peerHeartbeatChecker: PeerHearbeatChecker): ConnectionInfo[] => {
return node.getConnections().map(connection => {
return {
id: connection.id,
peerId: connection.remotePeer.toString(),
multiaddr: connection.remoteAddr.toString(),
direction: connection.stat.direction,
status: connection.stat.status,
type: connection.remoteAddr.toString().includes('p2p-circuit/p2p') ? ConnectionType.Relayed : ConnectionType.Direct,
latency: peerHeartbeatChecker.getLatencyData(connection.remotePeer)
};
});
};

View File

@ -64,6 +64,9 @@ export interface RelayConfig {
// Max number of dial retries to be attempted to a relay peer
maxDialRetry?: number;
// Broadcast node's info over floodsub on requests
enableDebugInfo?: boolean;
}
// Peer config
@ -94,6 +97,9 @@ export interface PeerConfig {
// Peer id file path (json)
peerIdFile?: string;
// Participate in exchange of debug info over floodsub
enableDebugInfo?: boolean;
}
// P2P config