Maintain stream for each remote peer

This commit is contained in:
nabarun 2022-12-21 16:24:44 +05:30
parent d5a1f38db4
commit 1e0fffc1f9
4 changed files with 25 additions and 26 deletions

View File

@ -0,0 +1 @@
REACT_APP_SIGNAL_SERVER=/ip4/127.0.0.1/tcp/13579/ws/p2p-webrtc-star/

View File

@ -1,6 +1,5 @@
import React, { useEffect, useState } from 'react'; import React, { useEffect, useState } from 'react';
import { Peer } from '@cerc-io/peer'; import { Peer } from '@cerc-io/peer';
import { pushable } from 'it-pushable'
import logo from './logo.svg'; import logo from './logo.svg';
import './App.css'; import './App.css';
@ -14,11 +13,10 @@ function App() {
useEffect(() => { useEffect(() => {
if (peer) { if (peer) {
const source = pushable<string>({ objectMode: true }) peer.init(process.env.REACT_APP_SIGNAL_SERVER)
peer.init(undefined, source)
window.broadcast = (message: string) => { window.broadcast = (message: string) => {
source.push(message) peer.broadcastMessage(message)
} }
} }
}, [peer]) }, [peer])

View File

@ -21,7 +21,7 @@
"scripts": { "scripts": {
"build": "tsc", "build": "tsc",
"dev": "node dist/index.js", "dev": "node dist/index.js",
"signal-server": "webrtc-star --port=13579 --host=127.0.0.1" "signal-server": "webrtc-star --port=13579 --host=0.0.0.0"
}, },
"dependencies": { "dependencies": {
"@chainsafe/libp2p-noise": "^10.2.0", "@chainsafe/libp2p-noise": "^10.2.0",

View File

@ -9,6 +9,7 @@ import assert from 'assert'
import { pipe, Source } from 'it-pipe' import { pipe, Source } from 'it-pipe'
import * as lp from 'it-length-prefixed' import * as lp from 'it-length-prefixed'
import map from 'it-map' import map from 'it-map'
import { pushable, Pushable } from 'it-pushable'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
@ -24,19 +25,18 @@ const PROTOCOL = '/chat/1.0.0';
const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'; const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
export class Peer { export class Peer {
_messageSource?: Source<string>
_node?: Libp2p _node?: Libp2p
_wrtcStar: WebRTCStarTuple _wrtcStar: WebRTCStarTuple
_remotePeerIds: PeerId[] = [] _remotePeerIds: PeerId[] = []
_peerStreamMap: Map<string, Pushable<string>> = new Map()
constructor () { constructor () {
// Instantiation in nodejs. // Instantiation in nodejs.
this._wrtcStar = webRTCStar({ wrtc }); // this._wrtcStar = webRTCStar({ wrtc });
this._wrtcStar = webRTCStar();
} }
async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL, messageSource: Source<string>) { async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL) {
this._messageSource = messageSource;
this._node = await createLibp2p({ this._node = await createLibp2p({
addresses: { addresses: {
// Add the signaling server address, along with our PeerId to our multiaddrs list // Add the signaling server address, along with our PeerId to our multiaddrs list
@ -49,16 +49,12 @@ export class Peer {
] ]
}, },
transports: [ transports: [
// webSockets(),
this._wrtcStar.transport this._wrtcStar.transport
], ],
connectionEncryption: [noise()], connectionEncryption: [noise()],
streamMuxers: [mplex()], streamMuxers: [mplex()],
peerDiscovery: [ peerDiscovery: [
this._wrtcStar.discovery, this._wrtcStar.discovery,
// bootstrap({
// list: bootstrapMultiaddrs, // provide array of multiaddrs
// })
], ],
}) })
@ -91,27 +87,29 @@ export class Peer {
console.log(`libp2p id is ${this._node.peerId.toString()}`) console.log(`libp2p id is ${this._node.peerId.toString()}`)
} }
broadcastMessage (message: string) {
for (let [, stream] of this._peerStreamMap) {
stream.push(message)
}
}
async _connectPeer (peer: PeerInfo) { async _connectPeer (peer: PeerInfo) {
assert(this._node) assert(this._node)
console.log(`Found peer ${peer.id.toString()}`) console.log(`Found peer ${peer.id.toString()}`)
try { // dial them when we discover them
// dial them when we discover them const stream = await this._node.dialProtocol(peer.id, PROTOCOL)
const stream = await this._node.dialProtocol(peer.id, PROTOCOL)
this._handleStream(peer.id, stream) this._handleStream(peer.id, stream)
} catch (err) {
console.log("dial failed for peer.id", peer.id)
}
} }
_handleStream (peerId: PeerId, stream: P2PStream) { _handleStream (peerId: PeerId, stream: P2PStream) {
assert(this._messageSource) const messageStream = pushable<string>({ objectMode: true })
// Send message to pipe from stdin // Send message to pipe from stdin
pipe( pipe(
// Read from readable stream (the source) // Read from stream (the source)
this._messageSource, messageStream,
// Turn strings into buffers // Turn strings into buffers
(source) => map(source, (string) => uint8ArrayFromString(string)), (source) => map(source, (string) => uint8ArrayFromString(string)),
// Encode with length prefix (so receiving side knows how much data is coming) // Encode with length prefix (so receiving side knows how much data is coming)
@ -138,5 +136,7 @@ export class Peer {
} }
} }
) )
this._peerStreamMap.set(peerId.toString(), messageStream)
} }
} }