Sync metrics (#1975)

## Issue Addressed
- Add metrics to keep track of peer counts by sync type
- Add metric to keep track of the number of syncing chains in range

## Proposed Changes
Plugin to the network metrics update interval and update too the counts for peers wrt to their sync status with us

## Additional Info
For the peer counts
- By the way it is implemented the numbers won't always match to the total peer count in the `libp2p` metric.
- Updating the gauge with every change is messy because it requires to be updated on connection (in the `eth2_libp2p` crate, while metrics are defined in the `network` crate) on Goodbye sent (for an `IrrelevantPeer`) either in the `beacon_processor` or the `peer_manager`, and on disconnection. Since this is not a critical metric I think counting once every second is enough. If you think more accuracy is needed we can do it too, but it would be harder to maintain)

ATM those look like this
![image](https://user-images.githubusercontent.com/26765164/100275387-22137b00-2f60-11eb-93b9-94b0f265240c.png)
This commit is contained in:
divma 2020-11-26 05:23:17 +00:00
parent 26741944b1
commit fc07cc3fdf
5 changed files with 84 additions and 19 deletions

View File

@ -65,17 +65,20 @@ impl PeerSyncStatus {
true true
} }
} }
pub fn as_str(&self) -> &'static str {
match self {
PeerSyncStatus::Advanced { .. } => "Advanced",
PeerSyncStatus::Behind { .. } => "Behind",
PeerSyncStatus::Synced { .. } => "Synced",
PeerSyncStatus::Unknown => "Unknown",
PeerSyncStatus::IrrelevantPeer => "Irrelevant",
}
}
} }
impl std::fmt::Display for PeerSyncStatus { impl std::fmt::Display for PeerSyncStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let rpr = match self { f.write_str(self.as_str())
PeerSyncStatus::Behind { .. } => "Behind",
PeerSyncStatus::Advanced { .. } => "Advanced",
PeerSyncStatus::Synced { .. } => "Synced",
PeerSyncStatus::Unknown => "Unknown",
PeerSyncStatus::IrrelevantPeer => "IrrelevantPeer",
};
f.write_str(rpr)
} }
} }

View File

@ -406,6 +406,23 @@ lazy_static! {
); );
} }
lazy_static! {
/*
* Sync related metrics
*/
pub static ref PEERS_PER_SYNC_TYPE: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_peers_per_status",
"Number of connected peers per sync status type",
&["sync_status"]
);
pub static ref SYNCING_CHAINS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
"sync_range_chains",
"Number of Syncing chains in range, per range type",
&["range_type"]
);
}
pub fn register_attestation_error(error: &AttnError) { pub fn register_attestation_error(error: &AttnError) {
match error { match error {
AttnError::FutureEpoch { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_FUTURE_EPOCH), AttnError::FutureEpoch { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_FUTURE_EPOCH),

View File

@ -14,6 +14,7 @@ use eth2_libp2p::{
types::GossipKind, BehaviourEvent, GossipTopic, MessageId, NetworkGlobals, PeerId, TopicHash, types::GossipKind, BehaviourEvent, GossipTopic, MessageId, NetworkGlobals, PeerId, TopicHash,
}; };
use eth2_libp2p::{MessageAcceptance, Service as LibP2PService}; use eth2_libp2p::{MessageAcceptance, Service as LibP2PService};
use fnv::FnvHashMap;
use futures::prelude::*; use futures::prelude::*;
use slog::{debug, error, info, o, trace, warn}; use slog::{debug, error, info, o, trace, warn};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
@ -277,6 +278,9 @@ fn spawn_service<T: BeaconChainTypes>(
&service.libp2p.swarm.gs(), &service.libp2p.swarm.gs(),
&service.network_globals, &service.network_globals,
); );
// update sync metrics
update_sync_metrics(&service.network_globals);
} }
_ = service.gossipsub_parameter_update.next() => { _ = service.gossipsub_parameter_update.next() => {
if let Ok(slot) = service.beacon_chain.slot() { if let Ok(slot) = service.beacon_chain.slot() {
@ -932,3 +936,29 @@ fn update_gossip_metrics<T: EthSpec>(
} }
} }
} }
fn update_sync_metrics<T: EthSpec>(network_globals: &Arc<NetworkGlobals<T>>) {
// reset the counts
if metrics::PEERS_PER_SYNC_TYPE
.as_ref()
.map(|metric| metric.reset())
.is_err()
{
return;
};
// count per sync status, the number of connected peers
let mut peers_per_sync_type = FnvHashMap::default();
for sync_type in network_globals
.peers
.read()
.connected_peers()
.map(|(_peer_id, info)| info.sync_status.as_str())
{
*peers_per_sync_type.entry(sync_type).or_default() += 1;
}
for (sync_type, peer_count) in peers_per_sync_type {
metrics::set_gauge_entry(&metrics::PEERS_PER_SYNC_TYPE, &[sync_type], peer_count);
}
}

View File

@ -6,6 +6,7 @@
use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain};
use super::sync_type::RangeSyncType; use super::sync_type::RangeSyncType;
use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::beacon_processor::WorkEvent as BeaconWorkEvent;
use crate::metrics;
use crate::sync::network_context::SyncNetworkContext; use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
@ -63,7 +64,10 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
} }
/// Updates the Syncing state of the collection after a chain is removed. /// Updates the Syncing state of the collection after a chain is removed.
fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool) { fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) {
let _ = metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()])
.map(|m| m.dec());
match self.state { match self.state {
RangeSyncState::Finalized(ref syncing_id) => { RangeSyncState::Finalized(ref syncing_id) => {
if syncing_id == id { if syncing_id == id {
@ -136,7 +140,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
RangeSyncType::Head => self.head_chains.remove(&id), RangeSyncType::Head => self.head_chains.remove(&id),
}; };
let chain = chain.expect("Chain exists"); let chain = chain.expect("Chain exists");
self.on_chain_removed(&id, chain.is_syncing()); self.on_chain_removed(&id, chain.is_syncing(), sync_type);
results.push((chain, sync_type, reason)); results.push((chain, sync_type, reason));
} }
results results
@ -160,7 +164,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
// Search in our finalized chains first // Search in our finalized chains first
if let Err(remove_reason) = func(entry.get_mut()) { if let Err(remove_reason) = func(entry.get_mut()) {
let chain = entry.remove(); let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing()); self.on_chain_removed(&id, chain.is_syncing(), RangeSyncType::Finalized);
Ok((Some((chain, remove_reason)), RangeSyncType::Finalized)) Ok((Some((chain, remove_reason)), RangeSyncType::Finalized))
} else { } else {
Ok((None, RangeSyncType::Finalized)) Ok((None, RangeSyncType::Finalized))
@ -169,7 +173,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
// Search in our head chains next // Search in our head chains next
if let Err(remove_reason) = func(entry.get_mut()) { if let Err(remove_reason) = func(entry.get_mut()) {
let chain = entry.remove(); let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing()); self.on_chain_removed(&id, chain.is_syncing(), RangeSyncType::Head);
Ok((Some((chain, remove_reason)), RangeSyncType::Head)) Ok((Some((chain, remove_reason)), RangeSyncType::Head))
} else { } else {
Ok((None, RangeSyncType::Head)) Ok((None, RangeSyncType::Head))
@ -311,7 +315,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason);
} }
self.finalized_chains.remove(&new_id); self.finalized_chains.remove(&new_id);
self.on_chain_removed(&new_id, true); self.on_chain_removed(&new_id, true, RangeSyncType::Finalized);
} }
} }
} }
@ -424,7 +428,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|| chain.available_peers() == 0 || chain.available_peers() == 0
{ {
debug!(log_ref, "Purging out of finalized chain"; &chain); debug!(log_ref, "Purging out of finalized chain"; &chain);
removed_chains.push((*id, chain.is_syncing())); removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Finalized));
false false
} else { } else {
true true
@ -435,7 +439,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|| chain.available_peers() == 0 || chain.available_peers() == 0
{ {
debug!(log_ref, "Purging out of date head chain"; &chain); debug!(log_ref, "Purging out of date head chain"; &chain);
removed_chains.push((*id, chain.is_syncing())); removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Head));
false false
} else { } else {
true true
@ -443,8 +447,8 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
}); });
// update the state of the collection // update the state of the collection
for (id, was_syncing) in removed_chains { for (id, was_syncing, sync_type) in removed_chains {
self.on_chain_removed(&id, was_syncing); self.on_chain_removed(&id, was_syncing, sync_type);
} }
} }
@ -480,7 +484,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
} }
let chain = entry.remove(); let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing()); self.on_chain_removed(&id, chain.is_syncing(), sync_type);
} }
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
@ -496,6 +500,9 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
debug_assert_eq!(new_chain.get_id(), id); debug_assert_eq!(new_chain.get_id(), id);
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
entry.insert(new_chain); entry.insert(new_chain);
let _ =
metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()])
.map(|m| m.inc());
} }
} }
} }

View File

@ -6,7 +6,7 @@ use eth2_libp2p::SyncInfo;
use std::sync::Arc; use std::sync::Arc;
/// The type of Range sync that should be done relative to our current state. /// The type of Range sync that should be done relative to our current state.
#[derive(Debug)] #[derive(Debug, Clone, Copy)]
pub enum RangeSyncType { pub enum RangeSyncType {
/// A finalized chain sync should be started with this peer. /// A finalized chain sync should be started with this peer.
Finalized, Finalized,
@ -39,4 +39,12 @@ impl RangeSyncType {
RangeSyncType::Head RangeSyncType::Head
} }
} }
/// Get a `str` representation of the `RangeSyncType`.
pub fn as_str(&self) -> &'static str {
match self {
RangeSyncType::Finalized => "Finalized",
RangeSyncType::Head => "Head",
}
}
} }