Federated relay nodes and limiting connections (#296)

* Pass relay node an optional list of relay peers to connect to

* Catch protocol selection failure on dialling to new relay nodes

* Restrict max number of connections for a peer

* Tag the relay node with a high value to avoid disconnects

* Use debug for connect/disconnect logs in relay nodes

* Ignore incomplete multiaddr on a peer discovery

* Increase max connections for a peer to 10

* Refactor and descriptive comments
This commit is contained in:
prathamesh0 2023-01-19 11:35:09 +05:30 committed by GitHub
parent 1e5485c6ef
commit 9d38306fe9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 25 deletions

View File

@ -37,11 +37,12 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
```bash
# In packages/peer
yarn relay-node --signal-server [SIGNAL_SERVER_URL] --peer-id-file [PEER_ID_FILE_PATH]
yarn relay-node --signal-server [SIGNAL_SERVER_URL] --peer-id-file [PEER_ID_FILE_PATH] --relay-peers [RELAY_PEERS_FILE_PATH]
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `peer-id-file`: file path for peer id to be used (json)
* `relay-peers`: file path for relay peer multiaddr(s) to dial on startup (json)
* Start the node:
@ -51,6 +52,6 @@ A basic CLI to pass messages between peers using `stdin`/`stdout`
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `relay-node`: multiaddr of a hop enabled relay node
* `relay-node`: multiaddr of a primary hop enabled relay node
* The process starts reading from `stdin` and outputs messages from others peers to `stdout`.

View File

@ -39,13 +39,14 @@ This project was bootstrapped with [Create React App](https://github.com/faceboo
```bash
# In packages/peer
yarn relay-node --signal-server [SIGNAL_SERVER_URL] --peer-id-file [PEER_ID_FILE_PATH]
yarn relay-node --signal-server [SIGNAL_SERVER_URL] --peer-id-file [PEER_ID_FILE_PATH] --relay-peers [RELAY_PEERS_FILE_PATH]
```
* `signal-server`: multiaddr of a signalling server (default: local signalling server multiaddr)
* `peer-id-file`: file path for peer id to be used (json)
* `relay-peers`: file path for relay peer multiaddr(s) to dial on startup (json)
* Set the signalling server and relay node multiaddrs in the [env](./.env) file:
* Set the signalling server and primary relay node multiaddrs in the [env](./.env) file:
```
REACT_APP_SIGNAL_SERVER=/ip4/127.0.0.1/tcp/13579/ws/p2p-webrtc-star/

View File

@ -35,6 +35,7 @@
"@libp2p/pubsub-peer-discovery": "^7.0.1",
"@libp2p/webrtc-star": "^5.0.3",
"@multiformats/multiaddr": "^11.1.4",
"debug": "^4.3.1",
"it-length-prefixed": "^8.0.4",
"it-map": "^2.0.0",
"it-pipe": "^2.0.5",

View File

@ -2,7 +2,32 @@
// Copyright 2023 Vulcanize, Inc.
//
// How often a peer should broadcast it's peer data over pubsub discovery topic
// (interval at which other peers get corresponding discovery event)
export const PUBSUB_DISCOVERY_INTERVAL = 10000; // 10 seconds
// Use StrictSign signature policy to pass signed pubsub messages
// (includes source peer's id with a signature in the message)
export const PUBSUB_SIGNATURE_POLICY = 'StrictSign';
// Relayed connections between peers drop after hop timeout
// (redialled on discovery)
export const HOP_TIMEOUT = 24 * 60 * 60 * 1000; // 1 day
// Connected peers can be given tags according to their priority
// Create a high value tag for prioritizing primary relay node connection
export const RELAY_TAG = {
tag: 'laconic:relay-primary',
value: 100
};
// Peer connection manager config constants
// Number of max concurrent dials per peer
export const MAX_CONCURRENT_DIALS_PER_PEER = 3;
// Max number of connections for a peer after which it starts pruning connections
export const MAX_CONNECTIONS = 10;
// Min number of connections for a peer below which autodial triggers (if enabled)
export const MIN_CONNECTIONS = 0;

View File

@ -24,11 +24,13 @@ import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY } from './constants.js';
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG } from './constants.js';
export const CHAT_PROTOCOL = '/chat/1.0.0';
export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
export const ERR_PROTOCOL_SELECTION = 'protocol selection failed';
export class Peer {
_node?: Libp2p
_wrtcStar: WebRTCStarTuple
@ -97,8 +99,10 @@ export class Peer {
}
},
connectionManager: {
maxDialsPerPeer: 3, // Number of max concurrent dials per peer
autoDial: false
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER, // Number of max concurrent dials per peer
autoDial: false,
maxConnections: MAX_CONNECTIONS,
minConnections: MIN_CONNECTIONS
}
});
@ -110,6 +114,14 @@ export class Peer {
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
await this._node.dial(relayMultiaddr);
// Tag the relay node with a high value to prioritize it's connection
// in connection pruning on crossing peer's maxConnections limit
const relayPeerId = this._node.getPeers().find(
peerId => peerId.toString() === relayMultiaddr.getPeerId()
);
assert(relayPeerId);
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
}
// Listen for change in stored multiaddrs
@ -249,32 +261,31 @@ export class Peer {
async _connectPeer (peer: PeerInfo): Promise<void> {
assert(this._node);
// Check if discovered the relay node
if (this._relayNodeMultiaddr) {
const relayMultiaddr = this._relayNodeMultiaddr;
const relayNodePeerId = relayMultiaddr.getPeerId();
if (relayNodePeerId && relayNodePeerId === peer.id.toString()) {
console.log(`Dialling relay peer ${peer.id.toString()} using multiaddr ${relayMultiaddr.toString()}`);
await this._node.dial(relayMultiaddr).catch(err => {
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err);
});
return;
}
}
// Dial them when we discover them
// Attempt to dial all the multiaddrs of the discovered peer (to connect through relay)
for (const peerMultiaddr of peer.multiaddrs) {
// Relay nodes sometimes give an additional multiaddr of signalling server (without peer id) in discovery
// Eg. /ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star
// Workaround to avoid dialling multiaddr(s) without peer id
if (!peerMultiaddr.toString().includes('p2p/')) {
continue;
}
try {
console.log(`Dialling peer ${peer.id.toString()} using multiaddr ${peerMultiaddr.toString()}`);
const stream = await this._node.dialProtocol(peerMultiaddr, CHAT_PROTOCOL);
this._handleStream(peer.id, stream);
break;
} catch (err) {
console.log(`Could not dial ${peerMultiaddr.toString()}`, err);
} catch (err: any) {
// Check if protocol negotiation failed (dial still succeeds)
// (happens in case of dialProtocol to relay nodes since they don't handle CHAT_PROTOCOL)
if ((err as Error).message === ERR_PROTOCOL_SELECTION) {
console.log(`Protocol selection failed with peer ${peerMultiaddr}`);
break;
} else {
console.log(`Could not dial ${peerMultiaddr.toString()}`, err);
}
}
}
}

View File

@ -2,12 +2,13 @@
// Copyright 2022 Vulcanize, Inc.
//
import { createLibp2p } from 'libp2p';
import { Libp2p, createLibp2p } from 'libp2p';
import wrtc from 'wrtc';
import { hideBin } from 'yargs/helpers';
import yargs from 'yargs';
import fs from 'fs';
import path from 'path';
import debug from 'debug';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
@ -15,13 +16,18 @@ import { webRTCStar, WebRTCStarTuple } from '@libp2p/webrtc-star';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { createFromJSON } from '@libp2p/peer-id-factory';
import type { Connection } from '@libp2p/interface-connection';
import { DEFAULT_SIGNAL_SERVER_URL } from './index.js';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY } from './constants.js';
import { multiaddr } from '@multiformats/multiaddr';
const log = debug('laconic:relay');
interface Arguments {
signalServer: string;
peerIdFile: string;
relayPeers: string;
}
async function main (): Promise<void> {
@ -70,12 +76,47 @@ async function main (): Promise<void> {
advertise: {
enabled: true
}
},
connectionManager: {
autoDial: false
}
});
console.log(`Relay node started with id ${node.peerId.toString()}`);
console.log('Listening on:');
node.getMultiaddrs().forEach((ma) => console.log(ma.toString()));
console.log();
// Listen for peers connection
node.connectionManager.addEventListener('peer:connect', (evt) => {
// console.log('event peer:connect', evt);
// Log connected peer
const connection: Connection = evt.detail;
log(`Connected to ${connection.remotePeer.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
});
// Listen for peers disconnecting
node.connectionManager.addEventListener('peer:disconnect', (evt) => {
// console.log('event peer:disconnect', evt);
// Log disconnected peer
const connection: Connection = evt.detail;
log(`Disconnected from ${connection.remotePeer.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
});
if (argv.relayPeers) {
const relayPeersFilePath = path.resolve(argv.relayPeers);
if (!fs.existsSync(relayPeersFilePath)) {
console.log(`File at given path ${relayPeersFilePath} not found, exiting`);
process.exit();
}
console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`);
const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8');
const relayPeersList: string[] = JSON.parse(relayPeersListObj);
await _dialRelayPeers(node, relayPeersList);
}
}
function _getArgv (): any {
@ -89,10 +130,23 @@ function _getArgv (): any {
peerIdFile: {
type: 'string',
describe: 'Relay Peer Id file path (json)'
},
relayPeers: {
type: 'string',
describe: 'Relay peer multiaddr(s) list file path (json)'
}
}).argv;
}
async function _dialRelayPeers (node: Libp2p, relayPeersList: string[]): Promise<void> {
relayPeersList.forEach(async (relayPeer) => {
const relayMultiaddr = multiaddr(relayPeer);
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
await node.dial(relayMultiaddr);
});
}
main().catch(err => {
console.log(err);
});