From a56ade96fa6ae1cd123315a1724654a40d8efdd6 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Fri, 17 Feb 2023 16:05:20 +0530 Subject: [PATCH] Refactor peer package for usage in watchers (#324) * Refactor protocol change handler * Refactor protocol stream creation * Refactor Peer constructor * Refactor peer CLIs to a separate cli folder * Move peer node setup to a separate file * Add a getter method for primary relay node mutiaddr * Close new relay peer connection if limit already reached --- packages/peer/package.json | 4 +- packages/peer/src/{ => cli}/create-peer.ts | 2 +- packages/peer/src/cli/relay.ts | 103 ++++ packages/peer/src/index.ts | 506 +------------------- packages/peer/src/peer.ts | 527 +++++++++++++++++++++ packages/peer/src/relay.ts | 112 +---- 6 files changed, 655 insertions(+), 599 deletions(-) rename packages/peer/src/{ => cli}/create-peer.ts (96%) create mode 100644 packages/peer/src/cli/relay.ts create mode 100644 packages/peer/src/peer.ts diff --git a/packages/peer/package.json b/packages/peer/package.json index 79051f0a..81e7e0a6 100644 --- a/packages/peer/package.json +++ b/packages/peer/package.json @@ -22,8 +22,8 @@ "build": "tsc", "lint": "eslint .", "dev": "node dist/index.js", - "create-peer": "node dist/create-peer.js", - "relay-node": "node dist/relay.js" + "create-peer": "node dist/cli/create-peer.js", + "relay-node": "node dist/cli/relay.js" }, "dependencies": { "@cerc-io/libp2p": "0.42.2-laconic-0.1.1", diff --git a/packages/peer/src/create-peer.ts b/packages/peer/src/cli/create-peer.ts similarity index 96% rename from packages/peer/src/create-peer.ts rename to packages/peer/src/cli/create-peer.ts index 50d2ec4a..b03d0b02 100644 --- a/packages/peer/src/create-peer.ts +++ b/packages/peer/src/cli/create-peer.ts @@ -7,7 +7,7 @@ import path from 'path'; import { hideBin } from 'yargs/helpers'; import yargs from 'yargs'; -import { createPeerId } from './index.js'; +import { createPeerId } from '../peer.js'; interface Arguments { file: string; diff --git a/packages/peer/src/cli/relay.ts b/packages/peer/src/cli/relay.ts new file mode 100644 index 00000000..78ca0911 --- /dev/null +++ b/packages/peer/src/cli/relay.ts @@ -0,0 +1,103 @@ +import yargs from 'yargs'; +import { hideBin } from 'yargs/helpers'; +import fs from 'fs'; +import path from 'path'; + +import { RelayNodeInit, createRelayNode } from '../relay.js'; +import { PeerIdObj } from '../peer.js'; + +const DEFAULT_HOST = '127.0.0.1'; +const DEFAULT_PORT = 9090; +const DEFAULT_MAX_DIAL_RETRY = 5; + +interface Arguments { + host: string; + port: number; + announce?: string; + peerIdFile?: string; + relayPeers?: string; + maxDialRetry: number; +} + +async function main (): Promise { + const argv: Arguments = _getArgv(); + let peerIdObj: PeerIdObj | undefined; + let relayPeersList: string[] = []; + + if (argv.peerIdFile) { + const peerIdFilePath = path.resolve(argv.peerIdFile); + console.log(`Reading peer id from file ${peerIdFilePath}`); + + const peerIdJson = fs.readFileSync(peerIdFilePath, 'utf-8'); + peerIdObj = JSON.parse(peerIdJson); + } else { + console.log('Creating a new peer id'); + } + + if (argv.relayPeers) { + const relayPeersFilePath = path.resolve(argv.relayPeers); + + if (!fs.existsSync(relayPeersFilePath)) { + console.log(`File at given path ${relayPeersFilePath} not found, exiting`); + process.exit(); + } + + console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`); + const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8'); + relayPeersList = JSON.parse(relayPeersListObj); + } + + const relayNodeInit: RelayNodeInit = { + host: argv.host, + port: argv.port, + announceDomain: argv.announce, + relayPeers: relayPeersList, + maxDialRetry: argv.maxDialRetry, + peerIdObj + }; + await createRelayNode(relayNodeInit); +} + +function _getArgv (): any { + return yargs(hideBin(process.argv)).parserConfiguration({ + 'parse-numbers': false + }).options({ + host: { + type: 'string', + alias: 'h', + default: DEFAULT_HOST, + describe: 'Host to bind to' + }, + port: { + type: 'number', + alias: 'p', + default: DEFAULT_PORT, + describe: 'Port to start listening on' + }, + announce: { + type: 'string', + alias: 'a', + describe: 'Domain name to be used in the announce address' + }, + peerIdFile: { + type: 'string', + alias: 'f', + describe: 'Relay Peer Id file path (json)' + }, + relayPeers: { + type: 'string', + alias: 'r', + describe: 'Relay peer multiaddr(s) list file path (json)' + }, + maxDialRetry: { + type: 'number', + describe: 'Maximum number of retries for dialling a relay peer', + default: DEFAULT_MAX_DIAL_RETRY + } + // https://github.com/yargs/yargs/blob/main/docs/typescript.md?plain=1#L83 + }).parseSync(); +} + +main().catch(err => { + console.log(err); +}); diff --git a/packages/peer/src/index.ts b/packages/peer/src/index.ts index f94404cd..db018d84 100644 --- a/packages/peer/src/index.ts +++ b/packages/peer/src/index.ts @@ -2,507 +2,5 @@ // Copyright 2022 Vulcanize, Inc. // -import { createLibp2p, Libp2p } from '@cerc-io/libp2p'; -// For nodejs. -import wrtc from 'wrtc'; -import assert from 'assert'; -import { Buffer } from 'buffer'; -import { pipe } from 'it-pipe'; -import * as lp from 'it-length-prefixed'; -import map from 'it-map'; -import { pushable, Pushable } from 'it-pushable'; -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'; -import { toString as uint8ArrayToString } from 'uint8arrays/to-string'; - -import { webRTCDirect, WebRTCDirectComponents, P2P_WEBRTC_STAR_ID, WebRTCDirectNodeType } from '@cerc-io/webrtc-direct'; -import { noise } from '@chainsafe/libp2p-noise'; -import { mplex } from '@libp2p/mplex'; -import type { Transport } from '@libp2p/interface-transport'; -import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection'; -import type { PeerInfo } from '@libp2p/interface-peer-info'; -import type { Message } from '@libp2p/interface-pubsub'; -import type { PeerId } from '@libp2p/interface-peer-id'; -import { createFromJSON, createEd25519PeerId } from '@libp2p/peer-id-factory'; -import { multiaddr, Multiaddr } from '@multiformats/multiaddr'; -import { floodsub } from '@libp2p/floodsub'; -import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; - -import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT, DEFAULT_MAX_RELAY_CONNECTIONS } from './constants.js'; -import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; -import { dialWithRetry } from './utils/index.js'; - -const P2P_CIRCUIT_ID = 'p2p-circuit'; -export const CHAT_PROTOCOL = '/chat/1.0.0'; - -const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged'; - -type PeerIdObj = { - id: string - privKey: string - pubKey: string -}; - -export class Peer { - _node?: Libp2p - _peerHeartbeatChecker?: PeerHearbeatChecker - _wrtcTransport: (components: WebRTCDirectComponents) => Transport - _relayNodeMultiaddr: Multiaddr - _numRelayConnections = 0 - - _peerStreamMap: Map> = new Map() - _messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] - _topicHandlers: Map void>> = new Map() - - constructor (relayNodeURL: string, nodejs?: boolean) { - this._relayNodeMultiaddr = multiaddr(relayNodeURL); - - const relayPeerId = this._relayNodeMultiaddr.getPeerId(); - assert(relayPeerId); - - // Instantiation in nodejs. - if (nodejs) { - this._wrtcTransport = webRTCDirect({ - wrtc, - enableSignalling: true, - nodeType: WebRTCDirectNodeType.Peer, - relayPeerId - }); - } else { - this._wrtcTransport = webRTCDirect({ relayPeerId, enableSignalling: true }); - } - } - - get peerId (): PeerId | undefined { - return this._node?.peerId; - } - - get node (): Libp2p | undefined { - return this._node; - } - - async init ( - peerIdObj?: PeerIdObj, - maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS - ): Promise { - try { - let peerId: PeerId | undefined; - if (peerIdObj) { - peerId = await createFromJSON(peerIdObj); - } - - this._node = await createLibp2p({ - peerId, - addresses: { - // Use existing protocol id in multiaddr to listen through signalling channel to relay node - // Allows direct webrtc connection to a peer if possible (eg. peers on a same network) - listen: [`${this._relayNodeMultiaddr.toString()}/${P2P_WEBRTC_STAR_ID}`] - }, - transports: [this._wrtcTransport], - connectionEncryption: [noise()], - streamMuxers: [mplex()], - pubsub: floodsub({ globalSignaturePolicy: PUBSUB_SIGNATURE_POLICY }), - peerDiscovery: [ - // Use pubsub based discovery; relay server acts as a peer discovery source - pubsubPeerDiscovery({ - interval: PUBSUB_DISCOVERY_INTERVAL - }) - ], - relay: { - enabled: true, - autoRelay: { - enabled: true, - maxListeners: 2 - } - }, - connectionManager: { - maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER, - autoDial: false, - maxConnections: MAX_CONNECTIONS, - minConnections: MIN_CONNECTIONS, - keepMultipleConnections: true // Set true to get connections with multiple multiaddr - }, - ping: { - timeout: PING_TIMEOUT - } - }); - } catch (err: any) { - console.log('Could not initialize a libp2p node', err); - return; - } - - console.log('libp2p node created', this._node); - this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node); - - // Dial to the HOP enabled relay node - await this._dialRelay(); - - // Listen for change in stored multiaddrs - this._node.peerStore.addEventListener('change:multiaddrs', (evt) => { - assert(this._node); - const { peerId, multiaddrs } = evt.detail; - - // Log updated self multiaddrs - if (peerId.equals(this._node.peerId)) { - console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString())); - } else { - console.log('Updated peer node multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString())); - } - }); - - // Listen for change in peer protocols - this._node.peerStore.addEventListener('change:protocols', async (evt) => { - assert(this._node); - console.log('event change:protocols', evt); - await this._handleChangeProtocols(evt.detail); - }); - - // Listen for peers discovery - this._node.addEventListener('peer:discovery', (evt) => { - // console.log('event peer:discovery', evt); - this._handleDiscovery(evt.detail, maxRelayConnections); - }); - - // Listen for peers connection - this._node.addEventListener('peer:connect', async (evt) => { - console.log('event peer:connect', evt); - await this._handleConnect(evt.detail); - }); - - // Listen for peers disconnecting - // peer:disconnect event is trigerred when all connections to a peer close - // https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64 - this._node.addEventListener('peer:disconnect', (evt) => { - console.log('event peer:disconnect', evt); - this._handleDisconnect(evt.detail); - }); - - // Handle messages for the protocol - await this._node.handle(CHAT_PROTOCOL, async ({ stream, connection }) => { - this._handleStream(connection.remotePeer, stream); - }); - - // Listen for pubsub messages - this._node.pubsub.addEventListener('message', (evt) => { - this._handleMessage(evt.detail); - }); - } - - async close (): Promise { - assert(this._node); - - this._node.peerStore.removeEventListener('change:multiaddrs'); - this._node.removeEventListener('peer:discovery'); - this._node.removeEventListener('peer:connect'); - this._node.removeEventListener('peer:disconnect'); - this._node.peerStore.removeEventListener('change:multiaddrs'); - this._node.peerStore.removeEventListener('change:protocols'); - this._node.pubsub.removeEventListener('message'); - - await this._node.unhandle(CHAT_PROTOCOL); - const remotePeerIds = this._node.getPeers(); - remotePeerIds.forEach(remotePeerId => this._peerHeartbeatChecker?.stop(remotePeerId)); - const hangUpPromises = remotePeerIds.map(async peerId => this._node?.hangUp(peerId)); - await Promise.all(hangUpPromises); - } - - broadcastMessage (message: any): void { - for (const [, stream] of this._peerStreamMap) { - stream.push(message); - } - } - - async floodMessage (topic: string, msg: any): Promise { - assert(this._node); - await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg))); - } - - subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void { - this._messageHandlers.push(handler); - - const unsubscribe = () => { - this._messageHandlers = this._messageHandlers - .filter(registeredHandler => registeredHandler !== handler); - }; - - return unsubscribe; - } - - subscribeTopic (topic: string, handler: (peerId: PeerId, data: any) => void): () => void { - assert(this._node); - - // Subscribe node to the topic - this._node.pubsub.subscribe(topic); - - // Register provided handler for the topic - if (!this._topicHandlers.has(topic)) { - this._topicHandlers.set(topic, [handler]); - } else { - this._topicHandlers.get(topic)?.push(handler); - } - - // Create a unsubscribe callback - const unsubscribe = () => { - // Remove handler from registered handlers for the topic - const filteredTopicHandlers = this._topicHandlers.get(topic) - ?.filter(registeredHandler => registeredHandler !== handler); - - if (filteredTopicHandlers?.length) { - this._topicHandlers.set(topic, filteredTopicHandlers); - } else { - // Remove topic from map and unsubscribe node from the topic if no handlers left - this._topicHandlers.delete(topic); - this._node?.pubsub.unsubscribe(topic); - } - }; - - return unsubscribe; - } - - async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) { - assert(this._node); - - // Handle protocol and open stream from only one peer - if (this._node.peerId.toString() > peerId.toString()) { - return; - } - - // Return if stream is self peer or chat protocol is not handled by remote peer - if (peerId.equals(this._node.peerId) || !protocols.includes(CHAT_PROTOCOL)) { - return; - } - - const [connection] = this._node.getConnections(peerId); - - // Open stream if connection exists and it doesn't already have a stream with chat protocol - if (connection && !connection.streams.some(stream => stream.stat.protocol === CHAT_PROTOCOL)) { - const stream = await connection.newStream([CHAT_PROTOCOL]); - this._handleStream(peerId, stream); - } - } - - async _dialRelay (): Promise { - assert(this._node); - const relayMultiaddr = this._relayNodeMultiaddr; - console.log('Dialling relay node'); - - const connection = await dialWithRetry( - this._node, - relayMultiaddr, - { - redialDelay: RELAY_REDIAL_DELAY, - maxRetry: Infinity - } - ); - - const relayPeerId = connection.remotePeer; - - // Tag the relay node with a high value to prioritize it's connection - // in connection pruning on crossing peer's maxConnections limit - this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }).catch((err: Error) => { - // TODO: Check if tag already exists - // If awaited on the getTags / tagPeer method, relay node connect event is not triggered - // const peerTags = await this._node.peerStore.getTags(relayPeerId); - - // Ignore the error thrown on retagging a peer on reconnect - if (err.message === ERR_PEER_ALREADY_TAGGED) { - return; - } - - throw err; - }); - } - - _isRelayPeerMultiaddr (multiaddrString: string): boolean { - // Multiaddr not having p2p-circuit id or webrtc-star id is of a relay node - return !(multiaddrString.includes(P2P_CIRCUIT_ID) || multiaddrString.includes(P2P_WEBRTC_STAR_ID)); - } - - _handleDiscovery (peer: PeerInfo, maxRelayConnections: number): void { - // Check connected peers as they are discovered repeatedly. - if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { - let isRelayPeer = false; - for (const multiaddr of peer.multiaddrs) { - if (this._isRelayPeerMultiaddr(multiaddr.toString())) { - isRelayPeer = true; - break; - } - } - - // Check relay connections limit if it's a relay peer - if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) { - // console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`); - return; - } - - console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString())); - this._connectPeer(peer); - } - } - - async _handleConnect (connection: Connection): Promise { - assert(this._node); - const remotePeerId = connection.remotePeer; - const remoteAddrString = connection.remoteAddr.toString(); - - // Log connected peer - console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${remoteAddrString}`); - - if (this._isRelayPeerMultiaddr(remoteAddrString)) { - this._numRelayConnections++; - } - - // Manage connections and stream if peer id is smaller to break symmetry - if (this._node.peerId.toString() < remotePeerId.toString()) { - const remoteConnections = this._node.getConnections(remotePeerId); - - // Keep only one connection with a peer - if (remoteConnections.length > 1) { - // Close new connection if using relayed multiaddr - if (connection.remoteAddr.protoNames().includes(P2P_CIRCUIT_ID)) { - console.log('Closing new connection for already connected peer'); - await connection.close(); - console.log('Closed'); - - return; - } - - console.log('Closing exisiting connections for new webrtc connection'); - // Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection) - const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id) - .map(remoteConnection => remoteConnection.close()); - - await Promise.all(closeConnectionPromises); - console.log('Closed'); - } - - try { - // Open stream in new connection for chat protocol - const protocols = await this._node.peerStore.protoBook.get(remotePeerId); - - // Dial if protocol is handled by remote peer - // The chat protocol may not be updated in the list and will be handled later on change:protocols event - if (protocols.includes(CHAT_PROTOCOL)) { - const stream = await connection.newStream([CHAT_PROTOCOL]); - this._handleStream(remotePeerId, stream); - } - } catch (err: any) { - console.log(`Could not create a new protocol stream with ${remotePeerId.toString()}`, err); - } - } - - console.log(`Current number of peers connected: ${this._node.getPeers().length}`); - - // Start heartbeat check for peer - await this._peerHeartbeatChecker?.start( - remotePeerId, - async () => this._handleDeadConnections(remotePeerId) - ); - } - - async _handleDeadConnections (remotePeerId: PeerId) { - // Close existing connections of remote peer - console.log(`Closing connections for ${remotePeerId}`); - await this._node?.hangUp(remotePeerId); - console.log('Closed'); - } - - async _handleDisconnect (connection: Connection): Promise { - assert(this._node); - const disconnectedPeerId = connection.remotePeer; - const remoteAddrString = connection.remoteAddr.toString(); - - // Log disconnected peer - console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${remoteAddrString}`); - console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); - - if (this._isRelayPeerMultiaddr(remoteAddrString)) { - this._numRelayConnections--; - } - - // Stop connection check for disconnected peer - this._peerHeartbeatChecker?.stop(disconnectedPeerId); - - if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) { - // Reconnect to relay node if disconnected - await this._dialRelay(); - } - } - - async _connectPeer (peer: PeerInfo): Promise { - assert(this._node); - - // Dial them when we discover them - const peerIdString = peer.id.toString(); - - try { - console.log(`Dialling peer ${peerIdString}`); - // When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel - await this._node.dial(peer.id); - } catch (err: any) { - console.log(`Could not dial ${peerIdString}`, err); - } - } - - _handleStream (peerId: PeerId, stream: P2PStream): void { - // console.log('Stream after connection', stream); - const messageStream = pushable({ objectMode: true }); - - // Send message to pipe from stdin - pipe( - // Read from stream (the source) - messageStream, - // Turn objects into buffers - (source) => map(source, (value) => { - return uint8ArrayFromString(JSON.stringify(value)); - }), - // Encode with length prefix (so receiving side knows how much data is coming) - lp.encode(), - // Write to the stream (the sink) - stream.sink - ); - - // Handle message from stream - pipe( - // Read from the stream (the source) - stream.source, - // Decode length-prefixed data - lp.decode(), - // Turn buffers into objects - (source) => map(source, (buf) => { - return JSON.parse(uint8ArrayToString(buf.subarray())); - }), - // Sink function - async (source) => { - // For each chunk of data - for await (const msg of source) { - this._messageHandlers.forEach(messageHandler => messageHandler(peerId, msg)); - } - } - ); - - // TODO: Check if stream already exists for peer id - this._peerStreamMap.set(peerId.toString(), messageStream); - } - - _handleMessage (msg: Message): void { - // Messages should be signed since globalSignaturePolicy is set to 'StrictSign' - assert(msg.type === 'signed'); - - // Send msg data to registered topic handlers - this._topicHandlers.get(msg.topic)?.forEach(handler => { - const dataObj = JSON.parse(uint8ArrayToString(msg.data)); - handler(msg.from, dataObj); - }); - } -} - -export async function createPeerId (): Promise { - const peerId = await createEd25519PeerId(); - assert(peerId.privateKey); - - return { - id: peerId.toString(), - privKey: Buffer.from(peerId.privateKey).toString('base64'), - pubKey: Buffer.from(peerId.publicKey).toString('base64') - }; -} +export { Peer, PeerIdObj, createPeerId } from './peer.js'; +export { RelayNodeInit, createRelayNode } from './relay.js'; diff --git a/packages/peer/src/peer.ts b/packages/peer/src/peer.ts new file mode 100644 index 00000000..ae17d279 --- /dev/null +++ b/packages/peer/src/peer.ts @@ -0,0 +1,527 @@ +// +// Copyright 2023 Vulcanize, Inc. +// + +import { createLibp2p, Libp2p } from '@cerc-io/libp2p'; +// For nodejs. +import wrtc from 'wrtc'; +import assert from 'assert'; +import { Buffer } from 'buffer'; +import { pipe } from 'it-pipe'; +import * as lp from 'it-length-prefixed'; +import map from 'it-map'; +import { pushable, Pushable } from 'it-pushable'; +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'; +import { toString as uint8ArrayToString } from 'uint8arrays/to-string'; + +import { webRTCDirect, WebRTCDirectComponents, P2P_WEBRTC_STAR_ID, WebRTCDirectNodeType, WebRTCDirectInit } from '@cerc-io/webrtc-direct'; +import { noise } from '@chainsafe/libp2p-noise'; +import { mplex } from '@libp2p/mplex'; +import type { Transport } from '@libp2p/interface-transport'; +import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection'; +import type { PeerInfo } from '@libp2p/interface-peer-info'; +import type { Message } from '@libp2p/interface-pubsub'; +import type { PeerId } from '@libp2p/interface-peer-id'; +import { createFromJSON, createEd25519PeerId } from '@libp2p/peer-id-factory'; +import { multiaddr, Multiaddr } from '@multiformats/multiaddr'; +import { floodsub } from '@libp2p/floodsub'; +import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; + +import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT, DEFAULT_MAX_RELAY_CONNECTIONS } from './constants.js'; +import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; +import { dialWithRetry } from './utils/index.js'; + +const P2P_CIRCUIT_ID = 'p2p-circuit'; +export const CHAT_PROTOCOL = '/chat/1.0.0'; + +const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged'; + +export type PeerIdObj = { + id: string; + privKey: string; + pubKey: string; +}; + +export class Peer { + _node?: Libp2p + _peerHeartbeatChecker?: PeerHearbeatChecker + _wrtcTransport: (components: WebRTCDirectComponents) => Transport + _relayNodeMultiaddr: Multiaddr + _numRelayConnections = 0 + + _peerStreamMap: Map> = new Map() + _messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] + _topicHandlers: Map void>> = new Map() + + constructor (relayNodeURL: string, nodejs?: boolean) { + this._relayNodeMultiaddr = multiaddr(relayNodeURL); + + const relayPeerId = this._relayNodeMultiaddr.getPeerId(); + assert(relayPeerId); + + const initOptions: WebRTCDirectInit = { + wrtc: nodejs ? wrtc : undefined, // Instantiation in nodejs + enableSignalling: true, + nodeType: WebRTCDirectNodeType.Peer, + relayPeerId + }; + this._wrtcTransport = webRTCDirect(initOptions); + } + + get peerId (): PeerId | undefined { + return this._node?.peerId; + } + + get node (): Libp2p | undefined { + return this._node; + } + + get relayNodeMultiaddr (): Multiaddr { + return this._relayNodeMultiaddr; + } + + async init ( + peerIdObj?: PeerIdObj, + maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS + ): Promise { + try { + let peerId: PeerId | undefined; + if (peerIdObj) { + peerId = await createFromJSON(peerIdObj); + } + + this._node = await createLibp2p({ + peerId, + addresses: { + // Use existing protocol id in multiaddr to listen through signalling channel to relay node + // Allows direct webrtc connection to a peer if possible (eg. peers on a same network) + listen: [`${this._relayNodeMultiaddr.toString()}/${P2P_WEBRTC_STAR_ID}`] + }, + transports: [this._wrtcTransport], + connectionEncryption: [noise()], + streamMuxers: [mplex()], + pubsub: floodsub({ globalSignaturePolicy: PUBSUB_SIGNATURE_POLICY }), + peerDiscovery: [ + // Use pubsub based discovery; relay server acts as a peer discovery source + pubsubPeerDiscovery({ + interval: PUBSUB_DISCOVERY_INTERVAL + }) + ], + relay: { + enabled: true, + autoRelay: { + enabled: true, + maxListeners: 2 + } + }, + connectionManager: { + maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER, + autoDial: false, + maxConnections: MAX_CONNECTIONS, + minConnections: MIN_CONNECTIONS, + keepMultipleConnections: true // Set true to get connections with multiple multiaddr + }, + ping: { + timeout: PING_TIMEOUT + } + }); + } catch (err: any) { + console.log('Could not initialize a libp2p node', err); + return; + } + + console.log('libp2p node created', this._node); + this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node); + + // Dial to the HOP enabled relay node + await this._dialRelay(); + + // Listen for change in stored multiaddrs + this._node.peerStore.addEventListener('change:multiaddrs', (evt) => { + assert(this._node); + const { peerId, multiaddrs } = evt.detail; + + // Log updated self multiaddrs + if (peerId.equals(this._node.peerId)) { + console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString())); + } else { + console.log('Updated peer node multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString())); + } + }); + + // Listen for change in peer protocols + this._node.peerStore.addEventListener('change:protocols', async (evt) => { + assert(this._node); + console.log('event change:protocols', evt); + await this._handleChangeProtocols(evt.detail); + }); + + // Listen for peers discovery + this._node.addEventListener('peer:discovery', (evt) => { + // console.log('event peer:discovery', evt); + this._handleDiscovery(evt.detail, maxRelayConnections); + }); + + // Listen for peers connection + this._node.addEventListener('peer:connect', async (evt) => { + console.log('event peer:connect', evt); + await this._handleConnect(evt.detail, maxRelayConnections); + }); + + // Listen for peers disconnecting + // peer:disconnect event is trigerred when all connections to a peer close + // https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64 + this._node.addEventListener('peer:disconnect', (evt) => { + console.log('event peer:disconnect', evt); + this._handleDisconnect(evt.detail); + }); + + // Handle messages for the protocol + await this._node.handle(CHAT_PROTOCOL, async ({ stream, connection }) => { + this._handleStream(connection.remotePeer, stream); + }); + + // Listen for pubsub messages + this._node.pubsub.addEventListener('message', (evt) => { + this._handlePubSubMessage(evt.detail); + }); + } + + async close (): Promise { + assert(this._node); + + this._node.peerStore.removeEventListener('change:multiaddrs'); + this._node.removeEventListener('peer:discovery'); + this._node.removeEventListener('peer:connect'); + this._node.removeEventListener('peer:disconnect'); + this._node.peerStore.removeEventListener('change:multiaddrs'); + this._node.peerStore.removeEventListener('change:protocols'); + this._node.pubsub.removeEventListener('message'); + + await this._node.unhandle(CHAT_PROTOCOL); + const remotePeerIds = this._node.getPeers(); + remotePeerIds.forEach(remotePeerId => this._peerHeartbeatChecker?.stop(remotePeerId)); + const hangUpPromises = remotePeerIds.map(async peerId => this._node?.hangUp(peerId)); + await Promise.all(hangUpPromises); + } + + broadcastMessage (message: any): void { + for (const [, stream] of this._peerStreamMap) { + stream.push(message); + } + } + + async floodMessage (topic: string, msg: any): Promise { + assert(this._node); + await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg))); + } + + subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void { + this._messageHandlers.push(handler); + + const unsubscribe = () => { + this._messageHandlers = this._messageHandlers + .filter(registeredHandler => registeredHandler !== handler); + }; + + return unsubscribe; + } + + subscribeTopic (topic: string, handler: (peerId: PeerId, data: any) => void): () => void { + assert(this._node); + + // Subscribe node to the topic + this._node.pubsub.subscribe(topic); + + // Register provided handler for the topic + if (!this._topicHandlers.has(topic)) { + this._topicHandlers.set(topic, [handler]); + } else { + this._topicHandlers.get(topic)?.push(handler); + } + + // Create a unsubscribe callback + const unsubscribe = () => { + // Remove handler from registered handlers for the topic + const filteredTopicHandlers = this._topicHandlers.get(topic) + ?.filter(registeredHandler => registeredHandler !== handler); + + if (filteredTopicHandlers?.length) { + this._topicHandlers.set(topic, filteredTopicHandlers); + } else { + // Remove topic from map and unsubscribe node from the topic if no handlers left + this._topicHandlers.delete(topic); + this._node?.pubsub.unsubscribe(topic); + } + }; + + return unsubscribe; + } + + async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) { + assert(this._node); + + // Ignore self protocol changes + if (peerId.equals(this._node.peerId)) { + return; + } + + // Ignore if chat protocol is not handled by remote peer + if (!protocols.includes(CHAT_PROTOCOL)) { + return; + } + + // Handle protocol and open stream from only one side + if (this._node.peerId.toString() > peerId.toString()) { + return; + } + + const [connection] = this._node.getConnections(peerId); + + // Open stream if connection exists and it doesn't already have a stream with chat protocol + if (connection && !connection.streams.some(stream => stream.stat.protocol === CHAT_PROTOCOL)) { + await this._createProtocolStream(connection, CHAT_PROTOCOL); + } + } + + async _dialRelay (): Promise { + assert(this._node); + const relayMultiaddr = this._relayNodeMultiaddr; + console.log('Dialling relay node'); + + const connection = await dialWithRetry( + this._node, + relayMultiaddr, + { + redialDelay: RELAY_REDIAL_DELAY, + maxRetry: Infinity + } + ); + + const relayPeerId = connection.remotePeer; + + // Tag the relay node with a high value to prioritize it's connection + // in connection pruning on crossing peer's maxConnections limit + this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }).catch((err: Error) => { + // TODO: Check if tag already exists + // If awaited on the getTags / tagPeer method, relay node connect event is not triggered + // const peerTags = await this._node.peerStore.getTags(relayPeerId); + + // Ignore the error thrown on retagging a peer on reconnect + if (err.message === ERR_PEER_ALREADY_TAGGED) { + return; + } + + throw err; + }); + } + + _isRelayPeerMultiaddr (multiaddrString: string): boolean { + // Multiaddr not having p2p-circuit id or webrtc-star id is of a relay node + return !(multiaddrString.includes(P2P_CIRCUIT_ID) || multiaddrString.includes(P2P_WEBRTC_STAR_ID)); + } + + _handleDiscovery (peer: PeerInfo, maxRelayConnections: number): void { + // Check connected peers as they are discovered repeatedly. + if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { + let isRelayPeer = false; + for (const multiaddr of peer.multiaddrs) { + if (this._isRelayPeerMultiaddr(multiaddr.toString())) { + isRelayPeer = true; + break; + } + } + + // Check relay connections limit if it's a relay peer + if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) { + // console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`); + return; + } + + console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString())); + this._connectPeer(peer); + } + } + + async _handleConnect (connection: Connection, maxRelayConnections: number): Promise { + assert(this._node); + const remotePeerId = connection.remotePeer; + const remotePeerIdString = connection.remotePeer.toString(); + const remoteAddrString = connection.remoteAddr.toString(); + + // Log connected peer + console.log(`Connected to ${remotePeerIdString} using multiaddr ${remoteAddrString}`); + + if (this._isRelayPeerMultiaddr(remoteAddrString)) { + // Check if relay connections limit has already been reached + if (this._numRelayConnections >= maxRelayConnections) { + console.log(`Closing connection to relay ${remotePeerIdString} as max relay connections limit reached`); + await connection.close(); + return; + } + + this._numRelayConnections++; + } + + // Manage connections and streams + // Check if peer id is smaller to break symmetry + if (this._node.peerId.toString() < remotePeerIdString) { + const remoteConnections = this._node.getConnections(remotePeerId); + + // Keep only one connection with a peer + if (remoteConnections.length > 1) { + // Close new connection if using relayed multiaddr + if (connection.remoteAddr.protoNames().includes(P2P_CIRCUIT_ID)) { + console.log('Closing new relayed connection in favor of existing connection'); + await connection.close(); + console.log('Closed'); + + return; + } + + console.log('Closing exisiting connections in favor of new webrtc connection'); + // Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection) + const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id) + .map(remoteConnection => remoteConnection.close()); + + await Promise.all(closeConnectionPromises); + console.log('Closed'); + } + + // Open stream in new connection for chat protocol (if handled by remote peer) + const protocols = await this._node.peerStore.protoBook.get(remotePeerId); + + // The chat protocol may not be updated in the list and will be handled later on change:protocols event + if (protocols.includes(CHAT_PROTOCOL)) { + await this._createProtocolStream(connection, CHAT_PROTOCOL); + } + } + + console.log(`Current number of peers connected: ${this._node.getPeers().length}`); + + // Start heartbeat check for peer + await this._peerHeartbeatChecker?.start( + remotePeerId, + async () => this._handleDeadConnections(remotePeerId) + ); + } + + async _createProtocolStream (connection: Connection, protocol: string) { + assert(this._node); + const remotePeerId = connection.remotePeer; + + try { + const stream = await connection.newStream([protocol]); + this._handleStream(remotePeerId, stream); + } catch (err: any) { + console.log(`Could not create a new ${protocol} stream with ${remotePeerId.toString()}`, err); + } + } + + async _handleDeadConnections (remotePeerId: PeerId) { + // Close existing connections of remote peer + console.log(`Closing connections for ${remotePeerId}`); + await this._node?.hangUp(remotePeerId); + console.log('Closed'); + } + + async _handleDisconnect (connection: Connection): Promise { + assert(this._node); + const disconnectedPeerId = connection.remotePeer; + const remoteAddrString = connection.remoteAddr.toString(); + + // Log disconnected peer + console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${remoteAddrString}`); + console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); + + if (this._isRelayPeerMultiaddr(remoteAddrString)) { + this._numRelayConnections--; + } + + // Stop connection check for disconnected peer + this._peerHeartbeatChecker?.stop(disconnectedPeerId); + + if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) { + // Reconnect to relay node if disconnected + await this._dialRelay(); + } + } + + async _connectPeer (peer: PeerInfo): Promise { + assert(this._node); + + // Dial them when we discover them + const peerIdString = peer.id.toString(); + + try { + console.log(`Dialling peer ${peerIdString}`); + // When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel + await this._node.dial(peer.id); + } catch (err: any) { + console.log(`Could not dial ${peerIdString}`, err); + } + } + + _handleStream (peerId: PeerId, stream: P2PStream): void { + // console.log('Stream after connection', stream); + const messageStream = pushable({ objectMode: true }); + + // Send message to pipe from stdin + pipe( + // Read from stream (the source) + messageStream, + // Turn objects into buffers + (source) => map(source, (value) => { + return uint8ArrayFromString(JSON.stringify(value)); + }), + // Encode with length prefix (so receiving side knows how much data is coming) + lp.encode(), + // Write to the stream (the sink) + stream.sink + ); + + // Handle message from stream + pipe( + // Read from the stream (the source) + stream.source, + // Decode length-prefixed data + lp.decode(), + // Turn buffers into objects + (source) => map(source, (buf) => { + return JSON.parse(uint8ArrayToString(buf.subarray())); + }), + // Sink function + async (source) => { + // For each chunk of data + for await (const msg of source) { + this._messageHandlers.forEach(messageHandler => messageHandler(peerId, msg)); + } + } + ); + + // TODO: Check if stream already exists for peer id + this._peerStreamMap.set(peerId.toString(), messageStream); + } + + _handlePubSubMessage (msg: Message): void { + // Messages should be signed since globalSignaturePolicy is set to 'StrictSign' + assert(msg.type === 'signed'); + + // Send msg data to registered topic handlers + this._topicHandlers.get(msg.topic)?.forEach(handler => { + const dataObj = JSON.parse(uint8ArrayToString(msg.data)); + handler(msg.from, dataObj); + }); + } +} + +export async function createPeerId (): Promise { + const peerId = await createEd25519PeerId(); + assert(peerId.privateKey); + + return { + id: peerId.toString(), + privKey: Buffer.from(peerId.privateKey).toString('base64'), + pubKey: Buffer.from(peerId.publicKey).toString('base64') + }; +} diff --git a/packages/peer/src/relay.ts b/packages/peer/src/relay.ts index f86edc57..d355dc34 100644 --- a/packages/peer/src/relay.ts +++ b/packages/peer/src/relay.ts @@ -4,10 +4,6 @@ import { Libp2p, createLibp2p } from '@cerc-io/libp2p'; import wrtc from 'wrtc'; -import { hideBin } from 'yargs/helpers'; -import yargs from 'yargs'; -import fs from 'fs'; -import path from 'path'; import debug from 'debug'; import { noise } from '@chainsafe/libp2p-noise'; @@ -15,64 +11,38 @@ import { mplex } from '@libp2p/mplex'; import { WebRTCDirectNodeType, webRTCDirect } from '@cerc-io/webrtc-direct'; import { floodsub } from '@libp2p/floodsub'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; -import { createFromJSON } from '@libp2p/peer-id-factory'; import type { Connection } from '@libp2p/interface-connection'; import { multiaddr } from '@multiformats/multiaddr'; import type { PeerId } from '@libp2p/interface-peer-id'; +import { createFromJSON } from '@libp2p/peer-id-factory'; import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE, RELAY_REDIAL_DELAY } from './constants.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; import { dialWithRetry } from './utils/index.js'; +import { PeerIdObj } from './peer.js'; const log = debug('laconic:relay'); -const DEFAULT_HOST = '127.0.0.1'; -const DEFAULT_PORT = 9090; -const DEFAULT_MAX_DIAL_RETRY = 5; - -interface Arguments { +export interface RelayNodeInit { host: string; port: number; - announce?: string; - peerIdFile?: string; - relayPeers?: string; + announceDomain?: string; + relayPeers: string[]; maxDialRetry: number; + peerIdObj?: PeerIdObj; } -async function main (): Promise { - const argv: Arguments = _getArgv(); - let peerId: PeerId | undefined; - let relayPeersList: string[] = []; - - if (argv.peerIdFile) { - const peerIdFilePath = path.resolve(argv.peerIdFile); - console.log(`Reading peer id from file ${peerIdFilePath}`); - - const peerIdObj = fs.readFileSync(peerIdFilePath, 'utf-8'); - const peerIdJson = JSON.parse(peerIdObj); - peerId = await createFromJSON(peerIdJson); - } else { - console.log('Creating a new peer id'); - } - - if (argv.relayPeers) { - const relayPeersFilePath = path.resolve(argv.relayPeers); - - if (!fs.existsSync(relayPeersFilePath)) { - console.log(`File at given path ${relayPeersFilePath} not found, exiting`); - process.exit(); - } - - console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`); - const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8'); - relayPeersList = JSON.parse(relayPeersListObj); - } - - const listenMultiaddrs = [`/ip4/${argv.host}/tcp/${argv.port}/http/p2p-webrtc-direct`]; +export async function createRelayNode (init: RelayNodeInit): Promise { + const listenMultiaddrs = [`/ip4/${init.host}/tcp/${init.port}/http/p2p-webrtc-direct`]; const announceMultiaddrs = []; - if (argv.announce) { - announceMultiaddrs.push(`/dns4/${argv.announce}/tcp/443/https/p2p-webrtc-direct`); + if (init.announceDomain) { + announceMultiaddrs.push(`/dns4/${init.announceDomain}/tcp/443/https/p2p-webrtc-direct`); + } + + let peerId: PeerId | undefined; + if (init.peerIdObj) { + peerId = await createFromJSON(init.peerIdObj); } const node = await createLibp2p({ @@ -148,62 +118,24 @@ async function main (): Promise { peerHeartbeatChecker.stop(connection.remotePeer); // Redial if disconnected peer is in relayPeers list - if (relayPeersList.includes(remoteAddr.toString())) { + if (init.relayPeers.includes(remoteAddr.toString())) { await dialWithRetry( node, remoteAddr, { redialDelay: RELAY_REDIAL_DELAY, - maxRetry: argv.maxDialRetry + maxRetry: init.maxDialRetry } ).catch((error: Error) => console.log(error.message)); } }); - if (relayPeersList.length) { + if (init.relayPeers.length) { console.log('Dialling relay peers'); - await _dialRelayPeers(node, relayPeersList, argv.maxDialRetry); + await _dialRelayPeers(node, init.relayPeers, init.maxDialRetry); } -} -function _getArgv (): Arguments { - return yargs(hideBin(process.argv)).parserConfiguration({ - 'parse-numbers': false - }).options({ - host: { - type: 'string', - alias: 'h', - default: DEFAULT_HOST, - describe: 'Host to bind to' - }, - port: { - type: 'number', - alias: 'p', - default: DEFAULT_PORT, - describe: 'Port to start listening on' - }, - announce: { - type: 'string', - alias: 'a', - describe: 'Domain name to be used in the announce address' - }, - peerIdFile: { - type: 'string', - alias: 'f', - describe: 'Relay Peer Id file path (json)' - }, - relayPeers: { - type: 'string', - alias: 'r', - describe: 'Relay peer multiaddr(s) list file path (json)' - }, - maxDialRetry: { - type: 'number', - describe: 'Maximum number of retries for dialling a relay peer', - default: DEFAULT_MAX_DIAL_RETRY - } - // https://github.com/yargs/yargs/blob/main/docs/typescript.md?plain=1#L83 - }).parseSync(); + return node; } async function _dialRelayPeers (node: Libp2p, relayPeersList: string[], maxDialRetry: number): Promise { @@ -226,7 +158,3 @@ async function _handleDeadConnections (node: Libp2p, remotePeerId: PeerId): Prom await node.hangUp(remotePeerId); log('Closed'); } - -main().catch(err => { - console.log(err); -});