Prioritize direct webrtc connection between peers (#306)

* Prioritize direct webrtc connection between peers

* Close event listener and add comment
This commit is contained in:
Nabarun Gogoi 2023-02-01 10:48:34 +05:30 committed by Ashwin Phatak
parent a5658c6344
commit cc445d5caf

View File

@ -102,7 +102,8 @@ export class Peer {
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER, // Number of max concurrent dials per peer
autoDial: false,
maxConnections: MAX_CONNECTIONS,
minConnections: MIN_CONNECTIONS
minConnections: MIN_CONNECTIONS,
keepMultipleConnections: true // Set true to get connections with multiple multiaddr
},
ping: {
timeout: PING_TIMEOUT
@ -129,6 +130,13 @@ export class Peer {
}
});
// Listen for change in peer protocols
this._node.peerStore.addEventListener('change:protocols', async (evt) => {
assert(this._node);
console.log('event change:protocols', evt);
await this._handleChangeProtocols(evt.detail);
});
// Listen for peers discovery
this._node.addEventListener('peer:discovery', (evt) => {
// console.log('event peer:discovery', evt);
@ -164,6 +172,8 @@ export class Peer {
this._node.removeEventListener('peer:discovery');
this._node.removeEventListener('peer:connect');
this._node.removeEventListener('peer:disconnect');
this._node.peerStore.removeEventListener('change:multiaddrs');
this._node.peerStore.removeEventListener('change:protocols');
this._node.pubsub.removeEventListener('message');
await this._node.unhandle(CHAT_PROTOCOL);
@ -226,6 +236,28 @@ export class Peer {
return unsubscribe;
}
async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) {
assert(this._node);
// Handle protocol and open stream from only one peer
if (this._node.peerId.toString() > peerId.toString()) {
return;
}
// Return if stream is self peer or chat protocol is not handled by remote peer
if (peerId.equals(this._node.peerId) || !protocols.includes(CHAT_PROTOCOL)) {
return;
}
const [connection] = this._node.getConnections(peerId);
// Open stream if connection exists and it doesn't already have a stream with chat protocol
if (connection && !connection.streams.some(stream => stream.stat.protocol === CHAT_PROTOCOL)) {
const stream = await connection.newStream([CHAT_PROTOCOL]);
this._handleStream(peerId, stream);
}
}
async _dialRelay (): Promise<void> {
assert(this._relayNodeMultiaddr);
assert(this._node);
@ -265,22 +297,43 @@ export class Peer {
async _handleConnect (connection: Connection): Promise<void> {
assert(this._node);
const remotePeerId = connection.remotePeer;
const remoteConnections = this._node.getConnections(remotePeerId);
// Log connected peer
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
// Keep only one connection with a peer.
if (remoteConnections.length > 1) {
// Close new connection from peer having the smaller peer id.
if (this._node.peerId.toString() < remotePeerId.toString()) {
console.log('Closing new connection for already connected peer');
// Close new connection as protocol stream is opened in the first connection that is established.
await connection.close();
// Manage connections and stream if peer id is smaller to break symmetry
if (this._node.peerId.toString() < remotePeerId.toString()) {
const remoteConnections = this._node.getConnections(remotePeerId);
// Keep only one connection with a peer
if (remoteConnections.length > 1) {
// Close new connection if using relayed multiaddr
if (connection.remoteAddr.protoNames().includes('p2p-circuit')) {
console.log('Closing new connection for already connected peer');
await connection.close();
console.log('Closed');
return;
}
console.log('Closing exisiting connections for new webrtc connection');
// Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection)
const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id)
.map(remoteConnection => remoteConnection.close());
await Promise.all(closeConnectionPromises);
console.log('Closed');
}
return;
// Open stream in new connection for chat protocol
const protocols = await this._node.peerStore.protoBook.get(remotePeerId);
// Dial if protocol is handled by remote peer
// The chat protocol may not be updated in the list and will be handled later on change:protocols event
if (protocols.includes(CHAT_PROTOCOL)) {
const stream = await connection.newStream([CHAT_PROTOCOL]);
this._handleStream(remotePeerId, stream);
}
}
console.log(`Current number of peers connected: ${this._node.getPeers().length}`);
@ -376,20 +429,9 @@ export class Peer {
// Dial them when we discover them
const peerIdString = peer.id.toString();
try {
console.log(`Dialling peer ${peerIdString}`);
// When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel
const stream = await this._node.dialProtocol(peer.id, CHAT_PROTOCOL);
this._handleStream(peer.id, stream);
} catch (err: any) {
// Check if protocol negotiation failed (dial still succeeds)
// (happens in case of dialProtocol to relay nodes since they don't handle CHAT_PROTOCOL)
if ((err as Error).message === ERR_PROTOCOL_SELECTION) {
console.log(`Protocol selection failed with peer ${peerIdString}`);
} else {
console.log(`Could not dial ${peerIdString}`, err);
}
}
console.log(`Dialling peer ${peerIdString}`);
// When dialling with peer id, all multiaddr(s) (direct/relayed) of the discovered peer are dialled in parallel
await this._node.dial(peer.id);
}
_handleStream (peerId: PeerId, stream: P2PStream): void {