diff --git a/packages/peer/src/constants.ts b/packages/peer/src/constants.ts index b7f995de..632b75b2 100644 --- a/packages/peer/src/constants.ts +++ b/packages/peer/src/constants.ts @@ -25,6 +25,10 @@ export const RELAY_TAG = { // Currently only checking for relay node export const CONN_CHECK_INTERVAL = 10000; // 10 seconds +// Ping timeout used to check if connection is alive +// Should be lesser than CONN_CHECK_INTERVAL +export const PING_TIMEOUT = 5000; // 5 seconds + // Delay time in ms to redial relay node on failing to connect export const RELAY_REDIAL_DELAY = 5000; // 5 sconds diff --git a/packages/peer/src/index.ts b/packages/peer/src/index.ts index f3e132ba..ecd7ae83 100644 --- a/packages/peer/src/index.ts +++ b/packages/peer/src/index.ts @@ -24,7 +24,7 @@ import { multiaddr, Multiaddr } from '@multiformats/multiaddr'; import { floodsub } from '@libp2p/floodsub'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; -import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL } from './constants.js'; +import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL, PING_TIMEOUT } from './constants.js'; export const CHAT_PROTOCOL = '/chat/1.0.0'; export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'; @@ -36,7 +36,6 @@ export class Peer { _wrtcStar: WebRTCStarTuple _relayNodeMultiaddr?: Multiaddr - _remotePeerIds: Set = new Set() _peerStreamMap: Map> = new Map() _messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] _topicHandlers: Map void>> = new Map() @@ -104,6 +103,9 @@ export class Peer { autoDial: false, maxConnections: MAX_CONNECTIONS, minConnections: MIN_CONNECTIONS + }, + ping: { + timeout: PING_TIMEOUT } }); @@ -165,8 +167,9 @@ export class Peer { this._node.pubsub.removeEventListener('message'); await this._node.unhandle(CHAT_PROTOCOL); - this._remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId)); - const hangUpPromises = [...this._remotePeerIds].map(async peerId => this._node?.hangUp(peerId)); + const remotePeerIds = this._node.getPeers(); + remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId)); + const hangUpPromises = remotePeerIds.map(async peerId => this._node?.hangUp(peerId)); await Promise.all(hangUpPromises); } @@ -176,9 +179,9 @@ export class Peer { } } - floodMessage (topic: string, msg: any): void { + async floodMessage (topic: string, msg: any): Promise { assert(this._node); - this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg))); + await this._node.pubsub.publish(topic, uint8ArrayFromString(JSON.stringify(msg))); } subscribeMessage (handler: (peerId: PeerId, message: any) => void) : () => void { @@ -240,12 +243,6 @@ export class Peer { // in connection pruning on crossing peer's maxConnections limit this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }); - // Start heartbeat check for relay node - await this._startHeartbeatChecks( - relayPeerId, - async () => await this._handleRelayDisconnect(relayPeerId) - ); - break; } catch (err) { console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err); @@ -257,21 +254,9 @@ export class Peer { } } - async _handleRelayDisconnect (relayPeerId: PeerId): Promise { - assert(this._node); - - // Close existing connection of relay node - console.log(`closing connections for ${relayPeerId}`); - await this._node.hangUp(relayPeerId); - console.log('closed'); - - // Reconnect to relay node - await this._dialRelay(); - } - _handleDiscovery (peer: PeerInfo): void { // Check connected peers as they are discovered repeatedly. - if (![...this._remotePeerIds].some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { + if (!this._node?.getPeers().some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { console.log(`Discovered peer ${peer.id.toString()} with multiaddrs`, peer.multiaddrs.map(addr => addr.toString())); this._connectPeer(peer); } @@ -279,11 +264,23 @@ export class Peer { async _handleConnect (connection: Connection): Promise { const remotePeerId = connection.remotePeer; - this._remotePeerIds.add(remotePeerId); // Log connected peer console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); + + // Start heartbeat check peer + await this._startHeartbeatChecks( + remotePeerId, + async () => this._handleDeadConnections(remotePeerId) + ); + } + + async _handleDeadConnections (remotePeerId: PeerId) { + // Close existing connections of remote peer + console.log(`Closing connections for ${remotePeerId}`); + await this._node?.hangUp(remotePeerId); + console.log('Closed'); } async _startHeartbeatChecks (peerId: PeerId, handleDisconnect: () => Promise): Promise { @@ -326,22 +323,25 @@ export class Peer { } } - _handleDisconnect (connection: Connection): void { + async _handleDisconnect (connection: Connection): Promise { assert(this._node); const disconnectedPeerId = connection.remotePeer; - const peerConnections = this._node.getConnections(disconnectedPeerId); - - if (!peerConnections.length) { - // Remove peer if no remaining connections - this._remotePeerIds = new Set([...this._remotePeerIds].filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString())); - - // Stop connection check for disconnected peer - this._stopHeartbeatChecks(disconnectedPeerId); - } // Log disconnected peer console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); + + const peerConnections = this._node.getConnections(disconnectedPeerId); + + if (!peerConnections.length) { + // Stop connection check for disconnected peer + this._stopHeartbeatChecks(disconnectedPeerId); + + if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) { + // Reconnect to relay node if disconnected + await this._dialRelay(); + } + } } _stopHeartbeatChecks (peerId: PeerId): void {