Improve connection check with peers by retrying ping on failures (#330)

* Improve connection check with peers by retrying ping on failures

* Use DEFAULT_PING_INTERVAL in cli

* Log only error message

* Add config option to pass peer id file path

* Finish retrying pings before connection check interval duration

* Handle duplicate connections to relay nodes

* Increase default max dial retries from one relay to another

* Update connection manager config for relay nodes

* Use debug for logs in relay node

---------

Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
This commit is contained in:
Nabarun Gogoi 2023-02-27 12:47:49 +05:30 committed by GitHub
parent 888199b717
commit 90d60f54a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 143 additions and 53 deletions

View File

@ -154,7 +154,8 @@ export class ServerCmd {
RELAY_DEFAULT_PORT,
RELAY_DEFAULT_MAX_DIAL_RETRY,
RELAY_REDIAL_INTERVAL,
PING_INTERVAL
DEFAULT_PING_INTERVAL,
DIAL_TIMEOUT
} = await import('@cerc-io/peer');
// Run the relay node if enabled
@ -172,7 +173,8 @@ export class ServerCmd {
port: relayConfig.port ?? RELAY_DEFAULT_PORT,
announceDomain: relayConfig.announce,
relayPeers: relayConfig.relayPeers ?? [],
pingInterval: relayConfig.pingInterval ?? PING_INTERVAL,
dialTimeout: relayConfig.dialTimeout ?? DIAL_TIMEOUT,
pingInterval: relayConfig.pingInterval ?? DEFAULT_PING_INTERVAL,
redialInterval: relayConfig.redialInterval ?? RELAY_REDIAL_INTERVAL,
maxDialRetry: relayConfig.maxDialRetry ?? RELAY_DEFAULT_MAX_DIAL_RETRY,
peerIdObj
@ -185,6 +187,11 @@ export class ServerCmd {
const peerConfig = p2pConfig.peer;
assert(peerConfig, 'Peer config not set');
let peerIdObj: PeerIdObj | undefined;
if (peerConfig.peerIdFile) {
peerIdObj = readPeerId(peerConfig.peerIdFile);
}
const peer = new Peer(peerConfig.relayMultiaddr, true);
const peerNodeInit: PeerInitConfig = {
@ -195,7 +202,7 @@ export class ServerCmd {
maxConnections: peerConfig.maxConnections,
dialTimeout: peerConfig.dialTimeout
};
await peer.init(peerNodeInit);
await peer.init(peerNodeInit, peerIdObj);
peer.subscribeTopic(peerConfig.pubSubTopic, (peerId, data) => {
if (parseLibp2pMessage) {

View File

@ -23,12 +23,12 @@
"lint": "eslint .",
"dev": "node dist/index.js",
"create-peer": "node dist/cli/create-peer.js",
"relay-node": "node dist/cli/relay.js"
"relay-node": "DEBUG='laconic:*' node dist/cli/relay.js"
},
"dependencies": {
"@cerc-io/libp2p": "0.42.2-laconic-0.1.1",
"@cerc-io/prometheus-metrics": "1.1.4",
"@cerc-io/webrtc-direct": "^5.0.0-laconic-0.1.3",
"@cerc-io/webrtc-direct": "^5.0.0-laconic-0.1.4",
"@chainsafe/libp2p-noise": "^11.0.0",
"@libp2p/floodsub": "^6.0.0",
"@libp2p/mplex": "^7.1.1",

View File

@ -5,7 +5,14 @@ import path from 'path';
import { RelayNodeInitConfig, createRelayNode } from '../relay.js';
import { PeerIdObj } from '../peer.js';
import { RELAY_DEFAULT_HOST, RELAY_DEFAULT_PORT, RELAY_DEFAULT_MAX_DIAL_RETRY, RELAY_REDIAL_INTERVAL, PING_INTERVAL } from '../constants.js';
import {
RELAY_DEFAULT_HOST,
RELAY_DEFAULT_PORT,
RELAY_DEFAULT_MAX_DIAL_RETRY,
RELAY_REDIAL_INTERVAL,
DEFAULT_PING_INTERVAL,
DIAL_TIMEOUT
} from '../constants.js';
interface Arguments {
host: string;
@ -13,6 +20,7 @@ interface Arguments {
announce?: string;
peerIdFile?: string;
relayPeers?: string;
dialTimeout: number;
pingInterval: number;
redialInterval: number;
maxDialRetry: number;
@ -52,6 +60,7 @@ async function main (): Promise<void> {
peerIdObj,
announceDomain: argv.announce,
relayPeers: relayPeersList,
dialTimeout: argv.dialTimeout,
pingInterval: argv.pingInterval,
redialInterval: argv.redialInterval,
maxDialRetry: argv.maxDialRetry
@ -93,7 +102,12 @@ function _getArgv (): Arguments {
pingInterval: {
type: 'number',
describe: 'Interval to check relay peer connections using ping (ms)',
default: PING_INTERVAL
default: DEFAULT_PING_INTERVAL
},
dialTimeout: {
type: 'number',
describe: 'Timeout for dial to relay peers (ms)',
default: DIAL_TIMEOUT
},
redialInterval: {
type: 'number',

View File

@ -22,11 +22,11 @@ export const RELAY_TAG = {
};
// Interval in ms to check peer connections using ping
export const PING_INTERVAL = 10000; // 10 seconds
export const DEFAULT_PING_INTERVAL = 10000; // 10 seconds
// Ping timeout used to check if connection is alive
// Should be less than PING_INTERVAL
export const PING_TIMEOUT = 5000; // 5 seconds
// Should be less than DEFAULT_PING_INTERVAL
export const DEFAULT_PING_TIMEOUT = 3000; // 3 seconds
// Redial interval (in ms) to relay node on connection failure
export const RELAY_REDIAL_INTERVAL = 5000; // 5 seconds
@ -64,4 +64,4 @@ export const RELAY_DEFAULT_HOST = '127.0.0.1';
export const RELAY_DEFAULT_PORT = 9090;
// Default max number of dial retries to a relay peer
export const RELAY_DEFAULT_MAX_DIAL_RETRY = 5;
export const RELAY_DEFAULT_MAX_DIAL_RETRY = 10;

View File

@ -10,5 +10,6 @@ export {
RELAY_DEFAULT_PORT,
RELAY_REDIAL_INTERVAL,
RELAY_DEFAULT_MAX_DIAL_RETRY,
PING_INTERVAL
DEFAULT_PING_INTERVAL,
DIAL_TIMEOUT
} from './constants.js';

View File

@ -4,25 +4,35 @@
import { Libp2p } from '@cerc-io/libp2p';
import type { PeerId } from '@libp2p/interface-peer-id';
import debug from 'debug';
import { PING_INTERVAL } from './constants.js';
import { DEFAULT_PING_INTERVAL, DEFAULT_PING_TIMEOUT } from './constants.js';
const log = debug('laconic:peer-heartbeat-checker');
interface PeerData {
intervalId: NodeJS.Timer;
latencyValues: Array<number>;
}
interface PeerHearbeatCheckerOptions {
pingInterval: number;
pingTimeout: number;
}
/**
* Used for tracking heartbeat of connected remote peers
*/
export class PeerHearbeatChecker {
_node: Libp2p;
_pingInterval: number;
_pingTimeout: number;
_peerMap: Map<string, PeerData> = new Map()
constructor (node: Libp2p, pingInterval = PING_INTERVAL) {
constructor (node: Libp2p, options: Partial<PeerHearbeatCheckerOptions> = {}) {
this._node = node;
this._pingInterval = pingInterval;
this._pingInterval = options.pingInterval ?? DEFAULT_PING_INTERVAL;
this._pingTimeout = options.pingTimeout ?? DEFAULT_PING_TIMEOUT;
}
/**
@ -101,24 +111,45 @@ export class PeerHearbeatChecker {
* @param handleDisconnect
*/
async _validatePing (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
try {
// Ping remote peer
const latency = await this._node.ping(peerId);
// Number of retries depends on the ping interval and ping timeout
const pingRetriesOnFail = Math.floor(this._pingInterval / this._pingTimeout);
let pingSuccess = false;
const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues;
// Loop to retry ping on failure and confirm that there is no connection
// Loop breaks on a successful ping pong
for (let i = 0; !pingSuccess && (i < pingRetriesOnFail); i++) {
const retryDelayPromise = new Promise(resolve => setTimeout(resolve, this._pingTimeout));
if (latencyValues) {
const length = latencyValues.unshift(latency);
try {
// Ping remote peer
const latency = await this._node.ping(peerId);
pingSuccess = true;
if (length > 5) {
latencyValues.pop();
const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues;
if (latencyValues) {
// Update latency values with latest
const length = latencyValues.unshift(latency);
if (length > 5) {
// Pop oldest latency value from list
latencyValues.pop();
}
}
}
} catch (err) {
// On error i.e. no pong
console.log(`Not connected to peer ${peerId.toString()}`);
} catch (err: any) {
// On error i.e. no pong
log(err?.message);
await handleDisconnect();
// Retry after a delay of pingTimeout in case ping fails immediately
await retryDelayPromise;
}
}
if (pingSuccess) {
return;
}
console.log(`Not connected to peer ${peerId.toString()}`);
await handleDisconnect();
}
}

View File

@ -38,7 +38,7 @@ import {
RELAY_TAG,
RELAY_REDIAL_INTERVAL,
DEFAULT_MAX_RELAY_CONNECTIONS,
PING_TIMEOUT
DEFAULT_PING_TIMEOUT
} from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
@ -72,7 +72,6 @@ export class Peer {
_relayNodeMultiaddr: Multiaddr
_numRelayConnections = 0
_pingInterval?: number
_relayRedialInterval?: number
_maxRelayConnections?: number
@ -87,6 +86,8 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
const relayPeerId = this._relayNodeMultiaddr.getPeerId();
assert(relayPeerId);
console.log(`Using peer ${relayPeerId.toString()} as the primary relay node`);
const initOptions: WebRTCDirectInit = {
wrtc: nodejs ? wrtc : undefined, // Instantiation in nodejs
enableSignalling: true,
@ -113,9 +114,9 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
}
async init (initOptions: PeerInitConfig, peerIdObj?: PeerIdObj): Promise<void> {
this._pingInterval = initOptions.pingInterval;
this._relayRedialInterval = initOptions.relayRedialInterval;
this._maxRelayConnections = initOptions.maxRelayConnections;
const pingTimeout = initOptions.pingTimeout ?? DEFAULT_PING_TIMEOUT;
try {
let peerId: PeerId | undefined;
@ -156,7 +157,7 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
keepMultipleConnections: true // Set true to get connections with multiple multiaddr
},
ping: {
timeout: initOptions.pingTimeout ?? PING_TIMEOUT
timeout: pingTimeout
},
metrics: () => this._metrics
});
@ -166,7 +167,13 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
}
console.log('libp2p node created', this._node);
this._peerHeartbeatChecker = new PeerHearbeatChecker(this._node, this._pingInterval);
this._peerHeartbeatChecker = new PeerHearbeatChecker(
this._node,
{
pingInterval: initOptions.pingInterval,
pingTimeout
}
);
// Dial to the HOP enabled primary relay node
await this._dialRelay(this._relayRedialInterval);
@ -399,7 +406,9 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
// Log connected peer
console.log(`Connected to ${remotePeerIdString} using multiaddr ${remoteAddrString}`);
if (this.isRelayPeerMultiaddr(remoteAddrString)) {
const isRemoteARelayPeer = this.isRelayPeerMultiaddr(remoteAddrString);
if (isRemoteARelayPeer) {
this._numRelayConnections++;
// Check if relay connections limit has already been reached
@ -411,22 +420,22 @@ _peerStreamMap: Map<string, Pushable<any>> = new Map()
}
// Manage connections and streams
// Check if peer id is smaller to break symmetry
if (this._node.peerId.toString() < remotePeerIdString) {
// Check if peer id is smaller to break symmetry in case of peer nodes
if (isRemoteARelayPeer || this._node.peerId.toString() < remotePeerIdString) {
const remoteConnections = this._node.getConnections(remotePeerId);
// Keep only one connection with a peer
if (remoteConnections.length > 1) {
// Close new connection if using relayed multiaddr
if (connection.remoteAddr.protoNames().includes(P2P_CIRCUIT_ID)) {
console.log('Closing new relayed connection in favor of existing connection');
console.log(`Closing new relayed connection with ${remotePeerIdString} in favor of existing connection`);
await connection.close();
console.log('Closed');
return;
}
console.log('Closing exisiting connections in favor of new webrtc connection');
console.log(`Closing exisiting connections with ${remotePeerIdString} in favor of new webrtc connection`);
// Close existing connections if new connection is not using relayed multiaddr (so it is a webrtc connection)
const closeConnectionPromises = remoteConnections.filter(remoteConnection => remoteConnection.id !== connection.id)
.map(remoteConnection => remoteConnection.close());

View File

@ -16,7 +16,14 @@ import { multiaddr } from '@multiformats/multiaddr';
import type { PeerId } from '@libp2p/interface-peer-id';
import { createFromJSON } from '@libp2p/peer-id-factory';
import { HOP_TIMEOUT, PUBSUB_DISCOVERY_INTERVAL, PUBSUB_SIGNATURE_POLICY, WEBRTC_PORT_RANGE } from './constants.js';
import {
HOP_TIMEOUT,
DEFAULT_PING_TIMEOUT,
PUBSUB_DISCOVERY_INTERVAL,
PUBSUB_SIGNATURE_POLICY,
WEBRTC_PORT_RANGE,
MAX_CONCURRENT_DIALS_PER_PEER
} from './constants.js';
import { PeerHearbeatChecker } from './peer-heartbeat-checker.js';
import { dialWithRetry } from './utils/index.js';
import { PeerIdObj } from './peer.js';
@ -29,7 +36,9 @@ export interface RelayNodeInitConfig {
peerIdObj?: PeerIdObj;
announceDomain?: string;
relayPeers: string[];
dialTimeout: number;
pingInterval: number;
pingTimeout?: number;
redialInterval: number;
maxDialRetry: number;
}
@ -47,6 +56,8 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
peerId = await createFromJSON(init.peerIdObj);
}
const pingTimeout = init.pingTimeout ?? DEFAULT_PING_TIMEOUT;
const node = await createLibp2p({
peerId,
addresses: {
@ -81,19 +92,30 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
}
},
connectionManager: {
autoDial: false
maxDialsPerPeer: MAX_CONCURRENT_DIALS_PER_PEER,
autoDial: false,
dialTimeout: init.dialTimeout
},
ping: {
timeout: pingTimeout
}
});
const peerHeartbeatChecker = new PeerHearbeatChecker(node, init.pingInterval);
const peerHeartbeatChecker = new PeerHearbeatChecker(
node,
{
pingInterval: init.pingInterval,
pingTimeout
}
);
console.log(`Relay node started with id ${node.peerId.toString()}`);
console.log('Listening on:');
node.getMultiaddrs().forEach((ma) => console.log(ma.toString()));
log(`Relay node started with id ${node.peerId.toString()}`);
log('Listening on:');
node.getMultiaddrs().forEach((ma) => log(ma.toString()));
// Listen for peers connection
node.addEventListener('peer:connect', async (evt) => {
// console.log('event peer:connect', evt);
// log('event peer:connect', evt);
// Log connected peer
const connection: Connection = evt.detail;
log(`Connected to ${connection.remotePeer.toString()} using multiaddr ${connection.remoteAddr.toString()}`);
@ -109,7 +131,7 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
// peer:disconnect event is trigerred when all connections to a peer close
// https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-libp2p/src/index.ts#L64
node.addEventListener('peer:disconnect', async (evt) => {
log('event peer:disconnect', evt);
// log('event peer:disconnect', evt);
// Log disconnected peer
const connection: Connection = evt.detail;
@ -128,12 +150,12 @@ export async function createRelayNode (init: RelayNodeInitConfig): Promise<Libp2
redialInterval: init.redialInterval,
maxRetry: init.maxDialRetry
}
).catch((error: Error) => console.log(error.message));
).catch((error: Error) => log(error.message));
}
});
if (init.relayPeers.length) {
console.log('Dialling relay peers');
log('Dialling relay peers');
await _dialRelayPeers(node, init.relayPeers, init.maxDialRetry, init.redialInterval);
}
@ -150,7 +172,7 @@ async function _dialRelayPeers (node: Libp2p, relayPeersList: string[], maxDialR
redialInterval,
maxRetry: maxDialRetry
}
).catch((error: Error) => console.log(error.message));
).catch((error: Error) => log(error.message));
});
}

View File

@ -53,6 +53,9 @@ export interface RelayConfig {
// Relay peer multiaddr(s) list
relayPeers?: string[];
// Timeout (ms) for dial to relay peers
dialTimeout?: number;
// Interval in ms to check relay peer connections using ping
pingInterval?: number;
@ -88,6 +91,9 @@ export interface PeerConfig {
// Timeout (ms) for dial to peers
dialTimeout?: number;
// Peer id file path (json)
peerIdFile?: string;
}
// P2P config

View File

@ -385,10 +385,10 @@
it-stream-types "^1.0.4"
promjs "^0.4.2"
"@cerc-io/webrtc-direct@^5.0.0-laconic-0.1.3":
version "5.0.0-laconic-0.1.3"
resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fwebrtc-direct/-/5.0.0-laconic-0.1.3/webrtc-direct-5.0.0-laconic-0.1.3.tgz#14802ba88899c904bddc327082d96cb541523ffb"
integrity sha512-HiRn2eoXbOFM2Dklecr+H76BB0H1H/k4I59Hnjj7tppdlr6wwts9MuA/SX0draXYEiRA5Ft4vXrfo469fGS68A==
"@cerc-io/webrtc-direct@^5.0.0-laconic-0.1.4":
version "5.0.0-laconic-0.1.4"
resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fwebrtc-direct/-/5.0.0-laconic-0.1.4/webrtc-direct-5.0.0-laconic-0.1.4.tgz#5cd7ab544c58a7785f126959f02b30869b643b90"
integrity sha512-5HTnSBc7WoH4Y9SZskUxVAqXvTvgJe6+i8YCESwk2+kimDXo9W8y6C+bZm7B2apK/G9UXkWMMHGrx3w6bgRBEg==
dependencies:
"@cerc-io/webrtc-peer" "^2.0.2-laconic-0.1.4"
"@libp2p/interface-transport" "^2.0.0"