Add connection check for all remote peers

This commit is contained in:
nabarun 2023-01-25 14:05:52 +05:30 committed by Ashwin Phatak
parent 69e9402dc1
commit f619c71b42
2 changed files with 40 additions and 36 deletions

View File

@ -25,6 +25,10 @@ export const RELAY_TAG = {
// Currently only checking for relay node // Currently only checking for relay node
export const CONN_CHECK_INTERVAL = 10000; // 10 seconds export const CONN_CHECK_INTERVAL = 10000; // 10 seconds
// Ping timeout used to check if connection is alive
// Should be lesser than CONN_CHECK_INTERVAL
export const PING_TIMEOUT = 5000; // 5 seconds
// Delay time in ms to redial relay node on failing to connect // Delay time in ms to redial relay node on failing to connect
export const RELAY_REDIAL_DELAY = 5000; // 5 sconds export const RELAY_REDIAL_DELAY = 5000; // 5 sconds

View File

@ -24,7 +24,7 @@ import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { floodsub } from '@libp2p/floodsub'; import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL } from './constants.js'; import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL, PING_TIMEOUT } from './constants.js';
export const CHAT_PROTOCOL = '/chat/1.0.0'; export const CHAT_PROTOCOL = '/chat/1.0.0';
export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'; export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
@ -36,7 +36,6 @@ export class Peer {
_wrtcStar: WebRTCStarTuple _wrtcStar: WebRTCStarTuple
_relayNodeMultiaddr?: Multiaddr _relayNodeMultiaddr?: Multiaddr
_remotePeerIds: Set<PeerId> = new Set()
_peerStreamMap: Map<string, Pushable<any>> = new Map() _peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] _messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map() _topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
@ -104,6 +103,9 @@ export class Peer {
autoDial: false, autoDial: false,
maxConnections: MAX_CONNECTIONS, maxConnections: MAX_CONNECTIONS,
minConnections: MIN_CONNECTIONS minConnections: MIN_CONNECTIONS
},
ping: {
timeout: PING_TIMEOUT
} }
}); });
@ -165,8 +167,9 @@ export class Peer {
this._node.pubsub.removeEventListener('message'); this._node.pubsub.removeEventListener('message');
await this._node.unhandle(CHAT_PROTOCOL); await this._node.unhandle(CHAT_PROTOCOL);
this._remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId)); const remotePeerIds = this._node.getPeers();
const hangUpPromises = [...this._remotePeerIds].map(async peerId => this._node?.hangUp(peerId)); remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId));
const hangUpPromises = remotePeerIds.map(async peerId => this._node?.hangUp(peerId));
await Promise.all(hangUpPromises); await Promise.all(hangUpPromises);
} }
@ -176,9 +179,9 @@ export class Peer {
} }
} }
floodMessage (topic: string, msg: any): void { async floodMessage (topic: string, msg: any): Promise<void> {
assert(this._node); assert(this._node);
this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg))); await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg)));
} }
subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void { subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void {
@ -240,12 +243,6 @@ export class Peer {
// in connection pruning on crossing peer's maxConnections limit // in connection pruning on crossing peer's maxConnections limit
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }); this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
// Start heartbeat check for relay node
await this._startHeartbeatChecks(
relayPeerId,
async () => await this._handleRelayDisconnect(relayPeerId)
);
break; break;
} catch (err) { } catch (err) {
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err); console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err);
@ -257,21 +254,9 @@ export class Peer {
} }
} }
async _handleRelayDisconnect (relayPeerId: PeerId): Promise<void> {
assert(this._node);
// Close existing connection of relay node
console.log(`closing connections for ${relayPeerId}`);
await this._node.hangUp(relayPeerId);
console.log('closed');
// Reconnect to relay node
await this._dialRelay();
}
_handleDiscovery (peer: PeerInfo): void { _handleDiscovery (peer: PeerInfo): void {
// Check connected peers as they are discovered repeatedly. // Check connected peers as they are discovered repeatedly.
if (![...this._remotePeerIds].some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString())); console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer); this._connectPeer(peer);
} }
@ -279,11 +264,23 @@ export class Peer {
async _handleConnect (connection: Connection): Promise<void> { async _handleConnect (connection: Connection): Promise<void> {
const remotePeerId = connection.remotePeer; const remotePeerId = connection.remotePeer;
this._remotePeerIds.add(remotePeerId);
// Log connected peer // Log connected peer
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
// Start heartbeat check peer
await this._startHeartbeatChecks(
remotePeerId,
async () => this._handleDeadConnections(remotePeerId)
);
}
async _handleDeadConnections (remotePeerId: PeerId) {
// Close existing connections of remote peer
console.log(`Closing connections for ${remotePeerId}`);
await this._node?.hangUp(remotePeerId);
console.log('Closed');
} }
async _startHeartbeatChecks (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> { async _startHeartbeatChecks (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
@ -326,22 +323,25 @@ export class Peer {
} }
} }
_handleDisconnect (connection: Connection): void { async _handleDisconnect (connection: Connection): Promise<void> {
assert(this._node); assert(this._node);
const disconnectedPeerId = connection.remotePeer; const disconnectedPeerId = connection.remotePeer;
const peerConnections = this._node.getConnections(disconnectedPeerId);
if (!peerConnections.length) {
// Remove peer if no remaining connections
this._remotePeerIds = new Set([...this._remotePeerIds].filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString()));
// Stop connection check for disconnected peer
this._stopHeartbeatChecks(disconnectedPeerId);
}
// Log disconnected peer // Log disconnected peer
console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
const peerConnections = this._node.getConnections(disconnectedPeerId);
if (!peerConnections.length) {
// Stop connection check for disconnected peer
this._stopHeartbeatChecks(disconnectedPeerId);
if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) {
// Reconnect to relay node if disconnected
await this._dialRelay();
}
}
} }
_stopHeartbeatChecks (peerId: PeerId): void { _stopHeartbeatChecks (peerId: PeerId): void {