diff --git a/packages/peer/package.json b/packages/peer/package.json index 43419345..ebcc8018 100644 --- a/packages/peer/package.json +++ b/packages/peer/package.json @@ -28,6 +28,7 @@ "dependencies": { "@cerc-io/libp2p": "0.42.2-laconic-0.1.1", "@cerc-io/webrtc-direct": "^5.0.0-laconic-0.1.3", + "@cerc-io/prometheus-metrics": "1.1.4", "@chainsafe/libp2p-noise": "^11.0.0", "@libp2p/floodsub": "^6.0.0", "@libp2p/mplex": "^7.1.1", diff --git a/packages/peer/src/peer-heartbeat-checker.ts b/packages/peer/src/peer-heartbeat-checker.ts index 0b9ad1c2..70af0a9e 100644 --- a/packages/peer/src/peer-heartbeat-checker.ts +++ b/packages/peer/src/peer-heartbeat-checker.ts @@ -7,12 +7,17 @@ import type { PeerId } from '@libp2p/interface-peer-id'; import { CONN_CHECK_INTERVAL } from './constants.js'; +interface PeerData { + intervalId: NodeJS.Timer; + latencyValues: Array; +} + /** * Used for tracking heartbeat of connected remote peers */ export class PeerHearbeatChecker { _node: Libp2p; - _peerHeartbeatIntervalIdsMap: Map = new Map(); + _peerMap: Map = new Map() constructor (node: Libp2p) { this._node = node; @@ -24,29 +29,68 @@ export class PeerHearbeatChecker { * @param handleDisconnect */ async start (peerId: PeerId, handleDisconnect: () => Promise): Promise { - if (this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) { + const peerIdString = peerId.toString(); + + if (this._peerMap.has(peerIdString)) { // Do not start connection check interval if already present return; } + const handlePingDisconnect = async () => { + // Check if connection check interval for peer is already cleared + if (!this._peerMap.get(peerIdString)) { + return; + } + + // Clear and remove check interval for remote peer if not connected + this.stop(peerId); + + await handleDisconnect(); + }; + const intervalId = setInterval(async () => { await this._validatePing( peerId, - async () => { - // Check if connection check interval for peer is already cleared - if (!this._peerHeartbeatIntervalIdsMap.has(peerId.toString())) { - return; - } - - // Clear and remove check interval for remote peer if not connected - this.stop(peerId); - - await handleDisconnect(); - } + handlePingDisconnect ); }, CONN_CHECK_INTERVAL); - this._peerHeartbeatIntervalIdsMap.set(peerId.toString(), intervalId); + this._peerMap.set( + peerIdString, + { + intervalId, + latencyValues: [] + } + ); + + await this._validatePing( + peerId, + handlePingDisconnect + ); + } + + /** + * Method to stop heartbeat checks for a peer + * @param peerId + */ + stop (peerId: PeerId): void { + // Clear check interval for disconnected peer + const peerData = this._peerMap.get(peerId.toString()); + + if (peerData) { + clearInterval(peerData.intervalId); + } + + this._peerMap.delete(peerId.toString()); + } + + /** + * Get latency data for peer + */ + getLatencyData (peerId: PeerId): Array { + const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues; + + return latencyValues ?? []; } /** @@ -57,7 +101,17 @@ export class PeerHearbeatChecker { async _validatePing (peerId: PeerId, handleDisconnect: () => Promise): Promise { try { // Ping remote peer - await this._node.ping(peerId); + const latency = await this._node.ping(peerId); + + const latencyValues = this._peerMap.get(peerId.toString())?.latencyValues; + + if (latencyValues) { + const length = latencyValues.unshift(latency); + + if (length > 5) { + latencyValues.pop(); + } + } } catch (err) { // On error i.e. no pong console.log(`Not connected to peer ${peerId.toString()}`); @@ -65,19 +119,4 @@ export class PeerHearbeatChecker { await handleDisconnect(); } } - - /** - * Method to stop heartbeat checks for a peer - * @param peerId - */ - stop (peerId: PeerId): void { - // Clear check interval for disconnected peer - const intervalId = this._peerHeartbeatIntervalIdsMap.get(peerId.toString()); - - if (intervalId) { - clearInterval(intervalId); - } - - this._peerHeartbeatIntervalIdsMap.delete(peerId.toString()); - } } diff --git a/packages/peer/src/peer.ts b/packages/peer/src/peer.ts index a9b6f5c1..87c30136 100644 --- a/packages/peer/src/peer.ts +++ b/packages/peer/src/peer.ts @@ -26,6 +26,7 @@ import { createFromJSON, createEd25519PeerId } from '@libp2p/peer-id-factory'; import { multiaddr, Multiaddr } from '@multiformats/multiaddr'; import { floodsub } from '@libp2p/floodsub'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; +import { PrometheusMetrics } from '@cerc-io/prometheus-metrics'; import { MAX_CONCURRENT_DIALS_PER_PEER, @@ -63,6 +64,7 @@ export class Peer { _peerStreamMap: Map> = new Map() _messageHandlers: Array<(peerId: PeerId, message: any) => void> = [] _topicHandlers: Map void>> = new Map() + _metrics = new PrometheusMetrics() constructor (relayNodeURL: string, nodejs?: boolean) { this._relayNodeMultiaddr = multiaddr(relayNodeURL); @@ -91,6 +93,10 @@ export class Peer { return this._relayNodeMultiaddr; } + get metrics (): PrometheusMetrics { + return this._metrics; + } + async init ( peerIdObj?: PeerIdObj, maxRelayConnections = DEFAULT_MAX_RELAY_CONNECTIONS @@ -135,7 +141,8 @@ export class Peer { }, ping: { timeout: PING_TIMEOUT - } + }, + metrics: () => this._metrics }); } catch (err: any) { console.log('Could not initialize a libp2p node', err); @@ -279,6 +286,14 @@ export class Peer { return multiaddrString === this._relayNodeMultiaddr.toString(); } + getLatencyData (peerId: PeerId): Array { + if (this._peerHeartbeatChecker) { + return this._peerHeartbeatChecker.getLatencyData(peerId); + } + + return []; + } + async _handleChangeProtocols ({ peerId, protocols }: { peerId: PeerId, protocols: string[] }) { assert(this._node); @@ -369,14 +384,14 @@ export class Peer { console.log(`Connected to ${remotePeerIdString} using multiaddr ${remoteAddrString}`); if (this.isRelayPeerMultiaddr(remoteAddrString)) { + this._numRelayConnections++; + // Check if relay connections limit has already been reached - if (this._numRelayConnections >= maxRelayConnections) { + if (this._numRelayConnections > maxRelayConnections && !this.isPrimaryRelay(remoteAddrString)) { console.log(`Closing connection to relay ${remotePeerIdString} as max relay connections limit reached`); await connection.close(); return; } - - this._numRelayConnections++; } // Manage connections and streams diff --git a/yarn.lock b/yarn.lock index cf12cd71..0f128bae 100644 --- a/yarn.lock +++ b/yarn.lock @@ -373,6 +373,18 @@ wherearewe "^2.0.0" xsalsa20 "^1.1.0" +"@cerc-io/prometheus-metrics@1.1.4": + version "1.1.4" + resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fprometheus-metrics/-/1.1.4/prometheus-metrics-1.1.4.tgz#51006b0b5bf6168394390c78072a1c0bb2b02f28" + integrity sha512-Mqg7o1Wer8zKv3/0NWB1sCMmW8hyYI0Fw58d/MR62+5EDZ2yPhwMUrLZUhyqdo3qXJzxMylAPSVx8URDcthmKA== + dependencies: + "@libp2p/interface-connection" "^3.0.2" + "@libp2p/interface-metrics" "^4.0.2" + "@libp2p/logger" "^2.0.2" + it-foreach "^1.0.0" + it-stream-types "^1.0.4" + promjs "^0.4.2" + "@cerc-io/webrtc-direct@^5.0.0-laconic-0.1.3": version "5.0.0-laconic-0.1.3" resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fwebrtc-direct/-/5.0.0-laconic-0.1.3/webrtc-direct-5.0.0-laconic-0.1.3.tgz#14802ba88899c904bddc327082d96cb541523ffb" @@ -2620,7 +2632,7 @@ interface-datastore "^7.0.0" multiformats "^10.0.0" -"@libp2p/logger@^2.0.1", "@libp2p/logger@^2.0.5": +"@libp2p/logger@^2.0.1", "@libp2p/logger@^2.0.2", "@libp2p/logger@^2.0.5": version "2.0.5" resolved "https://registry.yarnpkg.com/@libp2p/logger/-/logger-2.0.5.tgz#cf0ee695ba21471fd085a7fda3e534e03946ad20" integrity sha512-WEhxsc7+gsfuTcljI4vSgW/H2f18aBaC+JiO01FcX841Wxe9szjzHdBLDh9eqygUlzoK0LEeIBfctN7ibzus5A== @@ -13505,6 +13517,11 @@ promise-to-callback@^1.0.0: is-fn "^1.0.0" set-immediate-shim "^1.0.1" +promjs@^0.4.2: + version "0.4.2" + resolved "https://registry.yarnpkg.com/promjs/-/promjs-0.4.2.tgz#9c2b4a60e00c1a0ecb69a3c1c322d1cfb47a300d" + integrity sha512-qvHcTU9xwEieFOf2Qnf5JYPKkdJU2lRbJfJvJspw6XpnoH7VPmNfnJJnOLPfN8ODJMBLRt8wEPVjxyyn0Or6RQ== + promzard@^0.3.0: version "0.3.0" resolved "https://registry.npmjs.org/promzard/-/promzard-0.3.0.tgz"