Add API to compute discrete validator attestation performance (#2874)

## Issue Addressed

N/A

## Proposed Changes

Add a HTTP API which can be used to compute the attestation performances of a validator (or all validators) over a discrete range of epochs.
Performances can be computed for a single validator, or for the global validator set. 

## Usage
### Request
The API can be used as follows:
```
curl "http://localhost:5052/lighthouse/analysis/attestation_performance/{validator_index}?start_epoch=57730&end_epoch=57732"
```
Alternatively, to compute performances for the global validator set:
```
curl "http://localhost:5052/lighthouse/analysis/attestation_performance/global?start_epoch=57730&end_epoch=57732"
```

### Response
The response is JSON formatted as follows:
```
[
  {
    "index": 72,
    "epochs": {
      "57730": {
        "active": true,
        "head": false,
        "target": false,
        "source": false
      },
      "57731": {
        "active": true,
        "head": true,
        "target": true,
        "source": true,
        "delay": 1
      },
      "57732": {
        "active": true,
        "head": true,
        "target": true,
        "source": true,
        "delay": 1
      },
    }
  }
]
```
> Note that the `"epochs"` are not guaranteed to be in ascending order. 

## Additional Info

- This API is intended to be used in our upcoming validator analysis tooling (#2873) and will likely not be very useful for regular users. Some advanced users or block explorers may find this API useful however.
- The request range is limited to 100 epochs (since the range is inclusive and it also computes the `end_epoch` it's actually 101 epochs) to prevent Lighthouse using exceptionally large amounts of memory.
This commit is contained in:
Mac L 2022-01-27 22:58:31 +00:00
parent 782abdcab5
commit e05142b798
4 changed files with 277 additions and 0 deletions

View File

@ -0,0 +1,216 @@
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::{
AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics,
};
use state_processing::{
per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError,
per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer,
};
use std::sync::Arc;
use types::{BeaconState, BeaconStateError, EthSpec, Hash256, SignedBeaconBlock};
use warp_utils::reject::{beacon_chain_error, custom_bad_request, custom_server_error};
const MAX_REQUEST_RANGE_EPOCHS: usize = 100;
const BLOCK_ROOT_CHUNK_SIZE: usize = 100;
#[derive(Debug)]
enum AttestationPerformanceError {
BlockReplay(BlockReplayError),
BeaconState(BeaconStateError),
ParticipationCache(ParticipationCacheError),
UnableToFindValidator(usize),
}
impl From<BlockReplayError> for AttestationPerformanceError {
fn from(e: BlockReplayError) -> Self {
Self::BlockReplay(e)
}
}
impl From<BeaconStateError> for AttestationPerformanceError {
fn from(e: BeaconStateError) -> Self {
Self::BeaconState(e)
}
}
impl From<ParticipationCacheError> for AttestationPerformanceError {
fn from(e: ParticipationCacheError) -> Self {
Self::ParticipationCache(e)
}
}
pub fn get_attestation_performance<T: BeaconChainTypes>(
target: String,
query: AttestationPerformanceQuery,
chain: Arc<BeaconChain<T>>,
) -> Result<Vec<AttestationPerformance>, warp::Rejection> {
let spec = &chain.spec;
// We increment by 2 here so that when we build the state from the `prior_slot` it is
// still 1 epoch ahead of the first epoch we want to analyse.
// This ensures the `.is_previous_epoch_X` functions on `EpochProcessingSummary` return results
// for the correct epoch.
let start_epoch = query.start_epoch + 2;
let start_slot = start_epoch.start_slot(T::EthSpec::slots_per_epoch());
let prior_slot = start_slot - 1;
let end_epoch = query.end_epoch + 2;
let end_slot = end_epoch.end_slot(T::EthSpec::slots_per_epoch());
// Ensure end_epoch is smaller than the current epoch - 1.
let current_epoch = chain.epoch().map_err(beacon_chain_error)?;
if query.end_epoch >= current_epoch - 1 {
return Err(custom_bad_request(format!(
"end_epoch must be less than the current epoch - 1. current: {}, end: {}",
current_epoch, query.end_epoch
)));
}
// Check query is valid.
if start_epoch > end_epoch {
return Err(custom_bad_request(format!(
"start_epoch must not be larger than end_epoch. start: {}, end: {}",
query.start_epoch, query.end_epoch
)));
}
// The response size can grow exceptionally large therefore we should check that the
// query is within permitted bounds to prevent potential OOM errors.
if (end_epoch - start_epoch).as_usize() > MAX_REQUEST_RANGE_EPOCHS {
return Err(custom_bad_request(format!(
"end_epoch must not exceed start_epoch by more than 100 epochs. start: {}, end: {}",
query.start_epoch, query.end_epoch
)));
}
// Either use the global validator set, or the specified index.
let index_range = if target.to_lowercase() == "global" {
chain
.with_head(|head| Ok((0..head.beacon_state.validators().len() as u64).collect()))
.map_err(beacon_chain_error)?
} else {
vec![target.parse::<u64>().map_err(|_| {
custom_bad_request(format!(
"Invalid validator index: {:?}",
target.to_lowercase()
))
})?]
};
// Load block roots.
let mut block_roots: Vec<Hash256> = chain
.forwards_iter_block_roots_until(start_slot, end_slot)
.map_err(beacon_chain_error)?
.map(|res| res.map(|(root, _)| root))
.collect::<Result<Vec<Hash256>, _>>()
.map_err(beacon_chain_error)?;
block_roots.dedup();
// Load first block so we can get its parent.
let first_block_root = block_roots.first().ok_or_else(|| {
custom_server_error(
"No blocks roots could be loaded. Ensure the beacon node is synced.".to_string(),
)
})?;
let first_block = chain
.get_block(first_block_root)
.and_then(|maybe_block| {
maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root))
})
.map_err(beacon_chain_error)?;
// Load the block of the prior slot which will be used to build the starting state.
let prior_block = chain
.get_block(&first_block.parent_root())
.and_then(|maybe_block| {
maybe_block
.ok_or_else(|| BeaconChainError::MissingBeaconBlock(first_block.parent_root()))
})
.map_err(beacon_chain_error)?;
// Load state for block replay.
let state_root = prior_block.state_root();
let state = chain
.get_state(&state_root, Some(prior_slot))
.and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root)))
.map_err(beacon_chain_error)?;
// Allocate an AttestationPerformance vector for each validator in the range.
let mut perfs: Vec<AttestationPerformance> =
AttestationPerformance::initialize(index_range.clone());
let post_slot_hook = |state: &mut BeaconState<T::EthSpec>,
summary: Option<EpochProcessingSummary<T::EthSpec>>,
_is_skip_slot: bool|
-> Result<(), AttestationPerformanceError> {
// If a `summary` was not output then an epoch boundary was not crossed
// so we move onto the next slot.
if let Some(summary) = summary {
for (position, i) in index_range.iter().enumerate() {
let index = *i as usize;
let val = perfs
.get_mut(position)
.ok_or(AttestationPerformanceError::UnableToFindValidator(index))?;
// We are two epochs ahead since the summary is generated for
// `state.previous_epoch()` then `summary.is_previous_epoch_X` functions return
// data for the epoch before that.
let epoch = state.previous_epoch().as_u64() - 1;
let is_active = summary.is_active_unslashed_in_previous_epoch(index);
let received_source_reward = summary.is_previous_epoch_source_attester(index)?;
let received_head_reward = summary.is_previous_epoch_head_attester(index)?;
let received_target_reward = summary.is_previous_epoch_target_attester(index)?;
let inclusion_delay = summary
.previous_epoch_inclusion_info(index)
.map(|info| info.delay);
let perf = AttestationPerformanceStatistics {
active: is_active,
head: received_head_reward,
target: received_target_reward,
source: received_source_reward,
delay: inclusion_delay,
};
val.epochs.insert(epoch, perf);
}
}
Ok(())
};
// Initialize block replayer
let mut replayer = BlockReplayer::new(state, spec)
.no_state_root_iter()
.no_signature_verification()
.minimal_block_root_verification()
.post_slot_hook(Box::new(post_slot_hook));
// Iterate through block roots in chunks to reduce load on memory.
for block_root_chunks in block_roots.chunks(BLOCK_ROOT_CHUNK_SIZE) {
// Load blocks from the block root chunks.
let blocks = block_root_chunks
.iter()
.map(|root| {
chain
.get_block(root)
.and_then(|maybe_block| {
maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root))
})
.map_err(beacon_chain_error)
})
.collect::<Result<Vec<SignedBeaconBlock<T::EthSpec>>, _>>()?;
replayer = replayer
.apply_blocks(blocks, None)
.map_err(|e| custom_server_error(format!("{:?}", e)))?;
}
drop(replayer);
Ok(perfs)
}

View File

@ -5,6 +5,7 @@
//! There are also some additional, non-standard endpoints behind the `/lighthouse/` path which are //! There are also some additional, non-standard endpoints behind the `/lighthouse/` path which are
//! used for development. //! used for development.
mod attestation_performance;
mod attester_duties; mod attester_duties;
mod block_id; mod block_id;
mod block_rewards; mod block_rewards;
@ -2541,7 +2542,9 @@ pub fn serve<T: BeaconChainTypes>(
}, },
); );
// GET lighthouse/analysis/block_rewards
let get_lighthouse_block_rewards = warp::path("lighthouse") let get_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
.and(warp::path("block_rewards")) .and(warp::path("block_rewards"))
.and(warp::query::<eth2::lighthouse::BlockRewardsQuery>()) .and(warp::query::<eth2::lighthouse::BlockRewardsQuery>())
.and(warp::path::end()) .and(warp::path::end())
@ -2551,6 +2554,20 @@ pub fn serve<T: BeaconChainTypes>(
blocking_json_task(move || block_rewards::get_block_rewards(query, chain, log)) blocking_json_task(move || block_rewards::get_block_rewards(query, chain, log))
}); });
// GET lighthouse/analysis/attestation_performance/{index}
let get_lighthouse_attestation_performance = warp::path("lighthouse")
.and(warp::path("analysis"))
.and(warp::path("attestation_performance"))
.and(warp::path::param::<String>())
.and(warp::query::<eth2::lighthouse::AttestationPerformanceQuery>())
.and(warp::path::end())
.and(chain_filter.clone())
.and_then(|target, query, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
attestation_performance::get_attestation_performance(target, query, chain)
})
});
let get_events = eth1_v1 let get_events = eth1_v1
.and(warp::path("events")) .and(warp::path("events"))
.and(warp::path::end()) .and(warp::path::end())
@ -2676,6 +2693,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_lighthouse_staking.boxed()) .or(get_lighthouse_staking.boxed())
.or(get_lighthouse_database_info.boxed()) .or(get_lighthouse_database_info.boxed())
.or(get_lighthouse_block_rewards.boxed()) .or(get_lighthouse_block_rewards.boxed())
.or(get_lighthouse_attestation_performance.boxed())
.or(get_events.boxed()), .or(get_events.boxed()),
) )
.or(warp::post().and( .or(warp::post().and(

View File

@ -1,5 +1,6 @@
//! This module contains endpoints that are non-standard and only available on Lighthouse servers. //! This module contains endpoints that are non-standard and only available on Lighthouse servers.
mod attestation_performance;
mod block_rewards; mod block_rewards;
use crate::{ use crate::{
@ -14,6 +15,9 @@ use ssz::four_byte_option_impl;
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use store::{AnchorInfo, Split}; use store::{AnchorInfo, Split};
pub use attestation_performance::{
AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics,
};
pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery};
pub use lighthouse_network::{types::SyncState, PeerInfo}; pub use lighthouse_network::{types::SyncState, PeerInfo};

View File

@ -0,0 +1,39 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use types::Epoch;
#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
pub struct AttestationPerformanceStatistics {
pub active: bool,
pub head: bool,
pub target: bool,
pub source: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub delay: Option<u64>,
}
#[derive(Debug, Default, PartialEq, Clone, Serialize, Deserialize)]
pub struct AttestationPerformance {
pub index: u64,
pub epochs: HashMap<u64, AttestationPerformanceStatistics>,
}
impl AttestationPerformance {
pub fn initialize(indices: Vec<u64>) -> Vec<Self> {
let mut vec = Vec::with_capacity(indices.len());
for index in indices {
vec.push(Self {
index,
..Default::default()
})
}
vec
}
}
/// Query parameters for the `/lighthouse/analysis/attestation_performance` endpoint.
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct AttestationPerformanceQuery {
pub start_epoch: Epoch,
pub end_epoch: Epoch,
}