From 91c1c35da809b705e591ddb2b40256504ca88590 Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Wed, 15 Feb 2023 18:59:34 +0530 Subject: [PATCH] Maintain connection between relay nodes in federated relay network (#315) * Maintain connection between relay nodes in federated relay network * Stop heartbeat check in relay node for disconnected peers * Add maxDialRetry option in relay node --- packages/peer/src/index.ts | 64 ++++++++++--------- packages/peer/src/relay.ts | 103 +++++++++++++++++++++---------- packages/peer/src/utils/index.ts | 49 +++++++++++++++ 3 files changed, 152 insertions(+), 64 deletions(-) create mode 100644 packages/peer/src/utils/index.ts diff --git a/packages/peer/src/index.ts b/packages/peer/src/index.ts index 190f213f..f94404cd 100644 --- a/packages/peer/src/index.ts +++ b/packages/peer/src/index.ts @@ -29,11 +29,12 @@ import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT, DEFAULT_MAX_RELAY_CONNECTIONS } from './constants.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; +import { dialWithRetry } from './utils/index.js'; const P2P_CIRCUIT_ID = 'p2p-circuit'; export const CHAT_PROTOCOL = '/chat/1.0.0'; -export const ERR_PROTOCOL_SELECTION = 'protocol selection failed'; +const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged'; type PeerIdObj = { id: string @@ -168,6 +169,8 @@ export class Peer { }); // Listen for peers disconnecting + // peer:disconnect event is trigerred when all connections to a peer close + // https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64 this._node.addEventListener('peer:disconnect', (evt) => { console.log('event peer:disconnect', evt); this._handleDisconnect(evt.detail); @@ -280,28 +283,33 @@ export class Peer { async _dialRelay (): Promise { assert(this._node); const relayMultiaddr = this._relayNodeMultiaddr; + console.log('Dialling relay node'); - // Keep dialling relay node until it connects - while (true) { - try { - console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`); - const connection = await this._node.dial(relayMultiaddr); - const relayPeerId = connection.remotePeer; - - // TODO: Check if tag already exists. When checking tags issue with relay node connect event - // Tag the relay node with a high value to prioritize it's connection - // in connection pruning on crossing peer's maxConnections limit - this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }); - - break; - } catch (err) { - console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err); - - // TODO: Use wait method from util package. - // Issue using util package in react app. - await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY)); + const connection = await dialWithRetry( + this._node, + relayMultiaddr, + { + redialDelay: RELAY_REDIAL_DELAY, + maxRetry: Infinity } - } + ); + + const relayPeerId = connection.remotePeer; + + // Tag the relay node with a high value to prioritize it's connection + // in connection pruning on crossing peer's maxConnections limit + this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }).catch((err: Error) => { + // TODO: Check if tag already exists + // If awaited on the getTags / tagPeer method, relay node connect event is not triggered + // const peerTags = await this._node.peerStore.getTags(relayPeerId); + + // Ignore the error thrown on retagging a peer on reconnect + if (err.message === ERR_PEER_ALREADY_TAGGED) { + return; + } + + throw err; + }); } _isRelayPeerMultiaddr (multiaddrString: string): boolean { @@ -411,16 +419,12 @@ export class Peer { this._numRelayConnections--; } - const peerConnections = this._node.getConnections(disconnectedPeerId); + // Stop connection check for disconnected peer + this._peerHeartbeatChecker?.stop(disconnectedPeerId); - if (!peerConnections.length) { - // Stop connection check for disconnected peer - this._peerHeartbeatChecker?.stop(disconnectedPeerId); - - if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) { - // Reconnect to relay node if disconnected - await this._dialRelay(); - } + if (disconnectedPeerId.toString() === this._relayNodeMultiaddr?.getPeerId()) { + // Reconnect to relay node if disconnected + await this._dialRelay(); } } diff --git a/packages/peer/src/relay.ts b/packages/peer/src/relay.ts index 4349416c..f86edc57 100644 --- a/packages/peer/src/relay.ts +++ b/packages/peer/src/relay.ts @@ -20,23 +20,30 @@ import type { Connection } from '@libp2p/interface-connection'; import { multiaddr } from '@multiformats/multiaddr'; import type { PeerId } from '@libp2p/interface-peer-id'; -import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE } from './constants.js'; +import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE, RELAY_REDIAL_DELAY } from './constants.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; +import { dialWithRetry } from './utils/index.js'; const log = debug('laconic:relay'); +const DEFAULT_HOST = '127.0.0.1'; +const DEFAULT_PORT = 9090; +const DEFAULT_MAX_DIAL_RETRY = 5; + interface Arguments { host: string; port: number; - announce: string; - peerIdFile: string; - relayPeers: string; + announce?: string; + peerIdFile?: string; + relayPeers?: string; + maxDialRetry: number; } async function main (): Promise { const argv: Arguments = _getArgv(); + let peerId: PeerId | undefined; + let relayPeersList: string[] = []; - let peerId: any; if (argv.peerIdFile) { const peerIdFilePath = path.resolve(argv.peerIdFile); console.log(`Reading peer id from file ${peerIdFilePath}`); @@ -48,6 +55,19 @@ async function main (): Promise { console.log('Creating a new peer id'); } + if (argv.relayPeers) { + const relayPeersFilePath = path.resolve(argv.relayPeers); + + if (!fs.existsSync(relayPeersFilePath)) { + console.log(`File at given path ${relayPeersFilePath} not found, exiting`); + process.exit(); + } + + console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`); + const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8'); + relayPeersList = JSON.parse(relayPeersListObj); + } + const listenMultiaddrs = [`/ip4/${argv.host}/tcp/${argv.port}/http/p2p-webrtc-direct`]; const announceMultiaddrs = []; @@ -114,43 +134,52 @@ async function main (): Promise { }); // Listen for peers disconnecting - node.addEventListener('peer:disconnect', (evt) => { - // console.log('event peer:disconnect', evt); + // peer:disconnect event is trigerred when all connections to a peer close + // https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64 + node.addEventListener('peer:disconnect', async (evt) => { + log('event peer:disconnect', evt); + // Log disconnected peer const connection: Connection = evt.detail; - log(`Disconnected from ${connection.remotePeer.toString()} using multiaddr ${connection.remoteAddr.toString()}`); + const remoteAddr = connection.remoteAddr; + log(`Disconnected from ${connection.remotePeer.toString()} using multiaddr ${remoteAddr.toString()}`); + + // Stop connection check for disconnected peer + peerHeartbeatChecker.stop(connection.remotePeer); + + // Redial if disconnected peer is in relayPeers list + if (relayPeersList.includes(remoteAddr.toString())) { + await dialWithRetry( + node, + remoteAddr, + { + redialDelay: RELAY_REDIAL_DELAY, + maxRetry: argv.maxDialRetry + } + ).catch((error: Error) => console.log(error.message)); + } }); - if (argv.relayPeers) { - const relayPeersFilePath = path.resolve(argv.relayPeers); - - if (!fs.existsSync(relayPeersFilePath)) { - console.log(`File at given path ${relayPeersFilePath} not found, exiting`); - process.exit(); - } - - console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`); - const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8'); - const relayPeersList: string[] = JSON.parse(relayPeersListObj); - - await _dialRelayPeers(node, relayPeersList); + if (relayPeersList.length) { + console.log('Dialling relay peers'); + await _dialRelayPeers(node, relayPeersList, argv.maxDialRetry); } } -function _getArgv (): any { +function _getArgv (): Arguments { return yargs(hideBin(process.argv)).parserConfiguration({ 'parse-numbers': false }).options({ host: { type: 'string', alias: 'h', - default: '127.0.0.1', + default: DEFAULT_HOST, describe: 'Host to bind to' }, port: { type: 'number', alias: 'p', - default: '9090', + default: DEFAULT_PORT, describe: 'Port to start listening on' }, announce: { @@ -167,21 +196,27 @@ function _getArgv (): any { type: 'string', alias: 'r', describe: 'Relay peer multiaddr(s) list file path (json)' + }, + maxDialRetry: { + type: 'number', + describe: 'Maximum number of retries for dialling a relay peer', + default: DEFAULT_MAX_DIAL_RETRY } - }).argv; + // https://github.com/yargs/yargs/blob/main/docs/typescript.md?plain=1#L83 + }).parseSync(); } -async function _dialRelayPeers (node: Libp2p, relayPeersList: string[]): Promise { +async function _dialRelayPeers (node: Libp2p, relayPeersList: string[], maxDialRetry: number): Promise { relayPeersList.forEach(async (relayPeer) => { const relayMultiaddr = multiaddr(relayPeer); - const peerIdString = relayMultiaddr.getPeerId()?.toString(); - - try { - console.log(`Dialling relay node ${peerIdString} using multiaddr ${relayMultiaddr.toString()}`); - await node.dial(relayMultiaddr); - } catch (err: any) { - console.log(`Could not dial ${peerIdString}`, err); - } + await dialWithRetry( + node, + relayMultiaddr, + { + redialDelay: RELAY_REDIAL_DELAY, + maxRetry: maxDialRetry + } + ).catch((error: Error) => console.log(error.message)); }); } diff --git a/packages/peer/src/utils/index.ts b/packages/peer/src/utils/index.ts new file mode 100644 index 00000000..9ba6abc6 --- /dev/null +++ b/packages/peer/src/utils/index.ts @@ -0,0 +1,49 @@ +// +// Copyright 2023 Vulcanize, Inc. +// + +import { Libp2p } from '@cerc-io/libp2p'; +import { Multiaddr } from '@multiformats/multiaddr'; + +interface DialWithRetryOptions { + redialDelay: number + maxRetry: number +} + +const DEFAULT_DIAL_RETRY_OPTIONS: DialWithRetryOptions = { + redialDelay: 5000, // ms + maxRetry: 5 +}; + +/** + * Method to dial remote peer multiaddr with retry on failure + * Number of retries can be configured using options.maxRetry + * @param node + * @param multiaddr + * @param options + */ +export const dialWithRetry = async (node: Libp2p, multiaddr: Multiaddr, options: Partial) => { + const { redialDelay, maxRetry } = { + ...DEFAULT_DIAL_RETRY_OPTIONS, + ...options + }; + + // Keep dialling node until it connects + for (let i = 0; i < maxRetry; i++) { + try { + console.log(`Dialling node ${multiaddr.getPeerId()} using multiaddr ${multiaddr.toString()}`); + const connection = await node.dial(multiaddr); + + return connection; + } catch (err) { + console.log(`Could not dial node ${multiaddr.toString()}`, err); + console.log(`Retrying after ${redialDelay}ms`); + + // TODO: Use wait method from util package. + // Issue using util package in react app. + await new Promise(resolve => setTimeout(resolve, redialDelay)); + } + } + + throw new Error(`Stopping dial retry after ${maxRetry} attempts for multiaddr ${multiaddr.toString()}`); +};