Use browser metrics package in peer (#322)

* Use browser metrics package

* Get latency from heartbeat check

* Use metrics in nodejs CLI

* Avoid disconnect from primary relay on reaching limit

* Fix relay connections count
This commit is contained in:
Nabarun Gogoi 2023-02-20 11:19:57 +05:30 committed by GitHub
parent 83ad5d80a7
commit 054600ccc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 35 deletions

View File

@ -28,6 +28,7 @@
"dependencies": {
"@cerc-io/libp2p": "0.42.2-laconic-0.1.1",
"@cerc-io/webrtc-direct": "^5.0.0-laconic-0.1.3",
"@cerc-io/prometheus-metrics": "1.1.4",
"@chainsafe/libp2p-noise": "^11.0.0",
"@libp2p/floodsub": "^6.0.0",
"@libp2p/mplex": "^7.1.1",

View File

@ -7,12 +7,17 @@ import type { PeerId } from '@libp2p/interface-peer-id';
import { CONN_CHECK_INTERVAL } from './constants.js';
interface PeerData {
intervalId: NodeJS.Timer;
latencyValues: Array<number>;
}
/**
* Used for tracking heartbeat of connected remote peers
*/
export class PeerHearbeatChecker {
_node: Libp2p;
_peerHeartbeatIntervalIdsMap: Map<string, NodeJS.Timer> = new Map();
_peerMap: Map<string, PeerData> = new Map()
constructor (node: Libp2p) {
this._node = node;
@ -24,29 +29,68 @@ export class PeerHearbeatChecker {
* @param handleDisconnect
*/
async start (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
if (this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
const peerIdString = peerId.toString();
if (this._peerMap.has(peerIdString)) {
// Do not start connection check interval if already present
return;
}
const handlePingDisconnect = async () => {
// Check if connection check interval for peer is already cleared
if (!this._peerMap.get(peerIdString)) {
return;
}
// Clear and remove check interval for remote peer if not connected
this.stop(peerId);
await handleDisconnect();
};
const intervalId = setInterval(async () => {
await this._validatePing(
peerId,
async () => {
// Check if connection check interval for peer is already cleared
if (!this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) {
return;
}
// Clear and remove check interval for remote peer if not connected
this.stop(peerId);
await handleDisconnect();
}
handlePingDisconnect
);
}, CONN_CHECK_INTERVAL);
this._peerHeartbeatIntervalIdsMap.set(peerId.toString(), intervalId);
this._peerMap.set(
peerIdString,
{
intervalId,
latencyValues: []
}
);
await this._validatePing(
peerId,
handlePingDisconnect
);
}
/**
* Method to stop heartbeat checks for a peer
* @param peerId
*/
stop (peerId: PeerId): void {
// Clear check interval for disconnected peer
const peerData = this._peerMap.get(peerId.toString());
if (peerData) {
clearInterval(peerData.intervalId);
}
this._peerMap.delete(peerId.toString());
}
/**
* Get latency data for peer
*/
getLatencyData (peerId: PeerId): Array<number> {
const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues;
return latencyValues ?? [];
}
/**
@ -57,7 +101,17 @@ export class PeerHearbeatChecker {
async _validatePing (peerId: PeerId, handleDisconnect: () => Promise<void>): Promise<void> {
try {
// Ping remote peer
await this._node.ping(peerId);
const latency = await this._node.ping(peerId);
const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues;
if (latencyValues) {
const length = latencyValues.unshift(latency);
if (length > 5) {
latencyValues.pop();
}
}
} catch (err) {
// On error i.e. no pong
console.log(`Not connected to peer ${peerId.toString()}`);
@ -65,19 +119,4 @@ export class PeerHearbeatChecker {
await handleDisconnect();
}
}
/**
* Method to stop heartbeat checks for a peer
* @param peerId
*/
stop (peerId: PeerId): void {
// Clear check interval for disconnected peer
const intervalId = this._peerHeartbeatIntervalIdsMap.get(peerId.toString());
if (intervalId) {
clearInterval(intervalId);
}
this._peerHeartbeatIntervalIdsMap.delete(peerId.toString());
}
}

View File

@ -26,6 +26,7 @@ import { createFromJSON, createEd25519PeerId } from '@libp2p/peer-id-factory';
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
import { floodsub } from '@libp2p/floodsub';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { PrometheusMetrics } from '@cerc-io/prometheus-metrics';
import {
MAX_CONCURRENT_DIALS_PER_PEER,
@ -63,6 +64,7 @@ export class Peer {
_peerStreamMap: Map<string, Pushable<any>> = new Map()
_messageHandlers: Array<(peerId: PeerId, message: any) => void> = []
_topicHandlers: Map<string, Array<(peerId: PeerId, data: any) => void>> = new Map()
_metrics = new PrometheusMetrics()
constructor (relayNodeURL: string, nodejs?: boolean) {
this._relayNodeMultiaddr = multiaddr(relayNodeURL);
@ -91,6 +93,10 @@ export class Peer {
return this._relayNodeMultiaddr;
}
get metrics (): PrometheusMetrics {
return this._metrics;
}
async init (
peerIdObj?: PeerIdObj,
maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS
@ -135,7 +141,8 @@ export class Peer {
},
ping: {
timeout: PING_TIMEOUT
}
},
metrics: () => this._metrics
});
} catch (err: any) {
console.log('Could not initialize a libp2p node', err);
@ -279,6 +286,14 @@ export class Peer {
return multiaddrString === this._relayNodeMultiaddr.toString();
}
getLatencyData (peerId: PeerId): Array<number> {
if (this._peerHeartbeatChecker) {
return this._peerHeartbeatChecker.getLatencyData(peerId);
}
return [];
}
async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) {
assert(this._node);
@ -369,14 +384,14 @@ export class Peer {
console.log(`Connected to ${remotePeerIdString} using multiaddr ${remoteAddrString}`);
if (this.isRelayPeerMultiaddr(remoteAddrString)) {
this._numRelayConnections++;
// Check if relay connections limit has already been reached
if (this._numRelayConnections >= maxRelayConnections) {
if (this._numRelayConnections > maxRelayConnections && !this.isPrimaryRelay(remoteAddrString)) {
console.log(`Closing connection to relay ${remotePeerIdString} as max relay connections limit reached`);
await connection.close();
return;
}
this._numRelayConnections++;
}
// Manage connections and streams

View File

@ -373,6 +373,18 @@
wherearewe "^2.0.0"
xsalsa20 "^1.1.0"
"@cerc-io/prometheus-metrics@1.1.4":
version "1.1.4"
resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fprometheus-metrics/-/1.1.4/prometheus-metrics-1.1.4.tgz#51006b0b5bf6168394390c78072a1c0bb2b02f28"
integrity sha512-Mqg7o1Wer8zKv3/0NWB1sCMmW8hyYI0Fw58d/MR62+5EDZ2yPhwMUrLZUhyqdo3qXJzxMylAPSVx8URDcthmKA==
dependencies:
"@libp2p/interface-connection" "^3.0.2"
"@libp2p/interface-metrics" "^4.0.2"
"@libp2p/logger" "^2.0.2"
it-foreach "^1.0.0"
it-stream-types "^1.0.4"
promjs "^0.4.2"
"@cerc-io/webrtc-direct@^5.0.0-laconic-0.1.3":
version "5.0.0-laconic-0.1.3"
resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fwebrtc-direct/-/5.0.0-laconic-0.1.3/webrtc-direct-5.0.0-laconic-0.1.3.tgz#14802ba88899c904bddc327082d96cb541523ffb"
@ -2620,7 +2632,7 @@
interface-datastore "^7.0.0"
multiformats "^10.0.0"
"@libp2p/logger@^2.0.1", "@libp2p/logger@^2.0.5":
"@libp2p/logger@^2.0.1", "@libp2p/logger@^2.0.2", "@libp2p/logger@^2.0.5":
version "2.0.5"
resolved "https://registry.yarnpkg.com/@libp2p/logger/-/logger-2.0.5.tgz#cf0ee695ba21471fd085a7fda3e534e03946ad20"
integrity sha512-WEhxsc7+gsfuTcljI4vSgW/H2f18aBaC+JiO01FcX841Wxe9szjzHdBLDh9eqygUlzoK0LEeIBfctN7ibzus5A==
@ -13505,6 +13517,11 @@ promise-to-callback@^1.0.0:
is-fn "^1.0.0"
set-immediate-shim "^1.0.1"
promjs@^0.4.2:
version "0.4.2"
resolved "https://registry.yarnpkg.com/promjs/-/promjs-0.4.2.tgz#9c2b4a60e00c1a0ecb69a3c1c322d1cfb47a300d"
integrity sha512-qvHcTU9xwEieFOf2Qnf5JYPKkdJU2lRbJfJvJspw6XpnoH7VPmNfnJJnOLPfN8ODJMBLRt8wEPVjxyyn0Or6RQ==
promzard@^0.3.0:
version "0.3.0"
resolved "https://registry.npmjs.org/promzard/-/promzard-0.3.0.tgz"