Setup a relay node and pubsub based discovery (#284)

* Add a script to setup a relay node

* Use pubsub based peer discovery

* Add peer multiaddr to connection log
This commit is contained in:
prathamesh0 2023-01-03 15:41:24 +05:30 committed by GitHub
parent fed3ba9c90
commit 84539dffce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 275 additions and 33 deletions

View File

@ -2,7 +2,7 @@
## chat
A basic CLI to pass messages between peers using stdin/stdout
A basic CLI to pass messages between peers using `stdin`/`stdout`
* Install dependencies:
@ -10,27 +10,37 @@ A basic CLI to pass messages between peers using stdin/stdout
yarn install
```
* Build the peer package:
* Build the `peer` package:
```
cd packages/peer
yarn build
```
* Run a local signalling server (skip if an already running signalling server is available):
* (Optional) Run a local signalling server:
```bash
# In packages/peer
yarn signal-server
```
* (Optional) Run a local relay node:
```bash
# In packages/peer
yarn relay-node --signal-server [SIGNAL_SERVER_URL]
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* Start the node:
```bash
# In packages/cli
yarn chat --signalServer [SIGNAL_SERVER_URL]
yarn chat --signal-server [SIGNAL_SERVER_URL] --relay-node [RELAY_NODE_URL]
```
* `signalServer`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `relay-node`: multiaddr of a hop enabled relay node
* The process starts reading from `stdin` and outputs messages from others peers to `stdout`.

View File

@ -1,3 +1,7 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import * as readline from 'readline';
import { hideBin } from 'yargs/helpers';
import yargs from 'yargs';
@ -7,18 +11,19 @@ import { PeerId } from '@libp2p/interface-peer-id';
interface Arguments {
signalServer: string;
relayNode: string;
}
async function main (): Promise<void> {
const argv: Arguments = _getArgv();
if (!argv.signalServer) {
console.log('Using default signalling server URL');
console.log('Using the default signalling server URL');
}
// https://adamcoster.com/blog/commonjs-and-esm-importexport-compatibility-examples#importing-esm-into-commonjs-cjs
const { Peer } = await import('@cerc-io/peer');
const peer = new Peer(true);
await peer.init(argv.signalServer);
await peer.init(argv.signalServer, argv.relayNode);
peer.subscribeMessage((peerId: PeerId, message: string) => {
console.log(`> ${peerId.toString()} > ${message}`);
@ -45,6 +50,10 @@ function _getArgv (): any {
signalServer: {
type: 'string',
describe: 'Signalling server URL'
},
relayNode: {
type: 'string',
describe: 'Relay node URL'
}
}).argv;
}

View File

@ -1 +1,2 @@
REACT_APP_SIGNAL_SERVER=/ip4/127.0.0.1/tcp/13579/ws/p2p-webrtc-star/
REACT_APP_RELAY_NODE=

View File

@ -4,13 +4,13 @@ This project was bootstrapped with [Create React App](https://github.com/faceboo
## Instructions
* Install dependencies
* Install dependencies:
```bash
yarn install
```
* Build the peer package
* Build the peer package:
```bash
# From repo root
@ -19,14 +19,30 @@ This project was bootstrapped with [Create React App](https://github.com/faceboo
yarn build
```
* Run the signalling server
* (Optional) Run a local signalling server:
```bash
# In packages/peer
yarn signal-server
```
* Start the react app in development mode
* (Optional) Run a local relay node:
```bash
# In packages/peer
yarn relay-node --signal-server [SIGNAL_SERVER_URL]
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* Set the signalling server and relay node multiaddrs in the [env](./.env) file:
```
REACT_APP_SIGNAL_SERVER=/ip4/127.0.0.1/tcp/13579/ws/p2p-webrtc-star/
REACT_APP_RELAY_NODE=/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star/p2p/12D3KooWRzH3ZRFP6RDbs2EKA8jSrD4Y6VYtLnCRMj3mYCiMHCJP
```
* Start the react app in development mode:
```bash
# In packages/peer-test-app

View File

@ -10,17 +10,17 @@ declare global {
function App() {
const [peer, setPeer] = useState<Peer>()
useEffect(() => {
(async () => {
if (peer) {
await peer.init(process.env.REACT_APP_SIGNAL_SERVER)
await peer.init(process.env.REACT_APP_SIGNAL_SERVER, process.env.REACT_APP_RELAY_NODE)
console.log(`Peer ID is ${peer.peerId!.toString()}`);
peer.subscribeMessage((peerId, message) => {
console.log(`${peerId.toString()} > ${message}`)
})
window.broadcast = (message: string) => {
peer.broadcastMessage(message)
}

View File

@ -16,5 +16,6 @@
"@typescript-eslint"
],
"rules": {
"@typescript-eslint/no-explicit-any": "off"
}
}

View File

@ -22,14 +22,18 @@
"build": "tsc",
"lint": "eslint .",
"dev": "node dist/index.js",
"signal-server": "webrtc-star --port=13579 --host=0.0.0.0"
"signal-server": "webrtc-star --port=13579 --host=0.0.0.0",
"relay-node": "node dist/relay.js"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^10.2.0",
"@libp2p/bootstrap": "^5.0.2",
"@libp2p/floodsub": "^5.0.0",
"@libp2p/mplex": "^7.1.1",
"@libp2p/pubsub-peer-discovery": "^7.0.1",
"@libp2p/webrtc-star": "^5.0.3",
"@libp2p/websockets": "^5.0.2",
"@multiformats/multiaddr": "^11.1.4",
"assert": "^2.0.0",
"it-map": "^2.0.0",
"it-pipe": "^2.0.5",

View File

@ -19,13 +19,18 @@ import { mplex } from '@libp2p/mplex';
import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection';
import type { PeerInfo } from '@libp2p/interface-peer-info';
import { PeerId } from '@libp2p/interface-peer-id';
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { bootstrap } from '@libp2p/bootstrap';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
const PROTOCOL = '/chat/1.0.0';
const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
export const PROTOCOL = '/chat/1.0.0';
export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
export class Peer {
_node?: Libp2p
_wrtcStar: WebRTCStarTuple
_relayNodeMultiaddr?: Multiaddr
_remotePeerIds: PeerId[] = []
_peerStreamMap: Map<string, Pushable<string>> = new Map()
@ -44,7 +49,23 @@ export class Peer {
return this._node?.peerId;
}
async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL): Promise<void> {
async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL, relayNodeURL?: string): Promise<void> {
let peerDiscovery: any;
if (relayNodeURL) {
console.log('Bootstrapping relay node');
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
peerDiscovery = [
bootstrap({
list: [this._relayNodeMultiaddr.toString()]
}),
pubsubPeerDiscovery({
interval: 1000
})
];
} else {
peerDiscovery = [this._wrtcStar.discovery];
}
this._node = await createLibp2p({
addresses: {
// Add the signaling server address, along with our PeerId to our multiaddrs list
@ -62,13 +83,32 @@ export class Peer {
],
connectionEncryption: [noise()],
streamMuxers: [mplex()],
peerDiscovery: [
this._wrtcStar.discovery
]
pubsub: floodsub(),
peerDiscovery,
relay: {
enabled: true,
autoRelay: {
enabled: true,
maxListeners: 2
}
}
});
console.log('libp2p node created', this._node);
// Listen for change in stored multiaddrs
this._node.peerStore.addEventListener('change:multiaddrs', (evt) => {
assert(this._node);
const { peerId, multiaddrs } = evt.detail;
// Log updated self multiaddrs
if (peerId.equals(this._node.peerId)) {
console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString()));
} else {
console.log('Updated other node\'s multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString()));
}
});
// Listen for peers discovery
this._node.addEventListener('peer:discovery', (evt) => {
console.log('event peer:discovery', evt);
@ -123,10 +163,9 @@ export class Peer {
}
_handleDiscovery (peer: PeerInfo): void {
console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString()));
// Check connected peers as they are discovered repeatedly.
if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer);
}
}
@ -136,7 +175,7 @@ export class Peer {
this._remotePeerIds.push(remotePeerId);
// Log connected peer
console.log('Connected to %s', remotePeerId.toString());
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
}
_handleDisconnect (connection: Connection): void {
@ -151,10 +190,27 @@ export class Peer {
assert(this._node);
console.log(`Dialling peer ${peer.id.toString()}`);
// Dial them when we discover them
const stream = await this._node.dialProtocol(peer.id, PROTOCOL);
// Check if discovered the relay node
if (this._relayNodeMultiaddr) {
const relayNodePeerId = this._relayNodeMultiaddr.getPeerId();
if (relayNodePeerId && relayNodePeerId === peer.id.toString()) {
await this._node.dial(this._relayNodeMultiaddr);
return;
}
}
this._handleStream(peer.id, stream);
// Dial them when we discover them
// Attempt to dial all the multiaddrs of the discovered peer (to connect through relay)
for (const peerMultiaddr of peer.multiaddrs) {
const stream = await this._node.dialProtocol(peerMultiaddr, PROTOCOL).catch(err => {
console.log(`Could not dial ${peerMultiaddr.toString()}`, err);
});
if (stream) {
this._handleStream(peer.id, stream);
break;
}
}
}
_handleStream (peerId: PeerId, stream: P2PStream): void {
@ -190,6 +246,7 @@ export class Peer {
}
);
// TODO: Check if stream already exists for peer id
this._peerStreamMap.set(peerId.toString(), messageStream);
}
}

View File

@ -0,0 +1,79 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import { createLibp2p } from 'libp2p';
import wrtc from 'wrtc';
import { hideBin } from 'yargs/helpers';
import yargs from 'yargs';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { DEFAULT_SIGNAL_SERVER_URL } from './index.js';
interface Arguments {
signalServer: string;
}
async function main (): Promise<void> {
const argv: Arguments = _getArgv();
if (!argv.signalServer) {
console.log('Using the default signalling server URL');
}
const wrtcStar: WebRTCStarTuple = webRTCStar({ wrtc });
const node = await createLibp2p({
addresses: {
listen: [
argv.signalServer || DEFAULT_SIGNAL_SERVER_URL
]
},
transports: [
wrtcStar.transport
],
connectionEncryption: [noise()],
streamMuxers: [mplex()],
pubsub: floodsub(),
peerDiscovery: [
pubsubPeerDiscovery({
interval: 1000
})
],
relay: {
enabled: true,
hop: {
enabled: true
},
advertise: {
enabled: true
}
}
});
console.log(`Relay node started with id ${node.peerId.toString()}`);
console.log('Listening on:');
node.getMultiaddrs().forEach((ma) => console.log(ma.toString()));
}
function _getArgv (): any {
return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
signalServer: {
type: 'string',
describe: 'Signalling server URL'
},
relayNode: {
type: 'string',
describe: 'Relay node URL'
}
}).argv;
}
main().catch(err => {
console.log(err);
});

View File

@ -3994,6 +3994,19 @@
protons-runtime "^4.0.1"
uint8arrays "^4.0.2"
"@libp2p/floodsub@^5.0.0":
version "5.0.0"
resolved "https://registry.yarnpkg.com/@libp2p/floodsub/-/floodsub-5.0.0.tgz#4aeb10c89cae6f04d9244ad8cb094494222eb61f"
integrity sha512-B39UW/AWgfVVUl2yJDardmL2kKo1Zd4E+11/rkyjnjbygh944DTLcp3B2gSarqRlyN+x4ChUTKiN75UGajOaog==
dependencies:
"@libp2p/interface-peer-id" "^1.0.2"
"@libp2p/interface-pubsub" "^3.0.0"
"@libp2p/logger" "^2.0.0"
"@libp2p/pubsub" "^5.0.0"
protons-runtime "^4.0.1"
uint8arraylist "^2.1.1"
uint8arrays "^4.0.2"
"@libp2p/interface-address-manager@^2.0.0":
version "2.0.3"
resolved "https://registry.npmjs.org/@libp2p/interface-address-manager/-/interface-address-manager-2.0.3.tgz"
@ -4072,14 +4085,14 @@
"@libp2p/interface-peer-info" "^1.0.0"
"@libp2p/interfaces" "^3.0.0"
"@libp2p/interface-peer-id@^1.0.0", "@libp2p/interface-peer-id@^1.0.2", "@libp2p/interface-peer-id@^1.0.4", "@libp2p/interface-peer-id@^1.1.2":
"@libp2p/interface-peer-id@^1.0.0", "@libp2p/interface-peer-id@^1.0.2", "@libp2p/interface-peer-id@^1.0.4", "@libp2p/interface-peer-id@^1.0.5", "@libp2p/interface-peer-id@^1.1.2":
version "1.1.2"
resolved "https://registry.yarnpkg.com/@libp2p/interface-peer-id/-/interface-peer-id-1.1.2.tgz#22cbfb4707949cd49c3271a871172221d6920049"
integrity sha512-S5iyVzG2EUgxm4NLe8W4ya9kpKuGfHs7Wbbos0wOUB4GXsbIKgOOxIr4yf+xGFgtEBaoximvlLkpob6dn8VFgA==
dependencies:
multiformats "^10.0.0"
"@libp2p/interface-peer-info@^1.0.0", "@libp2p/interface-peer-info@^1.0.3":
"@libp2p/interface-peer-info@^1.0.0", "@libp2p/interface-peer-info@^1.0.2", "@libp2p/interface-peer-info@^1.0.3":
version "1.0.6"
resolved "https://registry.npmjs.org/@libp2p/interface-peer-info/-/interface-peer-info-1.0.6.tgz"
integrity sha512-oi+wV3cLWQCD110rsPhhr0yAV2uifZ3eU7Hvy55TVcKYjV8QdGhWMljJEG/x9KoeMa+elL6Nm6Px0oeD0KA1cw==
@ -4126,7 +4139,7 @@
"@libp2p/interface-peer-id" "^1.0.0"
uint8arraylist "^2.1.2"
"@libp2p/interface-registrar@^2.0.3":
"@libp2p/interface-registrar@^2.0.0", "@libp2p/interface-registrar@^2.0.3":
version "2.0.6"
resolved "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.6.tgz"
integrity sha512-rgpCizjG6HPIfq8/AKlMSREMTbb0PBNVXoMrChOo0vbu9MYfvua4YVlzXVT9jQtlFJHDjtSt9knIRkOAgii/wQ==
@ -4295,6 +4308,58 @@
uint8arraylist "^2.1.1"
uint8arrays "^4.0.2"
"@libp2p/pubsub-peer-discovery@^7.0.1":
version "7.0.1"
resolved "https://registry.yarnpkg.com/@libp2p/pubsub-peer-discovery/-/pubsub-peer-discovery-7.0.1.tgz#2b60c0c8614b838502b9b17beb8f9ac5c61c4d7d"
integrity sha512-ER6SATLR2RVBoHkN/MGTiGHH11Ht2s6C/N6CfORSFMIVeP7J4HUF2ccLfUUaTMRrFgUbTY7SC8H0fFYwsD6Hcw==
dependencies:
"@libp2p/interface-peer-discovery" "^1.0.1"
"@libp2p/interface-peer-id" "^1.0.5"
"@libp2p/interface-peer-info" "^1.0.2"
"@libp2p/interface-pubsub" "^3.0.0"
"@libp2p/interfaces" "^3.0.3"
"@libp2p/logger" "^2.0.1"
"@libp2p/peer-id" "^1.1.15"
"@multiformats/multiaddr" "^11.0.5"
protons-runtime "^4.0.1"
"@libp2p/pubsub@^5.0.0":
version "5.0.1"
resolved "https://registry.yarnpkg.com/@libp2p/pubsub/-/pubsub-5.0.1.tgz#24523e3285cc15faddebe1504485b31124d09e35"
integrity sha512-pQNpUC6KWDKCm7A9bv4tT2t3a7a4IpJdfzHsRBjAaKEcIRgP/s/q0Xn8ySdcggg1fvdjMp5VY6NfuuRbSCu9LA==
dependencies:
"@libp2p/crypto" "^1.0.0"
"@libp2p/interface-connection" "^3.0.1"
"@libp2p/interface-peer-id" "^1.0.2"
"@libp2p/interface-pubsub" "^3.0.0"
"@libp2p/interface-registrar" "^2.0.0"
"@libp2p/interfaces" "^3.0.2"
"@libp2p/logger" "^2.0.0"
"@libp2p/peer-collections" "^2.0.0"
"@libp2p/peer-id" "^1.1.0"
"@libp2p/topology" "^3.0.0"
"@multiformats/multiaddr" "^11.0.0"
abortable-iterator "^4.0.2"
err-code "^3.0.1"
it-length-prefixed "^8.0.2"
it-pipe "^2.0.3"
it-pushable "^3.0.0"
multiformats "^10.0.0"
p-queue "^7.2.0"
uint8arraylist "^2.0.0"
uint8arrays "^4.0.2"
"@libp2p/topology@^3.0.0":
version "3.0.2"
resolved "https://registry.yarnpkg.com/@libp2p/topology/-/topology-3.0.2.tgz#b3c8dffd01d2ce222e867412f6898af9bd08e8fb"
integrity sha512-RDMmA8Us5uxl7sSWGoTIYyzdthjs6xQD1P/vBQPHlqTAjpjPWuCY019cbqK8lP1JCldCB/n2ljSxDJs1J4cweQ==
dependencies:
"@libp2p/interface-peer-id" "^1.0.4"
"@libp2p/interface-registrar" "^2.0.3"
"@libp2p/logger" "^2.0.1"
err-code "^3.0.1"
it-all "^2.0.0"
"@libp2p/tracked-map@^3.0.0":
version "3.0.2"
resolved "https://registry.npmjs.org/@libp2p/tracked-map/-/tracked-map-3.0.2.tgz"
@ -4430,9 +4495,9 @@
dependencies:
"@multiformats/multiaddr" "^11.0.0"
"@multiformats/multiaddr@^11.0.0":
"@multiformats/multiaddr@^11.0.0", "@multiformats/multiaddr@^11.0.5", "@multiformats/multiaddr@^11.1.4":
version "11.1.4"
resolved "https://registry.npmjs.org/@multiformats/multiaddr/-/multiaddr-11.1.4.tgz"
resolved "https://registry.yarnpkg.com/@multiformats/multiaddr/-/multiaddr-11.1.4.tgz#55be2da1d82973af1f9f38415143b894ec2d095c"
integrity sha512-eaFX7Pp5DNVoSk5xlWbmKwHmo1+ab90VT0xzWSocoXj9IkShx+lYm7Zo4tmfX8RnwTNGzBSZqY5G4jmqvYFoPg==
dependencies:
"@chainsafe/is-ip" "^2.0.1"