Use webrtc-direct protocol with peer signalling integrated into relay node (#307)

* Use webrtc-direct transport with pubsub based discovery

* Use changes to integrate signalling using relay node

* Add an indicator for peer connection type (direct/relayed)

* Catch errors while creting a libp2p node

* Catch errors while dialling peers

* Catch errors when dialling from relay node

* Pass CLI arg for port to listen on for relay node

* Subscribe chat CLI to pubsub topic

* Update yarn lockfile

* Update webrtc-direct dependency in package json

* Update webrtc-direct version

* Update yarn lockfile
This commit is contained in:
prathamesh0 2023-02-02 17:48:35 +05:30 committed by Ashwin Phatak
parent 453ee6a473
commit a1af962d99
6 changed files with 330 additions and 693 deletions

View File

@ -17,13 +17,6 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
yarn build
```
* (Optional) Run a local signalling server:
```bash
# In packages/peer
yarn signal-server
```
* (Optional) Create and export a peer id for the relay node:
```bash
@ -37,10 +30,10 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
```bash
# In packages/peer
yarn relay-node --signal-server [SIGNAL_SERVER_URL] --peer-id-file [PEER_ID_FILE_PATH] --relay-peers [RELAY_PEERS_FILE_PATH]
yarn relay-node --port [LISTEN_PORT] --peer-id-file [PEER_ID_FILE_PATH] --relay-peers [RELAY_PEERS_FILE_PATH]
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `port`: Port to start listening on (default: `9090`)
* `peer-id-file`: file path for peer id to be used (json)
* `relay-peers`: file path for relay peer multiaddr(s) to dial on startup (json)
@ -48,10 +41,9 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
```bash
# In packages/cli
yarn chat --signal-server [SIGNAL_SERVER_URL] --relay-node [RELAY_NODE_URL]
yarn chat --relay-node <RELAY_NODE_URL>
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `relay-node`: multiaddr of a primary hop enabled relay node
* The process starts reading from `stdin` and outputs messages from others peers to `stdout`.

View File

@ -9,26 +9,27 @@ import yargs from 'yargs';
// @ts-expect-error https://github.com/microsoft/TypeScript/issues/49721#issuecomment-1319854183
import { PeerId } from '@libp2p/interface-peer-id';
const TEST_TOPIC = 'test';
interface Arguments {
signalServer: string;
relayNode: string;
}
async function main (): Promise<void> {
const argv: Arguments = _getArgv();
if (!argv.signalServer) {
console.log('Using the default signalling server URL');
}
// https://adamcoster.com/blog/commonjs-and-esm-importexport-compatibility-examples#importing-esm-into-commonjs-cjs
const { Peer } = await import('@cerc-io/peer');
const peer = new Peer(true);
await peer.init(argv.signalServer, argv.relayNode);
const peer = new Peer(argv.relayNode, true);
await peer.init();
peer.subscribeMessage((peerId: PeerId, message: string) => {
console.log(`> ${peerId.toString()} > ${message}`);
});
peer.subscribeTopic(TEST_TOPIC, (peerId, data) => {
console.log(`> ${peerId.toString()} > ${data}`);
});
console.log(`Peer ID: ${peer.peerId?.toString()}`);
const rl = readline.createInterface({
@ -47,13 +48,10 @@ function _getArgv (): any {
return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
signalServer: {
type: 'string',
describe: 'Signalling server URL'
},
relayNode: {
type: 'string',
describe: 'Relay node URL'
describe: 'Relay node URL',
demandOption: true
}
}).argv;
}

View File

@ -22,7 +22,6 @@
"build": "tsc",
"lint": "eslint .",
"dev": "node dist/index.js",
"signal-server": "webrtc-star --port=13579 --host=0.0.0.0",
"create-peer": "node dist/create-peer.js",
"relay-node": "node dist/relay.js"
},
@ -33,7 +32,7 @@
"@libp2p/mplex": "^7.1.1",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/pubsub-peer-discovery": "^8.0.0",
"@libp2p/webrtc-star": "^5.0.3",
"@cerc-io/webrtc-direct": "^5.0.0-laconic-0.1.1",
"@multiformats/multiaddr": "^11.1.4",
"debug": "^4.3.1",
"it-length-prefixed": "^8.0.4",
@ -41,12 +40,12 @@
"it-pipe": "^2.0.5",
"it-pushable": "^3.1.2",
"node-pre-gyp": "^0.13.0",
"p-event": "^5.0.1",
"uint8arrays": "^4.0.3",
"wrtc": "^0.4.7",
"yargs": "^17.0.1"
},
"devDependencies": {
"@libp2p/webrtc-star-signalling-server": "^3.0.0",
"@types/node": "16.11.7",
"@typescript-eslint/eslint-plugin": "^5.47.1",
"@typescript-eslint/parser": "^5.47.1",

View File

@ -13,9 +13,10 @@ import { pushable, Pushable } from 'it-pushable';
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star';
import { webRTCDirect, WebRTCDirectComponents, P2P_WEBRTC_STAR_ID } from '@cerc-io/webrtc-direct';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import type { Transport } from '@libp2p/interface-transport';
import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection';
import type { PeerInfo } from '@libp2p/interface-peer-info';
import type { Message } from '@libp2p/interface-pubsub';
@ -27,26 +28,30 @@ import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL, PING_TIMEOUT } from './constants.js';
export const CHAT_PROTOCOL = '/chat/1.0.0';
export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
export const ERR_PROTOCOL_SELECTION = 'protocol selection failed';
export class Peer {
_node?: Libp2p
_wrtcStar: WebRTCStarTuple
_relayNodeMultiaddr?: Multiaddr
_wrtcTransport: (components: WebRTCDirectComponents) => Transport
_relayNodeMultiaddr: Multiaddr
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
_peerHeartbeatIntervalIdsMap: Map<string, NodeJS.Timer> = new Map();
constructor (nodejs?: boolean) {
constructor (relayNodeURL: string, nodejs?: boolean) {
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
const relayPeerId = this._relayNodeMultiaddr.getPeerId();
assert(relayPeerId);
// Instantiation in nodejs.
if (nodejs) {
this._wrtcStar = webRTCStar({ wrtc });
this._wrtcTransport = webRTCDirect({ wrtc, relayPeerId, enableSignalling: true });
} else {
this._wrtcStar = webRTCStar();
this._wrtcTransport = webRTCDirect({ relayPeerId, enableSignalling: true });
}
}
@ -58,39 +63,24 @@ export class Peer {
return this._node;
}
async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL, relayNodeURL?: string): Promise<void> {
let peerDiscovery: any;
if (relayNodeURL) {
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
peerDiscovery = [
pubsubPeerDiscovery({
interval: PUBSUB_DISCOVERY_INTERVAL
})
];
} else {
peerDiscovery = [this._wrtcStar.discovery];
}
async init (): Promise<void> {
try {
this._node = await createLibp2p({
addresses: {
// Add the signaling server address, along with our PeerId to our multiaddrs list
// libp2p will automatically attempt to dial to the signaling server so that it can
// receive inbound connections from other peers
listen: [
// Public signal servers
// '/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star',
// '/dns4/wrtc-star2.sjc.dwebops.pub/tcp/443/wss/p2p-webrtc-star'
signalServerURL
]
// Use existing protocol id in multiaddr to listen through signalling channel to relay node
// Allows direct webrtc connection to a peer if possible (eg. peers on a same network)
listen: [`${this._relayNodeMultiaddr.toString()}/${P2P_WEBRTC_STAR_ID}`]
},
transports: [
this._wrtcStar.transport
],
transports: [this._wrtcTransport],
connectionEncryption: [noise()],
streamMuxers: [mplex()],
pubsub: floodsub({ globalSignaturePolicy: PUBSUB_SIGNATURE_POLICY }),
peerDiscovery,
peerDiscovery: [
// Use pubsub based discovery; relay server acts as a peer discovery source
pubsubPeerDiscovery({
interval: PUBSUB_DISCOVERY_INTERVAL
})
],
relay: {
enabled: true,
autoRelay: {
@ -99,7 +89,7 @@ export class Peer {
}
},
connectionManager: {
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER, // Number of max concurrent dials per peer
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER,
autoDial: false,
maxConnections: MAX_CONNECTIONS,
minConnections: MIN_CONNECTIONS,
@ -109,13 +99,15 @@ export class Peer {
timeout: PING_TIMEOUT
}
});
} catch (err: any) {
console.log('Could not initialize a libp2p node', err);
return;
}
console.log('libp2p node created', this._node);
// Dial to the HOP enabled relay node if available
if (this._relayNodeMultiaddr) {
// Dial to the HOP enabled relay node
await this._dialRelay();
}
// Listen for change in stored multiaddrs
this._node.peerStore.addEventListener('change:multiaddrs', (evt) => {
@ -169,6 +161,7 @@ export class Peer {
async close (): Promise<void> {
assert(this._node);
this._node.peerStore.removeEventListener('change:multiaddrs');
this._node.removeEventListener('peer:discovery');
this._node.removeEventListener('peer:connect');
this._node.removeEventListener('peer:disconnect');
@ -259,7 +252,6 @@ export class Peer {
}
async _dialRelay (): Promise<void> {
assert(this._relayNodeMultiaddr);
assert(this._node);
const relayMultiaddr = this._relayNodeMultiaddr;
@ -325,6 +317,7 @@ export class Peer {
console.log('Closed');
}
try {
// Open stream in new connection for chat protocol
const protocols = await this._node.peerStore.protoBook.get(remotePeerId);
@ -334,6 +327,9 @@ export class Peer {
const stream = await connection.newStream([CHAT_PROTOCOL]);
this._handleStream(remotePeerId, stream);
}
} catch (err: any) {
console.log(`Could not create a new protocol stream with ${remotePeerId.toString()}`, err);
}
}
console.log(`Current number of peers connected: ${this._node.getPeers().length}`);
@ -429,9 +425,14 @@ export class Peer {
// Dial them when we discover them
const peerIdString = peer.id.toString();
try {
console.log(`Dialling peer ${peerIdString}`);
// When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel
await this._node.dial(peer.id);
} catch (err: any) {
console.log(`Could not dial ${peerIdString}`, err);
}
}
_handleStream (peerId: PeerId, stream: P2PStream): void {

View File

@ -12,29 +12,28 @@ import debug from 'debug';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star';
import { webRTCDirect } from '@cerc-io/webrtc-direct';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { createFromJSON } from '@libp2p/peer-id-factory';
import type { Connection } from '@libp2p/interface-connection';
import { DEFAULT_SIGNAL_SERVER_URL } from './index.js';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY } from './constants.js';
import { multiaddr } from '@multiformats/multiaddr';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY } from './constants.js';
const log = debug('laconic:relay');
const DEFAULT_HOST = '0.0.0.0';
const DEFAULT_PORT = 9090;
interface Arguments {
signalServer: string;
port: number;
peerIdFile: string;
relayPeers: string;
}
async function main (): Promise<void> {
const argv: Arguments = _getArgv();
if (!argv.signalServer) {
console.log('Using the default signalling server URL');
}
let peerId: any;
if (argv.peerIdFile) {
@ -48,17 +47,15 @@ async function main (): Promise<void> {
console.log('Creating a new peer id');
}
const wrtcStar: WebRTCStarTuple = webRTCStar({ wrtc });
const listenPort = argv.port ? argv.port : DEFAULT_PORT;
const listenMultiaddr = `/ip4/${DEFAULT_HOST}/tcp/${listenPort}/http/p2p-webrtc-direct`;
const node = await createLibp2p({
peerId,
addresses: {
listen: [
argv.signalServer || DEFAULT_SIGNAL_SERVER_URL
]
listen: [listenMultiaddr]
},
transports: [
wrtcStar.transport
],
transports: [webRTCDirect({ wrtc, enableSignalling: true })],
connectionEncryption: [noise()],
streamMuxers: [mplex()],
pubsub: floodsub({ globalSignaturePolicy: PUBSUB_SIGNATURE_POLICY }),
@ -123,9 +120,9 @@ function _getArgv (): any {
return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
signalServer: {
type: 'string',
describe: 'Signalling server URL'
port: {
type: 'number',
describe: 'Port to start listening on'
},
peerIdFile: {
type: 'string',
@ -141,9 +138,14 @@ function _getArgv (): any {
async function _dialRelayPeers (node: Libp2p, relayPeersList: string[]): Promise<void> {
relayPeersList.forEach(async (relayPeer) => {
const relayMultiaddr = multiaddr(relayPeer);
const peerIdString = relayMultiaddr.getPeerId()?.toString();
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
try {
console.log(`Dialling relay node ${peerIdString} using multiaddr ${relayMultiaddr.toString()}`);
await node.dial(relayMultiaddr);
} catch (err: any) {
console.log(`Could not dial ${peerIdString}`, err);
}
});
}

799
yarn.lock

File diff suppressed because it is too large Load Diff