// // Copyright 2022 Vulcanize, Inc. // import { createLibp2p, Libp2p } from 'libp2p'; // For nodejs. import wrtc from 'wrtc'; import assert from 'assert'; import { pipe } from 'it-pipe'; import * as lp from 'it-length-prefixed'; import map from 'it-map'; import { pushable, Pushable } from 'it-pushable'; import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'; import { toString as uint8ArrayToString } from 'uint8arrays/to-string'; import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star'; import { noise } from '@chainsafe/libp2p-noise'; import { mplex } from '@libp2p/mplex'; import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection'; import type { PeerInfo } from '@libp2p/interface-peer-info'; import { PeerId } from '@libp2p/interface-peer-id'; import { multiaddr, Multiaddr } from '@multiformats/multiaddr'; import { bootstrap } from '@libp2p/bootstrap'; export const PROTOCOL = '/chat/1.0.0'; export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'; export class Peer { _node?: Libp2p _wrtcStar: WebRTCStarTuple _relayNodeMultiaddr?: Multiaddr _remotePeerIds: PeerId[] = [] _peerStreamMap: Map> = new Map() _messageHandlers: Array<(peerId: PeerId, message: string) => void> = [] constructor (nodejs?: boolean) { // Instantiation in nodejs. if (nodejs) { this._wrtcStar = webRTCStar({ wrtc }); } else { this._wrtcStar = webRTCStar(); } } get peerId (): PeerId | undefined { return this._node?.peerId; } async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL, relayNodeURL?: string): Promise { let peerDiscovery: any; if (relayNodeURL) { console.log('bootstrapping relay node'); this._relayNodeMultiaddr = multiaddr(relayNodeURL); peerDiscovery = [ bootstrap({ list: [this._relayNodeMultiaddr.toString()] }), this._wrtcStar.discovery ]; } else { peerDiscovery = [this._wrtcStar.discovery]; } this._node = await createLibp2p({ addresses: { // Add the signaling server address, along with our PeerId to our multiaddrs list // libp2p will automatically attempt to dial to the signaling server so that it can // receive inbound connections from other peers listen: [ // Public signal servers // '/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star', // '/dns4/wrtc-star2.sjc.dwebops.pub/tcp/443/wss/p2p-webrtc-star' signalServerURL ] }, transports: [ this._wrtcStar.transport ], connectionEncryption: [noise()], streamMuxers: [mplex()], peerDiscovery: peerDiscovery, relay: { enabled: true, autoRelay: { enabled: true, maxListeners: 2 } } }); console.log('libp2p node created', this._node); // Wait for connection and relay to be bind this._node.peerStore.addEventListener('change:multiaddrs', (evt) => { assert(this._node); const { peerId, multiaddrs } = evt.detail; // Log updated self multiaddrs if (peerId.equals(this._node.peerId)) { console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString())); } else { console.log('Updated other node\'s multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString())); } }); // Listen for peers discovery this._node.addEventListener('peer:discovery', (evt) => { console.log('event peer:discovery', evt); this._handleDiscovery(evt.detail); }); // Listen for peers connection this._node.connectionManager.addEventListener('peer:connect', (evt) => { console.log('event peer:connect', evt); this._handleConnect(evt.detail); }); // Listen for peers disconnecting this._node.connectionManager.addEventListener('peer:disconnect', (evt) => { console.log('event peer:disconnect', evt); this._handleDisconnect(evt.detail); }); // Handle messages for the protocol await this._node.handle(PROTOCOL, async ({ stream, connection }) => { this._handleStream(connection.remotePeer, stream); }); } async close (): Promise { assert(this._node); this._node.removeEventListener('peer:discovery'); this._node.connectionManager.removeEventListener('peer:connect'); this._node.connectionManager.removeEventListener('peer:disconnect'); await this._node.unhandle(PROTOCOL); const hangUpPromises = this._remotePeerIds.map(async peerId => this._node?.hangUp(peerId)); await Promise.all(hangUpPromises); } broadcastMessage (message: string): void { for (const [, stream] of this._peerStreamMap) { stream.push(message); } } subscribeMessage (handler: (peerId: PeerId, message: string) => void) : () => void { this._messageHandlers.push(handler); const unsubscribe = () => { this._messageHandlers = this._messageHandlers .filter(registeredHandler => registeredHandler !== handler); }; return unsubscribe; } _handleDiscovery (peer: PeerInfo): void { console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString())); // Check connected peers as they are discovered repeatedly. if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { this._connectPeer(peer); } } _handleConnect (connection: Connection): void { const remotePeerId = connection.remotePeer; this._remotePeerIds.push(remotePeerId); // Log connected peer console.log('Connected to %s', remotePeerId.toString()); } _handleDisconnect (connection: Connection): void { const disconnectedPeerId = connection.remotePeer; this._remotePeerIds = this._remotePeerIds.filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString()); // Log disconnected peer console.log(`Disconnected from ${disconnectedPeerId.toString()}`); } async _connectPeer (peer: PeerInfo): Promise { assert(this._node); console.log(`Dialling peer ${peer.id.toString()}`); // Dial them when we discover them if (this._relayNodeMultiaddr) { const relayNodePeerId = this._relayNodeMultiaddr.getPeerId(); if (relayNodePeerId && relayNodePeerId === peer.id.toString()) { await this._node.dial(this._relayNodeMultiaddr); return; } } const stream = await this._node.dialProtocol(peer.id, PROTOCOL).catch(err => { console.log(`Could not dial ${peer.id}`, err); }); if (stream) { this._handleStream(peer.id, stream); } } _handleStream (peerId: PeerId, stream: P2PStream): void { console.log('Stream after connection', stream); const messageStream = pushable({ objectMode: true }); // Send message to pipe from stdin pipe( // Read from stream (the source) messageStream, // Turn strings into buffers (source) => map(source, (string) => uint8ArrayFromString(string)), // Encode with length prefix (so receiving side knows how much data is coming) lp.encode(), // Write to the stream (the sink) stream.sink ); // Handle message from stream pipe( // Read from the stream (the source) stream.source, // Decode length-prefixed data lp.decode(), // Turn buffers into strings (source) => map(source, (buf) => uint8ArrayToString(buf.subarray())), // Sink function async (source) => { // For each chunk of data for await (const msg of source) { this._messageHandlers.forEach(messageHandler => messageHandler(peerId, msg.toString())); } } ); this._peerStreamMap.set(peerId.toString(), messageStream); } }