Fix peer count metrics (#5404)

* Set the peers_per_client metrics directly, rather than using increment/decrement

* Move PEERS_CONNECTED related update to the same place

* Move PEERS_CONNECTED_MULTI related update to the same place

* Rename

* Remove unused variables
This commit is contained in:
Akihito Nakano 2024-03-20 15:45:16 +09:00 committed by GitHub
parent aa592853df
commit 7117772fb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 75 additions and 68 deletions

View File

@ -26,6 +26,8 @@ pub use libp2p::identity::Keypair;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy #[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod peerdb; pub mod peerdb;
use crate::peer_manager::peerdb::client::ClientKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{ pub use peerdb::peer_info::{
ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo, ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo,
}; };
@ -33,6 +35,8 @@ use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus}; pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr; use std::net::IpAddr;
use strum::IntoEnumIterator;
pub mod config; pub mod config;
mod network_behaviour; mod network_behaviour;
@ -464,19 +468,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
"observed_address" => ?info.observed_addr, "observed_address" => ?info.observed_addr,
"protocols" => ?info.protocols "protocols" => ?info.protocols
); );
// update the peer client kind metric if the peer is connected
if matches!(
peer_info.connection_status(),
PeerConnectionStatus::Connected { .. }
| PeerConnectionStatus::Disconnecting { .. }
) {
metrics::inc_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[peer_info.client().kind.as_ref()],
);
metrics::dec_gauge_vec(&metrics::PEERS_PER_CLIENT, &[previous_kind.as_ref()]);
}
} }
} else { } else {
error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string()); error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string());
@ -812,11 +803,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// start a ping and status timer for the peer // start a ping and status timer for the peer
self.status_peers.insert(*peer_id); self.status_peers.insert(*peer_id);
let connected_peers = self.network_globals.connected_peers() as i64;
// increment prometheus metrics // increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);
true true
} }
@ -1267,6 +1255,70 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
); );
} }
} }
// Update peer count related metrics.
fn update_peer_count_metrics(&self) {
let mut peers_connected = 0;
let mut clients_per_peer = HashMap::new();
let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new();
for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
peers_connected += 1;
*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;
let direction = match peer_info.connection_direction() {
Some(ConnectionDirection::Incoming) => "inbound",
Some(ConnectionDirection::Outgoing) => "outbound",
None => "none",
};
// Note: the `transport` is set to `unknown` if the `listening_addresses` list is empty.
// This situation occurs when the peer is initially registered in PeerDB, but the peer
// info has not yet been updated at `PeerManager::identify`.
let transport = peer_info
.listening_addresses()
.iter()
.find_map(|addr| {
addr.iter().find_map(|proto| match proto {
multiaddr::Protocol::QuicV1 => Some("quic"),
multiaddr::Protocol::Tcp(_) => Some("tcp"),
_ => None,
})
})
.unwrap_or("unknown");
*peers_connected_mutli
.entry((direction, transport))
.or_default() += 1;
}
// PEERS_CONNECTED
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);
// PEERS_PER_CLIENT
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[client_kind.as_ref()],
*value as i64,
);
}
// PEERS_CONNECTED_MULTI
for direction in ["inbound", "outbound", "none"] {
for transport in ["quic", "tcp", "unknown"] {
metrics::set_gauge_vec(
&metrics::PEERS_CONNECTED_MULTI,
&[direction, transport],
*peers_connected_mutli
.get(&(direction, transport))
.unwrap_or(&0) as i64,
);
}
}
}
} }
enum ConnectingType { enum ConnectingType {

View File

@ -4,7 +4,7 @@ use std::net::IpAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::StreamExt; use futures::StreamExt;
use libp2p::core::{multiaddr, ConnectedPoint}; use libp2p::core::ConnectedPoint;
use libp2p::identity::PeerId; use libp2p::identity::PeerId;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
@ -243,35 +243,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id)); self.events.push(PeerManagerEvent::MetaData(peer_id));
} }
// increment prometheus metrics // Update the prometheus metrics
if self.metrics_enabled { if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};
match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr)
}
};
metrics::inc_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
self.update_peer_count_metrics();
} }
// Count dialing peers in the limit if the peer dialed us. // Count dialing peers in the limit if the peer dialed us.
@ -309,7 +285,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
fn on_connection_closed( fn on_connection_closed(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
endpoint: &ConnectedPoint, _endpoint: &ConnectedPoint,
remaining_established: usize, remaining_established: usize,
) { ) {
if remaining_established > 0 { if remaining_established > 0 {
@ -337,33 +313,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// reference so that peer manager can track this peer. // reference so that peer manager can track this peer.
self.inject_disconnect(&peer_id); self.inject_disconnect(&peer_id);
let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics // Update the prometheus metrics
if self.metrics_enabled { if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};
match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}
};
// Legacy standard metrics. // Legacy standard metrics.
metrics::dec_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
self.update_peer_count_metrics();
} }
} }