improve libp2p connected peer metrics (#5314)
* patch rust-yamux dep * improve libp2p connected peer metrics
This commit is contained in:
parent
65c4ff0775
commit
a89ff100af
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -9971,8 +9971,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "yamux"
|
name = "yamux"
|
||||||
version = "0.13.1"
|
version = "0.13.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/sigp/rust-yamux.git#6689e227a48258a52347cd1d984adfc94afc6f7a"
|
||||||
checksum = "ad1d0148b89300047e72994bee99ecdabd15a9166a7b70c8b8c37c314dcc9002"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures",
|
"futures",
|
||||||
"instant",
|
"instant",
|
||||||
|
@ -231,6 +231,9 @@ validator_client = { path = "validator_client" }
|
|||||||
validator_dir = { path = "common/validator_dir" }
|
validator_dir = { path = "common/validator_dir" }
|
||||||
warp_utils = { path = "common/warp_utils" }
|
warp_utils = { path = "common/warp_utils" }
|
||||||
|
|
||||||
|
[patch.crates-io]
|
||||||
|
yamux = { git = "https://github.com/sigp/rust-yamux.git" }
|
||||||
|
|
||||||
[profile.maxperf]
|
[profile.maxperf]
|
||||||
inherits = "release"
|
inherits = "release"
|
||||||
lto = "fat"
|
lto = "fat"
|
||||||
|
@ -68,7 +68,7 @@ use std::path::PathBuf;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use sysinfo::{System, SystemExt};
|
use sysinfo::{System, SystemExt};
|
||||||
use system_health::observe_system_health_bn;
|
use system_health::{observe_nat, observe_system_health_bn};
|
||||||
use task_spawner::{Priority, TaskSpawner};
|
use task_spawner::{Priority, TaskSpawner};
|
||||||
use tokio::sync::{
|
use tokio::sync::{
|
||||||
mpsc::{Sender, UnboundedSender},
|
mpsc::{Sender, UnboundedSender},
|
||||||
@ -3965,13 +3965,7 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
.and(warp::path::end())
|
.and(warp::path::end())
|
||||||
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
|
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
|
||||||
task_spawner.blocking_json_task(Priority::P1, move || {
|
task_spawner.blocking_json_task(Priority::P1, move || {
|
||||||
Ok(api_types::GenericResponse::from(
|
Ok(api_types::GenericResponse::from(observe_nat()))
|
||||||
lighthouse_network::metrics::NAT_OPEN
|
|
||||||
.as_ref()
|
|
||||||
.map(|v| v.get())
|
|
||||||
.unwrap_or(0)
|
|
||||||
!= 0,
|
|
||||||
))
|
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1004,7 +1004,10 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
|
|||||||
discv5::Event::SocketUpdated(socket_addr) => {
|
discv5::Event::SocketUpdated(socket_addr) => {
|
||||||
info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port());
|
info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port());
|
||||||
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
|
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
|
||||||
metrics::check_nat();
|
// We have SOCKET_UPDATED messages. This occurs when discovery has a majority of
|
||||||
|
// users reporting an external port and our ENR gets updated.
|
||||||
|
// Which means we are able to do NAT traversal.
|
||||||
|
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["discv5"], 1);
|
||||||
// Discv5 will have updated our local ENR. We save the updated version
|
// Discv5 will have updated our local ENR. We save the updated version
|
||||||
// to disk.
|
// to disk.
|
||||||
|
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
pub use lighthouse_metrics::*;
|
pub use lighthouse_metrics::*;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
pub static ref NAT_OPEN: Result<IntCounter> = try_create_int_counter(
|
pub static ref NAT_OPEN: Result<IntGaugeVec> = try_create_int_gauge_vec(
|
||||||
"nat_open",
|
"nat_open",
|
||||||
"An estimate indicating if the local node is exposed to the internet."
|
"An estimate indicating if the local node is reachable from external nodes",
|
||||||
|
&["protocol"]
|
||||||
);
|
);
|
||||||
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
|
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
|
||||||
"libp2p_address_update_total",
|
"libp2p_address_update_total",
|
||||||
@ -14,6 +15,9 @@ lazy_static! {
|
|||||||
"Count of libp2p peers currently connected"
|
"Count of libp2p peers currently connected"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
pub static ref PEERS_CONNECTED_MULTI: Result<IntGaugeVec> =
|
||||||
|
try_create_int_gauge_vec("libp2p_peers_multi", "Count of libp2p peers currently connected", &["direction", "transport"]);
|
||||||
|
|
||||||
pub static ref TCP_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
|
pub static ref TCP_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
|
||||||
"libp2p_tcp_peers",
|
"libp2p_tcp_peers",
|
||||||
"Count of libp2p peers currently connected via TCP"
|
"Count of libp2p peers currently connected via TCP"
|
||||||
@ -32,13 +36,10 @@ lazy_static! {
|
|||||||
"libp2p_peer_disconnect_event_total",
|
"libp2p_peer_disconnect_event_total",
|
||||||
"Count of libp2p peer disconnect events"
|
"Count of libp2p peer disconnect events"
|
||||||
);
|
);
|
||||||
pub static ref DISCOVERY_SENT_BYTES: Result<IntGauge> = try_create_int_gauge(
|
pub static ref DISCOVERY_BYTES: Result<IntGaugeVec> = try_create_int_gauge_vec(
|
||||||
"discovery_sent_bytes",
|
"discovery_bytes",
|
||||||
"The number of bytes sent in discovery"
|
"The number of bytes sent and received in discovery",
|
||||||
);
|
&["direction"]
|
||||||
pub static ref DISCOVERY_RECV_BYTES: Result<IntGauge> = try_create_int_gauge(
|
|
||||||
"discovery_recv_bytes",
|
|
||||||
"The number of bytes received in discovery"
|
|
||||||
);
|
);
|
||||||
pub static ref DISCOVERY_QUEUE: Result<IntGauge> = try_create_int_gauge(
|
pub static ref DISCOVERY_QUEUE: Result<IntGauge> = try_create_int_gauge(
|
||||||
"discovery_queue_size",
|
"discovery_queue_size",
|
||||||
@ -135,17 +136,6 @@ lazy_static! {
|
|||||||
&["type"]
|
&["type"]
|
||||||
);
|
);
|
||||||
|
|
||||||
/*
|
|
||||||
* Inbound/Outbound peers
|
|
||||||
*/
|
|
||||||
/// The number of peers that dialed us.
|
|
||||||
pub static ref NETWORK_INBOUND_PEERS: Result<IntGauge> =
|
|
||||||
try_create_int_gauge("network_inbound_peers","The number of peers that are currently connected that have dialed us.");
|
|
||||||
|
|
||||||
/// The number of peers that we dialed us.
|
|
||||||
pub static ref NETWORK_OUTBOUND_PEERS: Result<IntGauge> =
|
|
||||||
try_create_int_gauge("network_outbound_peers","The number of peers that are currently connected that we dialed.");
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Peer Reporting
|
* Peer Reporting
|
||||||
*/
|
*/
|
||||||
@ -156,31 +146,11 @@ lazy_static! {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks if we consider the NAT open.
|
|
||||||
///
|
|
||||||
/// Conditions for an open NAT:
|
|
||||||
/// 1. We have 1 or more SOCKET_UPDATED messages. This occurs when discovery has a majority of
|
|
||||||
/// users reporting an external port and our ENR gets updated.
|
|
||||||
/// 2. We have 0 SOCKET_UPDATED messages (can be true if the port was correct on boot), then we
|
|
||||||
/// rely on whether we have any inbound messages. If we have no socket update messages, but
|
|
||||||
/// manage to get at least one inbound peer, we are exposed correctly.
|
|
||||||
pub fn check_nat() {
|
|
||||||
// NAT is already deemed open.
|
|
||||||
if NAT_OPEN.as_ref().map(|v| v.get()).unwrap_or(0) != 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if ADDRESS_UPDATE_COUNT.as_ref().map(|v| v.get()).unwrap_or(0) != 0
|
|
||||||
|| NETWORK_INBOUND_PEERS.as_ref().map(|v| v.get()).unwrap_or(0) != 0_i64
|
|
||||||
{
|
|
||||||
inc_counter(&NAT_OPEN);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn scrape_discovery_metrics() {
|
pub fn scrape_discovery_metrics() {
|
||||||
let metrics =
|
let metrics =
|
||||||
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
|
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
|
||||||
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
|
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
|
||||||
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
|
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
|
||||||
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
|
set_gauge_vec(&DISCOVERY_BYTES, &["inbound"], metrics.bytes_recv as i64);
|
||||||
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
|
set_gauge_vec(&DISCOVERY_BYTES, &["outbound"], metrics.bytes_sent as i64);
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ use delay_map::HashSetDelay;
|
|||||||
use discv5::Enr;
|
use discv5::Enr;
|
||||||
use libp2p::identify::Info as IdentifyInfo;
|
use libp2p::identify::Info as IdentifyInfo;
|
||||||
use lru_cache::LRUTimeCache;
|
use lru_cache::LRUTimeCache;
|
||||||
use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
|
use peerdb::{BanOperation, BanResult, ScoreUpdateResult};
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use slog::{debug, error, trace, warn};
|
use slog::{debug, error, trace, warn};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
@ -18,7 +18,6 @@ use std::{
|
|||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
use strum::IntoEnumIterator;
|
|
||||||
use types::{EthSpec, SyncSubnetId};
|
use types::{EthSpec, SyncSubnetId};
|
||||||
|
|
||||||
pub use libp2p::core::Multiaddr;
|
pub use libp2p::core::Multiaddr;
|
||||||
@ -719,46 +718,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function updates metrics for all connected peers.
|
|
||||||
fn update_connected_peer_metrics(&self) {
|
|
||||||
// Do nothing if we don't have metrics enabled.
|
|
||||||
if !self.metrics_enabled {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut connected_peer_count = 0;
|
|
||||||
let mut inbound_connected_peers = 0;
|
|
||||||
let mut outbound_connected_peers = 0;
|
|
||||||
let mut clients_per_peer = HashMap::new();
|
|
||||||
|
|
||||||
for (_peer, peer_info) in self.network_globals.peers.read().connected_peers() {
|
|
||||||
connected_peer_count += 1;
|
|
||||||
if let PeerConnectionStatus::Connected { n_in, .. } = peer_info.connection_status() {
|
|
||||||
if *n_in > 0 {
|
|
||||||
inbound_connected_peers += 1;
|
|
||||||
} else {
|
|
||||||
outbound_connected_peers += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*clients_per_peer
|
|
||||||
.entry(peer_info.client().kind.to_string())
|
|
||||||
.or_default() += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peer_count);
|
|
||||||
metrics::set_gauge(&metrics::NETWORK_INBOUND_PEERS, inbound_connected_peers);
|
|
||||||
metrics::set_gauge(&metrics::NETWORK_OUTBOUND_PEERS, outbound_connected_peers);
|
|
||||||
|
|
||||||
for client_kind in ClientKind::iter() {
|
|
||||||
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
|
|
||||||
metrics::set_gauge_vec(
|
|
||||||
&metrics::PEERS_PER_CLIENT,
|
|
||||||
&[client_kind.as_ref()],
|
|
||||||
*value as i64,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Internal functions */
|
/* Internal functions */
|
||||||
|
|
||||||
/// Sets a peer as connected as long as their reputation allows it
|
/// Sets a peer as connected as long as their reputation allows it
|
||||||
|
@ -154,8 +154,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
|
|||||||
self.on_dial_failure(peer_id);
|
self.on_dial_failure(peer_id);
|
||||||
}
|
}
|
||||||
FromSwarm::ExternalAddrConfirmed(_) => {
|
FromSwarm::ExternalAddrConfirmed(_) => {
|
||||||
// TODO: we likely want to check this against our assumed external tcp
|
// We have an external address confirmed, means we are able to do NAT traversal.
|
||||||
// address
|
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["libp2p"], 1);
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
// NOTE: FromSwarm is a non exhaustive enum so updates should be based on release
|
// NOTE: FromSwarm is a non exhaustive enum so updates should be based on release
|
||||||
@ -243,14 +243,15 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
self.events.push(PeerManagerEvent::MetaData(peer_id));
|
self.events.push(PeerManagerEvent::MetaData(peer_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check NAT if metrics are enabled
|
|
||||||
if self.network_globals.local_enr.read().udp4().is_some() {
|
|
||||||
metrics::check_nat();
|
|
||||||
}
|
|
||||||
|
|
||||||
// increment prometheus metrics
|
// increment prometheus metrics
|
||||||
if self.metrics_enabled {
|
if self.metrics_enabled {
|
||||||
let remote_addr = endpoint.get_remote_address();
|
let remote_addr = endpoint.get_remote_address();
|
||||||
|
let direction = if endpoint.is_dialer() {
|
||||||
|
"outbound"
|
||||||
|
} else {
|
||||||
|
"inbound"
|
||||||
|
};
|
||||||
|
|
||||||
match remote_addr.iter().find(|proto| {
|
match remote_addr.iter().find(|proto| {
|
||||||
matches!(
|
matches!(
|
||||||
proto,
|
proto,
|
||||||
@ -258,10 +259,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
)
|
)
|
||||||
}) {
|
}) {
|
||||||
Some(multiaddr::Protocol::QuicV1) => {
|
Some(multiaddr::Protocol::QuicV1) => {
|
||||||
metrics::inc_gauge(&metrics::QUIC_PEERS_CONNECTED);
|
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
|
||||||
}
|
}
|
||||||
Some(multiaddr::Protocol::Tcp(_)) => {
|
Some(multiaddr::Protocol::Tcp(_)) => {
|
||||||
metrics::inc_gauge(&metrics::TCP_PEERS_CONNECTED);
|
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
|
||||||
}
|
}
|
||||||
Some(_) => unreachable!(),
|
Some(_) => unreachable!(),
|
||||||
None => {
|
None => {
|
||||||
@ -269,7 +270,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.update_connected_peer_metrics();
|
metrics::inc_gauge(&metrics::PEERS_CONNECTED);
|
||||||
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -339,6 +340,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
let remote_addr = endpoint.get_remote_address();
|
let remote_addr = endpoint.get_remote_address();
|
||||||
// Update the prometheus metrics
|
// Update the prometheus metrics
|
||||||
if self.metrics_enabled {
|
if self.metrics_enabled {
|
||||||
|
let direction = if endpoint.is_dialer() {
|
||||||
|
"outbound"
|
||||||
|
} else {
|
||||||
|
"inbound"
|
||||||
|
};
|
||||||
|
|
||||||
match remote_addr.iter().find(|proto| {
|
match remote_addr.iter().find(|proto| {
|
||||||
matches!(
|
matches!(
|
||||||
proto,
|
proto,
|
||||||
@ -346,15 +353,16 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
)
|
)
|
||||||
}) {
|
}) {
|
||||||
Some(multiaddr::Protocol::QuicV1) => {
|
Some(multiaddr::Protocol::QuicV1) => {
|
||||||
metrics::dec_gauge(&metrics::QUIC_PEERS_CONNECTED);
|
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
|
||||||
}
|
}
|
||||||
Some(multiaddr::Protocol::Tcp(_)) => {
|
Some(multiaddr::Protocol::Tcp(_)) => {
|
||||||
metrics::dec_gauge(&metrics::TCP_PEERS_CONNECTED);
|
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
|
||||||
}
|
}
|
||||||
// If it's an unknown protocol we already logged when connection was established.
|
// If it's an unknown protocol we already logged when connection was established.
|
||||||
_ => {}
|
_ => {}
|
||||||
};
|
};
|
||||||
self.update_connected_peer_metrics();
|
// Legacy standard metrics.
|
||||||
|
metrics::dec_gauge(&metrics::PEERS_CONNECTED);
|
||||||
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
|
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -198,6 +198,25 @@ pub fn observe_system_health_vc(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Observes if NAT traversal is possible.
|
||||||
|
pub fn observe_nat() -> bool {
|
||||||
|
let discv5_nat = lighthouse_network::metrics::get_int_gauge(
|
||||||
|
&lighthouse_network::metrics::NAT_OPEN,
|
||||||
|
&["discv5"],
|
||||||
|
)
|
||||||
|
.map(|g| g.get() == 1)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let libp2p_nat = lighthouse_network::metrics::get_int_gauge(
|
||||||
|
&lighthouse_network::metrics::NAT_OPEN,
|
||||||
|
&["libp2p"],
|
||||||
|
)
|
||||||
|
.map(|g| g.get() == 1)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
discv5_nat && libp2p_nat
|
||||||
|
}
|
||||||
|
|
||||||
/// Observes the Beacon Node system health.
|
/// Observes the Beacon Node system health.
|
||||||
pub fn observe_system_health_bn<TSpec: EthSpec>(
|
pub fn observe_system_health_bn<TSpec: EthSpec>(
|
||||||
sysinfo: Arc<RwLock<System>>,
|
sysinfo: Arc<RwLock<System>>,
|
||||||
@ -223,11 +242,7 @@ pub fn observe_system_health_bn<TSpec: EthSpec>(
|
|||||||
.unwrap_or_else(|| (String::from("None"), 0, 0));
|
.unwrap_or_else(|| (String::from("None"), 0, 0));
|
||||||
|
|
||||||
// Determine if the NAT is open or not.
|
// Determine if the NAT is open or not.
|
||||||
let nat_open = lighthouse_network::metrics::NAT_OPEN
|
let nat_open = observe_nat();
|
||||||
.as_ref()
|
|
||||||
.map(|v| v.get())
|
|
||||||
.unwrap_or(0)
|
|
||||||
!= 0;
|
|
||||||
|
|
||||||
SystemHealthBN {
|
SystemHealthBN {
|
||||||
system_health,
|
system_health,
|
||||||
|
Loading…
Reference in New Issue
Block a user