diff --git a/Cargo.lock b/Cargo.lock index cb51cac30..611cdf57a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2442,6 +2442,7 @@ dependencies = [ "lighthouse_network", "lighthouse_version", "network", + "parking_lot", "safe_arith", "sensitive_url", "serde", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 62373b464..315dbb9e5 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -27,8 +27,10 @@ slot_clock = { path = "../../common/slot_clock" } eth2_ssz = "0.4.1" bs58 = "0.4.0" futures = "0.3.8" +parking_lot = "0.11.0" safe_arith = {path = "../../consensus/safe_arith"} + [dev-dependencies] store = { path = "../store" } environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs new file mode 100644 index 000000000..d948c0d7d --- /dev/null +++ b/beacon_node/http_api/src/block_packing_efficiency.rs @@ -0,0 +1,382 @@ +use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use eth2::lighthouse::{ + BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, +}; +use parking_lot::Mutex; +use state_processing::{ + per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer, +}; +use std::collections::{HashMap, HashSet}; +use std::marker::PhantomData; +use std::sync::Arc; +use types::{ + BeaconCommittee, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, + OwnedBeaconCommittee, RelativeEpoch, SignedBeaconBlock, Slot, +}; +use warp_utils::reject::{beacon_chain_error, custom_bad_request, custom_server_error}; + +/// Load blocks from block roots in chunks to reduce load on memory. +const BLOCK_ROOT_CHUNK_SIZE: usize = 100; + +#[derive(Debug)] +enum PackingEfficiencyError { + BlockReplay(BlockReplayError), + BeaconState(BeaconStateError), + CommitteeStoreError(Slot), + InvalidAttestationError, +} + +impl From for PackingEfficiencyError { + fn from(e: BlockReplayError) -> Self { + Self::BlockReplay(e) + } +} + +impl From for PackingEfficiencyError { + fn from(e: BeaconStateError) -> Self { + Self::BeaconState(e) + } +} + +struct CommitteeStore { + current_epoch_committees: Vec, + previous_epoch_committees: Vec, +} + +impl CommitteeStore { + fn new() -> Self { + CommitteeStore { + current_epoch_committees: Vec::new(), + previous_epoch_committees: Vec::new(), + } + } +} + +struct PackingEfficiencyHandler { + current_slot: Slot, + current_epoch: Epoch, + prior_skip_slots: u64, + available_attestations: HashSet, + included_attestations: HashMap, + committee_store: CommitteeStore, + _phantom: PhantomData, +} + +impl PackingEfficiencyHandler { + fn new( + start_epoch: Epoch, + starting_state: BeaconState, + spec: &ChainSpec, + ) -> Result { + let mut handler = PackingEfficiencyHandler { + current_slot: start_epoch.start_slot(T::slots_per_epoch()), + current_epoch: start_epoch, + prior_skip_slots: 0, + available_attestations: HashSet::new(), + included_attestations: HashMap::new(), + committee_store: CommitteeStore::new(), + _phantom: PhantomData::default(), + }; + + handler.compute_epoch(start_epoch, &starting_state, spec)?; + Ok(handler) + } + + fn update_slot(&mut self, slot: Slot) { + self.current_slot = slot; + if slot % T::slots_per_epoch() == 0 { + self.current_epoch = Epoch::new(slot.as_u64() / T::slots_per_epoch()); + } + } + + fn prune_included_attestations(&mut self) { + let epoch = self.current_epoch; + self.included_attestations.retain(|x, _| { + x.slot >= Epoch::new(epoch.as_u64().saturating_sub(2)).start_slot(T::slots_per_epoch()) + }); + } + + fn prune_available_attestations(&mut self) { + let slot = self.current_slot; + self.available_attestations + .retain(|x| x.slot >= (slot.as_u64().saturating_sub(T::slots_per_epoch()))); + } + + fn apply_block( + &mut self, + block: &SignedBeaconBlock, + ) -> Result { + let block_body = block.message().body(); + let attestations = block_body.attestations(); + + let mut attestations_in_block = HashMap::new(); + for attestation in attestations.iter() { + for (position, voted) in attestation.aggregation_bits.iter().enumerate() { + if voted { + let unique_attestation = UniqueAttestation { + slot: attestation.data.slot, + committee_index: attestation.data.index, + committee_position: position, + }; + let inclusion_distance: u64 = block + .slot() + .as_u64() + .checked_sub(attestation.data.slot.as_u64()) + .ok_or(PackingEfficiencyError::InvalidAttestationError)?; + + self.available_attestations.remove(&unique_attestation); + attestations_in_block.insert(unique_attestation, inclusion_distance); + } + } + } + + // Remove duplicate attestations as these yield no reward. + attestations_in_block.retain(|x, _| self.included_attestations.get(x).is_none()); + self.included_attestations + .extend(attestations_in_block.clone()); + + Ok(attestations_in_block.len()) + } + + fn add_attestations(&mut self, slot: Slot) -> Result<(), PackingEfficiencyError> { + let committees = self.get_committees_at_slot(slot)?; + for committee in committees { + for position in 0..committee.committee.len() { + let unique_attestation = UniqueAttestation { + slot, + committee_index: committee.index, + committee_position: position, + }; + self.available_attestations.insert(unique_attestation); + } + } + + Ok(()) + } + + fn compute_epoch( + &mut self, + epoch: Epoch, + state: &BeaconState, + spec: &ChainSpec, + ) -> Result<(), PackingEfficiencyError> { + // Free some memory by pruning old attestations from the included set. + self.prune_included_attestations(); + + let new_committees = if state.committee_cache_is_initialized(RelativeEpoch::Current) { + state + .get_beacon_committees_at_epoch(RelativeEpoch::Current)? + .into_iter() + .map(BeaconCommittee::into_owned) + .collect::>() + } else { + state + .initialize_committee_cache(epoch, spec)? + .get_all_beacon_committees()? + .into_iter() + .map(BeaconCommittee::into_owned) + .collect::>() + }; + + self.committee_store.previous_epoch_committees = + self.committee_store.current_epoch_committees.clone(); + + self.committee_store.current_epoch_committees = new_committees; + + Ok(()) + } + + fn get_committees_at_slot( + &self, + slot: Slot, + ) -> Result, PackingEfficiencyError> { + let mut committees = Vec::new(); + + for committee in &self.committee_store.current_epoch_committees { + if committee.slot == slot { + committees.push(committee.clone()); + } + } + for committee in &self.committee_store.previous_epoch_committees { + if committee.slot == slot { + committees.push(committee.clone()); + } + } + + if committees.is_empty() { + return Err(PackingEfficiencyError::CommitteeStoreError(slot)); + } + + Ok(committees) + } +} + +pub fn get_block_packing_efficiency( + query: BlockPackingEfficiencyQuery, + chain: Arc>, +) -> Result, warp::Rejection> { + let spec = &chain.spec; + + let start_epoch = query.start_epoch; + let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let prior_slot = start_slot - 1; + + let end_epoch = query.end_epoch; + let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch()); + + // Check query is valid. + if start_epoch > end_epoch || start_epoch == 0 { + return Err(custom_bad_request(format!( + "invalid start and end epochs: {}, {}", + start_epoch, end_epoch + ))); + } + + let prior_epoch = start_epoch - 1; + let start_slot_of_prior_epoch = prior_epoch.start_slot(T::EthSpec::slots_per_epoch()); + + // Load block roots. + let mut block_roots: Vec = chain + .forwards_iter_block_roots_until(start_slot_of_prior_epoch, end_slot) + .map_err(beacon_chain_error)? + .collect::, _>>() + .map_err(beacon_chain_error)? + .iter() + .map(|(root, _)| *root) + .collect(); + block_roots.dedup(); + + let first_block_root = block_roots + .first() + .ok_or_else(|| custom_server_error("no blocks were loaded".to_string()))?; + + let first_block = chain + .get_block(first_block_root) + .and_then(|maybe_block| { + maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) + }) + .map_err(beacon_chain_error)?; + + // Load state for block replay. + let starting_state_root = first_block.state_root(); + + let starting_state = chain + .get_state(&starting_state_root, Some(prior_slot)) + .and_then(|maybe_state| { + maybe_state.ok_or(BeaconChainError::MissingBeaconState(starting_state_root)) + }) + .map_err(beacon_chain_error)?; + + // Initialize response vector. + let mut response = Vec::new(); + + // Initialize handler. + let handler = Arc::new(Mutex::new( + PackingEfficiencyHandler::new(prior_epoch, starting_state.clone(), spec) + .map_err(|e| custom_server_error(format!("{:?}", e)))?, + )); + + let pre_slot_hook = + |state: &mut BeaconState| -> Result<(), PackingEfficiencyError> { + // Add attestations to `available_attestations`. + handler.lock().add_attestations(state.slot())?; + Ok(()) + }; + + let post_slot_hook = |state: &mut BeaconState, + _summary: Option>, + is_skip_slot: bool| + -> Result<(), PackingEfficiencyError> { + handler.lock().update_slot(state.slot()); + + // Check if this a new epoch. + if state.slot() % T::EthSpec::slots_per_epoch() == 0 { + handler.lock().compute_epoch( + state.slot().epoch(T::EthSpec::slots_per_epoch()), + state, + spec, + )?; + } + + if is_skip_slot { + handler.lock().prior_skip_slots += 1; + } + + // Remove expired attestations. + handler.lock().prune_available_attestations(); + + Ok(()) + }; + + let pre_block_hook = |_state: &mut BeaconState, + block: &SignedBeaconBlock| + -> Result<(), PackingEfficiencyError> { + let slot = block.slot(); + + let block_message = block.message(); + // Get block proposer info. + let proposer_info = ProposerInfo { + validator_index: block_message.proposer_index(), + graffiti: block_message.body().graffiti().as_utf8_lossy(), + }; + + // Store the count of available attestations at this point. + // In the future it may be desirable to check that the number of available attestations + // does not exceed the maximum possible amount given the length of available committees. + let available_count = handler.lock().available_attestations.len(); + + // Get all attestations included in the block. + let included = handler.lock().apply_block(block)?; + + let efficiency = BlockPackingEfficiency { + slot, + block_hash: block.canonical_root(), + proposer_info, + available_attestations: available_count, + included_attestations: included, + prior_skip_slots: handler.lock().prior_skip_slots, + }; + + // Write to response. + if slot >= start_slot { + response.push(efficiency); + } + + handler.lock().prior_skip_slots = 0; + + Ok(()) + }; + + // Build BlockReplayer. + let mut replayer = BlockReplayer::new(starting_state, spec) + .no_state_root_iter() + .no_signature_verification() + .minimal_block_root_verification() + .pre_slot_hook(Box::new(pre_slot_hook)) + .post_slot_hook(Box::new(post_slot_hook)) + .pre_block_hook(Box::new(pre_block_hook)); + + // Iterate through the block roots, loading blocks in chunks to reduce load on memory. + for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) { + // Load blocks from the block root chunks. + let blocks = block_root_chunks + .iter() + .map(|root| { + chain + .get_block(root) + .and_then(|maybe_block| { + maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) + }) + .map_err(beacon_chain_error) + }) + .collect::>, _>>()?; + + replayer = replayer + .apply_blocks(blocks, None) + .map_err(|e: PackingEfficiencyError| custom_server_error(format!("{:?}", e)))?; + } + + drop(replayer); + + Ok(response) +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5ef845858..dcc6528a9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -8,6 +8,7 @@ mod attestation_performance; mod attester_duties; mod block_id; +mod block_packing_efficiency; mod block_rewards; mod database; mod metrics; @@ -2615,6 +2616,19 @@ pub fn serve( }) }); + // GET lighthouse/analysis/block_packing_efficiency + let get_lighthouse_block_packing_efficiency = warp::path("lighthouse") + .and(warp::path("analysis")) + .and(warp::path("block_packing_efficiency")) + .and(warp::query::()) + .and(warp::path::end()) + .and(chain_filter.clone()) + .and_then(|query, chain: Arc>| { + blocking_json_task(move || { + block_packing_efficiency::get_block_packing_efficiency(query, chain) + }) + }); + let get_events = eth1_v1 .and(warp::path("events")) .and(warp::path::end()) @@ -2741,6 +2755,7 @@ pub fn serve( .or(get_lighthouse_database_info.boxed()) .or(get_lighthouse_block_rewards.boxed()) .or(get_lighthouse_attestation_performance.boxed()) + .or(get_lighthouse_block_packing_efficiency.boxed()) .or(get_events.boxed()), ) .or(warp::post().and( diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index adf73d8b9..a2e4a66c4 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -1,6 +1,7 @@ //! This module contains endpoints that are non-standard and only available on Lighthouse servers. mod attestation_performance; +mod block_packing_efficiency; mod block_rewards; use crate::{ @@ -18,6 +19,9 @@ use store::{AnchorInfo, Split}; pub use attestation_performance::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, }; +pub use block_packing_efficiency::{ + BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, +}; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use lighthouse_network::{types::SyncState, PeerInfo}; diff --git a/common/eth2/src/lighthouse/block_packing_efficiency.rs b/common/eth2/src/lighthouse/block_packing_efficiency.rs new file mode 100644 index 000000000..0ad6f4603 --- /dev/null +++ b/common/eth2/src/lighthouse/block_packing_efficiency.rs @@ -0,0 +1,34 @@ +use serde::{Deserialize, Serialize}; +use types::{Epoch, Hash256, Slot}; + +type CommitteePosition = usize; +type Committee = u64; +type ValidatorIndex = u64; + +#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] +pub struct UniqueAttestation { + pub slot: Slot, + pub committee_index: Committee, + pub committee_position: CommitteePosition, +} +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct ProposerInfo { + pub validator_index: ValidatorIndex, + pub graffiti: String, +} + +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct BlockPackingEfficiency { + pub slot: Slot, + pub block_hash: Hash256, + pub proposer_info: ProposerInfo, + pub available_attestations: usize, + pub included_attestations: usize, + pub prior_skip_slots: u64, +} + +#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)] +pub struct BlockPackingEfficiencyQuery { + pub start_epoch: Epoch, + pub end_epoch: Epoch, +}