From a89ff100af810d2224f15a24bccf40ca43407df0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Wed, 28 Feb 2024 03:52:55 +0000 Subject: [PATCH] improve libp2p connected peer metrics (#5314) * patch rust-yamux dep * improve libp2p connected peer metrics --- Cargo.lock | 3 +- Cargo.toml | 3 ++ beacon_node/http_api/src/lib.rs | 10 +--- .../lighthouse_network/src/discovery/mod.rs | 5 +- beacon_node/lighthouse_network/src/metrics.rs | 54 +++++-------------- .../src/peer_manager/mod.rs | 43 +-------------- .../src/peer_manager/network_behaviour.rs | 34 +++++++----- common/system_health/src/lib.rs | 25 +++++++-- 8 files changed, 64 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 39d505ffc..8923f12be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9971,8 +9971,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1d0148b89300047e72994bee99ecdabd15a9166a7b70c8b8c37c314dcc9002" +source = "git+https://github.com/sigp/rust-yamux.git#6689e227a48258a52347cd1d984adfc94afc6f7a" dependencies = [ "futures", "instant", diff --git a/Cargo.toml b/Cargo.toml index 0fc707038..549adaf5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -231,6 +231,9 @@ validator_client = { path = "validator_client" } validator_dir = { path = "common/validator_dir" } warp_utils = { path = "common/warp_utils" } +[patch.crates-io] +yamux = { git = "https://github.com/sigp/rust-yamux.git" } + [profile.maxperf] inherits = "release" lto = "fat" diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a9b245e79..b39450d73 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -68,7 +68,7 @@ use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use sysinfo::{System, SystemExt}; -use system_health::observe_system_health_bn; +use system_health::{observe_nat, observe_system_health_bn}; use task_spawner::{Priority, TaskSpawner}; use tokio::sync::{ mpsc::{Sender, UnboundedSender}, @@ -3965,13 +3965,7 @@ pub fn serve( .and(warp::path::end()) .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P1, move || { - Ok(api_types::GenericResponse::from( - lighthouse_network::metrics::NAT_OPEN - .as_ref() - .map(|v| v.get()) - .unwrap_or(0) - != 0, - )) + Ok(api_types::GenericResponse::from(observe_nat())) }) }); diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 6659ba1d2..347c2352f 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -1004,7 +1004,10 @@ impl NetworkBehaviour for Discovery { discv5::Event::SocketUpdated(socket_addr) => { info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port()); metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT); - metrics::check_nat(); + // We have SOCKET_UPDATED messages. This occurs when discovery has a majority of + // users reporting an external port and our ENR gets updated. + // Which means we are able to do NAT traversal. + metrics::set_gauge_vec(&metrics::NAT_OPEN, &["discv5"], 1); // Discv5 will have updated our local ENR. We save the updated version // to disk. diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index ae02b689d..fc441f253 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -1,9 +1,10 @@ pub use lighthouse_metrics::*; lazy_static! { - pub static ref NAT_OPEN: Result = try_create_int_counter( + pub static ref NAT_OPEN: Result = try_create_int_gauge_vec( "nat_open", - "An estimate indicating if the local node is exposed to the internet." + "An estimate indicating if the local node is reachable from external nodes", + &["protocol"] ); pub static ref ADDRESS_UPDATE_COUNT: Result = try_create_int_counter( "libp2p_address_update_total", @@ -14,6 +15,9 @@ lazy_static! { "Count of libp2p peers currently connected" ); + pub static ref PEERS_CONNECTED_MULTI: Result = + try_create_int_gauge_vec("libp2p_peers_multi", "Count of libp2p peers currently connected", &["direction", "transport"]); + pub static ref TCP_PEERS_CONNECTED: Result = try_create_int_gauge( "libp2p_tcp_peers", "Count of libp2p peers currently connected via TCP" @@ -32,13 +36,10 @@ lazy_static! { "libp2p_peer_disconnect_event_total", "Count of libp2p peer disconnect events" ); - pub static ref DISCOVERY_SENT_BYTES: Result = try_create_int_gauge( - "discovery_sent_bytes", - "The number of bytes sent in discovery" - ); - pub static ref DISCOVERY_RECV_BYTES: Result = try_create_int_gauge( - "discovery_recv_bytes", - "The number of bytes received in discovery" + pub static ref DISCOVERY_BYTES: Result = try_create_int_gauge_vec( + "discovery_bytes", + "The number of bytes sent and received in discovery", + &["direction"] ); pub static ref DISCOVERY_QUEUE: Result = try_create_int_gauge( "discovery_queue_size", @@ -135,17 +136,6 @@ lazy_static! { &["type"] ); - /* - * Inbound/Outbound peers - */ - /// The number of peers that dialed us. - pub static ref NETWORK_INBOUND_PEERS: Result = - try_create_int_gauge("network_inbound_peers","The number of peers that are currently connected that have dialed us."); - - /// The number of peers that we dialed us. - pub static ref NETWORK_OUTBOUND_PEERS: Result = - try_create_int_gauge("network_outbound_peers","The number of peers that are currently connected that we dialed."); - /* * Peer Reporting */ @@ -156,31 +146,11 @@ lazy_static! { ); } -/// Checks if we consider the NAT open. -/// -/// Conditions for an open NAT: -/// 1. We have 1 or more SOCKET_UPDATED messages. This occurs when discovery has a majority of -/// users reporting an external port and our ENR gets updated. -/// 2. We have 0 SOCKET_UPDATED messages (can be true if the port was correct on boot), then we -/// rely on whether we have any inbound messages. If we have no socket update messages, but -/// manage to get at least one inbound peer, we are exposed correctly. -pub fn check_nat() { - // NAT is already deemed open. - if NAT_OPEN.as_ref().map(|v| v.get()).unwrap_or(0) != 0 { - return; - } - if ADDRESS_UPDATE_COUNT.as_ref().map(|v| v.get()).unwrap_or(0) != 0 - || NETWORK_INBOUND_PEERS.as_ref().map(|v| v.get()).unwrap_or(0) != 0_i64 - { - inc_counter(&NAT_OPEN); - } -} - pub fn scrape_discovery_metrics() { let metrics = discv5::metrics::Metrics::from(discv5::Discv5::::raw_metrics()); set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second); set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64); - set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64); - set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64); + set_gauge_vec(&DISCOVERY_BYTES, &["inbound"], metrics.bytes_recv as i64); + set_gauge_vec(&DISCOVERY_BYTES, &["outbound"], metrics.bytes_sent as i64); } diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index e4976a0d3..0aaede3d4 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -10,7 +10,7 @@ use delay_map::HashSetDelay; use discv5::Enr; use libp2p::identify::Info as IdentifyInfo; use lru_cache::LRUTimeCache; -use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; +use peerdb::{BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; use smallvec::SmallVec; @@ -18,7 +18,6 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use strum::IntoEnumIterator; use types::{EthSpec, SyncSubnetId}; pub use libp2p::core::Multiaddr; @@ -719,46 +718,6 @@ impl PeerManager { } } - // This function updates metrics for all connected peers. - fn update_connected_peer_metrics(&self) { - // Do nothing if we don't have metrics enabled. - if !self.metrics_enabled { - return; - } - - let mut connected_peer_count = 0; - let mut inbound_connected_peers = 0; - let mut outbound_connected_peers = 0; - let mut clients_per_peer = HashMap::new(); - - for (_peer, peer_info) in self.network_globals.peers.read().connected_peers() { - connected_peer_count += 1; - if let PeerConnectionStatus::Connected { n_in, .. } = peer_info.connection_status() { - if *n_in > 0 { - inbound_connected_peers += 1; - } else { - outbound_connected_peers += 1; - } - } - *clients_per_peer - .entry(peer_info.client().kind.to_string()) - .or_default() += 1; - } - - metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peer_count); - metrics::set_gauge(&metrics::NETWORK_INBOUND_PEERS, inbound_connected_peers); - metrics::set_gauge(&metrics::NETWORK_OUTBOUND_PEERS, outbound_connected_peers); - - for client_kind in ClientKind::iter() { - let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0); - metrics::set_gauge_vec( - &metrics::PEERS_PER_CLIENT, - &[client_kind.as_ref()], - *value as i64, - ); - } - } - /* Internal functions */ /// Sets a peer as connected as long as their reputation allows it diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index cb60906f6..5dda78a01 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -154,8 +154,8 @@ impl NetworkBehaviour for PeerManager { self.on_dial_failure(peer_id); } FromSwarm::ExternalAddrConfirmed(_) => { - // TODO: we likely want to check this against our assumed external tcp - // address + // We have an external address confirmed, means we are able to do NAT traversal. + metrics::set_gauge_vec(&metrics::NAT_OPEN, &["libp2p"], 1); } _ => { // NOTE: FromSwarm is a non exhaustive enum so updates should be based on release @@ -243,14 +243,15 @@ impl PeerManager { self.events.push(PeerManagerEvent::MetaData(peer_id)); } - // Check NAT if metrics are enabled - if self.network_globals.local_enr.read().udp4().is_some() { - metrics::check_nat(); - } - // increment prometheus metrics if self.metrics_enabled { let remote_addr = endpoint.get_remote_address(); + let direction = if endpoint.is_dialer() { + "outbound" + } else { + "inbound" + }; + match remote_addr.iter().find(|proto| { matches!( proto, @@ -258,10 +259,10 @@ impl PeerManager { ) }) { Some(multiaddr::Protocol::QuicV1) => { - metrics::inc_gauge(&metrics::QUIC_PEERS_CONNECTED); + metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]); } Some(multiaddr::Protocol::Tcp(_)) => { - metrics::inc_gauge(&metrics::TCP_PEERS_CONNECTED); + metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]); } Some(_) => unreachable!(), None => { @@ -269,7 +270,7 @@ impl PeerManager { } }; - self.update_connected_peer_metrics(); + metrics::inc_gauge(&metrics::PEERS_CONNECTED); metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); } @@ -339,6 +340,12 @@ impl PeerManager { let remote_addr = endpoint.get_remote_address(); // Update the prometheus metrics if self.metrics_enabled { + let direction = if endpoint.is_dialer() { + "outbound" + } else { + "inbound" + }; + match remote_addr.iter().find(|proto| { matches!( proto, @@ -346,15 +353,16 @@ impl PeerManager { ) }) { Some(multiaddr::Protocol::QuicV1) => { - metrics::dec_gauge(&metrics::QUIC_PEERS_CONNECTED); + metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]); } Some(multiaddr::Protocol::Tcp(_)) => { - metrics::dec_gauge(&metrics::TCP_PEERS_CONNECTED); + metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]); } // If it's an unknown protocol we already logged when connection was established. _ => {} }; - self.update_connected_peer_metrics(); + // Legacy standard metrics. + metrics::dec_gauge(&metrics::PEERS_CONNECTED); metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); } } diff --git a/common/system_health/src/lib.rs b/common/system_health/src/lib.rs index d10540e50..ec64ce31a 100644 --- a/common/system_health/src/lib.rs +++ b/common/system_health/src/lib.rs @@ -198,6 +198,25 @@ pub fn observe_system_health_vc( } } +/// Observes if NAT traversal is possible. +pub fn observe_nat() -> bool { + let discv5_nat = lighthouse_network::metrics::get_int_gauge( + &lighthouse_network::metrics::NAT_OPEN, + &["discv5"], + ) + .map(|g| g.get() == 1) + .unwrap_or_default(); + + let libp2p_nat = lighthouse_network::metrics::get_int_gauge( + &lighthouse_network::metrics::NAT_OPEN, + &["libp2p"], + ) + .map(|g| g.get() == 1) + .unwrap_or_default(); + + discv5_nat && libp2p_nat +} + /// Observes the Beacon Node system health. pub fn observe_system_health_bn( sysinfo: Arc>, @@ -223,11 +242,7 @@ pub fn observe_system_health_bn( .unwrap_or_else(|| (String::from("None"), 0, 0)); // Determine if the NAT is open or not. - let nat_open = lighthouse_network::metrics::NAT_OPEN - .as_ref() - .map(|v| v.get()) - .unwrap_or(0) - != 0; + let nat_open = observe_nat(); SystemHealthBN { system_health,