Implement peer connection and sending messages between peers

This commit is contained in:
nabarun 2022-12-20 22:29:53 +05:30
parent ee7383add6
commit 4856b8b43c
3 changed files with 785 additions and 202 deletions

View File

@ -21,7 +21,7 @@
"scripts": {
"build": "tsc",
"dev": "node dist/index.js",
"test": ""
"signal-server": "webrtc-star --port=13579 --host=127.0.0.1"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^10.2.0",
@ -29,10 +29,13 @@
"@libp2p/mplex": "^7.1.1",
"@libp2p/webrtc-star": "^5.0.3",
"@libp2p/websockets": "^5.0.2",
"it-map": "^2.0.0",
"it-pipe": "^2.0.5",
"libp2p": "^0.41.0",
"wrtc": "^0.4.7"
},
"devDependencies": {
"@libp2p/webrtc-star-signalling-server": "^2.0.5",
"@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0",
"eslint": "^7.27.0",

View File

@ -5,18 +5,23 @@
import { createLibp2p, Libp2p } from 'libp2p'
// For nodejs.
import wrtc from 'wrtc'
import assert from 'assert'
import { pipe } from 'it-pipe'
import * as lp from 'it-length-prefixed'
import map from 'it-map'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { webSockets } from '@libp2p/websockets'
import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star'
import { noise } from '@chainsafe/libp2p-noise'
import { mplex } from '@libp2p/mplex'
import { bootstrap } from '@libp2p/bootstrap'
import type { Stream } from '@libp2p/interface-connection'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import { PeerId } from '@libp2p/interface-peer-id'
// Known peers addresses
const bootstrapMultiaddrs = [
'/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb',
'/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN'
]
const PROTOCOL = '/chat/1.0.0';
const SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
export class Peer {
_node?: Libp2p
@ -25,6 +30,9 @@ export class Peer {
constructor () {
// Instantiation in nodejs.
this._wrtcStar = webRTCStar({ wrtc });
// Read utf-8 from stdin
process.stdin.setEncoding('utf8')
}
async init () {
@ -34,26 +42,28 @@ export class Peer {
// libp2p will automatically attempt to dial to the signaling server so that it can
// receive inbound connections from other peers
listen: [
'/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star',
'/dns4/wrtc-star2.sjc.dwebops.pub/tcp/443/wss/p2p-webrtc-star'
// '/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star',
// '/dns4/wrtc-star2.sjc.dwebops.pub/tcp/443/wss/p2p-webrtc-star'
SIGNAL_SERVER_URL
]
},
transports: [
webSockets(),
// webSockets(),
this._wrtcStar.transport
],
connectionEncryption: [noise()],
streamMuxers: [mplex()],
peerDiscovery: [
this._wrtcStar.discovery,
bootstrap({
list: bootstrapMultiaddrs, // provide array of multiaddrs
})
// bootstrap({
// list: bootstrapMultiaddrs, // provide array of multiaddrs
// })
],
})
this._node.addEventListener('peer:discovery', (evt) => {
console.log('Discovered %s', evt.detail.id.toString()) // Log discovered peer
const peer = evt.detail
this.connectPeer(peer)
})
this._node.connectionManager.addEventListener('peer:connect', (evt) => {
@ -66,8 +76,61 @@ export class Peer {
console.log(`Disconnected from ${connection.remotePeer.toString()}`)
})
// Handle messages for the protocol
await this._node.handle(PROTOCOL, async ({ stream, connection }) => {
this._handleStream(connection.remotePeer, stream)
})
console.log(`libp2p id is ${this._node.peerId.toString()}`)
}
async connectPeer (peer: PeerInfo) {
assert(this._node)
console.log(`Found peer ${peer.id.toString()}`)
try {
// dial them when we discover them
const stream = await this._node.dialProtocol(peer.id, PROTOCOL)
this._handleStream(peer.id, stream)
} catch (err) {
console.log("dial failed for peer.id", peer.id)
}
}
_handleStream (peerId: PeerId, stream: Stream) {
// Send message to pipe from stdin
pipe(
// Read from stdin (the source)
// TODO: Implement write stream for browser
process.stdin,
// Turn strings into buffers
(source) => map(source, (string) => uint8ArrayFromString(string)),
// Encode with length prefix (so receiving side knows how much data is coming)
lp.encode(),
// Write to the stream (the sink)
stream.sink
)
// log message from stream
pipe(
// Read from the stream (the source)
// TODO: Implement read stream for browser
stream.source,
// Decode length-prefixed data
lp.decode(),
// Turn buffers into strings
(source) => map(source, (buf) => uint8ArrayToString(buf.subarray())),
// Sink function
async function (source) {
// For each chunk of data
for await (const msg of source) {
// Output the data as a utf8 string
console.log(peerId.toString() + '> ' + msg.toString().replace('\n', ''))
}
}
)
}
}
const peer = new Peer();

893
yarn.lock

File diff suppressed because it is too large Load Diff