diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs new file mode 100644 index 000000000..5cd9894ad --- /dev/null +++ b/beacon_node/http_api/src/attestation_performance.rs @@ -0,0 +1,216 @@ +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::{ + AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, +}; +use state_processing::{ + per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError, + per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer, +}; +use std::sync::Arc; +use types::{BeaconState, BeaconStateError, EthSpec, Hash256, SignedBeaconBlock}; +use warp_utils::reject::{beacon_chain_error, custom_bad_request, custom_server_error}; + +const MAX_REQUEST_RANGE_EPOCHS: usize = 100; +const BLOCK_ROOT_CHUNK_SIZE: usize = 100; + +#[derive(Debug)] +enum AttestationPerformanceError { + BlockReplay(BlockReplayError), + BeaconState(BeaconStateError), + ParticipationCache(ParticipationCacheError), + UnableToFindValidator(usize), +} + +impl From for AttestationPerformanceError { + fn from(e: BlockReplayError) -> Self { + Self::BlockReplay(e) + } +} + +impl From for AttestationPerformanceError { + fn from(e: BeaconStateError) -> Self { + Self::BeaconState(e) + } +} + +impl From for AttestationPerformanceError { + fn from(e: ParticipationCacheError) -> Self { + Self::ParticipationCache(e) + } +} + +pub fn get_attestation_performance( + target: String, + query: AttestationPerformanceQuery, + chain: Arc>, +) -> Result, warp::Rejection> { + let spec = &chain.spec; + // We increment by 2 here so that when we build the state from the `prior_slot` it is + // still 1 epoch ahead of the first epoch we want to analyse. + // This ensures the `.is_previous_epoch_X` functions on `EpochProcessingSummary` return results + // for the correct epoch. + let start_epoch = query.start_epoch + 2; + let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let prior_slot = start_slot - 1; + + let end_epoch = query.end_epoch + 2; + let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch()); + + // Ensure end_epoch is smaller than the current epoch - 1. + let current_epoch = chain.epoch().map_err(beacon_chain_error)?; + if query.end_epoch >= current_epoch - 1 { + return Err(custom_bad_request(format!( + "end_epoch must be less than the current epoch - 1. current: {}, end: {}", + current_epoch, query.end_epoch + ))); + } + + // Check query is valid. + if start_epoch > end_epoch { + return Err(custom_bad_request(format!( + "start_epoch must not be larger than end_epoch. start: {}, end: {}", + query.start_epoch, query.end_epoch + ))); + } + + // The response size can grow exceptionally large therefore we should check that the + // query is within permitted bounds to prevent potential OOM errors. + if (end_epoch - start_epoch).as_usize() > MAX_REQUEST_RANGE_EPOCHS { + return Err(custom_bad_request(format!( + "end_epoch must not exceed start_epoch by more than 100 epochs. start: {}, end: {}", + query.start_epoch, query.end_epoch + ))); + } + + // Either use the global validator set, or the specified index. + let index_range = if target.to_lowercase() == "global" { + chain + .with_head(|head| Ok((0..head.beacon_state.validators().len() as u64).collect())) + .map_err(beacon_chain_error)? + } else { + vec![target.parse::().map_err(|_| { + custom_bad_request(format!( + "Invalid validator index: {:?}", + target.to_lowercase() + )) + })?] + }; + + // Load block roots. + let mut block_roots: Vec = chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .map_err(beacon_chain_error)? + .map(|res| res.map(|(root, _)| root)) + .collect::, _>>() + .map_err(beacon_chain_error)?; + block_roots.dedup(); + + // Load first block so we can get its parent. + let first_block_root = block_roots.first().ok_or_else(|| { + custom_server_error( + "No blocks roots could be loaded. Ensure the beacon node is synced.".to_string(), + ) + })?; + let first_block = chain + .get_block(first_block_root) + .and_then(|maybe_block| { + maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) + }) + .map_err(beacon_chain_error)?; + + // Load the block of the prior slot which will be used to build the starting state. + let prior_block = chain + .get_block(&first_block.parent_root()) + .and_then(|maybe_block| { + maybe_block + .ok_or_else(|| BeaconChainError::MissingBeaconBlock(first_block.parent_root())) + }) + .map_err(beacon_chain_error)?; + + // Load state for block replay. + let state_root = prior_block.state_root(); + let state = chain + .get_state(&state_root, Some(prior_slot)) + .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) + .map_err(beacon_chain_error)?; + + // Allocate an AttestationPerformance vector for each validator in the range. + let mut perfs: Vec = + AttestationPerformance::initialize(index_range.clone()); + + let post_slot_hook = |state: &mut BeaconState, + summary: Option>, + _is_skip_slot: bool| + -> Result<(), AttestationPerformanceError> { + // If a `summary` was not output then an epoch boundary was not crossed + // so we move onto the next slot. + if let Some(summary) = summary { + for (position, i) in index_range.iter().enumerate() { + let index = *i as usize; + + let val = perfs + .get_mut(position) + .ok_or(AttestationPerformanceError::UnableToFindValidator(index))?; + + // We are two epochs ahead since the summary is generated for + // `state.previous_epoch()` then `summary.is_previous_epoch_X` functions return + // data for the epoch before that. + let epoch = state.previous_epoch().as_u64() - 1; + + let is_active = summary.is_active_unslashed_in_previous_epoch(index); + + let received_source_reward = summary.is_previous_epoch_source_attester(index)?; + + let received_head_reward = summary.is_previous_epoch_head_attester(index)?; + + let received_target_reward = summary.is_previous_epoch_target_attester(index)?; + + let inclusion_delay = summary + .previous_epoch_inclusion_info(index) + .map(|info| info.delay); + + let perf = AttestationPerformanceStatistics { + active: is_active, + head: received_head_reward, + target: received_target_reward, + source: received_source_reward, + delay: inclusion_delay, + }; + + val.epochs.insert(epoch, perf); + } + } + Ok(()) + }; + + // Initialize block replayer + let mut replayer = BlockReplayer::new(state, spec) + .no_state_root_iter() + .no_signature_verification() + .minimal_block_root_verification() + .post_slot_hook(Box::new(post_slot_hook)); + + // Iterate through block roots in chunks to reduce load on memory. + for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) { + // Load blocks from the block root chunks. + let blocks = block_root_chunks + .iter() + .map(|root| { + chain + .get_block(root) + .and_then(|maybe_block| { + maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) + }) + .map_err(beacon_chain_error) + }) + .collect::>, _>>()?; + + replayer = replayer + .apply_blocks(blocks, None) + .map_err(|e| custom_server_error(format!("{:?}", e)))?; + } + + drop(replayer); + + Ok(perfs) +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index deadf6854..b30af858f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -5,6 +5,7 @@ //! There are also some additional, non-standard endpoints behind the `/lighthouse/` path which are //! used for development. +mod attestation_performance; mod attester_duties; mod block_id; mod block_rewards; @@ -2541,7 +2542,9 @@ pub fn serve( }, ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") + .and(warp::path("analysis")) .and(warp::path("block_rewards")) .and(warp::query::()) .and(warp::path::end()) @@ -2551,6 +2554,20 @@ pub fn serve( blocking_json_task(move || block_rewards::get_block_rewards(query, chain, log)) }); + // GET lighthouse/analysis/attestation_performance/{index} + let get_lighthouse_attestation_performance = warp::path("lighthouse") + .and(warp::path("analysis")) + .and(warp::path("attestation_performance")) + .and(warp::path::param::()) + .and(warp::query::()) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and_then(|target, query, chain: Arc>| { + blocking_json_task(move || { + attestation_performance::get_attestation_performance(target, query, chain) + }) + }); + let get_events = eth1_v1 .and(warp::path("events")) .and(warp::path::end()) @@ -2676,6 +2693,7 @@ pub fn serve( .or(get_lighthouse_staking.boxed()) .or(get_lighthouse_database_info.boxed()) .or(get_lighthouse_block_rewards.boxed()) + .or(get_lighthouse_attestation_performance.boxed()) .or(get_events.boxed()), ) .or(warp::post().and( diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 10601556f..adf73d8b9 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -1,5 +1,6 @@ //! This module contains endpoints that are non-standard and only available on Lighthouse servers. +mod attestation_performance; mod block_rewards; use crate::{ @@ -14,6 +15,9 @@ use ssz::four_byte_option_impl; use ssz_derive::{Decode, Encode}; use store::{AnchorInfo, Split}; +pub use attestation_performance::{ + AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, +}; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use lighthouse_network::{types::SyncState, PeerInfo}; diff --git a/common/eth2/src/lighthouse/attestation_performance.rs b/common/eth2/src/lighthouse/attestation_performance.rs new file mode 100644 index 000000000..5ce1d90a3 --- /dev/null +++ b/common/eth2/src/lighthouse/attestation_performance.rs @@ -0,0 +1,39 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use types::Epoch; + +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct AttestationPerformanceStatistics { + pub active: bool, + pub head: bool, + pub target: bool, + pub source: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub delay: Option, +} + +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct AttestationPerformance { + pub index: u64, + pub epochs: HashMap, +} + +impl AttestationPerformance { + pub fn initialize(indices: Vec) -> Vec { + let mut vec = Vec::with_capacity(indices.len()); + for index in indices { + vec.push(Self { + index, + ..Default::default() + }) + } + vec + } +} + +/// Query parameters for the `/lighthouse/analysis/attestation_performance` endpoint. +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct AttestationPerformanceQuery { + pub start_epoch: Epoch, + pub end_epoch: Epoch, +}