From fc07cc3fdfd9b1d8df1b21de35a7a8857b97fb85 Mon Sep 17 00:00:00 2001 From: divma Date: Thu, 26 Nov 2020 05:23:17 +0000 Subject: [PATCH] Sync metrics (#1975) ## Issue Addressed - Add metrics to keep track of peer counts by sync type - Add metric to keep track of the number of syncing chains in range ## Proposed Changes Plugin to the network metrics update interval and update too the counts for peers wrt to their sync status with us ## Additional Info For the peer counts - By the way it is implemented the numbers won't always match to the total peer count in the `libp2p` metric. - Updating the gauge with every change is messy because it requires to be updated on connection (in the `eth2_libp2p` crate, while metrics are defined in the `network` crate) on Goodbye sent (for an `IrrelevantPeer`) either in the `beacon_processor` or the `peer_manager`, and on disconnection. Since this is not a critical metric I think counting once every second is enough. If you think more accuracy is needed we can do it too, but it would be harder to maintain) ATM those look like this ![image](https://user-images.githubusercontent.com/26765164/100275387-22137b00-2f60-11eb-93b9-94b0f265240c.png) --- .../src/peer_manager/peer_sync_status.rs | 19 +++++++----- beacon_node/network/src/metrics.rs | 17 +++++++++++ beacon_node/network/src/service.rs | 30 +++++++++++++++++++ .../src/sync/range_sync/chain_collection.rs | 27 ++++++++++------- .../network/src/sync/range_sync/sync_type.rs | 10 ++++++- 5 files changed, 84 insertions(+), 19 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs index 9cf38ceb3..32580beff 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs @@ -65,17 +65,20 @@ impl PeerSyncStatus { true } } + + pub fn as_str(&self) -> &'static str { + match self { + PeerSyncStatus::Advanced { .. } => "Advanced", + PeerSyncStatus::Behind { .. } => "Behind", + PeerSyncStatus::Synced { .. } => "Synced", + PeerSyncStatus::Unknown => "Unknown", + PeerSyncStatus::IrrelevantPeer => "Irrelevant", + } + } } impl std::fmt::Display for PeerSyncStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let rpr = match self { - PeerSyncStatus::Behind { .. } => "Behind", - PeerSyncStatus::Advanced { .. } => "Advanced", - PeerSyncStatus::Synced { .. } => "Synced", - PeerSyncStatus::Unknown => "Unknown", - PeerSyncStatus::IrrelevantPeer => "IrrelevantPeer", - }; - f.write_str(rpr) + f.write_str(self.as_str()) } } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index bf365262b..8ccb76b55 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -406,6 +406,23 @@ lazy_static! { ); } +lazy_static! { + /* + * Sync related metrics + */ + pub static ref PEERS_PER_SYNC_TYPE: Result = try_create_int_gauge_vec( + "sync_peers_per_status", + "Number of connected peers per sync status type", + &["sync_status"] + ); + pub static ref SYNCING_CHAINS_COUNT: Result = try_create_int_gauge_vec( + "sync_range_chains", + "Number of Syncing chains in range, per range type", + &["range_type"] + ); + +} + pub fn register_attestation_error(error: &AttnError) { match error { AttnError::FutureEpoch { .. } => inc_counter(&GOSSIP_ATTESTATION_ERROR_FUTURE_EPOCH), diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index fdb96b77a..f4d6532c9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,6 +14,7 @@ use eth2_libp2p::{ types::GossipKind, BehaviourEvent, GossipTopic, MessageId, NetworkGlobals, PeerId, TopicHash, }; use eth2_libp2p::{MessageAcceptance, Service as LibP2PService}; +use fnv::FnvHashMap; use futures::prelude::*; use slog::{debug, error, info, o, trace, warn}; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; @@ -277,6 +278,9 @@ fn spawn_service( &service.libp2p.swarm.gs(), &service.network_globals, ); + // update sync metrics + update_sync_metrics(&service.network_globals); + } _ = service.gossipsub_parameter_update.next() => { if let Ok(slot) = service.beacon_chain.slot() { @@ -932,3 +936,29 @@ fn update_gossip_metrics( } } } + +fn update_sync_metrics(network_globals: &Arc>) { + // reset the counts + if metrics::PEERS_PER_SYNC_TYPE + .as_ref() + .map(|metric| metric.reset()) + .is_err() + { + return; + }; + + // count per sync status, the number of connected peers + let mut peers_per_sync_type = FnvHashMap::default(); + for sync_type in network_globals + .peers + .read() + .connected_peers() + .map(|(_peer_id, info)| info.sync_status.as_str()) + { + *peers_per_sync_type.entry(sync_type).or_default() += 1; + } + + for (sync_type, peer_count) in peers_per_sync_type { + metrics::set_gauge_entry(&metrics::PEERS_PER_SYNC_TYPE, &[sync_type], peer_count); + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 180ceb949..e820ce1a9 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -6,6 +6,7 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; +use crate::metrics; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; @@ -63,7 +64,10 @@ impl ChainCollection { } /// Updates the Syncing state of the collection after a chain is removed. - fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool) { + fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) { + let _ = metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()]) + .map(|m| m.dec()); + match self.state { RangeSyncState::Finalized(ref syncing_id) => { if syncing_id == id { @@ -136,7 +140,7 @@ impl ChainCollection { RangeSyncType::Head => self.head_chains.remove(&id), }; let chain = chain.expect("Chain exists"); - self.on_chain_removed(&id, chain.is_syncing()); + self.on_chain_removed(&id, chain.is_syncing(), sync_type); results.push((chain, sync_type, reason)); } results @@ -160,7 +164,7 @@ impl ChainCollection { // Search in our finalized chains first if let Err(remove_reason) = func(entry.get_mut()) { let chain = entry.remove(); - self.on_chain_removed(&id, chain.is_syncing()); + self.on_chain_removed(&id, chain.is_syncing(), RangeSyncType::Finalized); Ok((Some((chain, remove_reason)), RangeSyncType::Finalized)) } else { Ok((None, RangeSyncType::Finalized)) @@ -169,7 +173,7 @@ impl ChainCollection { // Search in our head chains next if let Err(remove_reason) = func(entry.get_mut()) { let chain = entry.remove(); - self.on_chain_removed(&id, chain.is_syncing()); + self.on_chain_removed(&id, chain.is_syncing(), RangeSyncType::Head); Ok((Some((chain, remove_reason)), RangeSyncType::Head)) } else { Ok((None, RangeSyncType::Head)) @@ -311,7 +315,7 @@ impl ChainCollection { error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); } self.finalized_chains.remove(&new_id); - self.on_chain_removed(&new_id, true); + self.on_chain_removed(&new_id, true, RangeSyncType::Finalized); } } } @@ -424,7 +428,7 @@ impl ChainCollection { || chain.available_peers() == 0 { debug!(log_ref, "Purging out of finalized chain"; &chain); - removed_chains.push((*id, chain.is_syncing())); + removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Finalized)); false } else { true @@ -435,7 +439,7 @@ impl ChainCollection { || chain.available_peers() == 0 { debug!(log_ref, "Purging out of date head chain"; &chain); - removed_chains.push((*id, chain.is_syncing())); + removed_chains.push((*id, chain.is_syncing(), RangeSyncType::Head)); false } else { true @@ -443,8 +447,8 @@ impl ChainCollection { }); // update the state of the collection - for (id, was_syncing) in removed_chains { - self.on_chain_removed(&id, was_syncing); + for (id, was_syncing, sync_type) in removed_chains { + self.on_chain_removed(&id, was_syncing, sync_type); } } @@ -480,7 +484,7 @@ impl ChainCollection { error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); } let chain = entry.remove(); - self.on_chain_removed(&id, chain.is_syncing()); + self.on_chain_removed(&id, chain.is_syncing(), sync_type); } } Entry::Vacant(entry) => { @@ -496,6 +500,9 @@ impl ChainCollection { debug_assert_eq!(new_chain.get_id(), id); debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); entry.insert(new_chain); + let _ = + metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()]) + .map(|m| m.inc()); } } } diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index c5e847bd5..a73f1735b 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -6,7 +6,7 @@ use eth2_libp2p::SyncInfo; use std::sync::Arc; /// The type of Range sync that should be done relative to our current state. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum RangeSyncType { /// A finalized chain sync should be started with this peer. Finalized, @@ -39,4 +39,12 @@ impl RangeSyncType { RangeSyncType::Head } } + + /// Get a `str` representation of the `RangeSyncType`. + pub fn as_str(&self) -> &'static str { + match self { + RangeSyncType::Finalized => "Finalized", + RangeSyncType::Head => "Head", + } + } }