Add a basic test to demonstrate a p2p connection (#336)

* Add a test to demonstrate a p2p connection

* Use debug for logs

* Perform peer intialization in a separate test
This commit is contained in:
prathamesh0 2023-03-09 17:36:27 +05:30 committed by GitHub
parent 1ba731915d
commit 787991c432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 182 additions and 36 deletions

View File

@ -18,7 +18,7 @@
"scripts": {
"lint": "lerna run lint --stream -- --max-warnings=0",
"test:init": "lerna run test:init --stream --ignore @cerc-io/*-watcher",
"test": "lerna run test --stream --ignore @cerc-io/*-watcher",
"test": "lerna run test --stream --ignore @cerc-io/*-watcher --ignore @cerc-io/peer",
"build": "lerna run build --stream",
"build:watch": "lerna run build --stream --parallel -- -w",
"prepare": "husky install",

View File

@ -0,0 +1 @@
RELAY = <RELAY-MULTIADDR>

View File

@ -0,0 +1,4 @@
require:
- 'ts-node/register'
- 'dotenv/config'
timeout: '300000'

View File

@ -19,11 +19,13 @@
"bugs": "",
"keywords": [],
"scripts": {
"build": "tsc",
"build": "yarn clean && tsc",
"clean": "rm -rf ./dist",
"lint": "eslint .",
"dev": "node dist/index.js",
"create-peer": "node dist/cli/create-peer.js",
"relay-node": "DEBUG='laconic:*' node dist/cli/relay.js"
"relay-node": "DEBUG='laconic:*' node dist/cli/relay.js",
"test" : "mocha dist/peer.test.js"
},
"dependencies": {
"@cerc-io/libp2p": "0.42.2-laconic-0.1.1",
@ -36,11 +38,14 @@
"@libp2p/pubsub-peer-discovery": "^8.0.0",
"@multiformats/multiaddr": "^11.1.4",
"buffer": "^6.0.3",
"chai": "^4.3.4",
"debug": "^4.3.1",
"dotenv": "^16.0.3",
"it-length-prefixed": "^8.0.4",
"it-map": "^2.0.0",
"it-pipe": "^2.0.5",
"it-pushable": "^3.1.2",
"mocha": "^8.4.0",
"node-pre-gyp": "^0.13.0",
"p-event": "^5.0.1",
"uint8arrays": "^4.0.3",
@ -49,6 +54,8 @@
"yargs": "^17.0.1"
},
"devDependencies": {
"@types/chai": "^4.2.19",
"@types/mocha": "^8.2.2",
"@types/node": "16.11.7",
"@typescript-eslint/eslint-plugin": "^5.47.1",
"@typescript-eslint/parser": "^5.47.1",

View File

@ -0,0 +1,127 @@
/* eslint-disable no-unused-expressions */
import * as dotenv from 'dotenv';
import path from 'path';
import 'mocha';
import assert from 'assert';
import { expect } from 'chai';
import { pEvent } from 'p-event';
import debug from 'debug';
import { Connection } from '@libp2p/interface-connection';
import { multiaddr } from '@multiformats/multiaddr';
import { Peer } from './peer.js';
const log = debug('laconic:test');
const PEER_CONNECTION_TIMEOUT = 15 * 1000; // 15s
// Get relay node address from the .env file
dotenv.config({ path: path.resolve('./.env') });
describe('basic p2p tests', () => {
let peers: Peer[];
const relayMultiAddr = process.env.RELAY;
it('peers get initialized', async () => {
assert(relayMultiAddr, 'Relay multiaddr not provided');
peers = [
new Peer(relayMultiAddr, true),
new Peer(relayMultiAddr, true)
];
await Promise.all(peers.map(async (peer) => {
await peer.init({});
}));
});
it('peers get connected to the primary relay node', async () => {
await Promise.all(peers.map(async (peer) => {
assert(peer.node);
// Wait for a connection to be established
await pEvent(peer.node, 'peer:connect');
const connections = peer.node.getConnections();
assert(connections, 'No peer connections found');
const expectedPeerId = multiaddr(relayMultiAddr).getPeerId()?.toString();
const connectedPeerIds = connections?.map(connection => connection.remotePeer.toString());
expect(connectedPeerIds).to.include(expectedPeerId);
}));
});
it('peers discover and get connected to each other', async () => {
const connectionPromises = peers.map(async (peer, index) => {
assert(peer.node);
const otherPeersId = peers[1 - index].node?.peerId.toString();
return new Promise<void>((resolve, reject) => {
peer.node?.addEventListener('peer:connect', async (event) => {
const connection: Connection = event.detail;
// Resolve after getting connected to the other peer
if (connection.remotePeer.toString() === otherPeersId) {
resolve();
}
});
setTimeout(() => {
reject(new Error('Peer connection timed out'));
}, PEER_CONNECTION_TIMEOUT);
});
});
await Promise.all(connectionPromises);
});
it('peers are able to communicate over a topic', async () => {
const pubSubTopic = 'dummy-topic';
const msgFromPeer1 = 'Hello from peer1';
const msgFromPeer2 = 'Hello from peer2';
let messageReceivedByPeer1 = false;
let messageReceivedByPeer2 = false;
peers[0].subscribeTopic(pubSubTopic, (peerId, data) => {
if (data === msgFromPeer2) {
messageReceivedByPeer1 = true;
}
log(`${peerId.toString()} > ${data}`);
});
peers[1].subscribeTopic(pubSubTopic, (peerId, data) => {
if (data === msgFromPeer1) {
messageReceivedByPeer2 = true;
}
log(`${peerId.toString()} > ${data}`);
});
// Wait for the connection between peers to be stabilized
// Peers upgrade to a direct connection from a relayed one if possible
await sleep(3000);
peers[0].floodMessage(pubSubTopic, msgFromPeer1);
peers[1].floodMessage(pubSubTopic, msgFromPeer2);
await sleep(2000);
expect(messageReceivedByPeer1).to.be.true;
expect(messageReceivedByPeer2).to.be.true;
});
after('cleanup', async () => {
// Doing peer.close() runs into the following error:
// Pure virtual function called!
// Aborted (core dumped)
// So just exit the process to stop the peers
process.exit(0);
});
});
function sleep (ms : number) {
return new Promise(resolve => setTimeout(resolve, ms));
}

View File

@ -2,8 +2,6 @@
// Copyright 2023 Vulcanize, Inc.
//
import { createLibp2p, Libp2p } from '@cerc-io/libp2p';
// For nodejs.
import wrtc from 'wrtc';
import assert from 'assert';
import { Buffer } from 'buffer';
@ -13,7 +11,9 @@ import map from 'it-map';
import { pushable, Pushable } from 'it-pushable';
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
import debug from 'debug';
import { createLibp2p, Libp2p } from '@cerc-io/libp2p';
import { webRTCDirect, WebRTCDirectComponents, P2P_WEBRTC_STAR_ID, WebRTCDirectNodeType, WebRTCDirectInit } from '@cerc-io/webrtc-direct';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
@ -47,6 +47,8 @@ import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { debugInfoRequestHandler, dialWithRetry, getConnectionsInfo, getPseudonymForPeerId, getSelfInfo } from './utils/index.js';
import { ConnectionType, DebugPeerInfo, DebugRequest, PeerConnectionInfo, PeerSelfInfo } from './types/debug-info.js';
const log = debug('laconic:peer');
const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged';
const ERR_DEBUG_INFO_NOT_ENABLED = 'Debug info not enabled';
@ -91,7 +93,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
const relayPeerId = this._relayNodeMultiaddr.getPeerId();
assert(relayPeerId);
console.log(`Using peer ${relayPeerId.toString()} (${getPseudonymForPeerId(relayPeerId.toString())}) as the primary relay node`);
log(`Using peer ${relayPeerId.toString()} (${getPseudonymForPeerId(relayPeerId.toString())}) as the primary relay node`);
const initOptions: WebRTCDirectInit = {
wrtc: nodejs ? wrtc : undefined, // Instantiation in nodejs
@ -168,11 +170,11 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
metrics: () => this._metrics
});
} catch (err: any) {
console.log('Could not initialize a libp2p node', err);
log('Could not initialize a libp2p node', err);
return;
}
console.log('libp2p node created', this._node);
log('libp2p node created', this._node);
this._peerHeartbeatChecker = new PeerHearbeatChecker(
this._node,
{
@ -191,28 +193,28 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
// Log updated self multiaddrs
if (peerId.equals(this._node.peerId)) {
console.log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString()));
log('Updated self multiaddrs', this._node.getMultiaddrs().map(addr => addr.toString()));
} else {
console.log('Updated peer node multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString()));
log('Updated peer node multiaddrs', multiaddrs.map((addr: Multiaddr) => addr.toString()));
}
});
// Listen for change in peer protocols
this._node.peerStore.addEventListener('change:protocols', async (evt) => {
assert(this._node);
console.log('event change:protocols', evt);
log('event change:protocols', evt);
await this._handleChangeProtocols(evt.detail);
});
// Listen for peers discovery
this._node.addEventListener('peer:discovery', (evt) => {
// console.log('event peer:discovery', evt);
// log('event peer:discovery', evt);
this._handleDiscovery(evt.detail, this._maxRelayConnections);
});
// Listen for peers connection
this._node.addEventListener('peer:connect', async (evt) => {
console.log('event peer:connect', evt);
// log('event peer:connect', evt);
await this._handleConnect(evt.detail, this._maxRelayConnections);
});
@ -220,7 +222,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
// peer:disconnect event is trigerred when all connections to a peer close
// https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64
this._node.addEventListener('peer:disconnect', (evt) => {
console.log('event peer:disconnect', evt);
// log('event peer:disconnect', evt);
this._handleDisconnect(evt.detail);
});
@ -235,7 +237,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
});
if (this._debugInfoEnabled) {
console.log('Debug info enabled');
log('Debug info enabled');
this._registerDebugInfoRequestHandler();
}
}
@ -423,7 +425,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
async _dialRelay (redialInterval = RELAY_REDIAL_INTERVAL): Promise<void> {
assert(this._node);
const relayMultiaddr = this._relayNodeMultiaddr;
console.log('Dialling primary relay node');
log('Dialling primary relay node');
const connection = await dialWithRetry(
this._node,
@ -468,11 +470,11 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
// Check relay connections limit if it's a relay peer
if (isRelayPeer && this._numRelayConnections >= maxRelayConnections) {
// console.log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`);
// log(`Ignoring discovered relay node ${peer.id.toString()} as max relay connections limit reached`);
return;
}
console.log(`Discovered peer ${peer.id.toString()} (${getPseudonymForPeerId(peer.id.toString())}) with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
log(`Discovered peer ${peer.id.toString()} (${getPseudonymForPeerId(peer.id.toString())}) with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer);
}
@ -483,7 +485,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
const remoteAddrString = connection.remoteAddr.toString();
// Log connected peer
console.log(`Connected to ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) using multiaddr ${remoteAddrString}`);
log(`Connected to ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) using multiaddr ${remoteAddrString}`);
const isRemoteARelayPeer = this.isRelayPeerMultiaddr(remoteAddrString);
@ -492,7 +494,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
// Check if relay connections limit has already been reached
if (this._numRelayConnections > maxRelayConnections && !this.isPrimaryRelay(remoteAddrString)) {
console.log(`Closing connection to relay ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) as max relay connections limit reached`);
log(`Closing connection to relay ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) as max relay connections limit reached`);
await connection.close();
return;
}
@ -507,20 +509,20 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
if (remoteConnections.length > 1) {
// Close new connection if using relayed multiaddr
if (connection.remoteAddr.protoNames().includes(P2P_CIRCUIT_ID)) {
console.log(`Closing new relayed connection with ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) in favor of existing connection`);
log(`Closing new relayed connection with ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) in favor of existing connection`);
await connection.close();
console.log('Closed');
log('Closed');
return;
}
console.log(`Closing exisiting connections with ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) in favor of new webrtc connection`);
log(`Closing exisiting connections with ${remotePeerIdString} (${getPseudonymForPeerId(remotePeerIdString)}) in favor of new webrtc connection`);
// Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection)
const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id)
.map(remoteConnection => remoteConnection.close());
await Promise.all(closeConnectionPromises);
console.log('Closed');
log('Closed');
}
// Open stream in new connection for chat protocol (if handled by remote peer)
@ -532,7 +534,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
}
}
console.log(`Current number of peers connected: ${this._node.getPeers().length}`);
log(`Current number of peers connected: ${this._node.getPeers().length}`);
// Start heartbeat check for peer
await this._peerHeartbeatChecker?.start(
@ -549,15 +551,15 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
const stream = await connection.newStream([protocol]);
this._handleStream(remotePeerId, stream);
} catch (err: any) {
console.log(`Could not create a new ${protocol} stream with ${remotePeerId.toString()} (${getPseudonymForPeerId(remotePeerId.toString())})`, err);
log(`Could not create a new ${protocol} stream with ${remotePeerId.toString()} (${getPseudonymForPeerId(remotePeerId.toString())})`, err);
}
}
async _handleDeadConnections (remotePeerId: PeerId) {
// Close existing connections of remote peer
console.log(`Closing connections for ${remotePeerId} (${getPseudonymForPeerId(remotePeerId.toString())})`);
log(`Closing connections for ${remotePeerId} (${getPseudonymForPeerId(remotePeerId.toString())})`);
await this._node?.hangUp(remotePeerId);
console.log('Closed');
log('Closed');
}
async _handleDisconnect (connection: Connection): Promise<void> {
@ -566,8 +568,8 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
const remoteAddrString = connection.remoteAddr.toString();
// Log disconnected peer
console.log(`Disconnected from ${disconnectedPeerId.toString()} (${getPseudonymForPeerId(disconnectedPeerId.toString())}) using multiaddr ${remoteAddrString}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
log(`Disconnected from ${disconnectedPeerId.toString()} (${getPseudonymForPeerId(disconnectedPeerId.toString())}) using multiaddr ${remoteAddrString}`);
log(`Current number of peers connected: ${this._node?.getPeers().length}`);
if (this.isRelayPeerMultiaddr(remoteAddrString)) {
this._numRelayConnections--;
@ -589,16 +591,16 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
const peerIdString = peer.id.toString();
try {
console.log(`Dialling peer ${peerIdString} (${getPseudonymForPeerId(peerIdString)})`);
log(`Dialling peer ${peerIdString} (${getPseudonymForPeerId(peerIdString)})`);
// When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel
await this._node.dial(peer.id);
} catch (err: any) {
console.log(`Could not dial ${peerIdString} (${getPseudonymForPeerId(peerIdString)})`, err);
log(`Could not dial ${peerIdString} (${getPseudonymForPeerId(peerIdString)})`, err);
}
}
_handleStream (peerId: PeerId, stream: P2PStream): void {
// console.log('Stream after connection', stream);
// log('Stream after connection', stream);
const messageStream = pushable<any>({ objectMode: true });
// Send message to pipe from stdin

View File

@ -46,13 +46,13 @@ export const dialWithRetry = async (node: Libp2p, multiaddr: Multiaddr, options:
assert(peerId);
try {
console.log(`Dialling node ${peerId} (${getPseudonymForPeerId(peerId.toString())}) using multiaddr ${multiaddr.toString()}`);
log(`Dialling node ${peerId} (${getPseudonymForPeerId(peerId.toString())}) using multiaddr ${multiaddr.toString()}`);
const connection = await node.dial(multiaddr);
return connection;
} catch (err) {
console.log(`Could not dial node ${multiaddr.toString()} (${getPseudonymForPeerId(peerId.toString())})`, err);
console.log(`Retrying after ${redialInterval}ms`);
log(`Could not dial node ${multiaddr.toString()} (${getPseudonymForPeerId(peerId.toString())})`, err);
log(`Retrying after ${redialInterval}ms`);
// TODO: Use wait method from util package.
// Issue using util package in react app.

View File

@ -7049,6 +7049,11 @@ dotenv@^10.0.0:
resolved "https://registry.npmjs.org/dotenv/-/dotenv-10.0.0.tgz"
integrity sha512-rlBi9d8jpv9Sf1klPjNfFAuWDjKLwTIJJ/VxtoTwIR6hnZxcEOQCZg2oIL3MWBYw5GpUDKOEnND7LXTbIpQ03Q==
dotenv@^16.0.3:
version "16.0.3"
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.0.3.tgz#115aec42bac5053db3c456db30cc243a5a836a07"
integrity sha512-7GO6HghkA5fYG9TYnNxi14/7K9f5occMlp3zXAuSxn7CKCxt9xbNWG7yF8hTCSUchlfWSe3uLmlPfigevRItzQ==
dotenv@^8.2.0:
version "8.6.0"
resolved "https://registry.npmjs.org/dotenv/-/dotenv-8.6.0.tgz"