Add metrics to VC (#1954)
## Issue Addressed NA ## Proposed Changes - Adds a HTTP server to the VC which provides Prometheus metrics. - Moves the health metrics into the `lighthouse_metrics` crate so it can be shared between BN/VC. - Sprinkle some metrics around the VC. - Update the book to indicate that we now have VC metrics. - Shifts the "waiting for genesis" logic later in the `ProductionValidatorClient::new_from_cli` - This is worth attention during the review. ## Additional Info - ~~`clippy` has some new lints that are failing. I'll deal with that in another PR.~~
This commit is contained in:
parent
50558e61f7
commit
26741944b1
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -7382,8 +7382,10 @@ dependencies = [
|
||||
"futures 0.3.8",
|
||||
"hex",
|
||||
"hyper 0.13.9",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"libsecp256k1",
|
||||
"lighthouse_metrics",
|
||||
"lighthouse_version",
|
||||
"logging",
|
||||
"parking_lot 0.11.1",
|
||||
@ -7535,6 +7537,8 @@ dependencies = [
|
||||
"beacon_chain",
|
||||
"eth2",
|
||||
"headers",
|
||||
"lazy_static",
|
||||
"lighthouse_metrics",
|
||||
"safe_arith",
|
||||
"serde",
|
||||
"state_processing",
|
||||
|
@ -1,10 +1,6 @@
|
||||
//! This crate provides a HTTP server that is solely dedicated to serving the `/metrics` endpoint.
|
||||
//!
|
||||
//! For other endpoints, see the `http_api` crate.
|
||||
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
mod metrics;
|
||||
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
|
@ -1,45 +1,9 @@
|
||||
use crate::Context;
|
||||
use beacon_chain::BeaconChainTypes;
|
||||
use eth2::lighthouse::Health;
|
||||
use lighthouse_metrics::{Encoder, TextEncoder};
|
||||
|
||||
pub use lighthouse_metrics::*;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref PROCESS_NUM_THREADS: Result<IntGauge> = try_create_int_gauge(
|
||||
"process_num_threads",
|
||||
"Number of threads used by the current process"
|
||||
);
|
||||
pub static ref PROCESS_RES_MEM: Result<IntGauge> = try_create_int_gauge(
|
||||
"process_resident_memory_bytes",
|
||||
"Resident memory used by the current process"
|
||||
);
|
||||
pub static ref PROCESS_VIRT_MEM: Result<IntGauge> = try_create_int_gauge(
|
||||
"process_virtual_memory_bytes",
|
||||
"Virtual memory used by the current process"
|
||||
);
|
||||
pub static ref SYSTEM_VIRT_MEM_TOTAL: Result<IntGauge> =
|
||||
try_create_int_gauge("system_virt_mem_total_bytes", "Total system virtual memory");
|
||||
pub static ref SYSTEM_VIRT_MEM_AVAILABLE: Result<IntGauge> = try_create_int_gauge(
|
||||
"system_virt_mem_available_bytes",
|
||||
"Available system virtual memory"
|
||||
);
|
||||
pub static ref SYSTEM_VIRT_MEM_USED: Result<IntGauge> =
|
||||
try_create_int_gauge("system_virt_mem_used_bytes", "Used system virtual memory");
|
||||
pub static ref SYSTEM_VIRT_MEM_FREE: Result<IntGauge> =
|
||||
try_create_int_gauge("system_virt_mem_free_bytes", "Free system virtual memory");
|
||||
pub static ref SYSTEM_VIRT_MEM_PERCENTAGE: Result<Gauge> = try_create_float_gauge(
|
||||
"system_virt_mem_percentage",
|
||||
"Percentage of used virtual memory"
|
||||
);
|
||||
pub static ref SYSTEM_LOADAVG_1: Result<Gauge> =
|
||||
try_create_float_gauge("system_loadavg_1", "Loadavg over 1 minute");
|
||||
pub static ref SYSTEM_LOADAVG_5: Result<Gauge> =
|
||||
try_create_float_gauge("system_loadavg_5", "Loadavg over 5 minutes");
|
||||
pub static ref SYSTEM_LOADAVG_15: Result<Gauge> =
|
||||
try_create_float_gauge("system_loadavg_15", "Loadavg over 15 minutes");
|
||||
}
|
||||
|
||||
pub fn gather_prometheus_metrics<T: BeaconChainTypes>(
|
||||
ctx: &Context<T>,
|
||||
) -> std::result::Result<String, String> {
|
||||
@ -75,27 +39,7 @@ pub fn gather_prometheus_metrics<T: BeaconChainTypes>(
|
||||
|
||||
eth2_libp2p::scrape_discovery_metrics();
|
||||
|
||||
// This will silently fail if we are unable to observe the health. This is desired behaviour
|
||||
// since we don't support `Health` for all platforms.
|
||||
if let Ok(health) = Health::observe() {
|
||||
set_gauge(&PROCESS_NUM_THREADS, health.pid_num_threads as i64);
|
||||
set_gauge(&PROCESS_RES_MEM, health.pid_mem_resident_set_size as i64);
|
||||
set_gauge(&PROCESS_VIRT_MEM, health.pid_mem_virtual_memory_size as i64);
|
||||
set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64);
|
||||
set_gauge(
|
||||
&SYSTEM_VIRT_MEM_AVAILABLE,
|
||||
health.sys_virt_mem_available as i64,
|
||||
);
|
||||
set_gauge(&SYSTEM_VIRT_MEM_USED, health.sys_virt_mem_used as i64);
|
||||
set_gauge(&SYSTEM_VIRT_MEM_FREE, health.sys_virt_mem_free as i64);
|
||||
set_float_gauge(
|
||||
&SYSTEM_VIRT_MEM_PERCENTAGE,
|
||||
health.sys_virt_mem_percent as f64,
|
||||
);
|
||||
set_float_gauge(&SYSTEM_LOADAVG_1, health.sys_loadavg_1);
|
||||
set_float_gauge(&SYSTEM_LOADAVG_5, health.sys_loadavg_5);
|
||||
set_float_gauge(&SYSTEM_LOADAVG_15, health.sys_loadavg_15);
|
||||
}
|
||||
warp_utils::metrics::scrape_health_metrics();
|
||||
|
||||
encoder
|
||||
.encode(&lighthouse_metrics::gather(), &mut buffer)
|
||||
|
@ -30,5 +30,21 @@ curl localhost:5054/metrics
|
||||
|
||||
## Validator Client Metrics
|
||||
|
||||
The validator client does not *yet* expose metrics, however this functionality
|
||||
is expected to be implemented in late-September 2020.
|
||||
|
||||
By default, these metrics are disabled but can be enabled with the `--metrics`
|
||||
flag. Use the `--metrics-address`, `--metrics-port` and
|
||||
`--metrics-allow-origin` flags to customize the metrics server.
|
||||
|
||||
### Example
|
||||
|
||||
Start a validator client with the metrics server enabled:
|
||||
|
||||
```bash
|
||||
lighthouse vc --metrics
|
||||
```
|
||||
|
||||
Check to ensure that the metrics are available on the default port:
|
||||
|
||||
```bash
|
||||
curl localhost:5064/metrics
|
||||
```
|
||||
|
@ -16,3 +16,5 @@ safe_arith = { path = "../../consensus/safe_arith" }
|
||||
serde = { version = "1.0.116", features = ["derive"] }
|
||||
tokio = { version = "0.2.22", features = ["sync"] }
|
||||
headers = "0.3.2"
|
||||
lighthouse_metrics = { path = "../lighthouse_metrics" }
|
||||
lazy_static = "1.4.0"
|
||||
|
@ -2,5 +2,6 @@
|
||||
//! Lighthouse project. E.g., the `http_api` and `http_metrics` crates.
|
||||
|
||||
pub mod cors;
|
||||
pub mod metrics;
|
||||
pub mod reject;
|
||||
pub mod task;
|
||||
|
61
common/warp_utils/src/metrics.rs
Normal file
61
common/warp_utils/src/metrics.rs
Normal file
@ -0,0 +1,61 @@
|
||||
use eth2::lighthouse::Health;
|
||||
use lighthouse_metrics::*;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref PROCESS_NUM_THREADS: Result<IntGauge> = try_create_int_gauge(
|
||||
"process_num_threads",
|
||||
"Number of threads used by the current process"
|
||||
);
|
||||
pub static ref PROCESS_RES_MEM: Result<IntGauge> = try_create_int_gauge(
|
||||
"process_resident_memory_bytes",
|
||||
"Resident memory used by the current process"
|
||||
);
|
||||
pub static ref PROCESS_VIRT_MEM: Result<IntGauge> = try_create_int_gauge(
|
||||
"process_virtual_memory_bytes",
|
||||
"Virtual memory used by the current process"
|
||||
);
|
||||
pub static ref SYSTEM_VIRT_MEM_TOTAL: Result<IntGauge> =
|
||||
try_create_int_gauge("system_virt_mem_total_bytes", "Total system virtual memory");
|
||||
pub static ref SYSTEM_VIRT_MEM_AVAILABLE: Result<IntGauge> = try_create_int_gauge(
|
||||
"system_virt_mem_available_bytes",
|
||||
"Available system virtual memory"
|
||||
);
|
||||
pub static ref SYSTEM_VIRT_MEM_USED: Result<IntGauge> =
|
||||
try_create_int_gauge("system_virt_mem_used_bytes", "Used system virtual memory");
|
||||
pub static ref SYSTEM_VIRT_MEM_FREE: Result<IntGauge> =
|
||||
try_create_int_gauge("system_virt_mem_free_bytes", "Free system virtual memory");
|
||||
pub static ref SYSTEM_VIRT_MEM_PERCENTAGE: Result<Gauge> = try_create_float_gauge(
|
||||
"system_virt_mem_percentage",
|
||||
"Percentage of used virtual memory"
|
||||
);
|
||||
pub static ref SYSTEM_LOADAVG_1: Result<Gauge> =
|
||||
try_create_float_gauge("system_loadavg_1", "Loadavg over 1 minute");
|
||||
pub static ref SYSTEM_LOADAVG_5: Result<Gauge> =
|
||||
try_create_float_gauge("system_loadavg_5", "Loadavg over 5 minutes");
|
||||
pub static ref SYSTEM_LOADAVG_15: Result<Gauge> =
|
||||
try_create_float_gauge("system_loadavg_15", "Loadavg over 15 minutes");
|
||||
}
|
||||
|
||||
pub fn scrape_health_metrics() {
|
||||
// This will silently fail if we are unable to observe the health. This is desired behaviour
|
||||
// since we don't support `Health` for all platforms.
|
||||
if let Ok(health) = Health::observe() {
|
||||
set_gauge(&PROCESS_NUM_THREADS, health.pid_num_threads as i64);
|
||||
set_gauge(&PROCESS_RES_MEM, health.pid_mem_resident_set_size as i64);
|
||||
set_gauge(&PROCESS_VIRT_MEM, health.pid_mem_virtual_memory_size as i64);
|
||||
set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64);
|
||||
set_gauge(
|
||||
&SYSTEM_VIRT_MEM_AVAILABLE,
|
||||
health.sys_virt_mem_available as i64,
|
||||
);
|
||||
set_gauge(&SYSTEM_VIRT_MEM_USED, health.sys_virt_mem_used as i64);
|
||||
set_gauge(&SYSTEM_VIRT_MEM_FREE, health.sys_virt_mem_free as i64);
|
||||
set_float_gauge(
|
||||
&SYSTEM_VIRT_MEM_PERCENTAGE,
|
||||
health.sys_virt_mem_percent as f64,
|
||||
);
|
||||
set_float_gauge(&SYSTEM_LOADAVG_1, health.sys_loadavg_1);
|
||||
set_float_gauge(&SYSTEM_LOADAVG_5, health.sys_loadavg_5);
|
||||
set_float_gauge(&SYSTEM_LOADAVG_15, health.sys_loadavg_15);
|
||||
}
|
||||
}
|
@ -59,3 +59,5 @@ libsecp256k1 = "0.3.5"
|
||||
ring = "0.16.12"
|
||||
rand = "0.7.3"
|
||||
scrypt = { version = "0.3.1", default-features = false }
|
||||
lighthouse_metrics = { path = "../common/lighthouse_metrics" }
|
||||
lazy_static = "1.4.0"
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::{
|
||||
duties_service::{DutiesService, DutyAndProof},
|
||||
http_metrics::metrics,
|
||||
validator_store::ValidatorStore,
|
||||
};
|
||||
use environment::RuntimeContext;
|
||||
@ -240,6 +241,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
aggregate_production_instant: Instant,
|
||||
) -> Result<(), ()> {
|
||||
let log = self.context.log();
|
||||
let attestations_timer = metrics::start_timer_vec(
|
||||
&metrics::ATTESTATION_SERVICE_TIMES,
|
||||
&[metrics::ATTESTATIONS],
|
||||
);
|
||||
|
||||
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
|
||||
// any validators for the given `slot` and `committee_index`.
|
||||
@ -263,6 +268,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
)
|
||||
})?;
|
||||
|
||||
drop(attestations_timer);
|
||||
|
||||
// Step 2.
|
||||
//
|
||||
// If an attestation was produced, make an aggregate.
|
||||
@ -273,6 +280,12 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
|
||||
// even if the instant has already elapsed.
|
||||
delay_until(aggregate_production_instant).await;
|
||||
|
||||
// Start the metrics timer *after* we've done the delay.
|
||||
let _aggregates_timer = metrics::start_timer_vec(
|
||||
&metrics::ATTESTATION_SERVICE_TIMES,
|
||||
&[metrics::AGGREGATES],
|
||||
);
|
||||
|
||||
// Then download, sign and publish a `SignedAggregateAndProof` for each
|
||||
// validator that is elected to aggregate for this `slot` and
|
||||
// `committee_index`.
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::validator_store::ValidatorStore;
|
||||
use crate::{http_metrics::metrics, validator_store::ValidatorStore};
|
||||
use environment::RuntimeContext;
|
||||
use eth2::{types::Graffiti, BeaconNodeHttpClient};
|
||||
use futures::channel::mpsc::Receiver;
|
||||
@ -137,6 +137,8 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
/// Attempt to produce a block for any block producers in the `ValidatorStore`.
|
||||
async fn do_update(&self, notification: BlockServiceNotification) -> Result<(), ()> {
|
||||
let log = self.context.log();
|
||||
let _timer =
|
||||
metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::FULL_UPDATE]);
|
||||
|
||||
let slot = self.slot_clock.now().ok_or_else(move || {
|
||||
crit!(log, "Duties manager failed to read slot clock");
|
||||
@ -208,6 +210,8 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
/// Produce a block at the given slot for validator_pubkey
|
||||
async fn publish_block(self, slot: Slot, validator_pubkey: PublicKey) -> Result<(), String> {
|
||||
let log = self.context.log();
|
||||
let _timer =
|
||||
metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::BEACON_BLOCK]);
|
||||
|
||||
let current_slot = self
|
||||
.slot_clock
|
||||
|
@ -138,4 +138,37 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
address of this server (e.g., http://localhost:5062).")
|
||||
.takes_value(true),
|
||||
)
|
||||
/* Prometheus metrics HTTP server related arguments */
|
||||
.arg(
|
||||
Arg::with_name("metrics")
|
||||
.long("metrics")
|
||||
.help("Enable the Prometheus metrics HTTP server. Disabled by default.")
|
||||
.takes_value(false),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("metrics-address")
|
||||
.long("metrics-address")
|
||||
.value_name("ADDRESS")
|
||||
.help("Set the listen address for the Prometheus metrics HTTP server.")
|
||||
.default_value("127.0.0.1")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("metrics-port")
|
||||
.long("metrics-port")
|
||||
.value_name("PORT")
|
||||
.help("Set the listen TCP port for the Prometheus metrics HTTP server.")
|
||||
.default_value("5064")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("metrics-allow-origin")
|
||||
.long("metrics-allow-origin")
|
||||
.value_name("ORIGIN")
|
||||
.help("Set the value of the Access-Control-Allow-Origin response HTTP header. \
|
||||
Use * to allow any origin (not recommended in production). \
|
||||
If no value is supplied, the CORS allowed origin is set to the listen \
|
||||
address of this server (e.g., http://localhost:5064).")
|
||||
.takes_value(true),
|
||||
)
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use crate::http_api;
|
||||
use crate::{http_api, http_metrics};
|
||||
use clap::ArgMatches;
|
||||
use clap_utils::{parse_optional, parse_required};
|
||||
use directory::{
|
||||
@ -9,6 +9,7 @@ use eth2::types::Graffiti;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use slog::{warn, Logger};
|
||||
use std::fs;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::path::PathBuf;
|
||||
use types::GRAFFITI_BYTES_LEN;
|
||||
|
||||
@ -38,6 +39,8 @@ pub struct Config {
|
||||
pub graffiti: Option<Graffiti>,
|
||||
/// Configuration for the HTTP REST API.
|
||||
pub http_api: http_api::Config,
|
||||
/// Configuration for the HTTP REST API.
|
||||
pub http_metrics: http_metrics::Config,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@ -61,6 +64,7 @@ impl Default for Config {
|
||||
init_slashing_protection: false,
|
||||
graffiti: None,
|
||||
http_api: <_>::default(),
|
||||
http_metrics: <_>::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -166,6 +170,35 @@ impl Config {
|
||||
config.http_api.allow_origin = Some(allow_origin.to_string());
|
||||
}
|
||||
|
||||
/*
|
||||
* Prometheus metrics HTTP server
|
||||
*/
|
||||
|
||||
if cli_args.is_present("metrics") {
|
||||
config.http_metrics.enabled = true;
|
||||
}
|
||||
|
||||
if let Some(address) = cli_args.value_of("metrics-address") {
|
||||
config.http_metrics.listen_addr = address
|
||||
.parse::<Ipv4Addr>()
|
||||
.map_err(|_| "metrics-address is not a valid IPv4 address.")?;
|
||||
}
|
||||
|
||||
if let Some(port) = cli_args.value_of("metrics-port") {
|
||||
config.http_metrics.listen_port = port
|
||||
.parse::<u16>()
|
||||
.map_err(|_| "metrics-port is not a valid u16.")?;
|
||||
}
|
||||
|
||||
if let Some(allow_origin) = cli_args.value_of("metrics-allow-origin") {
|
||||
// Pre-validate the config value to give feedback to the user on node startup, instead of
|
||||
// as late as when the first API response is produced.
|
||||
hyper::header::HeaderValue::from_str(allow_origin)
|
||||
.map_err(|_| "Invalid allow-origin value")?;
|
||||
|
||||
config.http_metrics.allow_origin = Some(allow_origin.to_string());
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
block_service::BlockServiceNotification, is_synced::is_synced, validator_duty::ValidatorDuty,
|
||||
validator_store::ValidatorStore,
|
||||
block_service::BlockServiceNotification, http_metrics::metrics, is_synced::is_synced,
|
||||
validator_duty::ValidatorDuty, validator_store::ValidatorStore,
|
||||
};
|
||||
use environment::RuntimeContext;
|
||||
use eth2::BeaconNodeHttpClient;
|
||||
@ -511,6 +511,8 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
|
||||
spec: &ChainSpec,
|
||||
) {
|
||||
let log = self.context.log();
|
||||
let _timer =
|
||||
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::FULL_UPDATE]);
|
||||
|
||||
if !is_synced(&self.beacon_node, &self.slot_clock, None).await
|
||||
&& !self.allow_unsynced_beacon_node
|
||||
|
@ -1,3 +1,4 @@
|
||||
use crate::http_metrics::metrics;
|
||||
use environment::RuntimeContext;
|
||||
use eth2::{types::StateId, BeaconNodeHttpClient};
|
||||
use futures::StreamExt;
|
||||
@ -161,6 +162,9 @@ impl<T: SlotClock + 'static> ForkService<T> {
|
||||
|
||||
/// Attempts to download the `Fork` from the server.
|
||||
async fn do_update(self) -> Result<(), ()> {
|
||||
let _timer =
|
||||
metrics::start_timer_vec(&metrics::FORK_SERVICE_TIMES, &[metrics::FULL_UPDATE]);
|
||||
|
||||
let fork = self
|
||||
.inner
|
||||
.beacon_node
|
||||
|
146
validator_client/src/http_metrics/metrics.rs
Normal file
146
validator_client/src/http_metrics/metrics.rs
Normal file
@ -0,0 +1,146 @@
|
||||
use super::Context;
|
||||
use slot_clock::SlotClock;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use types::EthSpec;
|
||||
|
||||
pub const SUCCESS: &str = "success";
|
||||
pub const SLASHABLE: &str = "slashable";
|
||||
pub const SAME_DATA: &str = "same_data";
|
||||
pub const UNREGISTERED: &str = "unregistered";
|
||||
pub const FULL_UPDATE: &str = "full_update";
|
||||
pub const BEACON_BLOCK: &str = "beacon_block";
|
||||
pub const ATTESTATIONS: &str = "attestations";
|
||||
pub const AGGREGATES: &str = "aggregates";
|
||||
pub const CURRENT_EPOCH: &str = "current_epoch";
|
||||
pub const NEXT_EPOCH: &str = "next_epoch";
|
||||
|
||||
pub use lighthouse_metrics::*;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref GENESIS_DISTANCE: Result<IntGauge> = try_create_int_gauge(
|
||||
"vc_genesis_distance_seconds",
|
||||
"Distance between now and genesis time"
|
||||
);
|
||||
pub static ref ENABLED_VALIDATORS_COUNT: Result<IntGauge> = try_create_int_gauge(
|
||||
"vc_validators_enabled_count",
|
||||
"Number of enabled validators"
|
||||
);
|
||||
pub static ref TOTAL_VALIDATORS_COUNT: Result<IntGauge> = try_create_int_gauge(
|
||||
"vc_validators_total_count",
|
||||
"Number of total validators (enabled and disabled)"
|
||||
);
|
||||
|
||||
pub static ref SIGNED_BLOCKS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"vc_signed_beacon_blocks_total",
|
||||
"Total count of attempted block signings",
|
||||
&["status"]
|
||||
);
|
||||
pub static ref SIGNED_ATTESTATIONS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"vc_signed_attestations_total",
|
||||
"Total count of attempted Attestation signings",
|
||||
&["status"]
|
||||
);
|
||||
pub static ref SIGNED_AGGREGATES_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"vc_signed_aggregates_total",
|
||||
"Total count of attempted SignedAggregateAndProof signings",
|
||||
&["status"]
|
||||
);
|
||||
pub static ref SIGNED_SELECTION_PROOFS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
|
||||
"vc_signed_selection_proofs_total",
|
||||
"Total count of attempted SelectionProof signings",
|
||||
&["status"]
|
||||
);
|
||||
pub static ref DUTIES_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
|
||||
"vc_duties_service_task_times_seconds",
|
||||
"Duration to perform duties service tasks",
|
||||
&["task"]
|
||||
);
|
||||
pub static ref FORK_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
|
||||
"vc_fork_service_task_times_seconds",
|
||||
"Duration to perform fork service tasks",
|
||||
&["task"]
|
||||
);
|
||||
pub static ref ATTESTATION_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
|
||||
"vc_attestation_service_task_times_seconds",
|
||||
"Duration to perform attestation service tasks",
|
||||
&["task"]
|
||||
);
|
||||
pub static ref BLOCK_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
|
||||
"vc_beacon_block_service_task_times_seconds",
|
||||
"Duration to perform beacon block service tasks",
|
||||
&["task"]
|
||||
);
|
||||
pub static ref PROPOSER_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
|
||||
"vc_beacon_block_proposer_count",
|
||||
"Number of beacon block proposers on this host",
|
||||
&["task"]
|
||||
);
|
||||
pub static ref ATTESTER_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
|
||||
"vc_beacon_attester_count",
|
||||
"Number of attesters on this host",
|
||||
&["task"]
|
||||
);
|
||||
}
|
||||
|
||||
pub fn gather_prometheus_metrics<T: EthSpec>(
|
||||
ctx: &Context<T>,
|
||||
) -> std::result::Result<String, String> {
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
{
|
||||
let shared = ctx.shared.read();
|
||||
|
||||
if let Some(genesis_time) = shared.genesis_time {
|
||||
if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||
let distance = now.as_secs() as i64 - genesis_time as i64;
|
||||
set_gauge(&GENESIS_DISTANCE, distance);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(validator_store) = &shared.validator_store {
|
||||
let initialized_validators_lock = validator_store.initialized_validators();
|
||||
let initialized_validators = initialized_validators_lock.read();
|
||||
|
||||
set_gauge(
|
||||
&ENABLED_VALIDATORS_COUNT,
|
||||
initialized_validators.num_enabled() as i64,
|
||||
);
|
||||
set_gauge(
|
||||
&TOTAL_VALIDATORS_COUNT,
|
||||
initialized_validators.num_total() as i64,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(duties_service) = &shared.duties_service {
|
||||
if let Some(slot) = duties_service.slot_clock.now() {
|
||||
let current_epoch = slot.epoch(T::slots_per_epoch());
|
||||
let next_epoch = current_epoch + 1;
|
||||
|
||||
set_int_gauge(
|
||||
&PROPOSER_COUNT,
|
||||
&[CURRENT_EPOCH],
|
||||
duties_service.proposer_count(current_epoch) as i64,
|
||||
);
|
||||
set_int_gauge(
|
||||
&ATTESTER_COUNT,
|
||||
&[CURRENT_EPOCH],
|
||||
duties_service.attester_count(current_epoch) as i64,
|
||||
);
|
||||
set_int_gauge(
|
||||
&ATTESTER_COUNT,
|
||||
&[NEXT_EPOCH],
|
||||
duties_service.attester_count(next_epoch) as i64,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
warp_utils::metrics::scrape_health_metrics();
|
||||
|
||||
encoder
|
||||
.encode(&lighthouse_metrics::gather(), &mut buffer)
|
||||
.unwrap();
|
||||
|
||||
String::from_utf8(buffer).map_err(|e| format!("Failed to encode prometheus info: {:?}", e))
|
||||
}
|
149
validator_client/src/http_metrics/mod.rs
Normal file
149
validator_client/src/http_metrics/mod.rs
Normal file
@ -0,0 +1,149 @@
|
||||
//! This crate provides a HTTP server that is solely dedicated to serving the `/metrics` endpoint.
|
||||
//!
|
||||
//! For other endpoints, see the `http_api` crate.
|
||||
pub mod metrics;
|
||||
|
||||
use crate::{DutiesService, ValidatorStore};
|
||||
use lighthouse_version::version_with_platform;
|
||||
use parking_lot::RwLock;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{crit, info, Logger};
|
||||
use slot_clock::SystemTimeSlotClock;
|
||||
use std::future::Future;
|
||||
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
use types::EthSpec;
|
||||
use warp::{http::Response, Filter};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
Warp(warp::Error),
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl From<warp::Error> for Error {
|
||||
fn from(e: warp::Error) -> Self {
|
||||
Error::Warp(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for Error {
|
||||
fn from(e: String) -> Self {
|
||||
Error::Other(e)
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains objects which have shared access from inside/outside of the metrics server.
|
||||
pub struct Shared<T: EthSpec> {
|
||||
pub validator_store: Option<ValidatorStore<SystemTimeSlotClock, T>>,
|
||||
pub duties_service: Option<DutiesService<SystemTimeSlotClock, T>>,
|
||||
pub genesis_time: Option<u64>,
|
||||
}
|
||||
|
||||
/// A wrapper around all the items required to spawn the HTTP server.
|
||||
///
|
||||
/// The server will gracefully handle the case where any fields are `None`.
|
||||
pub struct Context<T: EthSpec> {
|
||||
pub config: Config,
|
||||
pub shared: RwLock<Shared<T>>,
|
||||
pub log: Logger,
|
||||
}
|
||||
|
||||
/// Configuration for the HTTP server.
|
||||
#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
pub enabled: bool,
|
||||
pub listen_addr: Ipv4Addr,
|
||||
pub listen_port: u16,
|
||||
pub allow_origin: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: false,
|
||||
listen_addr: Ipv4Addr::new(127, 0, 0, 1),
|
||||
listen_port: 5064,
|
||||
allow_origin: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a server that will serve requests using information from `ctx`.
|
||||
///
|
||||
/// The server will shut down gracefully when the `shutdown` future resolves.
|
||||
///
|
||||
/// ## Returns
|
||||
///
|
||||
/// This function will bind the server to the provided address and then return a tuple of:
|
||||
///
|
||||
/// - `SocketAddr`: the address that the HTTP server will listen on.
|
||||
/// - `Future`: the actual server future that will need to be awaited.
|
||||
///
|
||||
/// ## Errors
|
||||
///
|
||||
/// Returns an error if the server is unable to bind or there is another error during
|
||||
/// configuration.
|
||||
pub fn serve<T: EthSpec>(
|
||||
ctx: Arc<Context<T>>,
|
||||
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
|
||||
) -> Result<(SocketAddr, impl Future<Output = ()>), Error> {
|
||||
let config = &ctx.config;
|
||||
let log = ctx.log.clone();
|
||||
|
||||
// Configure CORS.
|
||||
let cors_builder = {
|
||||
let builder = warp::cors()
|
||||
.allow_method("GET")
|
||||
.allow_headers(vec!["Content-Type"]);
|
||||
|
||||
warp_utils::cors::set_builder_origins(
|
||||
builder,
|
||||
config.allow_origin.as_deref(),
|
||||
(config.listen_addr, config.listen_port),
|
||||
)?
|
||||
};
|
||||
|
||||
// Sanity check.
|
||||
if !config.enabled {
|
||||
crit!(log, "Cannot start disabled metrics HTTP server");
|
||||
return Err(Error::Other(
|
||||
"A disabled metrics server should not be started".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let inner_ctx = ctx.clone();
|
||||
let routes = warp::get()
|
||||
.and(warp::path("metrics"))
|
||||
.map(move || inner_ctx.clone())
|
||||
.and_then(|ctx: Arc<Context<T>>| async move {
|
||||
Ok::<_, warp::Rejection>(
|
||||
metrics::gather_prometheus_metrics(&ctx)
|
||||
.map(|body| Response::builder().status(200).body(body).unwrap())
|
||||
.unwrap_or_else(|e| {
|
||||
Response::builder()
|
||||
.status(500)
|
||||
.body(format!("Unable to gather metrics: {:?}", e))
|
||||
.unwrap()
|
||||
}),
|
||||
)
|
||||
})
|
||||
// Add a `Server` header.
|
||||
.map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform()))
|
||||
.with(cors_builder.build());
|
||||
|
||||
let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown(
|
||||
SocketAddrV4::new(config.listen_addr, config.listen_port),
|
||||
async {
|
||||
shutdown.await;
|
||||
},
|
||||
)?;
|
||||
|
||||
info!(
|
||||
log,
|
||||
"Metrics HTTP server started";
|
||||
"listen_address" => listening_socket.to_string(),
|
||||
);
|
||||
|
||||
Ok((listening_socket, server))
|
||||
}
|
@ -4,6 +4,7 @@ mod cli;
|
||||
mod config;
|
||||
mod duties_service;
|
||||
mod fork_service;
|
||||
mod http_metrics;
|
||||
mod initialized_validators;
|
||||
mod is_synced;
|
||||
mod key_cache;
|
||||
@ -28,6 +29,7 @@ use futures::channel::mpsc;
|
||||
use http_api::ApiSecret;
|
||||
use initialized_validators::InitializedValidators;
|
||||
use notifier::spawn_notifier;
|
||||
use parking_lot::RwLock;
|
||||
use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME};
|
||||
use slog::{error, info, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
@ -49,6 +51,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
|
||||
/// The global timeout for HTTP requests to the beacon node.
|
||||
const HTTP_TIMEOUT: Duration = Duration::from_secs(12);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ProductionValidatorClient<T: EthSpec> {
|
||||
context: RuntimeContext<T>,
|
||||
duties_service: DutiesService<SystemTimeSlotClock, T>,
|
||||
@ -57,6 +60,7 @@ pub struct ProductionValidatorClient<T: EthSpec> {
|
||||
attestation_service: AttestationService<SystemTimeSlotClock, T>,
|
||||
validator_store: ValidatorStore<SystemTimeSlotClock, T>,
|
||||
http_api_listen_addr: Option<SocketAddr>,
|
||||
http_metrics_ctx: Option<Arc<http_metrics::Context<T>>>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
@ -84,6 +88,36 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
"validator_dir" => format!("{:?}", config.validator_dir),
|
||||
);
|
||||
|
||||
// Optionally start the metrics server.
|
||||
let http_metrics_ctx = if config.http_metrics.enabled {
|
||||
let shared = http_metrics::Shared {
|
||||
validator_store: None,
|
||||
genesis_time: None,
|
||||
duties_service: None,
|
||||
};
|
||||
|
||||
let ctx: Arc<http_metrics::Context<T>> = Arc::new(http_metrics::Context {
|
||||
config: config.http_metrics.clone(),
|
||||
shared: RwLock::new(shared),
|
||||
log: log.clone(),
|
||||
});
|
||||
|
||||
let exit = context.executor.exit();
|
||||
|
||||
let (_listen_addr, server) = http_metrics::serve(ctx.clone(), exit)
|
||||
.map_err(|e| format!("Unable to start metrics API server: {:?}", e))?;
|
||||
|
||||
context
|
||||
.clone()
|
||||
.executor
|
||||
.spawn_without_exit(async move { server.await }, "metrics-api");
|
||||
|
||||
Some(ctx)
|
||||
} else {
|
||||
info!(log, "HTTP metrics server is disabled");
|
||||
None
|
||||
};
|
||||
|
||||
let mut validator_defs = ValidatorDefinitions::open_or_create(&config.validator_dir)
|
||||
.map_err(|e| format!("Unable to open or create validator definitions: {:?}", e))?;
|
||||
|
||||
@ -186,6 +220,11 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
() = context.executor.exit() => return Err("Shutting down".to_string())
|
||||
};
|
||||
|
||||
// Update the metrics server.
|
||||
if let Some(ctx) = &http_metrics_ctx {
|
||||
ctx.shared.write().genesis_time = Some(genesis_time);
|
||||
}
|
||||
|
||||
let slot_clock = SystemTimeSlotClock::new(
|
||||
context.eth2_config.spec.genesis_slot,
|
||||
Duration::from_secs(genesis_time),
|
||||
@ -221,6 +260,12 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
.allow_unsynced_beacon_node(config.allow_unsynced_beacon_node)
|
||||
.build()?;
|
||||
|
||||
// Update the metrics server.
|
||||
if let Some(ctx) = &http_metrics_ctx {
|
||||
ctx.shared.write().validator_store = Some(validator_store.clone());
|
||||
ctx.shared.write().duties_service = Some(duties_service.clone());
|
||||
}
|
||||
|
||||
let block_service = BlockServiceBuilder::new()
|
||||
.slot_clock(slot_clock.clone())
|
||||
.validator_store(validator_store.clone())
|
||||
@ -233,10 +278,16 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
.duties_service(duties_service.clone())
|
||||
.slot_clock(slot_clock)
|
||||
.validator_store(validator_store.clone())
|
||||
.beacon_node(beacon_node)
|
||||
.beacon_node(beacon_node.clone())
|
||||
.runtime_context(context.service_context("attestation".into()))
|
||||
.build()?;
|
||||
|
||||
// Wait until genesis has occured.
|
||||
//
|
||||
// It seems most sensible to move this into the `start_service` function, but I'm caution
|
||||
// of making too many changes this close to genesis (<1 week).
|
||||
wait_for_genesis(&beacon_node, genesis_time, &context).await?;
|
||||
|
||||
Ok(Self {
|
||||
context,
|
||||
duties_service,
|
||||
@ -246,6 +297,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
validator_store,
|
||||
config,
|
||||
http_api_listen_addr: None,
|
||||
http_metrics_ctx,
|
||||
})
|
||||
}
|
||||
|
||||
@ -320,7 +372,7 @@ async fn init_from_beacon_node<E: EthSpec>(
|
||||
context: &RuntimeContext<E>,
|
||||
) -> Result<(u64, Hash256), String> {
|
||||
// Wait for the beacon node to come online.
|
||||
wait_for_node(beacon_node, context.log()).await?;
|
||||
wait_for_connectivity(beacon_node, context.log()).await?;
|
||||
|
||||
let yaml_config = beacon_node
|
||||
.get_config_spec()
|
||||
@ -367,10 +419,18 @@ async fn init_from_beacon_node<E: EthSpec>(
|
||||
delay_for(RETRY_DELAY).await;
|
||||
};
|
||||
|
||||
Ok((genesis.genesis_time, genesis.genesis_validators_root))
|
||||
}
|
||||
|
||||
async fn wait_for_genesis<E: EthSpec>(
|
||||
beacon_node: &BeaconNodeHttpClient,
|
||||
genesis_time: u64,
|
||||
context: &RuntimeContext<E>,
|
||||
) -> Result<(), String> {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map_err(|e| format!("Unable to read system time: {:?}", e))?;
|
||||
let genesis_time = Duration::from_secs(genesis.genesis_time);
|
||||
let genesis_time = Duration::from_secs(genesis_time);
|
||||
|
||||
// If the time now is less than (prior to) genesis, then delay until the
|
||||
// genesis instant.
|
||||
@ -404,12 +464,15 @@ async fn init_from_beacon_node<E: EthSpec>(
|
||||
);
|
||||
}
|
||||
|
||||
Ok((genesis.genesis_time, genesis.genesis_validators_root))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Request the version from the node, looping back and trying again on failure. Exit once the node
|
||||
/// has been contacted.
|
||||
async fn wait_for_node(beacon_node: &BeaconNodeHttpClient, log: &Logger) -> Result<(), String> {
|
||||
async fn wait_for_connectivity(
|
||||
beacon_node: &BeaconNodeHttpClient,
|
||||
log: &Logger,
|
||||
) -> Result<(), String> {
|
||||
// Try to get the version string from the node, looping until success is returned.
|
||||
loop {
|
||||
let log = log.clone();
|
||||
|
@ -1,4 +1,6 @@
|
||||
use crate::{fork_service::ForkService, initialized_validators::InitializedValidators};
|
||||
use crate::{
|
||||
fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators,
|
||||
};
|
||||
use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString};
|
||||
use parking_lot::RwLock;
|
||||
use slashing_protection::{NotSafe, Safe, SlashingDatabase};
|
||||
@ -186,6 +188,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
let validators = self.validators.read();
|
||||
let voting_keypair = validators.voting_keypair(validator_pubkey)?;
|
||||
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]);
|
||||
|
||||
Some(block.sign(
|
||||
&voting_keypair.sk,
|
||||
&fork,
|
||||
@ -198,6 +202,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
self.log,
|
||||
"Skipping signing of previously signed block";
|
||||
);
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SAME_DATA]);
|
||||
None
|
||||
}
|
||||
Err(NotSafe::UnregisteredValidator(pk)) => {
|
||||
@ -207,6 +212,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
"msg" => "Carefully consider running with --init-slashing-protection (see --help)",
|
||||
"public_key" => format!("{:?}", pk)
|
||||
);
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::UNREGISTERED]);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
@ -215,6 +221,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
"Not signing slashable block";
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SLASHABLE]);
|
||||
None
|
||||
}
|
||||
}
|
||||
@ -270,6 +277,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]);
|
||||
|
||||
Some(())
|
||||
}
|
||||
Ok(Safe::SameData) => {
|
||||
@ -277,6 +286,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
self.log,
|
||||
"Skipping signing of previously signed attestation"
|
||||
);
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::SIGNED_ATTESTATIONS_TOTAL,
|
||||
&[metrics::SAME_DATA],
|
||||
);
|
||||
None
|
||||
}
|
||||
Err(NotSafe::UnregisteredValidator(pk)) => {
|
||||
@ -286,6 +299,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
"msg" => "Carefully consider running with --init-slashing-protection (see --help)",
|
||||
"public_key" => format!("{:?}", pk)
|
||||
);
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::SIGNED_ATTESTATIONS_TOTAL,
|
||||
&[metrics::UNREGISTERED],
|
||||
);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
@ -295,6 +312,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
"attestation" => format!("{:?}", attestation.data),
|
||||
"error" => format!("{:?}", e)
|
||||
);
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::SIGNED_ATTESTATIONS_TOTAL,
|
||||
&[metrics::SLASHABLE],
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
@ -314,6 +335,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
let validators = self.validators.read();
|
||||
let voting_keypair = &validators.voting_keypair(validator_pubkey)?;
|
||||
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]);
|
||||
|
||||
Some(SignedAggregateAndProof::from_aggregate(
|
||||
validator_index,
|
||||
aggregate,
|
||||
@ -335,6 +358,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
|
||||
let validators = self.validators.read();
|
||||
let voting_keypair = &validators.voting_keypair(validator_pubkey)?;
|
||||
|
||||
metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]);
|
||||
|
||||
Some(SelectionProof::new::<E>(
|
||||
slot,
|
||||
&voting_keypair.sk,
|
||||
|
Loading…
Reference in New Issue
Block a user