Maintain connection between relay nodes in federated relay network (#315)

* Maintain connection between relay nodes in federated relay network

* Stop heartbeat check in relay node for disconnected peers

* Add maxDialRetry option in relay node
This commit is contained in:
Nabarun Gogoi 2023-02-15 18:59:34 +05:30 committed by GitHub
parent e47f864966
commit 91c1c35da8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 64 deletions

View File

@ -29,11 +29,12 @@ import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT, DEFAULT_MAX_RELAY_CONNECTIONS } from './constants.js'; import { MAX_CONCURRENT_DIALS_PER_PEER, MAX_CONNECTIONS, MIN_CONNECTIONS, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, RELAY_TAG, RELAY_REDIAL_DELAY, PING_TIMEOUT, DEFAULT_MAX_RELAY_CONNECTIONS } from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
const P2P_CIRCUIT_ID = 'p2p-circuit'; const P2P_CIRCUIT_ID = 'p2p-circuit';
export const CHAT_PROTOCOL = '/chat/1.0.0'; export const CHAT_PROTOCOL = '/chat/1.0.0';
export const ERR_PROTOCOL_SELECTION = 'protocol selection failed'; const ERR_PEER_ALREADY_TAGGED = 'Peer already tagged';
type PeerIdObj = { type PeerIdObj = {
id: string id: string
@ -168,6 +169,8 @@ export class Peer {
}); });
// Listen for peers disconnecting // Listen for peers disconnecting
// peer:disconnect event is trigerred when all connections to a peer close
// https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64
this._node.addEventListener('peer:disconnect', (evt) => { this._node.addEventListener('peer:disconnect', (evt) => {
console.log('event peer:disconnect', evt); console.log('event peer:disconnect', evt);
this._handleDisconnect(evt.detail); this._handleDisconnect(evt.detail);
@ -280,28 +283,33 @@ export class Peer {
async _dialRelay (): Promise<void> { async _dialRelay (): Promise<void> {
assert(this._node); assert(this._node);
const relayMultiaddr = this._relayNodeMultiaddr; const relayMultiaddr = this._relayNodeMultiaddr;
console.log('Dialling relay node');
const connection = await dialWithRetry(
this._node,
relayMultiaddr,
{
redialDelay: RELAY_REDIAL_DELAY,
maxRetry: Infinity
}
);
// Keep dialling relay node until it connects
while (true) {
try {
console.log(`Dialling relay node ${relayMultiaddr.getPeerId()} using multiaddr ${relayMultiaddr.toString()}`);
const connection = await this._node.dial(relayMultiaddr);
const relayPeerId = connection.remotePeer; const relayPeerId = connection.remotePeer;
// TODO: Check if tag already exists. When checking tags issue with relay node connect event
// Tag the relay node with a high value to prioritize it's connection // Tag the relay node with a high value to prioritize it's connection
// in connection pruning on crossing peer's maxConnections limit // in connection pruning on crossing peer's maxConnections limit
this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }); this._node.peerStore.tagPeer(relayPeerId, RELAY_TAG.tag, { value: RELAY_TAG.value }).catch((err: Error) => {
// TODO: Check if tag already exists
// If awaited on the getTags / tagPeer method, relay node connect event is not triggered
// const peerTags = await this._node.peerStore.getTags(relayPeerId);
break; // Ignore the error thrown on retagging a peer on reconnect
} catch (err) { if (err.message === ERR_PEER_ALREADY_TAGGED) {
console.log(`Could not dial relay ${relayMultiaddr.toString()}`, err); return;
}
// TODO: Use wait method from util package. throw err;
// Issue using util package in react app. });
await new Promise(resolve => setTimeout(resolve, RELAY_REDIAL_DELAY));
}
}
} }
_isRelayPeerMultiaddr (multiaddrString: string): boolean { _isRelayPeerMultiaddr (multiaddrString: string): boolean {
@ -411,9 +419,6 @@ export class Peer {
this._numRelayConnections--; this._numRelayConnections--;
} }
const peerConnections = this._node.getConnections(disconnectedPeerId);
if (!peerConnections.length) {
// Stop connection check for disconnected peer // Stop connection check for disconnected peer
this._peerHeartbeatChecker?.stop(disconnectedPeerId); this._peerHeartbeatChecker?.stop(disconnectedPeerId);
@ -422,7 +427,6 @@ export class Peer {
await this._dialRelay(); await this._dialRelay();
} }
} }
}
async _connectPeer (peer: PeerInfo): Promise<void> { async _connectPeer (peer: PeerInfo): Promise<void> {
assert(this._node); assert(this._node);

View File

@ -20,23 +20,30 @@ import type { Connection } from '@libp2p/interface-connection';
import { multiaddr } from '@multiformats/multiaddr'; import { multiaddr } from '@multiformats/multiaddr';
import type { PeerId } from '@libp2p/interface-peer-id'; import type { PeerId } from '@libp2p/interface-peer-id';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE } from './constants.js'; import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE, RELAY_REDIAL_DELAY } from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js'; import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
const log = debug('laconic:relay'); const log = debug('laconic:relay');
const DEFAULT_HOST = '127.0.0.1';
const DEFAULT_PORT = 9090;
const DEFAULT_MAX_DIAL_RETRY = 5;
interface Arguments { interface Arguments {
host: string; host: string;
port: number; port: number;
announce: string; announce?: string;
peerIdFile: string; peerIdFile?: string;
relayPeers: string; relayPeers?: string;
maxDialRetry: number;
} }
async function main (): Promise<void> { async function main (): Promise<void> {
const argv: Arguments = _getArgv(); const argv: Arguments = _getArgv();
let peerId: PeerId | undefined;
let relayPeersList: string[] = [];
let peerId: any;
if (argv.peerIdFile) { if (argv.peerIdFile) {
const peerIdFilePath = path.resolve(argv.peerIdFile); const peerIdFilePath = path.resolve(argv.peerIdFile);
console.log(`Reading peer id from file ${peerIdFilePath}`); console.log(`Reading peer id from file ${peerIdFilePath}`);
@ -48,6 +55,19 @@ async function main (): Promise<void> {
console.log('Creating a new peer id'); console.log('Creating a new peer id');
} }
if (argv.relayPeers) {
const relayPeersFilePath = path.resolve(argv.relayPeers);
if (!fs.existsSync(relayPeersFilePath)) {
console.log(`File at given path ${relayPeersFilePath} not found, exiting`);
process.exit();
}
console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`);
const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8');
relayPeersList = JSON.parse(relayPeersListObj);
}
const listenMultiaddrs = [`/ip4/${argv.host}/tcp/${argv.port}/http/p2p-webrtc-direct`]; const listenMultiaddrs = [`/ip4/${argv.host}/tcp/${argv.port}/http/p2p-webrtc-direct`];
const announceMultiaddrs = []; const announceMultiaddrs = [];
@ -114,43 +134,52 @@ async function main (): Promise<void> {
}); });
// Listen for peers disconnecting // Listen for peers disconnecting
node.addEventListener('peer:disconnect', (evt) => { // peer:disconnect event is trigerred when all connections to a peer close
// console.log('event peer:disconnect', evt); // https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64
node.addEventListener('peer:disconnect', async (evt) => {
log('event peer:disconnect', evt);
// Log disconnected peer // Log disconnected peer
const connection: Connection = evt.detail; const connection: Connection = evt.detail;
log(`Disconnected from ${connection.remotePeer.toString()} using multiaddr ${connection.remoteAddr.toString()}`); const remoteAddr = connection.remoteAddr;
log(`Disconnected from ${connection.remotePeer.toString()} using multiaddr ${remoteAddr.toString()}`);
// Stop connection check for disconnected peer
peerHeartbeatChecker.stop(connection.remotePeer);
// Redial if disconnected peer is in relayPeers list
if (relayPeersList.includes(remoteAddr.toString())) {
await dialWithRetry(
node,
remoteAddr,
{
redialDelay: RELAY_REDIAL_DELAY,
maxRetry: argv.maxDialRetry
}
).catch((error: Error) => console.log(error.message));
}
}); });
if (argv.relayPeers) { if (relayPeersList.length) {
const relayPeersFilePath = path.resolve(argv.relayPeers); console.log('Dialling relay peers');
await _dialRelayPeers(node, relayPeersList, argv.maxDialRetry);
if (!fs.existsSync(relayPeersFilePath)) {
console.log(`File at given path ${relayPeersFilePath} not found, exiting`);
process.exit();
}
console.log(`Reading relay peer multiaddr(s) from file ${relayPeersFilePath}`);
const relayPeersListObj = fs.readFileSync(relayPeersFilePath, 'utf-8');
const relayPeersList: string[] = JSON.parse(relayPeersListObj);
await _dialRelayPeers(node, relayPeersList);
} }
} }
function _getArgv (): any { function _getArgv (): Arguments {
return yargs(hideBin(process.argv)).parserConfiguration({ return yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false 'parse-numbers': false
}).options({ }).options({
host: { host: {
type: 'string', type: 'string',
alias: 'h', alias: 'h',
default: '127.0.0.1', default: DEFAULT_HOST,
describe: 'Host to bind to' describe: 'Host to bind to'
}, },
port: { port: {
type: 'number', type: 'number',
alias: 'p', alias: 'p',
default: '9090', default: DEFAULT_PORT,
describe: 'Port to start listening on' describe: 'Port to start listening on'
}, },
announce: { announce: {
@ -167,21 +196,27 @@ function _getArgv (): any {
type: 'string', type: 'string',
alias: 'r', alias: 'r',
describe: 'Relay peer multiaddr(s) list file path (json)' describe: 'Relay peer multiaddr(s) list file path (json)'
},
maxDialRetry: {
type: 'number',
describe: 'Maximum number of retries for dialling a relay peer',
default: DEFAULT_MAX_DIAL_RETRY
} }
}).argv; // https://github.com/yargs/yargs/blob/main/docs/typescript.md?plain=1#L83
}).parseSync();
} }
async function _dialRelayPeers (node: Libp2p, relayPeersList: string[]): Promise<void> { async function _dialRelayPeers (node: Libp2p, relayPeersList: string[], maxDialRetry: number): Promise<void> {
relayPeersList.forEach(async (relayPeer) => { relayPeersList.forEach(async (relayPeer) => {
const relayMultiaddr = multiaddr(relayPeer); const relayMultiaddr = multiaddr(relayPeer);
const peerIdString = relayMultiaddr.getPeerId()?.toString(); await dialWithRetry(
node,
try { relayMultiaddr,
console.log(`Dialling relay node ${peerIdString} using multiaddr ${relayMultiaddr.toString()}`); {
await node.dial(relayMultiaddr); redialDelay: RELAY_REDIAL_DELAY,
} catch (err: any) { maxRetry: maxDialRetry
console.log(`Could not dial ${peerIdString}`, err);
} }
).catch((error: Error) => console.log(error.message));
}); });
} }

View File

@ -0,0 +1,49 @@
//
// Copyright 2023 Vulcanize, Inc.
//
import { Libp2p } from '@cerc-io/libp2p';
import { Multiaddr } from '@multiformats/multiaddr';
interface DialWithRetryOptions {
redialDelay: number
maxRetry: number
}
const DEFAULT_DIAL_RETRY_OPTIONS: DialWithRetryOptions = {
redialDelay: 5000, // ms
maxRetry: 5
};
/**
* Method to dial remote peer multiaddr with retry on failure
* Number of retries can be configured using options.maxRetry
* @param node
* @param multiaddr
* @param options
*/
export const dialWithRetry = async (node: Libp2p, multiaddr: Multiaddr, options: Partial<DialWithRetryOptions>) => {
const { redialDelay, maxRetry } = {
...DEFAULT_DIAL_RETRY_OPTIONS,
...options
};
// Keep dialling node until it connects
for (let i = 0; i < maxRetry; i++) {
try {
console.log(`Dialling node ${multiaddr.getPeerId()} using multiaddr ${multiaddr.toString()}`);
const connection = await node.dial(multiaddr);
return connection;
} catch (err) {
console.log(`Could not dial node ${multiaddr.toString()}`, err);
console.log(`Retrying after ${redialDelay}ms`);
// TODO: Use wait method from util package.
// Issue using util package in react app.
await new Promise(resolve => setTimeout(resolve, redialDelay));
}
}
throw new Error(`Stopping dial retry after ${maxRetry} attempts for multiaddr ${multiaddr.toString()}`);
};