mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-22 19:19:05 +00:00
Initial commit
This commit is contained in:
parent
3d8451feca
commit
8193ab4ad9
46
packages/peer/.aegir.cjs
Normal file
46
packages/peer/.aegir.cjs
Normal file
@ -0,0 +1,46 @@
|
||||
'use strict'
|
||||
|
||||
// const wrtc = require('wrtc')
|
||||
|
||||
// TODO: Temporary fix per wrtc issue
|
||||
// https://github.com/node-webrtc/node-webrtc/issues/636#issuecomment-774171409
|
||||
// process.on('beforeExit', (code) => process.exit(code))
|
||||
|
||||
async function before () {
|
||||
const { createRelayNode } = await import('./dist/src/relay.js')
|
||||
const { createFromJSON } = await import('@libp2p/peer-id-factory')
|
||||
|
||||
// const { relayPeerIdJson } = await import('./dist/test/relay-peer-id.json', {
|
||||
// assert: { type: 'json' }
|
||||
// })
|
||||
const { RELAY_PORT } = await import('./dist/test/constants.js')
|
||||
|
||||
const relayPeerIdJson = {
|
||||
"id": "12D3KooWRxmi5GXThHcLzadFGS7KWwMmYMsVpMjZpbgV6QQ1Cd68",
|
||||
"privKey": "CAESQDCAhwGVSQMYLysaTO+XAg31aig68n5A8aNdvhehjhCL7+JBFphTnaTND+6XSlP621nktg/i43ajZi9T23vmQZE=",
|
||||
"pubKey": "CAESIO/iQRaYU52kzQ/ul0pT+ttZ5LYP4uN2o2YvU9t75kGR"
|
||||
}
|
||||
const RELAY_PEER_ID = await createFromJSON(relayPeerIdJson);
|
||||
|
||||
console.log('RELAY_PEER_ID', RELAY_PEER_ID)
|
||||
console.log('RELAY_PORT', RELAY_PORT)
|
||||
|
||||
await createRelayNode(RELAY_PORT, [], RELAY_PEER_ID);
|
||||
}
|
||||
|
||||
/** @type {import('aegir').PartialOptions} */
|
||||
module.exports = {
|
||||
test: {
|
||||
build: false,
|
||||
target: ['browser'],
|
||||
runner: 'browser',
|
||||
// runner: 'node',
|
||||
files: ['dist/**/*.test.js'],
|
||||
// browser: {
|
||||
// config: {
|
||||
// browser: 'chromium'
|
||||
// }
|
||||
// },
|
||||
before
|
||||
}
|
||||
}
|
@ -2,8 +2,8 @@
|
||||
"name": "@cerc-io/peer",
|
||||
"version": "0.2.27",
|
||||
"description": "libp2p module",
|
||||
"main": "dist/index.js",
|
||||
"exports": "./dist/index.js",
|
||||
"main": "dist/src/index.js",
|
||||
"exports": "./dist/src/index.js",
|
||||
"type": "module",
|
||||
"license": "AGPL-version-3.0",
|
||||
"private": false,
|
||||
@ -21,9 +21,10 @@
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"lint": "eslint .",
|
||||
"dev": "node dist/index.js",
|
||||
"create-peer": "node dist/create-peer.js",
|
||||
"relay-node": "node dist/relay.js"
|
||||
"test": "mocha dist/**/*.test.js",
|
||||
"dev": "node dist/src/index.js",
|
||||
"create-peer": "node dist/cli/create-peer.js",
|
||||
"relay-node": "node dist/cli/relay.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@cerc-io/libp2p": "0.42.2-laconic-0.1.1",
|
||||
@ -35,20 +36,26 @@
|
||||
"@libp2p/pubsub-peer-discovery": "^8.0.0",
|
||||
"@multiformats/multiaddr": "^11.1.4",
|
||||
"debug": "^4.3.1",
|
||||
"install": "^0.13.0",
|
||||
"it-length-prefixed": "^8.0.4",
|
||||
"it-map": "^2.0.0",
|
||||
"it-pipe": "^2.0.5",
|
||||
"it-pushable": "^3.1.2",
|
||||
"node-pre-gyp": "^0.13.0",
|
||||
"p-event": "^5.0.1",
|
||||
"playwright-test": "^8.2.0",
|
||||
"uint8arrays": "^4.0.3",
|
||||
"wrtc": "^0.4.7",
|
||||
"yargs": "^17.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/chai": "^4.2.19",
|
||||
"@types/mocha": "^8.2.3",
|
||||
"@types/node": "16.11.7",
|
||||
"@typescript-eslint/eslint-plugin": "^5.47.1",
|
||||
"@typescript-eslint/parser": "^5.47.1",
|
||||
"aegir": "^38.1.2",
|
||||
"chai": "^4.3.4",
|
||||
"eslint": "^7.27.0",
|
||||
"eslint-config-semistandard": "^15.0.1",
|
||||
"eslint-config-standard": "^16.0.3",
|
||||
@ -56,6 +63,7 @@
|
||||
"eslint-plugin-node": "^11.1.0",
|
||||
"eslint-plugin-promise": "^5.1.0",
|
||||
"eslint-plugin-standard": "^5.0.0",
|
||||
"mocha": "^8.4.0",
|
||||
"typescript": "^4.9.4"
|
||||
}
|
||||
}
|
||||
|
72
packages/peer/src/cli/relay.ts
Normal file
72
packages/peer/src/cli/relay.ts
Normal file
@ -0,0 +1,72 @@
|
||||
import { hideBin } from 'yargs/helpers';
|
||||
import yargs from 'yargs';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory';
|
||||
import type { PeerId } from '@libp2p/interface-peer-id';
|
||||
|
||||
import { createRelayNode, DEFAULT_PORT } from '../relay.js';
|
||||
|
||||
interface Arguments {
|
||||
port: number;
|
||||
peerIdFile: string;
|
||||
relayPeers: string;
|
||||
}
|
||||
|
||||
async function main (): Promise<void> {
|
||||
const argv: Arguments = _getArgv();
|
||||
let peerId: PeerId | undefined;
|
||||
let relayPeersList: string[] = [];
|
||||
|
||||
if (argv.peerIdFile) {
|
||||
const peerIdFilePath = path.resolve(argv.peerIdFile);
|
||||
console.log(`Reading peer id from file ${peerIdFilePath}`);
|
||||
|
||||
const peerIdObj = fs.readFileSync(peerIdFilePath, 'utf-8');
|
||||
const peerIdJson = JSON.parse(peerIdObj);
|
||||
peerId = await createFromJSON(peerIdJson);
|
||||
} else {
|
||||
console.log('Creating a new peer id');
|
||||
}
|
||||
|
||||
if (argv.relayPeers) {
|
||||
const relayPeersFilePath = path.resolve(argv.relayPeers);
|
||||
|
||||
if (!fs.existsSync(relayPeersFilePath)) {
|
||||
console.log(`File at given path ${relayPeersFilePath} not found, exiting`);
|
||||
process.exit();
|
||||
}
|
||||
|
||||
console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`);
|
||||
const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8');
|
||||
relayPeersList = JSON.parse(relayPeersListObj);
|
||||
}
|
||||
|
||||
const listenPort = argv.port ? argv.port : DEFAULT_PORT;
|
||||
|
||||
await createRelayNode(listenPort, relayPeersList, peerId);
|
||||
}
|
||||
|
||||
function _getArgv (): any {
|
||||
return yargs(hideBin(process.argv)).parserConfiguration({
|
||||
'parse-numbers': false
|
||||
}).options({
|
||||
port: {
|
||||
type: 'number',
|
||||
describe: 'Port to start listening on'
|
||||
},
|
||||
peerIdFile: {
|
||||
type: 'string',
|
||||
describe: 'Relay Peer Id file path (json)'
|
||||
},
|
||||
relayPeers: {
|
||||
type: 'string',
|
||||
describe: 'Relay peer multiaddr(s) list file path (json)'
|
||||
}
|
||||
}).argv;
|
||||
}
|
||||
|
||||
main().catch(err => {
|
||||
console.log(err);
|
||||
});
|
@ -1,453 +1,6 @@
|
||||
//
|
||||
// Copyright 2022 Vulcanize, Inc.
|
||||
// Copyright 2023 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { createLibp2p, Libp2p } from '@cerc-io/libp2p';
|
||||
// For nodejs.
|
||||
import wrtc from 'wrtc';
|
||||
import assert from 'assert';
|
||||
import { pipe } from 'it-pipe';
|
||||
import * as lp from 'it-length-prefixed';
|
||||
import map from 'it-map';
|
||||
import { pushable, Pushable } from 'it-pushable';
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
|
||||
|
||||
import { webRTCDirect, WebRTCDirectComponents, P2P_WEBRTC_STAR_ID, WebRTCDirectNodeType, WebRTCDirectInit } from '@cerc-io/webrtc-direct';
|
||||
import { noise } from '@chainsafe/libp2p-noise';
|
||||
import { mplex } from '@libp2p/mplex';
|
||||
import type { Transport } from '@libp2p/interface-transport';
|
||||
import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection';
|
||||
import type { PeerInfo } from '@libp2p/interface-peer-info';
|
||||
import type { Message } from '@libp2p/interface-pubsub';
|
||||
import type { PeerId } from '@libp2p/interface-peer-id';
|
||||
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
|
||||
import { floodsub } from '@libp2p/floodsub';
|
||||
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
|
||||
|
||||
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT } from './constants.js';
|
||||
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
|
||||
|
||||
export const CHAT_PROTOCOL = '/chat/1.0.0';
|
||||
|
||||
export const ERR_PROTOCOL_SELECTION = 'protocol selection failed';
|
||||
|
||||
export class Peer {
|
||||
_node?: Libp2p
|
||||
_peerHeartbeatChecker?: PeerHearbeatChecker
|
||||
_wrtcTransport: (components: WebRTCDirectComponents) => Transport
|
||||
_relayNodeMultiaddr: Multiaddr
|
||||
|
||||
_peerStreamMap: Map<string, Pushable<any>> = new Map()
|
||||
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
|
||||
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
|
||||
|
||||
constructor (relayNodeURL: string, nodejs?: boolean) {
|
||||
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
|
||||
|
||||
const relayPeerId = this._relayNodeMultiaddr.getPeerId();
|
||||
assert(relayPeerId);
|
||||
|
||||
const initOptions: WebRTCDirectInit = {
|
||||
wrtc: nodejs ? wrtc : undefined, // Instantiation in nodejs
|
||||
enableSignalling: true,
|
||||
nodeType: WebRTCDirectNodeType.Peer,
|
||||
relayPeerId
|
||||
};
|
||||
this._wrtcTransport = webRTCDirect(initOptions);
|
||||
}
|
||||
|
||||
get peerId (): PeerId | undefined {
|
||||
return this._node?.peerId;
|
||||
}
|
||||
|
||||
get node (): Libp2p | undefined {
|
||||
return this._node;
|
||||
}
|
||||
|
||||
async init (): Promise<void> {
|
||||
try {
|
||||
this._node = await createLibp2p({
|
||||
addresses: {
|
||||
// Use existing protocol id in multiaddr to listen through signalling channel to relay node
|
||||
// Allows direct webrtc connection to a peer if possible (eg. peers on a same network)
|
||||
listen: [`${this._relayNodeMultiaddr.toString()}/${P2P_WEBRTC_STAR_ID}`]
|
||||
},
|
||||
transports: [this._wrtcTransport],
|
||||
connectionEncryption: [noise()],
|
||||
streamMuxers: [mplex()],
|
||||
pubsub: floodsub({ globalSignaturePolicy: PUBSUB_SIGNATURE_POLICY }),
|
||||
peerDiscovery: [
|
||||
// Use pubsub based discovery; relay server acts as a peer discovery source
|
||||
pubsubPeerDiscovery({
|
||||
interval: PUBSUB_DISCOVERY_INTERVAL
|
||||
})
|
||||
],
|
||||
relay: {
|
||||
enabled: true,
|
||||
autoRelay: {
|
||||
enabled: true,
|
||||
maxListeners: 2
|
||||
}
|
||||
},
|
||||
connectionManager: {
|
||||
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER,
|
||||
autoDial: false,
|
||||
maxConnections: MAX_CONNECTIONS,
|
||||
minConnections: MIN_CONNECTIONS,
|
||||
keepMultipleConnections: true // Set true to get connections with multiple multiaddr
|
||||
},
|
||||
ping: {
|
||||
timeout: PING_TIMEOUT
|
||||
}
|
||||
});
|
||||
} catch (err: any) {
|
||||
console.log('Could not initialize a libp2p node', err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('libp2p node created', this._node);
|
||||
this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node);
|
||||
|
||||
// Dial to the HOP enabled relay node
|
||||
await this._dialRelay();
|
||||
|
||||
// Listen for change in stored multiaddrs
|
||||
this._node.peerStore.addEventListener('change:multiaddrs', (evt) => {
|
||||
assert(this._node);
|
||||
const { peerId, multiaddrs } = evt.detail;
|
||||
|
||||
// Log updated self multiaddrs
|
||||
if (peerId.equals(this._node.peerId)) {
|
||||
console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString()));
|
||||
} else {
|
||||
console.log('Updated peer node multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString()));
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for change in peer protocols
|
||||
this._node.peerStore.addEventListener('change:protocols', async (evt) => {
|
||||
assert(this._node);
|
||||
console.log('event change:protocols', evt);
|
||||
await this._handleChangeProtocols(evt.detail);
|
||||
});
|
||||
|
||||
// Listen for peers discovery
|
||||
this._node.addEventListener('peer:discovery', (evt) => {
|
||||
// console.log('event peer:discovery', evt);
|
||||
this._handleDiscovery(evt.detail);
|
||||
});
|
||||
|
||||
// Listen for peers connection
|
||||
this._node.addEventListener('peer:connect', async (evt) => {
|
||||
console.log('event peer:connect', evt);
|
||||
await this._handleConnect(evt.detail);
|
||||
});
|
||||
|
||||
// Listen for peers disconnecting
|
||||
this._node.addEventListener('peer:disconnect', (evt) => {
|
||||
console.log('event peer:disconnect', evt);
|
||||
this._handleDisconnect(evt.detail);
|
||||
});
|
||||
|
||||
// Handle messages for the protocol
|
||||
await this._node.handle(CHAT_PROTOCOL, async ({ stream, connection }) => {
|
||||
this._handleStream(connection.remotePeer, stream);
|
||||
});
|
||||
|
||||
// Listen for pubsub messages
|
||||
this._node.pubsub.addEventListener('message', (evt) => {
|
||||
this._handlePubSubMessage(evt.detail);
|
||||
});
|
||||
}
|
||||
|
||||
async close (): Promise<void> {
|
||||
assert(this._node);
|
||||
|
||||
this._node.peerStore.removeEventListener('change:multiaddrs');
|
||||
this._node.removeEventListener('peer:discovery');
|
||||
this._node.removeEventListener('peer:connect');
|
||||
this._node.removeEventListener('peer:disconnect');
|
||||
this._node.peerStore.removeEventListener('change:multiaddrs');
|
||||
this._node.peerStore.removeEventListener('change:protocols');
|
||||
this._node.pubsub.removeEventListener('message');
|
||||
|
||||
await this._node.unhandle(CHAT_PROTOCOL);
|
||||
const remotePeerIds = this._node.getPeers();
|
||||
remotePeerIds.forEach(remotePeerId => this._peerHeartbeatChecker?.stop(remotePeerId));
|
||||
const hangUpPromises = remotePeerIds.map(async peerId => this._node?.hangUp(peerId));
|
||||
await Promise.all(hangUpPromises);
|
||||
}
|
||||
|
||||
broadcastMessage (message: any): void {
|
||||
for (const [, stream] of this._peerStreamMap) {
|
||||
stream.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
async floodMessage (topic: string, msg: any): Promise<void> {
|
||||
assert(this._node);
|
||||
await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg)));
|
||||
}
|
||||
|
||||
subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void {
|
||||
this._messageHandlers.push(handler);
|
||||
|
||||
const unsubscribe = () => {
|
||||
this._messageHandlers = this._messageHandlers
|
||||
.filter(registeredHandler => registeredHandler !== handler);
|
||||
};
|
||||
|
||||
return unsubscribe;
|
||||
}
|
||||
|
||||
subscribeTopic (topic: string, handler: (peerId: PeerId, data: any) => void): () => void {
|
||||
assert(this._node);
|
||||
|
||||
// Subscribe node to the topic
|
||||
this._node.pubsub.subscribe(topic);
|
||||
|
||||
// Register provided handler for the topic
|
||||
if (!this._topicHandlers.has(topic)) {
|
||||
this._topicHandlers.set(topic, [handler]);
|
||||
} else {
|
||||
this._topicHandlers.get(topic)?.push(handler);
|
||||
}
|
||||
|
||||
// Create a unsubscribe callback
|
||||
const unsubscribe = () => {
|
||||
// Remove handler from registered handlers for the topic
|
||||
const filteredTopicHandlers = this._topicHandlers.get(topic)
|
||||
?.filter(registeredHandler => registeredHandler !== handler);
|
||||
|
||||
if (filteredTopicHandlers?.length) {
|
||||
this._topicHandlers.set(topic, filteredTopicHandlers);
|
||||
} else {
|
||||
// Remove topic from map and unsubscribe node from the topic if no handlers left
|
||||
this._topicHandlers.delete(topic);
|
||||
this._node?.pubsub.unsubscribe(topic);
|
||||
}
|
||||
};
|
||||
|
||||
return unsubscribe;
|
||||
}
|
||||
|
||||
async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) {
|
||||
assert(this._node);
|
||||
|
||||
// Ignore self protocol changes
|
||||
if (peerId.equals(this._node.peerId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore if chat protocol is not handled by remote peer
|
||||
if (!protocols.includes(CHAT_PROTOCOL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle protocol and open stream from only one side
|
||||
if (this._node.peerId.toString() > peerId.toString()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const [connection] = this._node.getConnections(peerId);
|
||||
|
||||
// Open stream if connection exists and it doesn't already have a stream with chat protocol
|
||||
if (connection && !connection.streams.some(stream => stream.stat.protocol === CHAT_PROTOCOL)) {
|
||||
await this._createProtocolStream(connection, CHAT_PROTOCOL);
|
||||
}
|
||||
}
|
||||
|
||||
async _dialRelay (): Promise<void> {
|
||||
assert(this._node);
|
||||
const relayMultiaddr = this._relayNodeMultiaddr;
|
||||
|
||||
// Keep dialling relay node until it connects
|
||||
while (true) {
|
||||
try {
|
||||
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
|
||||
const connection = await this._node.dial(relayMultiaddr);
|
||||
const relayPeerId = connection.remotePeer;
|
||||
|
||||
// TODO: Check if tag already exists. When checking tags issue with relay node connect event
|
||||
// Tag the relay node with a high value to prioritize it's connection
|
||||
// in connection pruning on crossing peer's maxConnections limit
|
||||
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
|
||||
|
||||
break;
|
||||
} catch (err) {
|
||||
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err);
|
||||
|
||||
// TODO: Use wait method from util package.
|
||||
// Issue using util package in react app.
|
||||
await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_handleDiscovery (peer: PeerInfo): void {
|
||||
// Check connected peers as they are discovered repeatedly.
|
||||
if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
|
||||
console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
|
||||
this._connectPeer(peer);
|
||||
}
|
||||
}
|
||||
|
||||
async _handleConnect (connection: Connection): Promise<void> {
|
||||
assert(this._node);
|
||||
const remotePeerId = connection.remotePeer;
|
||||
|
||||
// Log connected peer
|
||||
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
|
||||
|
||||
// Manage connections and streams
|
||||
// Check if peer id is smaller to break symmetry
|
||||
if (this._node.peerId.toString() < remotePeerId.toString()) {
|
||||
const remoteConnections = this._node.getConnections(remotePeerId);
|
||||
|
||||
// Keep only one connection with a peer
|
||||
if (remoteConnections.length > 1) {
|
||||
// Close new connection if using relayed multiaddr
|
||||
if (connection.remoteAddr.protoNames().includes('p2p-circuit')) {
|
||||
console.log('Closing new relayed connection in favor of existing connection');
|
||||
await connection.close();
|
||||
console.log('Closed');
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('Closing exisiting connections in favor of new webrtc connection');
|
||||
// Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection)
|
||||
const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id)
|
||||
.map(remoteConnection => remoteConnection.close());
|
||||
|
||||
await Promise.all(closeConnectionPromises);
|
||||
console.log('Closed');
|
||||
}
|
||||
|
||||
// Open stream in new connection for chat protocol (if handled by remote peer)
|
||||
const protocols = await this._node.peerStore.protoBook.get(remotePeerId);
|
||||
|
||||
// The chat protocol may not be updated in the list and will be handled later on change:protocols event
|
||||
if (protocols.includes(CHAT_PROTOCOL)) {
|
||||
await this._createProtocolStream(connection, CHAT_PROTOCOL);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`Current number of peers connected: ${this._node.getPeers().length}`);
|
||||
|
||||
// Start heartbeat check for peer
|
||||
await this._peerHeartbeatChecker?.start(
|
||||
remotePeerId,
|
||||
async () => this._handleDeadConnections(remotePeerId)
|
||||
);
|
||||
}
|
||||
|
||||
async _createProtocolStream (connection: Connection, protocol: string) {
|
||||
assert(this._node);
|
||||
const remotePeerId = connection.remotePeer;
|
||||
|
||||
try {
|
||||
const stream = await connection.newStream([protocol]);
|
||||
this._handleStream(remotePeerId, stream);
|
||||
} catch (err: any) {
|
||||
console.log(`Could not create a new ${protocol} stream with ${remotePeerId.toString()}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
async _handleDeadConnections (remotePeerId: PeerId) {
|
||||
// Close existing connections of remote peer
|
||||
console.log(`Closing connections for ${remotePeerId}`);
|
||||
await this._node?.hangUp(remotePeerId);
|
||||
console.log('Closed');
|
||||
}
|
||||
|
||||
async _handleDisconnect (connection: Connection): Promise<void> {
|
||||
assert(this._node);
|
||||
const disconnectedPeerId = connection.remotePeer;
|
||||
|
||||
// Log disconnected peer
|
||||
console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
|
||||
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
|
||||
|
||||
const peerConnections = this._node.getConnections(disconnectedPeerId);
|
||||
|
||||
// If no connections left to the peer
|
||||
if (!peerConnections.length) {
|
||||
// Stop the heartbeat check
|
||||
this._peerHeartbeatChecker?.stop(disconnectedPeerId);
|
||||
|
||||
if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) {
|
||||
// Reconnect to relay node if disconnected
|
||||
await this._dialRelay();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _connectPeer (peer: PeerInfo): Promise<void> {
|
||||
assert(this._node);
|
||||
|
||||
// Dial them when we discover them
|
||||
const peerIdString = peer.id.toString();
|
||||
|
||||
try {
|
||||
console.log(`Dialling peer ${peerIdString}`);
|
||||
// When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel
|
||||
await this._node.dial(peer.id);
|
||||
} catch (err: any) {
|
||||
console.log(`Could not dial ${peerIdString}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
_handleStream (peerId: PeerId, stream: P2PStream): void {
|
||||
// console.log('Stream after connection', stream);
|
||||
const messageStream = pushable<any>({ objectMode: true });
|
||||
|
||||
// Send message to pipe from stdin
|
||||
pipe(
|
||||
// Read from stream (the source)
|
||||
messageStream,
|
||||
// Turn objects into buffers
|
||||
(source) => map(source, (value) => {
|
||||
return uint8ArrayFromString(JSON.stringify(value));
|
||||
}),
|
||||
// Encode with length prefix (so receiving side knows how much data is coming)
|
||||
lp.encode(),
|
||||
// Write to the stream (the sink)
|
||||
stream.sink
|
||||
);
|
||||
|
||||
// Handle message from stream
|
||||
pipe(
|
||||
// Read from the stream (the source)
|
||||
stream.source,
|
||||
// Decode length-prefixed data
|
||||
lp.decode(),
|
||||
// Turn buffers into objects
|
||||
(source) => map(source, (buf) => {
|
||||
return JSON.parse(uint8ArrayToString(buf.subarray()));
|
||||
}),
|
||||
// Sink function
|
||||
async (source) => {
|
||||
// For each chunk of data
|
||||
for await (const msg of source) {
|
||||
this._messageHandlers.forEach(messageHandler => messageHandler(peerId, msg));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// TODO: Check if stream already exists for peer id
|
||||
this._peerStreamMap.set(peerId.toString(), messageStream);
|
||||
}
|
||||
|
||||
_handlePubSubMessage (msg: Message): void {
|
||||
// Messages should be signed since globalSignaturePolicy is set to 'StrictSign'
|
||||
assert(msg.type === 'signed');
|
||||
|
||||
// Send msg data to registered topic handlers
|
||||
this._topicHandlers.get(msg.topic)?.forEach(handler => {
|
||||
const dataObj = JSON.parse(uint8ArrayToString(msg.data));
|
||||
handler(msg.from, dataObj);
|
||||
});
|
||||
}
|
||||
}
|
||||
export { Peer } from './peer.js';
|
||||
export { createRelayNode } from './relay.js';
|
||||
|
455
packages/peer/src/peer.ts
Normal file
455
packages/peer/src/peer.ts
Normal file
@ -0,0 +1,455 @@
|
||||
//
|
||||
// Copyright 2022 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { createLibp2p, Libp2p } from '@cerc-io/libp2p';
|
||||
// For nodejs.
|
||||
import wrtc from 'wrtc';
|
||||
import assert from 'assert';
|
||||
import { pipe } from 'it-pipe';
|
||||
import * as lp from 'it-length-prefixed';
|
||||
import map from 'it-map';
|
||||
import { pushable, Pushable } from 'it-pushable';
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
|
||||
|
||||
import { webRTCDirect, WebRTCDirectComponents, P2P_WEBRTC_STAR_ID, WebRTCDirectNodeType, WebRTCDirectInit } from '@cerc-io/webrtc-direct';
|
||||
import { noise } from '@chainsafe/libp2p-noise';
|
||||
import { mplex } from '@libp2p/mplex';
|
||||
import type { Transport } from '@libp2p/interface-transport';
|
||||
import type { Stream as P2PStream, Connection } from '@libp2p/interface-connection';
|
||||
import type { PeerInfo } from '@libp2p/interface-peer-info';
|
||||
import type { Message } from '@libp2p/interface-pubsub';
|
||||
import type { PeerId } from '@libp2p/interface-peer-id';
|
||||
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
|
||||
import { floodsub } from '@libp2p/floodsub';
|
||||
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
|
||||
|
||||
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT } from './constants.js';
|
||||
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
|
||||
|
||||
export const CHAT_PROTOCOL = '/chat/1.0.0';
|
||||
|
||||
export const ERR_PROTOCOL_SELECTION = 'protocol selection failed';
|
||||
|
||||
export class Peer {
|
||||
_node?: Libp2p
|
||||
_peerHeartbeatChecker?: PeerHearbeatChecker
|
||||
_wrtcTransport: (components: WebRTCDirectComponents) => Transport
|
||||
_relayNodeMultiaddr: Multiaddr
|
||||
|
||||
_peerStreamMap: Map<string, Pushable<any>> = new Map()
|
||||
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
|
||||
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
|
||||
|
||||
constructor (relayNodeURL: string, nodejs?: boolean) {
|
||||
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
|
||||
|
||||
const relayPeerId = this._relayNodeMultiaddr.getPeerId();
|
||||
assert(relayPeerId);
|
||||
|
||||
const initOptions: WebRTCDirectInit = {
|
||||
wrtc: nodejs ? wrtc : undefined, // Instantiation in nodejs
|
||||
enableSignalling: true,
|
||||
nodeType: WebRTCDirectNodeType.Peer,
|
||||
relayPeerId
|
||||
};
|
||||
this._wrtcTransport = webRTCDirect(initOptions);
|
||||
}
|
||||
|
||||
get peerId (): PeerId | undefined {
|
||||
return this._node?.peerId;
|
||||
}
|
||||
|
||||
get node (): Libp2p | undefined {
|
||||
return this._node;
|
||||
}
|
||||
|
||||
async init (): Promise<void> {
|
||||
try {
|
||||
this._node = await createLibp2p({
|
||||
addresses: {
|
||||
// Use existing protocol id in multiaddr to listen through signalling channel to relay node
|
||||
// Allows direct webrtc connection to a peer if possible (eg. peers on a same network)
|
||||
listen: [`${this._relayNodeMultiaddr.toString()}/${P2P_WEBRTC_STAR_ID}`]
|
||||
},
|
||||
transports: [this._wrtcTransport],
|
||||
connectionEncryption: [noise()],
|
||||
streamMuxers: [mplex()],
|
||||
pubsub: floodsub({ globalSignaturePolicy: PUBSUB_SIGNATURE_POLICY }),
|
||||
peerDiscovery: [
|
||||
// Use pubsub based discovery; relay server acts as a peer discovery source
|
||||
pubsubPeerDiscovery({
|
||||
interval: PUBSUB_DISCOVERY_INTERVAL
|
||||
})
|
||||
],
|
||||
relay: {
|
||||
enabled: true,
|
||||
autoRelay: {
|
||||
enabled: true,
|
||||
maxListeners: 2
|
||||
}
|
||||
},
|
||||
connectionManager: {
|
||||
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER,
|
||||
autoDial: false,
|
||||
maxConnections: MAX_CONNECTIONS,
|
||||
minConnections: MIN_CONNECTIONS,
|
||||
keepMultipleConnections: true // Set true to get connections with multiple multiaddr
|
||||
},
|
||||
ping: {
|
||||
timeout: PING_TIMEOUT
|
||||
}
|
||||
});
|
||||
} catch (err: any) {
|
||||
console.log('Could not initialize a libp2p node', err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('libp2p node created', this._node);
|
||||
console.log('started', this._node.isStarted());
|
||||
console.log('peerid', this._node.peerId.toString());
|
||||
this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node);
|
||||
|
||||
// Dial to the HOP enabled relay node
|
||||
await this._dialRelay();
|
||||
|
||||
// Listen for change in stored multiaddrs
|
||||
this._node.peerStore.addEventListener('change:multiaddrs', (evt) => {
|
||||
assert(this._node);
|
||||
const { peerId, multiaddrs } = evt.detail;
|
||||
|
||||
// Log updated self multiaddrs
|
||||
if (peerId.equals(this._node.peerId)) {
|
||||
console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString()));
|
||||
} else {
|
||||
console.log('Updated peer node multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString()));
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for change in peer protocols
|
||||
this._node.peerStore.addEventListener('change:protocols', async (evt) => {
|
||||
assert(this._node);
|
||||
console.log('event change:protocols', evt);
|
||||
await this._handleChangeProtocols(evt.detail);
|
||||
});
|
||||
|
||||
// Listen for peers discovery
|
||||
this._node.addEventListener('peer:discovery', (evt) => {
|
||||
// console.log('event peer:discovery', evt);
|
||||
this._handleDiscovery(evt.detail);
|
||||
});
|
||||
|
||||
// Listen for peers connection
|
||||
this._node.addEventListener('peer:connect', async (evt) => {
|
||||
console.log('event peer:connect', evt);
|
||||
await this._handleConnect(evt.detail);
|
||||
});
|
||||
|
||||
// Listen for peers disconnecting
|
||||
this._node.addEventListener('peer:disconnect', (evt) => {
|
||||
console.log('event peer:disconnect', evt);
|
||||
this._handleDisconnect(evt.detail);
|
||||
});
|
||||
|
||||
// Handle messages for the protocol
|
||||
await this._node.handle(CHAT_PROTOCOL, async ({ stream, connection }) => {
|
||||
this._handleStream(connection.remotePeer, stream);
|
||||
});
|
||||
|
||||
// Listen for pubsub messages
|
||||
this._node.pubsub.addEventListener('message', (evt) => {
|
||||
this._handlePubSubMessage(evt.detail);
|
||||
});
|
||||
}
|
||||
|
||||
async close (): Promise<void> {
|
||||
assert(this._node);
|
||||
|
||||
this._node.peerStore.removeEventListener('change:multiaddrs');
|
||||
this._node.removeEventListener('peer:discovery');
|
||||
this._node.removeEventListener('peer:connect');
|
||||
this._node.removeEventListener('peer:disconnect');
|
||||
this._node.peerStore.removeEventListener('change:multiaddrs');
|
||||
this._node.peerStore.removeEventListener('change:protocols');
|
||||
this._node.pubsub.removeEventListener('message');
|
||||
|
||||
await this._node.unhandle(CHAT_PROTOCOL);
|
||||
const remotePeerIds = this._node.getPeers();
|
||||
remotePeerIds.forEach(remotePeerId => this._peerHeartbeatChecker?.stop(remotePeerId));
|
||||
const hangUpPromises = remotePeerIds.map(async peerId => this._node?.hangUp(peerId));
|
||||
await Promise.all(hangUpPromises);
|
||||
}
|
||||
|
||||
broadcastMessage (message: any): void {
|
||||
for (const [, stream] of this._peerStreamMap) {
|
||||
stream.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
async floodMessage (topic: string, msg: any): Promise<void> {
|
||||
assert(this._node);
|
||||
await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg)));
|
||||
}
|
||||
|
||||
subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void {
|
||||
this._messageHandlers.push(handler);
|
||||
|
||||
const unsubscribe = () => {
|
||||
this._messageHandlers = this._messageHandlers
|
||||
.filter(registeredHandler => registeredHandler !== handler);
|
||||
};
|
||||
|
||||
return unsubscribe;
|
||||
}
|
||||
|
||||
subscribeTopic (topic: string, handler: (peerId: PeerId, data: any) => void): () => void {
|
||||
assert(this._node);
|
||||
|
||||
// Subscribe node to the topic
|
||||
this._node.pubsub.subscribe(topic);
|
||||
|
||||
// Register provided handler for the topic
|
||||
if (!this._topicHandlers.has(topic)) {
|
||||
this._topicHandlers.set(topic, [handler]);
|
||||
} else {
|
||||
this._topicHandlers.get(topic)?.push(handler);
|
||||
}
|
||||
|
||||
// Create a unsubscribe callback
|
||||
const unsubscribe = () => {
|
||||
// Remove handler from registered handlers for the topic
|
||||
const filteredTopicHandlers = this._topicHandlers.get(topic)
|
||||
?.filter(registeredHandler => registeredHandler !== handler);
|
||||
|
||||
if (filteredTopicHandlers?.length) {
|
||||
this._topicHandlers.set(topic, filteredTopicHandlers);
|
||||
} else {
|
||||
// Remove topic from map and unsubscribe node from the topic if no handlers left
|
||||
this._topicHandlers.delete(topic);
|
||||
this._node?.pubsub.unsubscribe(topic);
|
||||
}
|
||||
};
|
||||
|
||||
return unsubscribe;
|
||||
}
|
||||
|
||||
async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) {
|
||||
assert(this._node);
|
||||
|
||||
// Ignore self protocol changes
|
||||
if (peerId.equals(this._node.peerId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore if chat protocol is not handled by remote peer
|
||||
if (!protocols.includes(CHAT_PROTOCOL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle protocol and open stream from only one side
|
||||
if (this._node.peerId.toString() > peerId.toString()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const [connection] = this._node.getConnections(peerId);
|
||||
|
||||
// Open stream if connection exists and it doesn't already have a stream with chat protocol
|
||||
if (connection && !connection.streams.some(stream => stream.stat.protocol === CHAT_PROTOCOL)) {
|
||||
await this._createProtocolStream(connection, CHAT_PROTOCOL);
|
||||
}
|
||||
}
|
||||
|
||||
async _dialRelay (): Promise<void> {
|
||||
assert(this._node);
|
||||
const relayMultiaddr = this._relayNodeMultiaddr;
|
||||
|
||||
// Keep dialling relay node until it connects
|
||||
while (true) {
|
||||
try {
|
||||
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
|
||||
const connection = await this._node.dial(relayMultiaddr);
|
||||
const relayPeerId = connection.remotePeer;
|
||||
|
||||
// TODO: Check if tag already exists. When checking tags issue with relay node connect event
|
||||
// Tag the relay node with a high value to prioritize it's connection
|
||||
// in connection pruning on crossing peer's maxConnections limit
|
||||
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
|
||||
|
||||
break;
|
||||
} catch (err) {
|
||||
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err);
|
||||
|
||||
// TODO: Use wait method from util package.
|
||||
// Issue using util package in react app.
|
||||
await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_handleDiscovery (peer: PeerInfo): void {
|
||||
// Check connected peers as they are discovered repeatedly.
|
||||
if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
|
||||
console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
|
||||
this._connectPeer(peer);
|
||||
}
|
||||
}
|
||||
|
||||
async _handleConnect (connection: Connection): Promise<void> {
|
||||
assert(this._node);
|
||||
const remotePeerId = connection.remotePeer;
|
||||
|
||||
// Log connected peer
|
||||
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
|
||||
|
||||
// Manage connections and streams
|
||||
// Check if peer id is smaller to break symmetry
|
||||
if (this._node.peerId.toString() < remotePeerId.toString()) {
|
||||
const remoteConnections = this._node.getConnections(remotePeerId);
|
||||
|
||||
// Keep only one connection with a peer
|
||||
if (remoteConnections.length > 1) {
|
||||
// Close new connection if using relayed multiaddr
|
||||
if (connection.remoteAddr.protoNames().includes('p2p-circuit')) {
|
||||
console.log('Closing new relayed connection in favor of existing connection');
|
||||
await connection.close();
|
||||
console.log('Closed');
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('Closing exisiting connections in favor of new webrtc connection');
|
||||
// Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection)
|
||||
const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id)
|
||||
.map(remoteConnection => remoteConnection.close());
|
||||
|
||||
await Promise.all(closeConnectionPromises);
|
||||
console.log('Closed');
|
||||
}
|
||||
|
||||
// Open stream in new connection for chat protocol (if handled by remote peer)
|
||||
const protocols = await this._node.peerStore.protoBook.get(remotePeerId);
|
||||
|
||||
// The chat protocol may not be updated in the list and will be handled later on change:protocols event
|
||||
if (protocols.includes(CHAT_PROTOCOL)) {
|
||||
await this._createProtocolStream(connection, CHAT_PROTOCOL);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`Current number of peers connected: ${this._node.getPeers().length}`);
|
||||
|
||||
// Start heartbeat check for peer
|
||||
await this._peerHeartbeatChecker?.start(
|
||||
remotePeerId,
|
||||
async () => this._handleDeadConnections(remotePeerId)
|
||||
);
|
||||
}
|
||||
|
||||
async _createProtocolStream (connection: Connection, protocol: string) {
|
||||
assert(this._node);
|
||||
const remotePeerId = connection.remotePeer;
|
||||
|
||||
try {
|
||||
const stream = await connection.newStream([protocol]);
|
||||
this._handleStream(remotePeerId, stream);
|
||||
} catch (err: any) {
|
||||
console.log(`Could not create a new ${protocol} stream with ${remotePeerId.toString()}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
async _handleDeadConnections (remotePeerId: PeerId) {
|
||||
// Close existing connections of remote peer
|
||||
console.log(`Closing connections for ${remotePeerId}`);
|
||||
await this._node?.hangUp(remotePeerId);
|
||||
console.log('Closed');
|
||||
}
|
||||
|
||||
async _handleDisconnect (connection: Connection): Promise<void> {
|
||||
assert(this._node);
|
||||
const disconnectedPeerId = connection.remotePeer;
|
||||
|
||||
// Log disconnected peer
|
||||
console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
|
||||
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
|
||||
|
||||
const peerConnections = this._node.getConnections(disconnectedPeerId);
|
||||
|
||||
// If no connections left to the peer
|
||||
if (!peerConnections.length) {
|
||||
// Stop the heartbeat check
|
||||
this._peerHeartbeatChecker?.stop(disconnectedPeerId);
|
||||
|
||||
if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) {
|
||||
// Reconnect to relay node if disconnected
|
||||
await this._dialRelay();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _connectPeer (peer: PeerInfo): Promise<void> {
|
||||
assert(this._node);
|
||||
|
||||
// Dial them when we discover them
|
||||
const peerIdString = peer.id.toString();
|
||||
|
||||
try {
|
||||
console.log(`Dialling peer ${peerIdString}`);
|
||||
// When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel
|
||||
await this._node.dial(peer.id);
|
||||
} catch (err: any) {
|
||||
console.log(`Could not dial ${peerIdString}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
_handleStream (peerId: PeerId, stream: P2PStream): void {
|
||||
// console.log('Stream after connection', stream);
|
||||
const messageStream = pushable<any>({ objectMode: true });
|
||||
|
||||
// Send message to pipe from stdin
|
||||
pipe(
|
||||
// Read from stream (the source)
|
||||
messageStream,
|
||||
// Turn objects into buffers
|
||||
(source) => map(source, (value) => {
|
||||
return uint8ArrayFromString(JSON.stringify(value));
|
||||
}),
|
||||
// Encode with length prefix (so receiving side knows how much data is coming)
|
||||
lp.encode(),
|
||||
// Write to the stream (the sink)
|
||||
stream.sink
|
||||
);
|
||||
|
||||
// Handle message from stream
|
||||
pipe(
|
||||
// Read from the stream (the source)
|
||||
stream.source,
|
||||
// Decode length-prefixed data
|
||||
lp.decode(),
|
||||
// Turn buffers into objects
|
||||
(source) => map(source, (buf) => {
|
||||
return JSON.parse(uint8ArrayToString(buf.subarray()));
|
||||
}),
|
||||
// Sink function
|
||||
async (source) => {
|
||||
// For each chunk of data
|
||||
for await (const msg of source) {
|
||||
this._messageHandlers.forEach(messageHandler => messageHandler(peerId, msg));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// TODO: Check if stream already exists for peer id
|
||||
this._peerStreamMap.set(peerId.toString(), messageStream);
|
||||
}
|
||||
|
||||
_handlePubSubMessage (msg: Message): void {
|
||||
// Messages should be signed since globalSignaturePolicy is set to 'StrictSign'
|
||||
assert(msg.type === 'signed');
|
||||
|
||||
// Send msg data to registered topic handlers
|
||||
this._topicHandlers.get(msg.topic)?.forEach(handler => {
|
||||
const dataObj = JSON.parse(uint8ArrayToString(msg.data));
|
||||
handler(msg.from, dataObj);
|
||||
});
|
||||
}
|
||||
}
|
@ -4,10 +4,6 @@
|
||||
|
||||
import { Libp2p, createLibp2p } from '@cerc-io/libp2p';
|
||||
import wrtc from 'wrtc';
|
||||
import { hideBin } from 'yargs/helpers';
|
||||
import yargs from 'yargs';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
import debug from 'debug';
|
||||
|
||||
import { noise } from '@chainsafe/libp2p-noise';
|
||||
@ -15,7 +11,6 @@ import { mplex } from '@libp2p/mplex';
|
||||
import { WebRTCDirectNodeType, webRTCDirect } from '@cerc-io/webrtc-direct';
|
||||
import { floodsub } from '@libp2p/floodsub';
|
||||
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory';
|
||||
import type { Connection } from '@libp2p/interface-connection';
|
||||
import { multiaddr } from '@multiformats/multiaddr';
|
||||
import type { PeerId } from '@libp2p/interface-peer-id';
|
||||
@ -25,31 +20,10 @@ import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
|
||||
|
||||
const log = debug('laconic:relay');
|
||||
|
||||
const DEFAULT_HOST = '0.0.0.0';
|
||||
const DEFAULT_PORT = 9090;
|
||||
export const DEFAULT_HOST = '0.0.0.0';
|
||||
export const DEFAULT_PORT = 9090;
|
||||
|
||||
interface Arguments {
|
||||
port: number;
|
||||
peerIdFile: string;
|
||||
relayPeers: string;
|
||||
}
|
||||
|
||||
async function main (): Promise<void> {
|
||||
const argv: Arguments = _getArgv();
|
||||
|
||||
let peerId: any;
|
||||
if (argv.peerIdFile) {
|
||||
const peerIdFilePath = path.resolve(argv.peerIdFile);
|
||||
console.log(`Reading peer id from file ${peerIdFilePath}`);
|
||||
|
||||
const peerIdObj = fs.readFileSync(peerIdFilePath, 'utf-8');
|
||||
const peerIdJson = JSON.parse(peerIdObj);
|
||||
peerId = await createFromJSON(peerIdJson);
|
||||
} else {
|
||||
console.log('Creating a new peer id');
|
||||
}
|
||||
|
||||
const listenPort = argv.port ? argv.port : DEFAULT_PORT;
|
||||
export async function createRelayNode (listenPort: number, relayPeersList: string[], peerId?: PeerId): Promise<Libp2p> {
|
||||
const listenMultiaddr = `/ip4/${DEFAULT_HOST}/tcp/${listenPort}/http/p2p-webrtc-direct`;
|
||||
|
||||
const node = await createLibp2p({
|
||||
@ -115,39 +89,12 @@ async function main (): Promise<void> {
|
||||
log(`Disconnected from ${connection.remotePeer.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
|
||||
});
|
||||
|
||||
if (argv.relayPeers) {
|
||||
const relayPeersFilePath = path.resolve(argv.relayPeers);
|
||||
|
||||
if (!fs.existsSync(relayPeersFilePath)) {
|
||||
console.log(`File at given path ${relayPeersFilePath} not found, exiting`);
|
||||
process.exit();
|
||||
}
|
||||
|
||||
console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`);
|
||||
const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8');
|
||||
const relayPeersList: string[] = JSON.parse(relayPeersListObj);
|
||||
|
||||
if (relayPeersList.length) {
|
||||
console.log('Dialling relay peers');
|
||||
await _dialRelayPeers(node, relayPeersList);
|
||||
}
|
||||
}
|
||||
|
||||
function _getArgv (): any {
|
||||
return yargs(hideBin(process.argv)).parserConfiguration({
|
||||
'parse-numbers': false
|
||||
}).options({
|
||||
port: {
|
||||
type: 'number',
|
||||
describe: 'Port to start listening on'
|
||||
},
|
||||
peerIdFile: {
|
||||
type: 'string',
|
||||
describe: 'Relay Peer Id file path (json)'
|
||||
},
|
||||
relayPeers: {
|
||||
type: 'string',
|
||||
describe: 'Relay peer multiaddr(s) list file path (json)'
|
||||
}
|
||||
}).argv;
|
||||
return node;
|
||||
}
|
||||
|
||||
async function _dialRelayPeers (node: Libp2p, relayPeersList: string[]): Promise<void> {
|
||||
@ -170,7 +117,3 @@ async function _handleDeadConnections (node: Libp2p, remotePeerId: PeerId): Prom
|
||||
await node.hangUp(remotePeerId);
|
||||
log('Closed');
|
||||
}
|
||||
|
||||
main().catch(err => {
|
||||
console.log(err);
|
||||
});
|
||||
|
40
packages/peer/src/sample.ts
Normal file
40
packages/peer/src/sample.ts
Normal file
@ -0,0 +1,40 @@
|
||||
// import * as mafmt from '@multiformats/mafmt';
|
||||
// import { multiaddr } from '@multiformats/multiaddr';
|
||||
|
||||
export interface ConnectRequest {
|
||||
type: 'ConnectRequest'
|
||||
src: string
|
||||
dst: string
|
||||
signal: string
|
||||
}
|
||||
|
||||
// ConnectResponse is made by a peer to another peer on a ConnectRequest to establish a direct webrtc connection
|
||||
export interface ConnectResponse {
|
||||
type: 'ConnectResponse'
|
||||
src: string
|
||||
dst: string
|
||||
signal: string
|
||||
}
|
||||
|
||||
async function main () {
|
||||
// const ma = multiaddr('/dns4/173-255-252-134.ip.linodeusercontent.com/tcp/443/https/p2p-webrtc-direct/p2p/12D3KooWENbU4KTaLgfdQVC5Ths6EewQJjYo4AjtPx2ykRrooT51');
|
||||
// console.log(ma.protoNames());
|
||||
// const x = undefined;
|
||||
// console.log(ma.getPeerId() === x);
|
||||
// console.log(ma.decapsulateCode(421));
|
||||
// console.log(ma.decapsulateCode(421).getPeerId());
|
||||
// console.log(mafmt.WebRTCDirect.matches(ma.decapsulateCode(421)));
|
||||
|
||||
const obj = {
|
||||
type: 'JoinRequest',
|
||||
peerId: 'peer-id'
|
||||
};
|
||||
const x: string = JSON.stringify(obj);
|
||||
const msg = JSON.parse(x) as ConnectRequest | ConnectResponse;
|
||||
console.log(typeof msg);
|
||||
console.log(msg);
|
||||
}
|
||||
|
||||
main().catch(err => {
|
||||
console.log(err);
|
||||
});
|
42
packages/peer/src/setups.test.ts
Normal file
42
packages/peer/src/setups.test.ts
Normal file
@ -0,0 +1,42 @@
|
||||
import assert from 'assert';
|
||||
import { isBrowser } from 'wherearewe';
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory';
|
||||
import { multiaddr } from '@multiformats/multiaddr';
|
||||
|
||||
import { Peer } from './peer.js';
|
||||
import { RELAY_LISTEN_ADDR } from '../test/constants.js';
|
||||
|
||||
import relayPeerIdJson from '../test/relay-peer-id.json' assert { type: 'json' };
|
||||
|
||||
describe('simple network setup', async () => {
|
||||
const RELAY_PEER_ID = await createFromJSON(relayPeerIdJson);
|
||||
const RELAY_MULTIADDR = multiaddr(`${RELAY_LISTEN_ADDR.toString()}/p2p/${RELAY_PEER_ID.toString()}`);
|
||||
|
||||
// before(async () => {
|
||||
// try {
|
||||
// await createRelayNode(RELAY_PORT, [], RELAY_PEER_ID);
|
||||
// } catch (error) {
|
||||
// console.log('error', error);
|
||||
// }
|
||||
// });
|
||||
|
||||
it('peer connects to the relay node', async () => {
|
||||
if (isBrowser) {
|
||||
console.log('browser env');
|
||||
} else {
|
||||
console.log('not a browser env');
|
||||
}
|
||||
|
||||
const peer = new Peer(RELAY_MULTIADDR.toString());
|
||||
await peer.init();
|
||||
|
||||
const node = peer.node;
|
||||
assert(node);
|
||||
|
||||
node?.addEventListener('peer:connect', async (evt) => {
|
||||
console.log('event peer:connect', evt);
|
||||
const connection = evt.detail;
|
||||
console.log('connected to node', connection.remotePeer.toString());
|
||||
});
|
||||
});
|
||||
});
|
9
packages/peer/test/constants.ts
Normal file
9
packages/peer/test/constants.ts
Normal file
@ -0,0 +1,9 @@
|
||||
import { multiaddr } from '@multiformats/multiaddr';
|
||||
// import { createFromJSON } from '@libp2p/peer-id-factory';
|
||||
|
||||
// import relayPeerIdJson from './relay-peer-id.json' assert { type: 'json' };
|
||||
|
||||
export const RELAY_PORT = 12345;
|
||||
// export const RELAY_PEER_ID = await createFromJSON(relayPeerIdJson);
|
||||
export const RELAY_LISTEN_ADDR = multiaddr(`/ip4/0.0.0.0/tcp/${RELAY_PORT}/http/p2p-webrtc-direct`);
|
||||
// export const RELAY_MULTIADDR = multiaddr(`${RELAY_LISTEN_ADDR.toString()}/p2p/${RELAY_PEER_ID.toString()}`);
|
5
packages/peer/test/relay-peer-id.json
Normal file
5
packages/peer/test/relay-peer-id.json
Normal file
@ -0,0 +1,5 @@
|
||||
{
|
||||
"id": "12D3KooWRxmi5GXThHcLzadFGS7KWwMmYMsVpMjZpbgV6QQ1Cd68",
|
||||
"privKey": "CAESQDCAhwGVSQMYLysaTO+XAg31aig68n5A8aNdvhehjhCL7+JBFphTnaTND+6XSlP621nktg/i43ajZi9T23vmQZE=",
|
||||
"pubKey": "CAESIO/iQRaYU52kzQ/ul0pT+ttZ5LYP4uN2o2YvU9t75kGR"
|
||||
}
|
@ -4,8 +4,8 @@
|
||||
|
||||
/* Basic Options */
|
||||
// "incremental": true, /* Enable incremental compilation */
|
||||
"target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */
|
||||
"module": "node16", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */
|
||||
"target": "ES2020", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */
|
||||
"module": "ESNext", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */
|
||||
"lib": [ "ES5", "ES6", "ES2020" ], /* Specify library files to be included in the compilation. */
|
||||
// "allowJs": true, /* Allow javascript files to be compiled. */
|
||||
// "checkJs": true, /* Report errors in .js files. */
|
||||
|
Loading…
Reference in New Issue
Block a user