From 3642efe76ac4cd480d2822a10fb8f089b1771f45 Mon Sep 17 00:00:00 2001 From: Mac L Date: Tue, 21 Feb 2023 20:54:55 +0000 Subject: [PATCH] Cache validator balances and allow them to be served over the HTTP API (#3863) ## Issue Addressed #3804 ## Proposed Changes - Add `total_balance` to the validator monitor and adjust the number of historical epochs which are cached. - Allow certain values in the cache to be served out via the HTTP API without requiring a state read. ## Usage ``` curl -X POST "http://localhost:5052/lighthouse/ui/validator_info" -d '{"indices": [0]}' -H "Content-Type: application/json" | jq ``` ``` { "data": { "validators": { "0": { "info": [ { "epoch": 172981, "total_balance": 36566388519 }, ... { "epoch": 172990, "total_balance": 36566496513 } ] }, "1": { "info": [ { "epoch": 172981, "total_balance": 36355797968 }, ... { "epoch": 172990, "total_balance": 36355905962 } ] } } } } ``` ## Additional Info This requires no historical states to operate which mean it will still function on the freshly checkpoint synced node, however because of this, the values will populate each epoch (up to a maximum of 10 entries). Another benefit of this method, is that we can easily cache any other values which would normally require a state read and serve them via the same endpoint. However, we would need be cautious about not overly increasing block processing time by caching values from complex computations. This also caches some of the validator metrics directly, rather than pulling them from the Prometheus metrics when the API is called. This means when the validator count exceeds the individual monitor threshold, the cached values will still be available. Co-authored-by: Paul Hauner --- Cargo.lock | 1 + .../beacon_chain/src/validator_monitor.rs | 106 +++++++++- beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/src/lib.rs | 17 ++ beacon_node/http_api/src/ui.rs | 194 ++++++++++++------ 5 files changed, 247 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d5d32157..12b70f58e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3209,6 +3209,7 @@ dependencies = [ "environment", "eth1", "eth2", + "eth2_serde_utils", "eth2_ssz", "execution_layer", "futures", diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index dad5e1517..de2681012 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -29,7 +29,7 @@ const TOTAL_LABEL: &str = "total"; /// The validator monitor collects per-epoch data about each monitored validator. Historical data /// will be kept around for `HISTORIC_EPOCHS` before it is pruned. -pub const HISTORIC_EPOCHS: usize = 4; +pub const HISTORIC_EPOCHS: usize = 10; /// Once the validator monitor reaches this number of validators it will stop /// tracking their metrics/logging individually in an effort to reduce @@ -45,7 +45,7 @@ pub enum Error { /// Contains data pertaining to one validator for one epoch. #[derive(Default)] -struct EpochSummary { +pub struct EpochSummary { /* * Attestations with a target in the current epoch. */ @@ -103,6 +103,12 @@ struct EpochSummary { pub proposer_slashings: usize, /// The number of attester slashings observed. pub attester_slashings: usize, + + /* + * Other validator info helpful for the UI. + */ + /// The total balance of the validator. + pub total_balance: Option, } impl EpochSummary { @@ -176,18 +182,60 @@ impl EpochSummary { pub fn register_attester_slashing(&mut self) { self.attester_slashings += 1; } + + pub fn register_validator_total_balance(&mut self, total_balance: u64) { + self.total_balance = Some(total_balance) + } } type SummaryMap = HashMap; +#[derive(Default)] +pub struct ValidatorMetrics { + pub attestation_hits: u64, + pub attestation_misses: u64, + pub attestation_head_hits: u64, + pub attestation_head_misses: u64, + pub attestation_target_hits: u64, + pub attestation_target_misses: u64, +} + +impl ValidatorMetrics { + pub fn increment_hits(&mut self) { + self.attestation_hits += 1; + } + + pub fn increment_misses(&mut self) { + self.attestation_misses += 1; + } + + pub fn increment_target_hits(&mut self) { + self.attestation_target_hits += 1; + } + + pub fn increment_target_misses(&mut self) { + self.attestation_target_misses += 1; + } + + pub fn increment_head_hits(&mut self) { + self.attestation_head_hits += 1; + } + + pub fn increment_head_misses(&mut self) { + self.attestation_head_misses += 1; + } +} + /// A validator that is being monitored by the `ValidatorMonitor`. -struct MonitoredValidator { +pub struct MonitoredValidator { /// A human-readable identifier for the validator. pub id: String, /// The validator index in the state. pub index: Option, /// A history of the validator over time. pub summaries: RwLock, + /// Validator metrics to be exposed over the HTTP API. + pub metrics: RwLock, } impl MonitoredValidator { @@ -198,6 +246,7 @@ impl MonitoredValidator { .unwrap_or_else(|| pubkey.to_string()), index, summaries: <_>::default(), + metrics: <_>::default(), } } @@ -252,6 +301,20 @@ impl MonitoredValidator { fn touch_epoch_summary(&self, epoch: Epoch) { self.with_epoch_summary(epoch, |_| {}); } + + fn get_from_epoch_summary(&self, epoch: Epoch, func: F) -> Option + where + F: Fn(Option<&EpochSummary>) -> Option, + { + let summaries = self.summaries.read(); + func(summaries.get(&epoch)) + } + + pub fn get_total_balance(&self, epoch: Epoch) -> Option { + self.get_from_epoch_summary(epoch, |summary_opt| { + summary_opt.and_then(|summary| summary.total_balance) + }) + } } /// Holds a collection of `MonitoredValidator` and is notified about a variety of events on the P2P @@ -347,12 +410,20 @@ impl ValidatorMonitor { if let Some(i) = monitored_validator.index { monitored_validator.touch_epoch_summary(current_epoch); + let i = i as usize; + + // Cache relevant validator info. + if let Some(balance) = state.balances().get(i) { + monitored_validator.with_epoch_summary(current_epoch, |summary| { + summary.register_validator_total_balance(*balance) + }); + } + // Only log the per-validator metrics if it's enabled. if !self.individual_tracking() { continue; } - let i = i as usize; let id = &monitored_validator.id; if let Some(balance) = state.balances().get(i) { @@ -479,6 +550,25 @@ impl ValidatorMonitor { continue; } + // Store some metrics directly to be re-exposed on the HTTP API. + let mut validator_metrics = monitored_validator.metrics.write(); + if previous_epoch_matched_any { + validator_metrics.increment_hits(); + if previous_epoch_matched_target { + validator_metrics.increment_target_hits() + } else { + validator_metrics.increment_target_misses() + } + if previous_epoch_matched_head { + validator_metrics.increment_head_hits() + } else { + validator_metrics.increment_head_misses() + } + } else { + validator_metrics.increment_misses() + } + drop(validator_metrics); + // Indicates if any attestation made it on-chain. // // For Base states, this will be *any* attestation whatsoever. For Altair states, @@ -717,6 +807,14 @@ impl ValidatorMonitor { self.validators.values().map(|val| val.id.clone()).collect() } + pub fn get_monitored_validator(&self, index: u64) -> Option<&MonitoredValidator> { + if let Some(pubkey) = self.indices.get(&index) { + self.validators.get(pubkey) + } else { + None + } + } + /// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`. /// Otherwise, do nothing. pub fn auto_register_local_validator(&mut self, validator_index: u64) { diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 077e3aa7c..d7a3a680b 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -36,6 +36,7 @@ tree_hash = "0.4.1" sysinfo = "0.26.5" system_health = { path = "../../common/system_health" } directory = { path = "../../common/directory" } +eth2_serde_utils = "0.1.1" [dev-dependencies] store = { path = "../store" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 60e5d2adf..009775701 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3025,6 +3025,22 @@ pub fn serve( }, ); + // POST lighthouse/ui/validator_info + let post_lighthouse_ui_validator_info = warp::path("lighthouse") + .and(warp::path("ui")) + .and(warp::path("validator_info")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(chain_filter.clone()) + .and_then( + |request_data: ui::ValidatorInfoRequestData, chain: Arc>| { + blocking_json_task(move || { + ui::get_validator_info(request_data, chain) + .map(api_types::GenericResponse::from) + }) + }, + ); + // GET lighthouse/syncing let get_lighthouse_syncing = warp::path("lighthouse") .and(warp::path("syncing")) @@ -3522,6 +3538,7 @@ pub fn serve( .or(post_lighthouse_database_historical_blocks.boxed()) .or(post_lighthouse_block_rewards.boxed()) .or(post_lighthouse_ui_validator_metrics.boxed()) + .or(post_lighthouse_ui_validator_info.boxed()) .recover(warp_utils::reject::handle_rejection), )) .recover(warp_utils::reject::handle_rejection) diff --git a/beacon_node/http_api/src/ui.rs b/beacon_node/http_api/src/ui.rs index a5b3a8b2f..e8280a796 100644 --- a/beacon_node/http_api/src/ui.rs +++ b/beacon_node/http_api/src/ui.rs @@ -1,5 +1,7 @@ -use beacon_chain::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::types::ValidatorStatus; +use beacon_chain::{ + validator_monitor::HISTORIC_EPOCHS, BeaconChain, BeaconChainError, BeaconChainTypes, +}; +use eth2::types::{Epoch, ValidatorStatus}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -71,6 +73,82 @@ pub fn get_validator_count( }) } +#[derive(PartialEq, Serialize, Deserialize)] +pub struct ValidatorInfoRequestData { + #[serde(with = "eth2_serde_utils::quoted_u64_vec")] + indices: Vec, +} + +#[derive(PartialEq, Serialize, Deserialize)] +pub struct ValidatorInfoValues { + #[serde(with = "eth2_serde_utils::quoted_u64")] + epoch: u64, + #[serde(with = "eth2_serde_utils::quoted_u64")] + total_balance: u64, +} + +#[derive(PartialEq, Serialize, Deserialize)] +pub struct ValidatorInfo { + info: Vec, +} + +#[derive(PartialEq, Serialize, Deserialize)] +pub struct ValidatorInfoResponse { + validators: HashMap, +} + +pub fn get_validator_info( + request_data: ValidatorInfoRequestData, + chain: Arc>, +) -> Result { + let current_epoch = chain.epoch().map_err(beacon_chain_error)?; + + let epochs = current_epoch.saturating_sub(HISTORIC_EPOCHS).as_u64()..=current_epoch.as_u64(); + + let validator_ids = chain + .validator_monitor + .read() + .get_all_monitored_validators() + .iter() + .cloned() + .collect::>(); + + let indices = request_data + .indices + .iter() + .map(|index| index.to_string()) + .collect::>(); + + let ids = validator_ids + .intersection(&indices) + .collect::>(); + + let mut validators = HashMap::new(); + + for id in ids { + if let Ok(index) = id.parse::() { + if let Some(validator) = chain + .validator_monitor + .read() + .get_monitored_validator(index) + { + let mut info = vec![]; + for epoch in epochs.clone() { + if let Some(total_balance) = validator.get_total_balance(Epoch::new(epoch)) { + info.push(ValidatorInfoValues { + epoch, + total_balance, + }); + } + } + validators.insert(id.clone(), ValidatorInfo { info }); + } + } + } + + Ok(ValidatorInfoResponse { validators }) +} + #[derive(PartialEq, Serialize, Deserialize)] pub struct ValidatorMetricsRequestData { indices: Vec, @@ -119,76 +197,56 @@ pub fn post_validator_monitor_metrics( let mut validators = HashMap::new(); for id in ids { - let attestation_hits = metrics::get_int_counter( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT, - &[id], - ) - .map(|counter| counter.get()) - .unwrap_or(0); - let attestation_misses = metrics::get_int_counter( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS, - &[id], - ) - .map(|counter| counter.get()) - .unwrap_or(0); - let attestations = attestation_hits + attestation_misses; - let attestation_hit_percentage: f64 = if attestations == 0 { - 0.0 - } else { - (100 * attestation_hits / attestations) as f64 - }; + if let Ok(index) = id.parse::() { + if let Some(validator) = chain + .validator_monitor + .read() + .get_monitored_validator(index) + { + let val_metrics = validator.metrics.read(); + let attestation_hits = val_metrics.attestation_hits; + let attestation_misses = val_metrics.attestation_misses; + let attestation_head_hits = val_metrics.attestation_head_hits; + let attestation_head_misses = val_metrics.attestation_head_misses; + let attestation_target_hits = val_metrics.attestation_target_hits; + let attestation_target_misses = val_metrics.attestation_target_misses; + drop(val_metrics); - let attestation_head_hits = metrics::get_int_counter( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT, - &[id], - ) - .map(|counter| counter.get()) - .unwrap_or(0); - let attestation_head_misses = metrics::get_int_counter( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS, - &[id], - ) - .map(|counter| counter.get()) - .unwrap_or(0); - let head_attestations = attestation_head_hits + attestation_head_misses; - let attestation_head_hit_percentage: f64 = if head_attestations == 0 { - 0.0 - } else { - (100 * attestation_head_hits / head_attestations) as f64 - }; + let attestations = attestation_hits + attestation_misses; + let attestation_hit_percentage: f64 = if attestations == 0 { + 0.0 + } else { + (100 * attestation_hits / attestations) as f64 + }; + let head_attestations = attestation_head_hits + attestation_head_misses; + let attestation_head_hit_percentage: f64 = if head_attestations == 0 { + 0.0 + } else { + (100 * attestation_head_hits / head_attestations) as f64 + }; - let attestation_target_hits = metrics::get_int_counter( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT, - &[id], - ) - .map(|counter| counter.get()) - .unwrap_or(0); - let attestation_target_misses = metrics::get_int_counter( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS, - &[id], - ) - .map(|counter| counter.get()) - .unwrap_or(0); - let target_attestations = attestation_target_hits + attestation_target_misses; - let attestation_target_hit_percentage: f64 = if target_attestations == 0 { - 0.0 - } else { - (100 * attestation_target_hits / target_attestations) as f64 - }; + let target_attestations = attestation_target_hits + attestation_target_misses; + let attestation_target_hit_percentage: f64 = if target_attestations == 0 { + 0.0 + } else { + (100 * attestation_target_hits / target_attestations) as f64 + }; - let metrics = ValidatorMetrics { - attestation_hits, - attestation_misses, - attestation_hit_percentage, - attestation_head_hits, - attestation_head_misses, - attestation_head_hit_percentage, - attestation_target_hits, - attestation_target_misses, - attestation_target_hit_percentage, - }; + let metrics = ValidatorMetrics { + attestation_hits, + attestation_misses, + attestation_hit_percentage, + attestation_head_hits, + attestation_head_misses, + attestation_head_hit_percentage, + attestation_target_hits, + attestation_target_misses, + attestation_target_hit_percentage, + }; - validators.insert(id.clone(), metrics); + validators.insert(id.clone(), metrics); + } + } } Ok(ValidatorMetricsResponse { validators })