diff --git a/Cargo.lock b/Cargo.lock index 8c8fbb7d1..45250ffa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7382,8 +7382,10 @@ dependencies = [ "futures 0.3.8", "hex", "hyper 0.13.9", + "lazy_static", "libc", "libsecp256k1", + "lighthouse_metrics", "lighthouse_version", "logging", "parking_lot 0.11.1", @@ -7535,6 +7537,8 @@ dependencies = [ "beacon_chain", "eth2", "headers", + "lazy_static", + "lighthouse_metrics", "safe_arith", "serde", "state_processing", diff --git a/beacon_node/http_metrics/src/lib.rs b/beacon_node/http_metrics/src/lib.rs index ce59578b9..e3238caae 100644 --- a/beacon_node/http_metrics/src/lib.rs +++ b/beacon_node/http_metrics/src/lib.rs @@ -1,10 +1,6 @@ //! This crate provides a HTTP server that is solely dedicated to serving the `/metrics` endpoint. //! //! For other endpoints, see the `http_api` crate. - -#[macro_use] -extern crate lazy_static; - mod metrics; use beacon_chain::{BeaconChain, BeaconChainTypes}; diff --git a/beacon_node/http_metrics/src/metrics.rs b/beacon_node/http_metrics/src/metrics.rs index bcd803c40..3d1b125ea 100644 --- a/beacon_node/http_metrics/src/metrics.rs +++ b/beacon_node/http_metrics/src/metrics.rs @@ -1,45 +1,9 @@ use crate::Context; use beacon_chain::BeaconChainTypes; -use eth2::lighthouse::Health; use lighthouse_metrics::{Encoder, TextEncoder}; pub use lighthouse_metrics::*; -lazy_static! { - pub static ref PROCESS_NUM_THREADS: Result = try_create_int_gauge( - "process_num_threads", - "Number of threads used by the current process" - ); - pub static ref PROCESS_RES_MEM: Result = try_create_int_gauge( - "process_resident_memory_bytes", - "Resident memory used by the current process" - ); - pub static ref PROCESS_VIRT_MEM: Result = try_create_int_gauge( - "process_virtual_memory_bytes", - "Virtual memory used by the current process" - ); - pub static ref SYSTEM_VIRT_MEM_TOTAL: Result = - try_create_int_gauge("system_virt_mem_total_bytes", "Total system virtual memory"); - pub static ref SYSTEM_VIRT_MEM_AVAILABLE: Result = try_create_int_gauge( - "system_virt_mem_available_bytes", - "Available system virtual memory" - ); - pub static ref SYSTEM_VIRT_MEM_USED: Result = - try_create_int_gauge("system_virt_mem_used_bytes", "Used system virtual memory"); - pub static ref SYSTEM_VIRT_MEM_FREE: Result = - try_create_int_gauge("system_virt_mem_free_bytes", "Free system virtual memory"); - pub static ref SYSTEM_VIRT_MEM_PERCENTAGE: Result = try_create_float_gauge( - "system_virt_mem_percentage", - "Percentage of used virtual memory" - ); - pub static ref SYSTEM_LOADAVG_1: Result = - try_create_float_gauge("system_loadavg_1", "Loadavg over 1 minute"); - pub static ref SYSTEM_LOADAVG_5: Result = - try_create_float_gauge("system_loadavg_5", "Loadavg over 5 minutes"); - pub static ref SYSTEM_LOADAVG_15: Result = - try_create_float_gauge("system_loadavg_15", "Loadavg over 15 minutes"); -} - pub fn gather_prometheus_metrics( ctx: &Context, ) -> std::result::Result { @@ -75,27 +39,7 @@ pub fn gather_prometheus_metrics( eth2_libp2p::scrape_discovery_metrics(); - // This will silently fail if we are unable to observe the health. This is desired behaviour - // since we don't support `Health` for all platforms. - if let Ok(health) = Health::observe() { - set_gauge(&PROCESS_NUM_THREADS, health.pid_num_threads as i64); - set_gauge(&PROCESS_RES_MEM, health.pid_mem_resident_set_size as i64); - set_gauge(&PROCESS_VIRT_MEM, health.pid_mem_virtual_memory_size as i64); - set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64); - set_gauge( - &SYSTEM_VIRT_MEM_AVAILABLE, - health.sys_virt_mem_available as i64, - ); - set_gauge(&SYSTEM_VIRT_MEM_USED, health.sys_virt_mem_used as i64); - set_gauge(&SYSTEM_VIRT_MEM_FREE, health.sys_virt_mem_free as i64); - set_float_gauge( - &SYSTEM_VIRT_MEM_PERCENTAGE, - health.sys_virt_mem_percent as f64, - ); - set_float_gauge(&SYSTEM_LOADAVG_1, health.sys_loadavg_1); - set_float_gauge(&SYSTEM_LOADAVG_5, health.sys_loadavg_5); - set_float_gauge(&SYSTEM_LOADAVG_15, health.sys_loadavg_15); - } + warp_utils::metrics::scrape_health_metrics(); encoder .encode(&lighthouse_metrics::gather(), &mut buffer) diff --git a/book/src/advanced_metrics.md b/book/src/advanced_metrics.md index 6c901862e..0d1aa345b 100644 --- a/book/src/advanced_metrics.md +++ b/book/src/advanced_metrics.md @@ -30,5 +30,21 @@ curl localhost:5054/metrics ## Validator Client Metrics -The validator client does not *yet* expose metrics, however this functionality -is expected to be implemented in late-September 2020. + +By default, these metrics are disabled but can be enabled with the `--metrics` +flag. Use the `--metrics-address`, `--metrics-port` and +`--metrics-allow-origin` flags to customize the metrics server. + +### Example + +Start a validator client with the metrics server enabled: + +```bash +lighthouse vc --metrics +``` + +Check to ensure that the metrics are available on the default port: + +```bash +curl localhost:5064/metrics +``` diff --git a/common/warp_utils/Cargo.toml b/common/warp_utils/Cargo.toml index 1fc88abab..cf77cccd3 100644 --- a/common/warp_utils/Cargo.toml +++ b/common/warp_utils/Cargo.toml @@ -16,3 +16,5 @@ safe_arith = { path = "../../consensus/safe_arith" } serde = { version = "1.0.116", features = ["derive"] } tokio = { version = "0.2.22", features = ["sync"] } headers = "0.3.2" +lighthouse_metrics = { path = "../lighthouse_metrics" } +lazy_static = "1.4.0" diff --git a/common/warp_utils/src/lib.rs b/common/warp_utils/src/lib.rs index b645d85dc..5f37dde87 100644 --- a/common/warp_utils/src/lib.rs +++ b/common/warp_utils/src/lib.rs @@ -2,5 +2,6 @@ //! Lighthouse project. E.g., the `http_api` and `http_metrics` crates. pub mod cors; +pub mod metrics; pub mod reject; pub mod task; diff --git a/common/warp_utils/src/metrics.rs b/common/warp_utils/src/metrics.rs new file mode 100644 index 000000000..dc42aa6b3 --- /dev/null +++ b/common/warp_utils/src/metrics.rs @@ -0,0 +1,61 @@ +use eth2::lighthouse::Health; +use lighthouse_metrics::*; + +lazy_static::lazy_static! { + pub static ref PROCESS_NUM_THREADS: Result = try_create_int_gauge( + "process_num_threads", + "Number of threads used by the current process" + ); + pub static ref PROCESS_RES_MEM: Result = try_create_int_gauge( + "process_resident_memory_bytes", + "Resident memory used by the current process" + ); + pub static ref PROCESS_VIRT_MEM: Result = try_create_int_gauge( + "process_virtual_memory_bytes", + "Virtual memory used by the current process" + ); + pub static ref SYSTEM_VIRT_MEM_TOTAL: Result = + try_create_int_gauge("system_virt_mem_total_bytes", "Total system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_AVAILABLE: Result = try_create_int_gauge( + "system_virt_mem_available_bytes", + "Available system virtual memory" + ); + pub static ref SYSTEM_VIRT_MEM_USED: Result = + try_create_int_gauge("system_virt_mem_used_bytes", "Used system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_FREE: Result = + try_create_int_gauge("system_virt_mem_free_bytes", "Free system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_PERCENTAGE: Result = try_create_float_gauge( + "system_virt_mem_percentage", + "Percentage of used virtual memory" + ); + pub static ref SYSTEM_LOADAVG_1: Result = + try_create_float_gauge("system_loadavg_1", "Loadavg over 1 minute"); + pub static ref SYSTEM_LOADAVG_5: Result = + try_create_float_gauge("system_loadavg_5", "Loadavg over 5 minutes"); + pub static ref SYSTEM_LOADAVG_15: Result = + try_create_float_gauge("system_loadavg_15", "Loadavg over 15 minutes"); +} + +pub fn scrape_health_metrics() { + // This will silently fail if we are unable to observe the health. This is desired behaviour + // since we don't support `Health` for all platforms. + if let Ok(health) = Health::observe() { + set_gauge(&PROCESS_NUM_THREADS, health.pid_num_threads as i64); + set_gauge(&PROCESS_RES_MEM, health.pid_mem_resident_set_size as i64); + set_gauge(&PROCESS_VIRT_MEM, health.pid_mem_virtual_memory_size as i64); + set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64); + set_gauge( + &SYSTEM_VIRT_MEM_AVAILABLE, + health.sys_virt_mem_available as i64, + ); + set_gauge(&SYSTEM_VIRT_MEM_USED, health.sys_virt_mem_used as i64); + set_gauge(&SYSTEM_VIRT_MEM_FREE, health.sys_virt_mem_free as i64); + set_float_gauge( + &SYSTEM_VIRT_MEM_PERCENTAGE, + health.sys_virt_mem_percent as f64, + ); + set_float_gauge(&SYSTEM_LOADAVG_1, health.sys_loadavg_1); + set_float_gauge(&SYSTEM_LOADAVG_5, health.sys_loadavg_5); + set_float_gauge(&SYSTEM_LOADAVG_15, health.sys_loadavg_15); + } +} diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 43868cf5f..382105ff5 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -59,3 +59,5 @@ libsecp256k1 = "0.3.5" ring = "0.16.12" rand = "0.7.3" scrypt = { version = "0.3.1", default-features = false } +lighthouse_metrics = { path = "../common/lighthouse_metrics" } +lazy_static = "1.4.0" diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index fbad0d18e..fa4457eb1 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,5 +1,6 @@ use crate::{ duties_service::{DutiesService, DutyAndProof}, + http_metrics::metrics, validator_store::ValidatorStore, }; use environment::RuntimeContext; @@ -240,6 +241,10 @@ impl AttestationService { aggregate_production_instant: Instant, ) -> Result<(), ()> { let log = self.context.log(); + let attestations_timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS], + ); // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. @@ -263,6 +268,8 @@ impl AttestationService { ) })?; + drop(attestations_timer); + // Step 2. // // If an attestation was produced, make an aggregate. @@ -273,6 +280,12 @@ impl AttestationService { // even if the instant has already elapsed. delay_until(aggregate_production_instant).await; + // Start the metrics timer *after* we've done the delay. + let _aggregates_timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES], + ); + // Then download, sign and publish a `SignedAggregateAndProof` for each // validator that is elected to aggregate for this `slot` and // `committee_index`. diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index bf52cacfc..2eb3c70b5 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,4 +1,4 @@ -use crate::validator_store::ValidatorStore; +use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use environment::RuntimeContext; use eth2::{types::Graffiti, BeaconNodeHttpClient}; use futures::channel::mpsc::Receiver; @@ -137,6 +137,8 @@ impl BlockService { /// Attempt to produce a block for any block producers in the `ValidatorStore`. async fn do_update(&self, notification: BlockServiceNotification) -> Result<(), ()> { let log = self.context.log(); + let _timer = + metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::FULL_UPDATE]); let slot = self.slot_clock.now().ok_or_else(move || { crit!(log, "Duties manager failed to read slot clock"); @@ -208,6 +210,8 @@ impl BlockService { /// Produce a block at the given slot for validator_pubkey async fn publish_block(self, slot: Slot, validator_pubkey: PublicKey) -> Result<(), String> { let log = self.context.log(); + let _timer = + metrics::start_timer_vec(&metrics::BLOCK_SERVICE_TIMES, &[metrics::BEACON_BLOCK]); let current_slot = self .slot_clock diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 2f63932b9..bdc5e7d05 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -138,4 +138,37 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { address of this server (e.g., http://localhost:5062).") .takes_value(true), ) + /* Prometheus metrics HTTP server related arguments */ + .arg( + Arg::with_name("metrics") + .long("metrics") + .help("Enable the Prometheus metrics HTTP server. Disabled by default.") + .takes_value(false), + ) + .arg( + Arg::with_name("metrics-address") + .long("metrics-address") + .value_name("ADDRESS") + .help("Set the listen address for the Prometheus metrics HTTP server.") + .default_value("127.0.0.1") + .takes_value(true), + ) + .arg( + Arg::with_name("metrics-port") + .long("metrics-port") + .value_name("PORT") + .help("Set the listen TCP port for the Prometheus metrics HTTP server.") + .default_value("5064") + .takes_value(true), + ) + .arg( + Arg::with_name("metrics-allow-origin") + .long("metrics-allow-origin") + .value_name("ORIGIN") + .help("Set the value of the Access-Control-Allow-Origin response HTTP header. \ + Use * to allow any origin (not recommended in production). \ + If no value is supplied, the CORS allowed origin is set to the listen \ + address of this server (e.g., http://localhost:5064).") + .takes_value(true), + ) } diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index f26eaf9e0..13a9a9de5 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -1,4 +1,4 @@ -use crate::http_api; +use crate::{http_api, http_metrics}; use clap::ArgMatches; use clap_utils::{parse_optional, parse_required}; use directory::{ @@ -9,6 +9,7 @@ use eth2::types::Graffiti; use serde_derive::{Deserialize, Serialize}; use slog::{warn, Logger}; use std::fs; +use std::net::Ipv4Addr; use std::path::PathBuf; use types::GRAFFITI_BYTES_LEN; @@ -38,6 +39,8 @@ pub struct Config { pub graffiti: Option, /// Configuration for the HTTP REST API. pub http_api: http_api::Config, + /// Configuration for the HTTP REST API. + pub http_metrics: http_metrics::Config, } impl Default for Config { @@ -61,6 +64,7 @@ impl Default for Config { init_slashing_protection: false, graffiti: None, http_api: <_>::default(), + http_metrics: <_>::default(), } } } @@ -166,6 +170,35 @@ impl Config { config.http_api.allow_origin = Some(allow_origin.to_string()); } + /* + * Prometheus metrics HTTP server + */ + + if cli_args.is_present("metrics") { + config.http_metrics.enabled = true; + } + + if let Some(address) = cli_args.value_of("metrics-address") { + config.http_metrics.listen_addr = address + .parse::() + .map_err(|_| "metrics-address is not a valid IPv4 address.")?; + } + + if let Some(port) = cli_args.value_of("metrics-port") { + config.http_metrics.listen_port = port + .parse::() + .map_err(|_| "metrics-port is not a valid u16.")?; + } + + if let Some(allow_origin) = cli_args.value_of("metrics-allow-origin") { + // Pre-validate the config value to give feedback to the user on node startup, instead of + // as late as when the first API response is produced. + hyper::header::HeaderValue::from_str(allow_origin) + .map_err(|_| "Invalid allow-origin value")?; + + config.http_metrics.allow_origin = Some(allow_origin.to_string()); + } + Ok(config) } } diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 8bba39300..b266f5d38 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1,6 +1,6 @@ use crate::{ - block_service::BlockServiceNotification, is_synced::is_synced, validator_duty::ValidatorDuty, - validator_store::ValidatorStore, + block_service::BlockServiceNotification, http_metrics::metrics, is_synced::is_synced, + validator_duty::ValidatorDuty, validator_store::ValidatorStore, }; use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; @@ -511,6 +511,8 @@ impl DutiesService { spec: &ChainSpec, ) { let log = self.context.log(); + let _timer = + metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::FULL_UPDATE]); if !is_synced(&self.beacon_node, &self.slot_clock, None).await && !self.allow_unsynced_beacon_node diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 58665ee01..7af171b67 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -1,3 +1,4 @@ +use crate::http_metrics::metrics; use environment::RuntimeContext; use eth2::{types::StateId, BeaconNodeHttpClient}; use futures::StreamExt; @@ -161,6 +162,9 @@ impl ForkService { /// Attempts to download the `Fork` from the server. async fn do_update(self) -> Result<(), ()> { + let _timer = + metrics::start_timer_vec(&metrics::FORK_SERVICE_TIMES, &[metrics::FULL_UPDATE]); + let fork = self .inner .beacon_node diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs new file mode 100644 index 000000000..beb9e2d16 --- /dev/null +++ b/validator_client/src/http_metrics/metrics.rs @@ -0,0 +1,146 @@ +use super::Context; +use slot_clock::SlotClock; +use std::time::{SystemTime, UNIX_EPOCH}; +use types::EthSpec; + +pub const SUCCESS: &str = "success"; +pub const SLASHABLE: &str = "slashable"; +pub const SAME_DATA: &str = "same_data"; +pub const UNREGISTERED: &str = "unregistered"; +pub const FULL_UPDATE: &str = "full_update"; +pub const BEACON_BLOCK: &str = "beacon_block"; +pub const ATTESTATIONS: &str = "attestations"; +pub const AGGREGATES: &str = "aggregates"; +pub const CURRENT_EPOCH: &str = "current_epoch"; +pub const NEXT_EPOCH: &str = "next_epoch"; + +pub use lighthouse_metrics::*; + +lazy_static::lazy_static! { + pub static ref GENESIS_DISTANCE: Result = try_create_int_gauge( + "vc_genesis_distance_seconds", + "Distance between now and genesis time" + ); + pub static ref ENABLED_VALIDATORS_COUNT: Result = try_create_int_gauge( + "vc_validators_enabled_count", + "Number of enabled validators" + ); + pub static ref TOTAL_VALIDATORS_COUNT: Result = try_create_int_gauge( + "vc_validators_total_count", + "Number of total validators (enabled and disabled)" + ); + + pub static ref SIGNED_BLOCKS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_beacon_blocks_total", + "Total count of attempted block signings", + &["status"] + ); + pub static ref SIGNED_ATTESTATIONS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_attestations_total", + "Total count of attempted Attestation signings", + &["status"] + ); + pub static ref SIGNED_AGGREGATES_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_aggregates_total", + "Total count of attempted SignedAggregateAndProof signings", + &["status"] + ); + pub static ref SIGNED_SELECTION_PROOFS_TOTAL: Result = try_create_int_counter_vec( + "vc_signed_selection_proofs_total", + "Total count of attempted SelectionProof signings", + &["status"] + ); + pub static ref DUTIES_SERVICE_TIMES: Result = try_create_histogram_vec( + "vc_duties_service_task_times_seconds", + "Duration to perform duties service tasks", + &["task"] + ); + pub static ref FORK_SERVICE_TIMES: Result = try_create_histogram_vec( + "vc_fork_service_task_times_seconds", + "Duration to perform fork service tasks", + &["task"] + ); + pub static ref ATTESTATION_SERVICE_TIMES: Result = try_create_histogram_vec( + "vc_attestation_service_task_times_seconds", + "Duration to perform attestation service tasks", + &["task"] + ); + pub static ref BLOCK_SERVICE_TIMES: Result = try_create_histogram_vec( + "vc_beacon_block_service_task_times_seconds", + "Duration to perform beacon block service tasks", + &["task"] + ); + pub static ref PROPOSER_COUNT: Result = try_create_int_gauge_vec( + "vc_beacon_block_proposer_count", + "Number of beacon block proposers on this host", + &["task"] + ); + pub static ref ATTESTER_COUNT: Result = try_create_int_gauge_vec( + "vc_beacon_attester_count", + "Number of attesters on this host", + &["task"] + ); +} + +pub fn gather_prometheus_metrics( + ctx: &Context, +) -> std::result::Result { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + { + let shared = ctx.shared.read(); + + if let Some(genesis_time) = shared.genesis_time { + if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) { + let distance = now.as_secs() as i64 - genesis_time as i64; + set_gauge(&GENESIS_DISTANCE, distance); + } + } + + if let Some(validator_store) = &shared.validator_store { + let initialized_validators_lock = validator_store.initialized_validators(); + let initialized_validators = initialized_validators_lock.read(); + + set_gauge( + &ENABLED_VALIDATORS_COUNT, + initialized_validators.num_enabled() as i64, + ); + set_gauge( + &TOTAL_VALIDATORS_COUNT, + initialized_validators.num_total() as i64, + ); + } + + if let Some(duties_service) = &shared.duties_service { + if let Some(slot) = duties_service.slot_clock.now() { + let current_epoch = slot.epoch(T::slots_per_epoch()); + let next_epoch = current_epoch + 1; + + set_int_gauge( + &PROPOSER_COUNT, + &[CURRENT_EPOCH], + duties_service.proposer_count(current_epoch) as i64, + ); + set_int_gauge( + &ATTESTER_COUNT, + &[CURRENT_EPOCH], + duties_service.attester_count(current_epoch) as i64, + ); + set_int_gauge( + &ATTESTER_COUNT, + &[NEXT_EPOCH], + duties_service.attester_count(next_epoch) as i64, + ); + } + } + } + + warp_utils::metrics::scrape_health_metrics(); + + encoder + .encode(&lighthouse_metrics::gather(), &mut buffer) + .unwrap(); + + String::from_utf8(buffer).map_err(|e| format!("Failed to encode prometheus info: {:?}", e)) +} diff --git a/validator_client/src/http_metrics/mod.rs b/validator_client/src/http_metrics/mod.rs new file mode 100644 index 000000000..32227172a --- /dev/null +++ b/validator_client/src/http_metrics/mod.rs @@ -0,0 +1,149 @@ +//! This crate provides a HTTP server that is solely dedicated to serving the `/metrics` endpoint. +//! +//! For other endpoints, see the `http_api` crate. +pub mod metrics; + +use crate::{DutiesService, ValidatorStore}; +use lighthouse_version::version_with_platform; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use slog::{crit, info, Logger}; +use slot_clock::SystemTimeSlotClock; +use std::future::Future; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::Arc; +use types::EthSpec; +use warp::{http::Response, Filter}; + +#[derive(Debug)] +pub enum Error { + Warp(warp::Error), + Other(String), +} + +impl From for Error { + fn from(e: warp::Error) -> Self { + Error::Warp(e) + } +} + +impl From for Error { + fn from(e: String) -> Self { + Error::Other(e) + } +} + +/// Contains objects which have shared access from inside/outside of the metrics server. +pub struct Shared { + pub validator_store: Option>, + pub duties_service: Option>, + pub genesis_time: Option, +} + +/// A wrapper around all the items required to spawn the HTTP server. +/// +/// The server will gracefully handle the case where any fields are `None`. +pub struct Context { + pub config: Config, + pub shared: RwLock>, + pub log: Logger, +} + +/// Configuration for the HTTP server. +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub enabled: bool, + pub listen_addr: Ipv4Addr, + pub listen_port: u16, + pub allow_origin: Option, +} + +impl Default for Config { + fn default() -> Self { + Self { + enabled: false, + listen_addr: Ipv4Addr::new(127, 0, 0, 1), + listen_port: 5064, + allow_origin: None, + } + } +} + +/// Creates a server that will serve requests using information from `ctx`. +/// +/// The server will shut down gracefully when the `shutdown` future resolves. +/// +/// ## Returns +/// +/// This function will bind the server to the provided address and then return a tuple of: +/// +/// - `SocketAddr`: the address that the HTTP server will listen on. +/// - `Future`: the actual server future that will need to be awaited. +/// +/// ## Errors +/// +/// Returns an error if the server is unable to bind or there is another error during +/// configuration. +pub fn serve( + ctx: Arc>, + shutdown: impl Future + Send + Sync + 'static, +) -> Result<(SocketAddr, impl Future), Error> { + let config = &ctx.config; + let log = ctx.log.clone(); + + // Configure CORS. + let cors_builder = { + let builder = warp::cors() + .allow_method("GET") + .allow_headers(vec!["Content-Type"]); + + warp_utils::cors::set_builder_origins( + builder, + config.allow_origin.as_deref(), + (config.listen_addr, config.listen_port), + )? + }; + + // Sanity check. + if !config.enabled { + crit!(log, "Cannot start disabled metrics HTTP server"); + return Err(Error::Other( + "A disabled metrics server should not be started".to_string(), + )); + } + + let inner_ctx = ctx.clone(); + let routes = warp::get() + .and(warp::path("metrics")) + .map(move || inner_ctx.clone()) + .and_then(|ctx: Arc>| async move { + Ok::<_, warp::Rejection>( + metrics::gather_prometheus_metrics(&ctx) + .map(|body| Response::builder().status(200).body(body).unwrap()) + .unwrap_or_else(|e| { + Response::builder() + .status(500) + .body(format!("Unable to gather metrics: {:?}", e)) + .unwrap() + }), + ) + }) + // Add a `Server` header. + .map(|reply| warp::reply::with_header(reply, "Server", &version_with_platform())) + .with(cors_builder.build()); + + let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( + SocketAddrV4::new(config.listen_addr, config.listen_port), + async { + shutdown.await; + }, + )?; + + info!( + log, + "Metrics HTTP server started"; + "listen_address" => listening_socket.to_string(), + ); + + Ok((listening_socket, server)) +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index e614fd136..06350f2f9 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -4,6 +4,7 @@ mod cli; mod config; mod duties_service; mod fork_service; +mod http_metrics; mod initialized_validators; mod is_synced; mod key_cache; @@ -28,6 +29,7 @@ use futures::channel::mpsc; use http_api::ApiSecret; use initialized_validators::InitializedValidators; use notifier::spawn_notifier; +use parking_lot::RwLock; use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use slog::{error, info, warn, Logger}; use slot_clock::SlotClock; @@ -49,6 +51,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12); /// The global timeout for HTTP requests to the beacon node. const HTTP_TIMEOUT: Duration = Duration::from_secs(12); +#[derive(Clone)] pub struct ProductionValidatorClient { context: RuntimeContext, duties_service: DutiesService, @@ -57,6 +60,7 @@ pub struct ProductionValidatorClient { attestation_service: AttestationService, validator_store: ValidatorStore, http_api_listen_addr: Option, + http_metrics_ctx: Option>>, config: Config, } @@ -84,6 +88,36 @@ impl ProductionValidatorClient { "validator_dir" => format!("{:?}", config.validator_dir), ); + // Optionally start the metrics server. + let http_metrics_ctx = if config.http_metrics.enabled { + let shared = http_metrics::Shared { + validator_store: None, + genesis_time: None, + duties_service: None, + }; + + let ctx: Arc> = Arc::new(http_metrics::Context { + config: config.http_metrics.clone(), + shared: RwLock::new(shared), + log: log.clone(), + }); + + let exit = context.executor.exit(); + + let (_listen_addr, server) = http_metrics::serve(ctx.clone(), exit) + .map_err(|e| format!("Unable to start metrics API server: {:?}", e))?; + + context + .clone() + .executor + .spawn_without_exit(async move { server.await }, "metrics-api"); + + Some(ctx) + } else { + info!(log, "HTTP metrics server is disabled"); + None + }; + let mut validator_defs = ValidatorDefinitions::open_or_create(&config.validator_dir) .map_err(|e| format!("Unable to open or create validator definitions: {:?}", e))?; @@ -186,6 +220,11 @@ impl ProductionValidatorClient { () = context.executor.exit() => return Err("Shutting down".to_string()) }; + // Update the metrics server. + if let Some(ctx) = &http_metrics_ctx { + ctx.shared.write().genesis_time = Some(genesis_time); + } + let slot_clock = SystemTimeSlotClock::new( context.eth2_config.spec.genesis_slot, Duration::from_secs(genesis_time), @@ -221,6 +260,12 @@ impl ProductionValidatorClient { .allow_unsynced_beacon_node(config.allow_unsynced_beacon_node) .build()?; + // Update the metrics server. + if let Some(ctx) = &http_metrics_ctx { + ctx.shared.write().validator_store = Some(validator_store.clone()); + ctx.shared.write().duties_service = Some(duties_service.clone()); + } + let block_service = BlockServiceBuilder::new() .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) @@ -233,10 +278,16 @@ impl ProductionValidatorClient { .duties_service(duties_service.clone()) .slot_clock(slot_clock) .validator_store(validator_store.clone()) - .beacon_node(beacon_node) + .beacon_node(beacon_node.clone()) .runtime_context(context.service_context("attestation".into())) .build()?; + // Wait until genesis has occured. + // + // It seems most sensible to move this into the `start_service` function, but I'm caution + // of making too many changes this close to genesis (<1 week). + wait_for_genesis(&beacon_node, genesis_time, &context).await?; + Ok(Self { context, duties_service, @@ -246,6 +297,7 @@ impl ProductionValidatorClient { validator_store, config, http_api_listen_addr: None, + http_metrics_ctx, }) } @@ -320,7 +372,7 @@ async fn init_from_beacon_node( context: &RuntimeContext, ) -> Result<(u64, Hash256), String> { // Wait for the beacon node to come online. - wait_for_node(beacon_node, context.log()).await?; + wait_for_connectivity(beacon_node, context.log()).await?; let yaml_config = beacon_node .get_config_spec() @@ -367,10 +419,18 @@ async fn init_from_beacon_node( delay_for(RETRY_DELAY).await; }; + Ok((genesis.genesis_time, genesis.genesis_validators_root)) +} + +async fn wait_for_genesis( + beacon_node: &BeaconNodeHttpClient, + genesis_time: u64, + context: &RuntimeContext, +) -> Result<(), String> { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|e| format!("Unable to read system time: {:?}", e))?; - let genesis_time = Duration::from_secs(genesis.genesis_time); + let genesis_time = Duration::from_secs(genesis_time); // If the time now is less than (prior to) genesis, then delay until the // genesis instant. @@ -404,12 +464,15 @@ async fn init_from_beacon_node( ); } - Ok((genesis.genesis_time, genesis.genesis_validators_root)) + Ok(()) } /// Request the version from the node, looping back and trying again on failure. Exit once the node /// has been contacted. -async fn wait_for_node(beacon_node: &BeaconNodeHttpClient, log: &Logger) -> Result<(), String> { +async fn wait_for_connectivity( + beacon_node: &BeaconNodeHttpClient, + log: &Logger, +) -> Result<(), String> { // Try to get the version string from the node, looping until success is returned. loop { let log = log.clone(); diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 66b75874a..7cc46c9cd 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -1,4 +1,6 @@ -use crate::{fork_service::ForkService, initialized_validators::InitializedValidators}; +use crate::{ + fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators, +}; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; use parking_lot::RwLock; use slashing_protection::{NotSafe, Safe, SlashingDatabase}; @@ -186,6 +188,8 @@ impl ValidatorStore { let validators = self.validators.read(); let voting_keypair = validators.voting_keypair(validator_pubkey)?; + metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SUCCESS]); + Some(block.sign( &voting_keypair.sk, &fork, @@ -198,6 +202,7 @@ impl ValidatorStore { self.log, "Skipping signing of previously signed block"; ); + metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SAME_DATA]); None } Err(NotSafe::UnregisteredValidator(pk)) => { @@ -207,6 +212,7 @@ impl ValidatorStore { "msg" => "Carefully consider running with --init-slashing-protection (see --help)", "public_key" => format!("{:?}", pk) ); + metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::UNREGISTERED]); None } Err(e) => { @@ -215,6 +221,7 @@ impl ValidatorStore { "Not signing slashable block"; "error" => format!("{:?}", e) ); + metrics::inc_counter_vec(&metrics::SIGNED_BLOCKS_TOTAL, &[metrics::SLASHABLE]); None } } @@ -270,6 +277,8 @@ impl ValidatorStore { }) .ok()?; + metrics::inc_counter_vec(&metrics::SIGNED_ATTESTATIONS_TOTAL, &[metrics::SUCCESS]); + Some(()) } Ok(Safe::SameData) => { @@ -277,6 +286,10 @@ impl ValidatorStore { self.log, "Skipping signing of previously signed attestation" ); + metrics::inc_counter_vec( + &metrics::SIGNED_ATTESTATIONS_TOTAL, + &[metrics::SAME_DATA], + ); None } Err(NotSafe::UnregisteredValidator(pk)) => { @@ -286,6 +299,10 @@ impl ValidatorStore { "msg" => "Carefully consider running with --init-slashing-protection (see --help)", "public_key" => format!("{:?}", pk) ); + metrics::inc_counter_vec( + &metrics::SIGNED_ATTESTATIONS_TOTAL, + &[metrics::UNREGISTERED], + ); None } Err(e) => { @@ -295,6 +312,10 @@ impl ValidatorStore { "attestation" => format!("{:?}", attestation.data), "error" => format!("{:?}", e) ); + metrics::inc_counter_vec( + &metrics::SIGNED_ATTESTATIONS_TOTAL, + &[metrics::SLASHABLE], + ); None } } @@ -314,6 +335,8 @@ impl ValidatorStore { let validators = self.validators.read(); let voting_keypair = &validators.voting_keypair(validator_pubkey)?; + metrics::inc_counter_vec(&metrics::SIGNED_AGGREGATES_TOTAL, &[metrics::SUCCESS]); + Some(SignedAggregateAndProof::from_aggregate( validator_index, aggregate, @@ -335,6 +358,8 @@ impl ValidatorStore { let validators = self.validators.read(); let voting_keypair = &validators.voting_keypair(validator_pubkey)?; + metrics::inc_counter_vec(&metrics::SIGNED_SELECTION_PROOFS_TOTAL, &[metrics::SUCCESS]); + Some(SelectionProof::new::( slot, &voting_keypair.sk,