Refactor code in peer package

This commit is contained in:
nabarun 2022-12-21 18:10:43 +05:30
parent 1e0fffc1f9
commit 055a0b920a
4 changed files with 104 additions and 63 deletions

View File

@ -12,13 +12,21 @@ function App() {
const [peer, setPeer] = useState<Peer>()
useEffect(() => {
if (peer) {
peer.init(process.env.REACT_APP_SIGNAL_SERVER)
(async () => {
if (peer) {
await peer.init(process.env.REACT_APP_SIGNAL_SERVER)
window.broadcast = (message: string) => {
peer.broadcastMessage(message)
peer.subscribeMessage((peerId, message) => {
console.log(`${peerId.toString()} > ${message}`)
})
window.broadcast = (message: string) => {
peer.broadcastMessage(message)
}
console.log(`Peer ID is ${peer.peerId!.toString()}`);
}
}
})()
}, [peer])
useEffect(() => {

View File

@ -20,6 +20,7 @@
"keywords": [],
"scripts": {
"build": "tsc",
"lint": "eslint .",
"dev": "node dist/index.js",
"signal-server": "webrtc-star --port=13579 --host=0.0.0.0"
},

View File

@ -2,24 +2,24 @@
// Copyright 2022 Vulcanize, Inc.
//
import { createLibp2p, Libp2p } from 'libp2p'
import { createLibp2p, Libp2p } from 'libp2p';
// For nodejs.
import wrtc from 'wrtc'
import assert from 'assert'
import { pipe, Source } from 'it-pipe'
import * as lp from 'it-length-prefixed'
import map from 'it-map'
import { pushable, Pushable } from 'it-pushable'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
// import wrtc from 'wrtc';
import assert from 'assert';
import { pipe } from 'it-pipe';
import * as lp from 'it-length-prefixed';
import map from 'it-map';
import { pushable, Pushable } from 'it-pushable';
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
import { webSockets } from '@libp2p/websockets'
import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star'
import { noise } from '@chainsafe/libp2p-noise'
import { mplex } from '@libp2p/mplex'
import type { Stream as P2PStream } from '@libp2p/interface-connection'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import { PeerId } from '@libp2p/interface-peer-id'
import { webSockets } from '@libp2p/websockets';
import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection';
import type { PeerInfo } from '@libp2p/interface-peer-info';
import { PeerId } from '@libp2p/interface-peer-id';
const PROTOCOL = '/chat/1.0.0';
const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
@ -27,8 +27,10 @@ const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'
export class Peer {
_node?: Libp2p
_wrtcStar: WebRTCStarTuple
_remotePeerIds: PeerId[] = []
_peerStreamMap: Map<string, Pushable<string>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: string) => void> = []
constructor () {
// Instantiation in nodejs.
@ -36,13 +38,18 @@ export class Peer {
this._wrtcStar = webRTCStar();
}
async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL) {
get peerId (): PeerId | undefined {
return this._node?.peerId;
}
async init (signalServerURL = DEFAULT_SIGNAL_SERVER_URL): Promise<void> {
this._node = await createLibp2p({
addresses: {
// Add the signaling server address, along with our PeerId to our multiaddrs list
// libp2p will automatically attempt to dial to the signaling server so that it can
// receive inbound connections from other peers
listen: [
// Public signal servers
// '/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star',
// '/dns4/wrtc-star2.sjc.dwebops.pub/tcp/443/wss/p2p-webrtc-star'
signalServerURL
@ -54,57 +61,83 @@ export class Peer {
connectionEncryption: [noise()],
streamMuxers: [mplex()],
peerDiscovery: [
this._wrtcStar.discovery,
],
})
this._wrtcStar.discovery
]
});
// Listen for peers discovery
this._node.addEventListener('peer:discovery', (evt) => {
const peer = evt.detail
this._handleDiscovery(evt.detail);
});
if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
this._connectPeer(peer)
}
})
// Listen for peers connection
this._node.connectionManager.addEventListener('peer:connect', (evt) => {
const remotePeerId = evt.detail.remotePeer;
this._remotePeerIds.push(remotePeerId)
console.log('Connected to %s', remotePeerId.toString()) // Log connected peer
})
this._handleConnect(evt.detail);
});
// Listen for peers disconnecting
this._node.connectionManager.addEventListener('peer:disconnect', (evt) => {
const remotePeerId = evt.detail.remotePeer;
this._remotePeerIds = this._remotePeerIds.filter(remotePeerId);
console.log(`Disconnected from ${remotePeerId.toString()}`)
})
this._handleDisconnect(evt.detail);
});
// Handle messages for the protocol
await this._node.handle(PROTOCOL, async ({ stream, connection }) => {
this._handleStream(connection.remotePeer, stream)
})
console.log(`libp2p id is ${this._node.peerId.toString()}`)
this._handleStream(connection.remotePeer, stream);
});
}
broadcastMessage (message: string) {
for (let [, stream] of this._peerStreamMap) {
stream.push(message)
broadcastMessage (message: string): void {
for (const [, stream] of this._peerStreamMap) {
stream.push(message);
}
}
async _connectPeer (peer: PeerInfo) {
assert(this._node)
console.log(`Found peer ${peer.id.toString()}`)
subscribeMessage (handler: (peerId: PeerId, message: string) => void) : () => void {
this._messageHandlers.push(handler);
// dial them when we discover them
const stream = await this._node.dialProtocol(peer.id, PROTOCOL)
const unsubscribe = () => {
this._messageHandlers = this._messageHandlers
.filter(registeredHandler => registeredHandler !== handler);
};
this._handleStream(peer.id, stream)
return unsubscribe;
}
_handleStream (peerId: PeerId, stream: P2PStream) {
const messageStream = pushable<string>({ objectMode: true })
_handleDiscovery (peer: PeerInfo): void {
// Check connected peers as they are discovered repeatedly.
if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
this._connectPeer(peer);
}
}
_handleConnect (connection: Connection): void {
const remotePeerId = connection.remotePeer;
this._remotePeerIds.push(remotePeerId);
// Log connected peer
console.log('Connected to %s', remotePeerId.toString());
}
_handleDisconnect (connection: Connection): void {
const disconnectedPeerId = connection.remotePeer;
this._remotePeerIds = this._remotePeerIds.filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString());
// Log disconnected peer
console.log(`Disconnected from ${disconnectedPeerId.toString()}`);
}
async _connectPeer (peer: PeerInfo): Promise<void> {
assert(this._node);
console.log(`Found peer ${peer.id.toString()}`);
// Dial them when we discover them
const stream = await this._node.dialProtocol(peer.id, PROTOCOL);
this._handleStream(peer.id, stream);
}
_handleStream (peerId: PeerId, stream: P2PStream): void {
const messageStream = pushable<string>({ objectMode: true });
// Send message to pipe from stdin
pipe(
@ -116,9 +149,9 @@ export class Peer {
lp.encode(),
// Write to the stream (the sink)
stream.sink
)
);
// log message from stream
// Handle message from stream
pipe(
// Read from the stream (the source)
// TODO: Implement read stream for browser
@ -128,15 +161,14 @@ export class Peer {
// Turn buffers into strings
(source) => map(source, (buf) => uint8ArrayToString(buf.subarray())),
// Sink function
async function (source) {
async (source) => {
// For each chunk of data
for await (const msg of source) {
// Output the data as a utf8 string
console.log(peerId.toString() + '> ' + msg.toString().replace('\n', ''))
this._messageHandlers.forEach(messageHandler => messageHandler(peerId, msg.toString()));
}
}
)
);
this._peerStreamMap.set(peerId.toString(), messageStream)
this._peerStreamMap.set(peerId.toString(), messageStream);
}
}

View File

@ -5975,7 +5975,7 @@
semver "^7.3.5"
tsutils "^3.21.0"
"@typescript-eslint/eslint-plugin@^5.47.0", "@typescript-eslint/eslint-plugin@^5.5.0":
"@typescript-eslint/eslint-plugin@^5.5.0":
version "5.47.0"
resolved "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.47.0.tgz"
integrity sha512-AHZtlXAMGkDmyLuLZsRpH3p4G/1iARIwc/T0vIem2YB+xW6pZaXYXzCBnZSF/5fdM97R9QqZWZ+h3iW10XgevQ==
@ -6019,7 +6019,7 @@
"@typescript-eslint/typescript-estree" "4.33.0"
debug "^4.3.1"
"@typescript-eslint/parser@^5.47.0", "@typescript-eslint/parser@^5.5.0":
"@typescript-eslint/parser@^5.5.0":
version "5.47.0"
resolved "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.47.0.tgz"
integrity sha512-udPU4ckK+R1JWCGdQC4Qa27NtBg7w020ffHqGyAK8pAgOVuNw7YaKXGChk+udh+iiGIJf6/E/0xhVXyPAbsczw==