improve libp2p connected peer metrics (#4870)

* improve libp2p connected peer metrics

* separate discv5 port from libp2p for NAT open

* use metric family for DISCOVERY_BYTES

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into improve-metrics
This commit is contained in:
João Oliveira 2024-02-08 02:40:54 +00:00 committed by GitHub
parent 0b59d10ab6
commit 0c3fef59b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 65 additions and 100 deletions

View File

@ -67,7 +67,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::observe_system_health_bn;
use system_health::{observe_nat, observe_system_health_bn};
use task_spawner::{Priority, TaskSpawner};
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
@ -4071,13 +4071,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0,
))
Ok(api_types::GenericResponse::from(observe_nat()))
})
});

View File

@ -1003,11 +1003,13 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
discv5::Event::SocketUpdated(socket_addr) => {
info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port());
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
metrics::check_nat();
// We have SOCKET_UPDATED messages. This occurs when discovery has a majority of
// users reporting an external port and our ENR gets updated.
// Which means we are able to do NAT traversal.
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["discv5"], 1);
// Discv5 will have updated our local ENR. We save the updated version
// to disk.
if (self.update_ports.tcp4 && socket_addr.is_ipv4())
|| (self.update_ports.tcp6 && socket_addr.is_ipv6())
{

View File

@ -1,29 +1,17 @@
pub use lighthouse_metrics::*;
lazy_static! {
pub static ref NAT_OPEN: Result<IntCounter> = try_create_int_counter(
pub static ref NAT_OPEN: Result<IntGaugeVec> = try_create_int_gauge_vec(
"nat_open",
"An estimate indicating if the local node is exposed to the internet."
"An estimate indicating if the local node is reachable from external nodes",
&["protocol"]
);
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_address_update_total",
"Count of libp2p socked updated events (when our view of our IP address has changed)"
);
pub static ref PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_peers",
"Count of libp2p peers currently connected"
);
pub static ref TCP_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_tcp_peers",
"Count of libp2p peers currently connected via TCP"
);
pub static ref QUIC_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_quic_peers",
"Count of libp2p peers currently connected via QUIC"
);
pub static ref PEERS_CONNECTED: Result<IntGaugeVec> =
try_create_int_gauge_vec("libp2p_peers", "Count of libp2p peers currently connected", &["direction", "transport"]);
pub static ref PEER_CONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_peer_connect_event_total",
"Count of libp2p peer connect events (not the current number of connected peers)"
@ -32,13 +20,10 @@ lazy_static! {
"libp2p_peer_disconnect_event_total",
"Count of libp2p peer disconnect events"
);
pub static ref DISCOVERY_SENT_BYTES: Result<IntGauge> = try_create_int_gauge(
"discovery_sent_bytes",
"The number of bytes sent in discovery"
);
pub static ref DISCOVERY_RECV_BYTES: Result<IntGauge> = try_create_int_gauge(
"discovery_recv_bytes",
"The number of bytes received in discovery"
pub static ref DISCOVERY_BYTES: Result<IntGaugeVec> = try_create_int_gauge_vec(
"discovery_bytes",
"The number of bytes sent and received in discovery",
&["direction"]
);
pub static ref DISCOVERY_QUEUE: Result<IntGauge> = try_create_int_gauge(
"discovery_queue_size",
@ -135,17 +120,6 @@ lazy_static! {
&["type"]
);
/*
* Inbound/Outbound peers
*/
/// The number of peers that dialed us.
pub static ref NETWORK_INBOUND_PEERS: Result<IntGauge> =
try_create_int_gauge("network_inbound_peers","The number of peers that are currently connected that have dialed us.");
/// The number of peers that we dialed us.
pub static ref NETWORK_OUTBOUND_PEERS: Result<IntGauge> =
try_create_int_gauge("network_outbound_peers","The number of peers that are currently connected that we dialed.");
/*
* Peer Reporting
*/
@ -156,31 +130,11 @@ lazy_static! {
);
}
/// Checks if we consider the NAT open.
///
/// Conditions for an open NAT:
/// 1. We have 1 or more SOCKET_UPDATED messages. This occurs when discovery has a majority of
/// users reporting an external port and our ENR gets updated.
/// 2. We have 0 SOCKET_UPDATED messages (can be true if the port was correct on boot), then we
/// rely on whether we have any inbound messages. If we have no socket update messages, but
/// manage to get at least one inbound peer, we are exposed correctly.
pub fn check_nat() {
// NAT is already deemed open.
if NAT_OPEN.as_ref().map(|v| v.get()).unwrap_or(0) != 0 {
return;
}
if ADDRESS_UPDATE_COUNT.as_ref().map(|v| v.get()).unwrap_or(0) != 0
|| NETWORK_INBOUND_PEERS.as_ref().map(|v| v.get()).unwrap_or(0) != 0_i64
{
inc_counter(&NAT_OPEN);
}
}
pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
set_gauge_vec(&DISCOVERY_BYTES, &["inbound"], metrics.bytes_recv as i64);
set_gauge_vec(&DISCOVERY_BYTES, &["outbound"], metrics.bytes_sent as i64);
}

View File

@ -726,29 +726,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
return;
}
let mut connected_peer_count = 0;
let mut inbound_connected_peers = 0;
let mut outbound_connected_peers = 0;
let mut clients_per_peer = HashMap::new();
for (_peer, peer_info) in self.network_globals.peers.read().connected_peers() {
connected_peer_count += 1;
if let PeerConnectionStatus::Connected { n_in, .. } = peer_info.connection_status() {
if *n_in > 0 {
inbound_connected_peers += 1;
} else {
outbound_connected_peers += 1;
}
}
*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;
}
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peer_count);
metrics::set_gauge(&metrics::NETWORK_INBOUND_PEERS, inbound_connected_peers);
metrics::set_gauge(&metrics::NETWORK_OUTBOUND_PEERS, outbound_connected_peers);
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
@ -853,11 +838,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// start a ping and status timer for the peer
self.status_peers.insert(*peer_id);
let connected_peers = self.network_globals.connected_peers() as i64;
// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
true
}

View File

@ -154,6 +154,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.on_dial_failure(peer_id);
}
FromSwarm::ExternalAddrConfirmed(_) => {
// We have an external address confirmed, means we are able to do NAT traversal.
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["libp2p"], 1);
// TODO: we likely want to check this against our assumed external tcp
// address
}
@ -243,14 +245,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}
// Check NAT if metrics are enabled
if self.network_globals.local_enr.read().udp4().is_some() {
metrics::check_nat();
}
// increment prometheus metrics
if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};
match remote_addr.iter().find(|proto| {
matches!(
proto,
@ -258,10 +260,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge(&metrics::QUIC_PEERS_CONNECTED);
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge(&metrics::TCP_PEERS_CONNECTED);
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
@ -339,6 +341,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};
match remote_addr.iter().find(|proto| {
matches!(
proto,
@ -346,10 +354,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge(&metrics::QUIC_PEERS_CONNECTED);
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge(&metrics::TCP_PEERS_CONNECTED);
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}

View File

@ -244,6 +244,16 @@ pub fn inc_counter_vec(int_counter_vec: &Result<IntCounterVec>, name: &[&str]) {
}
}
/// Sets the `int_counter_vec` with the given `name` to the `amount`,
/// should only be called with `ammount`s equal or above the current value
/// as per Prometheus spec, the `counter` type should only go up.
pub fn set_counter_vec_by(int_counter_vec: &Result<IntCounterVec>, name: &[&str], amount: u64) {
if let Some(counter) = get_int_counter(int_counter_vec, name) {
counter.reset();
counter.inc_by(amount);
}
}
pub fn inc_counter_vec_by(int_counter_vec: &Result<IntCounterVec>, name: &[&str], amount: u64) {
if let Some(counter) = get_int_counter(int_counter_vec, name) {
counter.inc_by(amount);

View File

@ -198,6 +198,25 @@ pub fn observe_system_health_vc(
}
}
/// Observes if NAT traversal is possible.
pub fn observe_nat() -> bool {
let discv5_nat = lighthouse_network::metrics::get_int_gauge(
&lighthouse_network::metrics::NAT_OPEN,
&["discv5"],
)
.map(|g| g.get() == 1)
.unwrap_or_default();
let libp2p_nat = lighthouse_network::metrics::get_int_gauge(
&lighthouse_network::metrics::NAT_OPEN,
&["libp2p"],
)
.map(|g| g.get() == 1)
.unwrap_or_default();
discv5_nat && libp2p_nat
}
/// Observes the Beacon Node system health.
pub fn observe_system_health_bn<TSpec: EthSpec>(
sysinfo: Arc<RwLock<System>>,
@ -223,11 +242,7 @@ pub fn observe_system_health_bn<TSpec: EthSpec>(
.unwrap_or_else(|| (String::from("None"), 0, 0));
// Determine if the NAT is open or not.
let nat_open = lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0;
let nat_open = observe_nat();
SystemHealthBN {
system_health,