Discovery metrics (#1276)
* Silky smooth squash * Add discovery metrics * Fix gauge metric, increase discv5 sessions * Formatting
This commit is contained in:
parent
e379ad0f4e
commit
f3380c00b8
@ -110,7 +110,7 @@ impl Default for Config {
|
||||
// discv5 configuration
|
||||
let discv5_config = Discv5ConfigBuilder::new()
|
||||
.enable_packet_filter()
|
||||
.session_cache_capacity(100)
|
||||
.session_cache_capacity(1000)
|
||||
.request_timeout(Duration::from_secs(4))
|
||||
.request_retries(2)
|
||||
.enr_peer_update_min(2) // prevents NAT's should be raised for mainnet
|
||||
|
@ -20,6 +20,7 @@ pub use discv5;
|
||||
pub use libp2p::gossipsub::{MessageId, Topic, TopicHash};
|
||||
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
|
||||
pub use libp2p::{multiaddr, Multiaddr};
|
||||
pub use metrics::scrape_discovery_metrics;
|
||||
pub use peer_manager::discovery;
|
||||
pub use peer_manager::{
|
||||
client::Client,
|
||||
|
@ -21,8 +21,39 @@ lazy_static! {
|
||||
"discovery_queue_size",
|
||||
"The number of discovery queries awaiting execution"
|
||||
);
|
||||
pub static ref DISCOVERY_REQS: Result<IntGauge> = try_create_int_gauge(
|
||||
pub static ref DISCOVERY_REQS: Result<Gauge> = try_create_float_gauge(
|
||||
"discovery_requests",
|
||||
"The number of unsolicited discovery requests per second"
|
||||
);
|
||||
pub static ref DISCOVERY_SESSIONS: Result<IntGauge> = try_create_int_gauge(
|
||||
"discovery_sessions",
|
||||
"The number of active discovery sessions with peers"
|
||||
);
|
||||
pub static ref DISCOVERY_REQS_IP: Result<GaugeVec> = try_create_float_gauge_vec(
|
||||
"discovery_reqs_per_ip",
|
||||
"Unsolicited discovery requests per ip per second",
|
||||
&["Addresses"]
|
||||
);
|
||||
}
|
||||
|
||||
pub fn scrape_discovery_metrics() {
|
||||
let metrics = discv5::metrics::Metrics::from(discv5::Discv5::raw_metrics());
|
||||
|
||||
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
|
||||
|
||||
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
|
||||
|
||||
let process_gauge_vec = |gauge: &Result<GaugeVec>, metrics: discv5::metrics::Metrics| {
|
||||
if let Ok(gauge_vec) = gauge {
|
||||
gauge_vec.reset();
|
||||
for (ip, value) in metrics.requests_per_ip_per_second.iter() {
|
||||
if let Ok(metric) = gauge_vec.get_metric_with_label_values(&[&format!("{:?}", ip)])
|
||||
{
|
||||
metric.set(*value);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
process_gauge_vec(&DISCOVERY_REQS_IP, metrics);
|
||||
}
|
||||
|
@ -12,9 +12,8 @@ use crate::{error, Enr, NetworkConfig, NetworkGlobals};
|
||||
use discv5::{enr::NodeId, Discv5, Discv5Event};
|
||||
use enr::{BITFIELD_ENR_KEY, ETH2_ENR_KEY};
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::PeerId;
|
||||
// use libp2p::multiaddr::Protocol;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use libp2p::core::PeerId;
|
||||
use lru::LruCache;
|
||||
use slog::{crit, debug, info, trace, warn};
|
||||
use ssz::{Decode, Encode};
|
||||
@ -197,11 +196,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
});
|
||||
}
|
||||
|
||||
// start the discv5 service.
|
||||
// Start the discv5 service.
|
||||
discv5.start(listen_socket);
|
||||
debug!(log, "Discovery service started");
|
||||
|
||||
// obtain the event stream
|
||||
// Obtain the event stream
|
||||
let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream()));
|
||||
|
||||
Ok(Self {
|
||||
@ -224,6 +223,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
|
||||
/// This adds a new `FindPeers` query to the queue if one doesn't already exist.
|
||||
pub fn discover_peers(&mut self) {
|
||||
// If we are in the process of a query, don't bother queuing a new one.
|
||||
if self.find_peer_active {
|
||||
return;
|
||||
}
|
||||
// If there is not already a find peer's query queued, add one
|
||||
let query = QueryType::FindPeers;
|
||||
if !self.queued_queries.contains(&query) {
|
||||
@ -426,6 +429,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
|
||||
None => {} // Queue is empty
|
||||
}
|
||||
}
|
||||
// Update the queue metric
|
||||
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
|
||||
}
|
||||
|
||||
// Returns a boolean indicating if we are currently processing the maximum number of
|
||||
|
@ -104,6 +104,7 @@ pub fn get_prometheus<T: BeaconChainTypes>(
|
||||
slot_clock::scrape_for_metrics::<T::EthSpec, T::SlotClock>(&beacon_chain.slot_clock);
|
||||
store::scrape_for_metrics(&db_path, &freezer_db_path);
|
||||
beacon_chain::scrape_for_metrics(&beacon_chain);
|
||||
eth2_libp2p::scrape_discovery_metrics();
|
||||
|
||||
// This will silently fail if we are unable to observe the health. This is desired behaviour
|
||||
// since we don't support `Health` for all platforms.
|
||||
|
@ -57,7 +57,8 @@
|
||||
use prometheus::{HistogramOpts, HistogramTimer, Opts};
|
||||
|
||||
pub use prometheus::{
|
||||
Encoder, Gauge, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result, TextEncoder,
|
||||
Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result,
|
||||
TextEncoder,
|
||||
};
|
||||
|
||||
/// Collect all the metrics for reporting.
|
||||
@ -127,6 +128,19 @@ pub fn try_create_int_gauge_vec(
|
||||
Ok(counter_vec)
|
||||
}
|
||||
|
||||
/// Attempts to crate a `GaugeVec`, returning `Err` if the registry does not accept the gauge
|
||||
/// (potentially due to naming conflict).
|
||||
pub fn try_create_float_gauge_vec(
|
||||
name: &str,
|
||||
help: &str,
|
||||
label_names: &[&str],
|
||||
) -> Result<GaugeVec> {
|
||||
let opts = Opts::new(name, help);
|
||||
let counter_vec = GaugeVec::new(opts, label_names)?;
|
||||
prometheus::register(Box::new(counter_vec.clone()))?;
|
||||
Ok(counter_vec)
|
||||
}
|
||||
|
||||
pub fn get_int_gauge(int_gauge_vec: &Result<IntGaugeVec>, name: &[&str]) -> Option<IntGauge> {
|
||||
if let Ok(int_gauge_vec) = int_gauge_vec {
|
||||
Some(int_gauge_vec.get_metric_with_label_values(name).ok()?)
|
||||
@ -177,6 +191,12 @@ pub fn set_gauge(gauge: &Result<IntGauge>, value: i64) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_float_gauge(gauge: &Result<Gauge>, value: f64) {
|
||||
if let Ok(gauge) = gauge {
|
||||
gauge.set(value);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inc_gauge(gauge: &Result<IntGauge>) {
|
||||
if let Ok(gauge) = gauge {
|
||||
gauge.inc();
|
||||
@ -195,12 +215,6 @@ pub fn maybe_set_gauge(gauge: &Result<IntGauge>, value_opt: Option<i64>) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_float_gauge(gauge: &Result<Gauge>, value: f64) {
|
||||
if let Ok(gauge) = gauge {
|
||||
gauge.set(value);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_set_float_gauge(gauge: &Result<Gauge>, value_opt: Option<f64>) {
|
||||
if let Some(value) = value_opt {
|
||||
set_float_gauge(gauge, value)
|
||||
|
Loading…
Reference in New Issue
Block a user