diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index ae02b689d..4650553d8 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -1,3 +1,6 @@ +use libp2p::bandwidth::BandwidthSinks; +use std::sync::Arc; + pub use lighthouse_metrics::*; lazy_static! { @@ -184,3 +187,46 @@ pub fn scrape_discovery_metrics() { set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64); set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64); } + +/// Aggregated `BandwidthSinks` of tcp and quic transports +/// used in libp2p. +pub struct AggregatedBandwidthSinks { + tcp_sinks: Arc, + quic_sinks: Option>, +} + +impl AggregatedBandwidthSinks { + /// Create a new `AggregatedBandwidthSinks`. + pub fn new(tcp_sinks: Arc, quic_sinks: Option>) -> Self { + AggregatedBandwidthSinks { + tcp_sinks, + quic_sinks, + } + } + + /// Total QUIC inbound bandwidth. + pub fn total_quic_inbound(&self) -> u64 { + self.quic_sinks + .as_ref() + .map(|q| q.total_inbound()) + .unwrap_or_default() + } + + /// Total TCP inbound bandwidth. + pub fn total_tcp_inbound(&self) -> u64 { + self.tcp_sinks.total_inbound() + } + + /// Total QUIC outbound bandwidth. + pub fn total_quic_outbound(&self) -> u64 { + self.quic_sinks + .as_ref() + .map(|q| q.total_outbound()) + .unwrap_or_default() + } + + /// Total TCP outbound bandwidth. + pub fn total_tcp_outbound(&self) -> u64 { + self.tcp_sinks.total_outbound() + } +} diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 5daa6557e..826446ddb 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -4,6 +4,7 @@ use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad}; use crate::discovery::{ subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS, }; +use crate::metrics::AggregatedBandwidthSinks; use crate::peer_manager::{ config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, @@ -24,7 +25,6 @@ use crate::{error, metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash}; use api_types::{PeerRequestId, Request, RequestId, Response}; use futures::stream::StreamExt; use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings}; -use libp2p::bandwidth::BandwidthSinks; use libp2p::gossipsub::{ self, IdentTopic as Topic, MessageAcceptance, MessageAuthenticity, MessageId, PublishError, TopicScoreParams, @@ -128,7 +128,7 @@ pub struct Network { update_gossipsub_scores: tokio::time::Interval, gossip_cache: GossipCache, /// The bandwidth logger for the underlying libp2p transport. - pub bandwidth: Arc, + pub bandwidth: AggregatedBandwidthSinks, /// This node's PeerId. pub local_peer_id: PeerId, /// Logger for behaviour actions. diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 4d05518aa..5b4f7cee9 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -1,3 +1,4 @@ +use crate::metrics::AggregatedBandwidthSinks; use crate::multiaddr::Protocol; use crate::rpc::{MetaData, MetaDataV1, MetaDataV2}; use crate::types::{ @@ -5,7 +6,6 @@ use crate::types::{ }; use crate::{GossipTopic, NetworkConfig}; use futures::future::Either; -use libp2p::bandwidth::BandwidthSinks; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::gossipsub; use libp2p::identity::{secp256k1, Keypair}; @@ -44,7 +44,7 @@ type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>; pub fn build_transport( local_private_key: Keypair, quic_support: bool, -) -> std::io::Result<(BoxedTransport, Arc)> { +) -> std::io::Result<(BoxedTransport, AggregatedBandwidthSinks)> { // mplex config let mut mplex_config = libp2p_mplex::MplexConfig::new(); mplex_config.set_max_buffer_size(256); @@ -55,30 +55,39 @@ pub fn build_transport( yamux_config.set_window_update_mode(yamux::WindowUpdateMode::on_read()); // Creates the TCP transport layer - let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) - .upgrade(core::upgrade::Version::V1) - .authenticate(generate_noise_config(&local_private_key)) - .multiplex(core::upgrade::SelectUpgrade::new( - yamux_config, - mplex_config, - )) - .timeout(Duration::from_secs(10)); + let (tcp, tcp_bandwidth) = + libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true)) + .upgrade(core::upgrade::Version::V1) + .authenticate(generate_noise_config(&local_private_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux_config, + mplex_config, + )) + .timeout(Duration::from_secs(10)) + .with_bandwidth_logging(); let (transport, bandwidth) = if quic_support { // Enables Quic // The default quic configuration suits us for now. let quic_config = libp2p_quic::Config::new(&local_private_key); - tcp.or_transport(libp2p_quic::tokio::Transport::new(quic_config)) + let (quic, quic_bandwidth) = + libp2p_quic::tokio::Transport::new(quic_config).with_bandwidth_logging(); + let transport = tcp + .or_transport(quic) .map(|either_output, _| match either_output { Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), }) - .with_bandwidth_logging() + .boxed(); + ( + transport, + AggregatedBandwidthSinks::new(tcp_bandwidth, Some(quic_bandwidth)), + ) } else { - tcp.with_bandwidth_logging() + (tcp, AggregatedBandwidthSinks::new(tcp_bandwidth, None)) }; - // // Enables DNS over the transport. + // Enables DNS over the transport. let transport = libp2p::dns::TokioDnsConfig::system(transport)?.boxed(); Ok((transport, bandwidth)) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index ee0d5e623..e57fd24f2 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -7,8 +7,8 @@ use beacon_chain::{ use fnv::FnvHashMap; pub use lighthouse_metrics::*; use lighthouse_network::{ - peer_manager::peerdb::client::ClientKind, types::GossipKind, BandwidthSinks, GossipTopic, - Gossipsub, NetworkGlobals, + metrics::AggregatedBandwidthSinks, peer_manager::peerdb::client::ClientKind, types::GossipKind, + GossipTopic, Gossipsub, NetworkGlobals, }; use std::sync::Arc; use strum::IntoEnumIterator; @@ -226,18 +226,8 @@ lazy_static! { /* * Bandwidth metrics */ - pub static ref INBOUND_LIBP2P_BYTES: Result = - try_create_int_gauge("libp2p_inbound_bytes", "The inbound bandwidth over libp2p"); - - pub static ref OUTBOUND_LIBP2P_BYTES: Result = try_create_int_gauge( - "libp2p_outbound_bytes", - "The outbound bandwidth over libp2p" - ); - pub static ref TOTAL_LIBP2P_BANDWIDTH: Result = try_create_int_gauge( - "libp2p_total_bandwidth", - "The total inbound/outbound bandwidth over libp2p" - ); - + pub static ref LIBP2P_BYTES: Result = + try_create_int_counter_vec("libp2p_inbound_bytes", "The bandwidth over libp2p", &["direction", "transport"]); /* * Sync related metrics @@ -328,13 +318,23 @@ lazy_static! { ); } -pub fn update_bandwidth_metrics(bandwidth: Arc) { - set_gauge(&INBOUND_LIBP2P_BYTES, bandwidth.total_inbound() as i64); - set_gauge(&OUTBOUND_LIBP2P_BYTES, bandwidth.total_outbound() as i64); - set_gauge( - &TOTAL_LIBP2P_BANDWIDTH, - (bandwidth.total_inbound() + bandwidth.total_outbound()) as i64, - ); +pub fn update_bandwidth_metrics(bandwidth: &AggregatedBandwidthSinks) { + if let Some(tcp_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "tcp"]) { + tcp_in_bandwidth.reset(); + tcp_in_bandwidth.inc_by(bandwidth.total_tcp_inbound()); + } + if let Some(tcp_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "tcp"]) { + tcp_out_bandwidth.reset(); + tcp_out_bandwidth.inc_by(bandwidth.total_tcp_outbound()); + } + if let Some(quic_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "quic"]) { + quic_in_bandwidth.reset(); + quic_in_bandwidth.inc_by(bandwidth.total_quic_inbound()); + } + if let Some(quic_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "quic"]) { + quic_out_bandwidth.reset(); + quic_out_bandwidth.inc_by(bandwidth.total_quic_outbound()); + } } pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7d0dc4b77..5e71341ab 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -497,7 +497,7 @@ impl NetworkService { } } } - metrics::update_bandwidth_metrics(self.libp2p.bandwidth.clone()); + metrics::update_bandwidth_metrics(&self.libp2p.bandwidth); } }; executor.spawn(service_fut, "network");