Implement POST validators/validator_balances APIs (#4872)

* Add POST for fetching validators from state

* Implement POST for balances

* Tests
This commit is contained in:
Michael Sproul 2023-12-08 12:09:36 +11:00 committed by GitHub
parent e02adbf7bf
commit b882519d2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 305 additions and 108 deletions

View File

@ -26,6 +26,7 @@ pub mod test_utils;
mod ui;
mod validator;
mod validator_inclusion;
mod validators;
mod version;
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
@ -41,7 +42,8 @@ use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
PublishBlockRequest, ValidatorId, ValidatorStatus,
PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus,
ValidatorsRequestBody,
};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
@ -663,47 +665,32 @@ pub fn serve<T: BeaconChainTypes>(
query_res: Result<api_types::ValidatorBalancesQuery, warp::Rejection>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let query = query_res?;
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
Ok((
state
.validators()
.iter()
.zip(state.balances().iter())
.enumerate()
// filter by validator id(s) if provided
.filter(|(index, (validator, _))| {
query.id.as_ref().map_or(true, |ids| {
ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => {
&validator.pubkey == pubkey
}
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
crate::validators::get_beacon_state_validator_balances(
state_id,
chain,
query.id.as_deref(),
)
})
})
})
.map(|(index, (_, balance))| {
Some(api_types::ValidatorBalanceData {
index: index as u64,
balance: *balance,
})
})
.collect::<Vec<_>>(),
execution_optimistic,
finalized,
))
},
)?;
);
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
// POST beacon/states/{state_id}/validator_balances
let post_beacon_state_validator_balances = beacon_states_path
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp::body::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorBalancesRequestBody| {
task_spawner.blocking_json_task(Priority::P1, move || {
crate::validators::get_beacon_state_validator_balances(
state_id,
chain,
Some(&query.ids),
)
})
},
);
@ -721,69 +708,34 @@ pub fn serve<T: BeaconChainTypes>(
query_res: Result<api_types::ValidatorsQuery, warp::Rejection>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let query = query_res?;
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let epoch = state.current_epoch();
let far_future_epoch = chain.spec.far_future_epoch;
Ok((
state
.validators()
.iter()
.zip(state.balances().iter())
.enumerate()
// filter by validator id(s) if provided
.filter(|(index, (validator, _))| {
query.id.as_ref().map_or(true, |ids| {
ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => {
&validator.pubkey == pubkey
}
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
crate::validators::get_beacon_state_validators(
state_id,
chain,
&query.id,
&query.status,
)
})
})
})
// filter by status(es) if provided and map the result
.filter_map(|(index, (validator, balance))| {
let status = api_types::ValidatorStatus::from_validator(
validator,
epoch,
far_future_epoch,
},
);
let status_matches =
query.status.as_ref().map_or(true, |statuses| {
statuses.contains(&status)
|| statuses.contains(&status.superstatus())
});
if status_matches {
Some(api_types::ValidatorData {
index: index as u64,
balance: *balance,
status,
validator: validator.clone(),
})
} else {
None
}
})
.collect::<Vec<_>>(),
execution_optimistic,
finalized,
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
// POST beacon/states/{state_id}/validators
let post_beacon_state_validators = beacon_states_path
.clone()
.and(warp::path("validators"))
.and(warp::path::end())
.and(warp::body::json())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: ValidatorsRequestBody| {
task_spawner.blocking_json_task(Priority::P1, move || {
crate::validators::get_beacon_state_validators(
state_id,
chain,
&query.ids,
&query.statuses,
)
})
},
);
@ -4709,6 +4661,8 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_pool_voluntary_exits)
.uor(post_beacon_pool_sync_committees)
.uor(post_beacon_pool_bls_to_execution_changes)
.uor(post_beacon_state_validators)
.uor(post_beacon_state_validator_balances)
.uor(post_beacon_rewards_attestations)
.uor(post_beacon_rewards_sync_committee)
.uor(post_validator_duties_attester)

View File

@ -0,0 +1,119 @@
use crate::state_id::StateId;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::types::{
self as api_types, ExecutionOptimisticFinalizedResponse, ValidatorBalanceData, ValidatorData,
ValidatorId, ValidatorStatus,
};
use std::sync::Arc;
pub fn get_beacon_state_validators<T: BeaconChainTypes>(
state_id: StateId,
chain: Arc<BeaconChain<T>>,
query_ids: &Option<Vec<ValidatorId>>,
query_statuses: &Option<Vec<ValidatorStatus>>,
) -> Result<ExecutionOptimisticFinalizedResponse<Vec<ValidatorData>>, warp::Rejection> {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
let epoch = state.current_epoch();
let far_future_epoch = chain.spec.far_future_epoch;
Ok((
state
.validators()
.iter()
.zip(state.balances().iter())
.enumerate()
// filter by validator id(s) if provided
.filter(|(index, (validator, _))| {
query_ids.as_ref().map_or(true, |ids| {
ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => &validator.pubkey == pubkey,
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
})
})
})
// filter by status(es) if provided and map the result
.filter_map(|(index, (validator, balance))| {
let status = api_types::ValidatorStatus::from_validator(
validator,
epoch,
far_future_epoch,
);
let status_matches = query_statuses.as_ref().map_or(true, |statuses| {
statuses.contains(&status)
|| statuses.contains(&status.superstatus())
});
if status_matches {
Some(ValidatorData {
index: index as u64,
balance: *balance,
status,
validator: validator.clone(),
})
} else {
None
}
})
.collect::<Vec<_>>(),
execution_optimistic,
finalized,
))
},
)?;
Ok(ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
}
pub fn get_beacon_state_validator_balances<T: BeaconChainTypes>(
state_id: StateId,
chain: Arc<BeaconChain<T>>,
optional_ids: Option<&[ValidatorId]>,
) -> Result<ExecutionOptimisticFinalizedResponse<Vec<ValidatorBalanceData>>, warp::Rejection> {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
&chain,
|state, execution_optimistic, finalized| {
Ok((
state
.validators()
.iter()
.zip(state.balances().iter())
.enumerate()
// filter by validator id(s) if provided
.filter(|(index, (validator, _))| {
optional_ids.map_or(true, |ids| {
ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => &validator.pubkey == pubkey,
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
})
})
})
.map(|(index, (_, balance))| ValidatorBalanceData {
index: index as u64,
balance: *balance,
})
.collect::<Vec<_>>(),
execution_optimistic,
finalized,
))
},
)?;
Ok(api_types::ExecutionOptimisticFinalizedResponse {
data,
execution_optimistic: Some(execution_optimistic),
finalized: Some(finalized),
})
}

View File

@ -850,6 +850,18 @@ impl ApiTester {
.await
.unwrap()
.map(|res| res.data);
let result_post_index_ids = self
.client
.post_beacon_states_validator_balances(state_id.0, validator_index_ids)
.await
.unwrap()
.map(|res| res.data);
let result_post_pubkey_ids = self
.client
.post_beacon_states_validator_balances(state_id.0, validator_pubkey_ids)
.await
.unwrap()
.map(|res| res.data);
let expected = state_opt.map(|(state, _execution_optimistic, _finalized)| {
let mut validators = Vec::with_capacity(validator_indices.len());
@ -868,6 +880,8 @@ impl ApiTester {
assert_eq!(result_index_ids, expected, "{:?}", state_id);
assert_eq!(result_pubkey_ids, expected, "{:?}", state_id);
assert_eq!(result_post_index_ids, expected, "{:?}", state_id);
assert_eq!(result_post_pubkey_ids, expected, "{:?}", state_id);
}
}
@ -913,7 +927,6 @@ impl ApiTester {
.await
.unwrap()
.map(|res| res.data);
let result_pubkey_ids = self
.client
.get_beacon_states_validators(
@ -924,6 +937,18 @@ impl ApiTester {
.await
.unwrap()
.map(|res| res.data);
let post_result_index_ids = self
.client
.post_beacon_states_validators(state_id.0, Some(validator_index_ids), None)
.await
.unwrap()
.map(|res| res.data);
let post_result_pubkey_ids = self
.client
.post_beacon_states_validators(state_id.0, Some(validator_pubkey_ids), None)
.await
.unwrap()
.map(|res| res.data);
let expected = state_opt.map(|state| {
let epoch = state.current_epoch();
@ -959,6 +984,8 @@ impl ApiTester {
assert_eq!(result_index_ids, expected, "{:?}", state_id);
assert_eq!(result_pubkey_ids, expected, "{:?}", state_id);
assert_eq!(post_result_index_ids, expected, "{:?}", state_id);
assert_eq!(post_result_pubkey_ids, expected, "{:?}", state_id);
}
}
}

View File

@ -317,6 +317,18 @@ impl BeaconNodeHttpClient {
.map_err(Into::into)
}
async fn post_with_opt_response<T: Serialize, U: IntoUrl, R: DeserializeOwned>(
&self,
url: U,
body: &T,
) -> Result<Option<R>, Error> {
if let Some(response) = self.post_generic(url, body, None).await.optional()? {
response.json().await.map_err(Into::into)
} else {
Ok(None)
}
}
/// Perform a HTTP POST request with a custom timeout.
async fn post_with_timeout<T: Serialize, U: IntoUrl>(
&self,
@ -524,6 +536,29 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}
/// `POST beacon/states/{state_id}/validator_balances`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn post_beacon_states_validator_balances(
&self,
state_id: StateId,
ids: Vec<ValidatorId>,
) -> Result<Option<ExecutionOptimisticFinalizedResponse<Vec<ValidatorBalanceData>>>, Error>
{
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("states")
.push(&state_id.to_string())
.push("validator_balances");
let request = ValidatorBalancesRequestBody { ids };
self.post_with_opt_response(path, &request).await
}
/// `GET beacon/states/{state_id}/validators?id,status`
///
/// Returns `Ok(None)` on a 404 error.
@ -563,6 +598,29 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}
/// `POST beacon/states/{state_id}/validators`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn post_beacon_states_validators(
&self,
state_id: StateId,
ids: Option<Vec<ValidatorId>>,
statuses: Option<Vec<ValidatorStatus>>,
) -> Result<Option<ExecutionOptimisticFinalizedResponse<Vec<ValidatorData>>>, Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("states")
.push(&state_id.to_string())
.push("validators");
let request = ValidatorsRequestBody { ids, statuses };
self.post_with_opt_response(path, &request).await
}
/// `GET beacon/states/{state_id}/committees?slot,index,epoch`
///
/// Returns `Ok(None)` on a 404 error.

View File

@ -278,17 +278,18 @@ pub struct FinalityCheckpointsData {
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(try_from = "&str")]
#[serde(into = "String")]
#[serde(try_from = "std::borrow::Cow<str>")]
pub enum ValidatorId {
PublicKey(PublicKeyBytes),
Index(u64),
}
impl TryFrom<&str> for ValidatorId {
impl TryFrom<std::borrow::Cow<'_, str>> for ValidatorId {
type Error = String;
fn try_from(s: &str) -> Result<Self, Self::Error> {
Self::from_str(s)
fn try_from(s: std::borrow::Cow<str>) -> Result<Self, Self::Error> {
Self::from_str(&s)
}
}
@ -317,6 +318,12 @@ impl fmt::Display for ValidatorId {
}
}
impl From<ValidatorId> for String {
fn from(id: ValidatorId) -> String {
id.to_string()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ValidatorData {
#[serde(with = "serde_utils::quoted_u64")]
@ -492,6 +499,15 @@ pub struct ValidatorsQuery {
pub status: Option<Vec<ValidatorStatus>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ValidatorsRequestBody {
#[serde(default)]
pub ids: Option<Vec<ValidatorId>>,
#[serde(default)]
pub statuses: Option<Vec<ValidatorStatus>>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CommitteeData {
#[serde(with = "serde_utils::quoted_u64")]
@ -656,6 +672,12 @@ pub struct ValidatorBalancesQuery {
pub id: Option<Vec<ValidatorId>>,
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ValidatorBalancesRequestBody {
pub ids: Vec<ValidatorId>,
}
#[derive(Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BlobIndicesQuery {
@ -1879,3 +1901,20 @@ pub struct BlobsBundle<E: EthSpec> {
#[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")]
pub blobs: BlobsList<E>,
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn validator_id_serde() {
let id_str = "\"1\"";
let x: ValidatorId = serde_json::from_str(id_str).unwrap();
assert_eq!(x, ValidatorId::Index(1));
assert_eq!(serde_json::to_string(&x).unwrap(), id_str);
let pubkey_str = "\"0xb824b5ede33a7b05a378a84b183b4bc7e7db894ce48b659f150c97d359edca2f503081d6678d1200f582ec7cafa9caf2\"";
let y: ValidatorId = serde_json::from_str(pubkey_str).unwrap();
assert_eq!(serde_json::to_string(&y).unwrap(), pubkey_str);
}
}