Ensure connection with peers after going offline (#297)

* Redial relay node on failure and after going offline

* Redial only relay node if not connected

* Refactor and rename methods

* Only close existing connection to relay node

* Hearbeat check only for relay node

* Refactor startHeartbeatCheck method
This commit is contained in:
Nabarun Gogoi 2023-01-19 15:01:40 +05:30 committed by GitHub
parent 9d38306fe9
commit b07e288756
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 21 deletions

View File

@ -14,6 +14,7 @@ declare global {
interface Window { interface Window {
broadcast: (message: string) => void; broadcast: (message: string) => void;
flood: (message: string) => void; flood: (message: string) => void;
peer: Peer;
} }
} }
@ -28,6 +29,9 @@ function App() {
return return
} }
// For debugging
window.peer = peer;
// Subscribe to messages from remote peers // Subscribe to messages from remote peers
const unsubscribeMessage = peer.subscribeMessage((peerId, message) => { const unsubscribeMessage = peer.subscribeMessage((peerId, message) => {
console.log(`${peerId.toString()} > ${message}`) console.log(`${peerId.toString()} > ${message}`)

View File

@ -21,6 +21,13 @@ export const RELAY_TAG = {
value: 100 value: 100
}; };
// Interval time in ms to check connection with ping for connected peer
// Currently only checking for relay node
export const CONN_CHECK_INTERVAL = 10000; // 10 seconds
// Delay time in ms to redial relay node on failing to connect
export const RELAY_REDIAL_DELAY = 5000; // 5 sconds
// Peer connection manager config constants // Peer connection manager config constants
// Number of max concurrent dials per peer // Number of max concurrent dials per peer

View File

@ -24,7 +24,7 @@ import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { floodsub } from '@libp2p/floodsub'; import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG } from './constants.js'; import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, CONN_CHECK_INTERVAL } from './constants.js';
export const CHAT_PROTOCOL = '/chat/1.0.0'; export const CHAT_PROTOCOL = '/chat/1.0.0';
export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star'; export const DEFAULT_SIGNAL_SERVER_URL = '/ip4/127.0.0.1/tcp/13579/wss/p2p-webrtc-star';
@ -36,10 +36,11 @@ export class Peer {
_wrtcStar: WebRTCStarTuple _wrtcStar: WebRTCStarTuple
_relayNodeMultiaddr?: Multiaddr _relayNodeMultiaddr?: Multiaddr
_remotePeerIds: PeerId[] = [] _remotePeerIds: Set<PeerId> = new Set()
_peerStreamMap: Map<string, Pushable<any>> = new Map() _peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] _messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map() _topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
_peerHeartbeatIntervalIdsMap: Map<string, NodeJS.Timer> = new Map();
constructor (nodejs?: boolean) { constructor (nodejs?: boolean) {
// Instantiation in nodejs. // Instantiation in nodejs.
@ -110,18 +111,7 @@ export class Peer {
// Dial to the HOP enabled relay node if available // Dial to the HOP enabled relay node if available
if (this._relayNodeMultiaddr) { if (this._relayNodeMultiaddr) {
const relayMultiaddr = this._relayNodeMultiaddr; await this._dialRelay();
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
await this._node.dial(relayMultiaddr);
// Tag the relay node with a high value to prioritize it's connection
// in connection pruning on crossing peer's maxConnections limit
const relayPeerId = this._node.getPeers().find(
peerId => peerId.toString() === relayMultiaddr.getPeerId()
);
assert(relayPeerId);
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
} }
// Listen for change in stored multiaddrs // Listen for change in stored multiaddrs
@ -144,9 +134,9 @@ export class Peer {
}); });
// Listen for peers connection // Listen for peers connection
this._node.connectionManager.addEventListener('peer:connect', (evt) => { this._node.connectionManager.addEventListener('peer:connect', async (evt) => {
console.log('event peer:connect', evt); console.log('event peer:connect', evt);
this._handleConnect(evt.detail); await this._handleConnect(evt.detail);
}); });
// Listen for peers disconnecting // Listen for peers disconnecting
@ -175,7 +165,8 @@ export class Peer {
this._node.pubsub.removeEventListener('message'); this._node.pubsub.removeEventListener('message');
await this._node.unhandle(CHAT_PROTOCOL); await this._node.unhandle(CHAT_PROTOCOL);
const hangUpPromises = this._remotePeerIds.map(async peerId => this._node?.hangUp(peerId)); this._remotePeerIds.forEach(remotePeerId => this._stopHeartbeatChecks(remotePeerId));
const hangUpPromises = [...this._remotePeerIds].map(async peerId => this._node?.hangUp(peerId));
await Promise.all(hangUpPromises); await Promise.all(hangUpPromises);
} }
@ -232,32 +223,138 @@ export class Peer {
return unsubscribe; return unsubscribe;
} }
async _dialRelay (): Promise<void> {
assert(this._relayNodeMultiaddr);
assert(this._node);
const relayMultiaddr = this._relayNodeMultiaddr;
// Keep dialling relay node until it connects
while (true) {
try {
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
const connection = await this._node.dial(relayMultiaddr);
const relayPeerId = connection.remotePeer;
// TODO: Check if tag already exists. When checking tags issue with relay node connect event
// Tag the relay node with a high value to prioritize it's connection
// in connection pruning on crossing peer's maxConnections limit
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value });
// Start heartbeat check for relay node
await this._startHeartbeatChecks(
relayPeerId,
async () => await this._handleRelayDisconnect(relayPeerId)
);
break;
} catch (err) {
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err);
// TODO: Use wait method from util package.
// Issue using util package in react app.
await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY));
}
}
}
async _handleRelayDisconnect (relayPeerId: PeerId): Promise<void> {
assert(this._node);
// Close existing connection of relay node
console.log(`closing connections for ${relayPeerId}`);
await this._node.hangUp(relayPeerId);
console.log('closed');
// Reconnect to relay node
await this._dialRelay();
}
_handleDiscovery (peer: PeerInfo): void { _handleDiscovery (peer: PeerInfo): void {
// Check connected peers as they are discovered repeatedly. // Check connected peers as they are discovered repeatedly.
if (!this._remotePeerIds.some(remotePeerId => remotePeerId.toString() === peer.id.toString())) { if (![...this._remotePeerIds].some(remotePeerId => remotePeerId.toString() === peer.id.toString())) {
console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString())); console.log('Discovered peer multiaddrs', peer.multiaddrs.map(addr => addr.toString()));
this._connectPeer(peer); this._connectPeer(peer);
} }
} }
_handleConnect (connection: Connection): void { async _handleConnect (connection: Connection): Promise<void> {
const remotePeerId = connection.remotePeer; const remotePeerId = connection.remotePeer;
this._remotePeerIds.push(remotePeerId); this._remotePeerIds.add(remotePeerId);
// Log connected peer // Log connected peer
console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Connected to ${remotePeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
} }
async _startHeartbeatChecks (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
if (this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
// Do not start connection check interval if already present
return;
}
const intervalId = setInterval(async () => {
await this._validatePing(
peerId,
async () => {
// Check if connection check interval for peer is already cleared
if (!this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
return;
}
// Clear and remove check interval for remote peer if not connected
this._stopHeartbeatChecks(peerId);
await handleDisconnect();
}
);
}, CONN_CHECK_INTERVAL);
this._peerHeartbeatIntervalIdsMap.set(peerId.toString(), intervalId);
}
async _validatePing (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
assert(this._node);
try {
// Ping remote peer
await this._node.ping(peerId);
} catch (err) {
// On error i.e. no pong
console.log(`Not connected to peer ${peerId.toString()}`);
await handleDisconnect();
}
}
_handleDisconnect (connection: Connection): void { _handleDisconnect (connection: Connection): void {
assert(this._node);
const disconnectedPeerId = connection.remotePeer; const disconnectedPeerId = connection.remotePeer;
this._remotePeerIds = this._remotePeerIds.filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString()); const peerConnections = this._node.getConnections(disconnectedPeerId);
if (!peerConnections.length) {
// Remove peer if no remaining connections
this._remotePeerIds = new Set([...this._remotePeerIds].filter(remotePeerId => remotePeerId.toString() !== disconnectedPeerId.toString()));
// Stop connection check for disconnected peer
this._stopHeartbeatChecks(disconnectedPeerId);
}
// Log disconnected peer // Log disconnected peer
console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`); console.log(`Disconnected from ${disconnectedPeerId.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
console.log(`Current number of peers connected: ${this._node?.getPeers().length}`); console.log(`Current number of peers connected: ${this._node?.getPeers().length}`);
} }
_stopHeartbeatChecks (peerId: PeerId): void {
// Clear check interval for disconnected peer
const intervalId = this._peerHeartbeatIntervalIdsMap.get(peerId.toString());
if (intervalId) {
clearInterval(intervalId);
}
this._peerHeartbeatIntervalIdsMap.delete(peerId.toString());
}
async _connectPeer (peer: PeerInfo): Promise<void> { async _connectPeer (peer: PeerInfo): Promise<void> {
assert(this._node); assert(this._node);