Update pool/attestations and committees endpoints (#1899)

## Issue Addressed

Catching up on a few eth2 spec updates:

## Proposed Changes

- adding query params to the `GET pool/attestations` endpoint
- allowing the `POST pool/attestations` endpoint to accept an array of attestations
    - batching attestation submission
- moving `epoch` from a path param to a query param in the `committees` endpoint

## Additional Info


Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
realbigsean 2020-11-18 23:31:39 +00:00
parent 3408de8151
commit 79fd9b32b9
6 changed files with 250 additions and 127 deletions

View File

@ -599,39 +599,34 @@ pub fn serve<T: BeaconChainTypes>(
}, },
); );
// GET beacon/states/{state_id}/committees/{epoch} // GET beacon/states/{state_id}/committees?slot,index,epoch
let get_beacon_state_committees = beacon_states_path let get_beacon_state_committees = beacon_states_path
.clone() .clone()
.and(warp::path("committees")) .and(warp::path("committees"))
.and(warp::path::param::<Epoch>())
.and(warp::query::<api_types::CommitteesQuery>()) .and(warp::query::<api_types::CommitteesQuery>())
.and(warp::path::end()) .and(warp::path::end())
.and_then( .and_then(
|state_id: StateId, |state_id: StateId, chain: Arc<BeaconChain<T>>, query: api_types::CommitteesQuery| {
chain: Arc<BeaconChain<T>>, // the api spec says if the epoch is not present then the epoch of the state should be used
epoch: Epoch, let query_state_id = query.epoch.map_or(state_id, |epoch| {
query: api_types::CommitteesQuery| { StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
});
blocking_json_task(move || { blocking_json_task(move || {
state_id.map_state(&chain, |state| { query_state_id.map_state(&chain, |state| {
let relative_epoch = let epoch = state.slot.epoch(T::EthSpec::slots_per_epoch());
RelativeEpoch::from_epoch(state.current_epoch(), epoch).map_err(
|_| {
warp_utils::reject::custom_bad_request(format!(
"state is epoch {} and only previous, current and next epochs are supported",
state.current_epoch()
))
},
)?;
let committee_cache = if state let committee_cache = if state
.committee_cache_is_initialized(relative_epoch) .committee_cache_is_initialized(RelativeEpoch::Current)
{ {
state.committee_cache(relative_epoch).map(Cow::Borrowed) state
.committee_cache(RelativeEpoch::Current)
.map(Cow::Borrowed)
} else { } else {
CommitteeCache::initialized(state, epoch, &chain.spec).map(Cow::Owned) CommitteeCache::initialized(state, epoch, &chain.spec).map(Cow::Owned)
} }
.map_err(BeaconChainError::BeaconStateError) .map_err(BeaconChainError::BeaconStateError)
.map_err(warp_utils::reject::beacon_chain_error)?; .map_err(warp_utils::reject::beacon_chain_error)?;
// Use either the supplied slot or all slots in the epoch. // Use either the supplied slot or all slots in the epoch.
let slots = query.slot.map(|slot| vec![slot]).unwrap_or_else(|| { let slots = query.slot.map(|slot| vec![slot]).unwrap_or_else(|| {
@ -659,11 +654,11 @@ pub fn serve<T: BeaconChainTypes>(
let committee = committee_cache let committee = committee_cache
.get_beacon_committee(slot, index) .get_beacon_committee(slot, index)
.ok_or_else(|| { .ok_or_else(|| {
warp_utils::reject::custom_bad_request(format!( warp_utils::reject::custom_bad_request(format!(
"committee index {} does not exist in epoch {}", "committee index {} does not exist in epoch {}",
index, epoch index, epoch
)) ))
})?; })?;
response.push(api_types::CommitteeData { response.push(api_types::CommitteeData {
index, index,
@ -906,63 +901,119 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp::body::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then( .and_then(
|chain: Arc<BeaconChain<T>>, |chain: Arc<BeaconChain<T>>,
attestation: Attestation<T::EthSpec>, attestations: Vec<Attestation<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| { network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
blocking_json_task(move || { blocking_json_task(move || {
let attestation = chain let mut failures = Vec::new();
.verify_unaggregated_attestation_for_gossip(attestation.clone(), None)
.map_err(|e| {
warp_utils::reject::object_invalid(format!(
"gossip verification failed: {:?}",
e
))
})?;
publish_pubsub_message( for (index, attestation) in attestations.as_slice().iter().enumerate() {
&network_tx, let attestation = match chain
PubsubMessage::Attestation(Box::new(( .verify_unaggregated_attestation_for_gossip(attestation.clone(), None)
attestation.subnet_id(), {
attestation.attestation().clone(), Ok(attestation) => attestation,
))), Err(e) => {
)?; error!(log,
"Failure verifying attestation for gossip";
"error" => ?e,
"request_index" => index,
"committee_index" => attestation.data.index,
"attestation_slot" => attestation.data.slot,
);
failures.push(api_types::Failure::new(
index,
format!("Verification: {:?}", e),
));
// skip to the next attestation so we do not publish this one to gossip
continue;
}
};
chain publish_pubsub_message(
.apply_attestation_to_fork_choice(&attestation) &network_tx,
.map_err(|e| { PubsubMessage::Attestation(Box::new((
warp_utils::reject::broadcast_without_import(format!( attestation.subnet_id(),
"not applied to fork choice: {:?}", attestation.attestation().clone(),
e ))),
)) )?;
})?;
chain let committee_index = attestation.attestation().data.index;
.add_to_naive_aggregation_pool(attestation) let slot = attestation.attestation().data.slot;
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"not applied to naive aggregation pool: {:?}",
e
))
})?;
Ok(()) if let Err(e) = chain.apply_attestation_to_fork_choice(&attestation) {
error!(log,
"Failure applying verified attestation to fork choice";
"error" => ?e,
"request_index" => index,
"committee_index" => committee_index,
"slot" => slot,
);
failures.push(api_types::Failure::new(
index,
format!("Fork choice: {:?}", e),
));
};
if let Err(e) = chain.add_to_naive_aggregation_pool(attestation) {
error!(log,
"Failure adding verified attestation to the naive aggregation pool";
"error" => ?e,
"request_index" => index,
"committee_index" => committee_index,
"slot" => slot,
);
failures.push(api_types::Failure::new(
index,
format!("Naive aggregation pool: {:?}", e),
));
}
}
if failures.is_empty() {
Ok(())
} else {
Err(warp_utils::reject::indexed_bad_request(
"error processing attestations".to_string(),
failures,
))
}
}) })
}, },
); );
// GET beacon/pool/attestations // GET beacon/pool/attestations?committee_index,slot
let get_beacon_pool_attestations = beacon_pool_path let get_beacon_pool_attestations = beacon_pool_path
.clone() .clone()
.and(warp::path("attestations")) .and(warp::path("attestations"))
.and(warp::path::end()) .and(warp::path::end())
.and_then(|chain: Arc<BeaconChain<T>>| { .and(warp::query::<api_types::AttestationPoolQuery>())
blocking_json_task(move || { .and_then(
let mut attestations = chain.op_pool.get_all_attestations(); |chain: Arc<BeaconChain<T>>, query: api_types::AttestationPoolQuery| {
attestations.extend(chain.naive_aggregation_pool.read().iter().cloned()); blocking_json_task(move || {
Ok(api_types::GenericResponse::from(attestations)) let query_filter = |attestation: &Attestation<T::EthSpec>| {
}) query
}); .slot
.map_or(true, |slot| slot == attestation.data.slot)
&& query
.committee_index
.map_or(true, |index| index == attestation.data.index)
};
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
attestations.extend(
chain
.naive_aggregation_pool
.read()
.iter()
.cloned()
.filter(query_filter),
);
Ok(api_types::GenericResponse::from(attestations))
})
},
);
// POST beacon/pool/attester_slashings // POST beacon/pool/attester_slashings
let post_beacon_pool_attester_slashings = beacon_pool_path let post_beacon_pool_attester_slashings = beacon_pool_path

View File

@ -6,6 +6,7 @@ use beacon_chain::{
}; };
use discv5::enr::{CombinedKey, EnrBuilder}; use discv5::enr::{CombinedKey, EnrBuilder};
use environment::null_logger; use environment::null_logger;
use eth2::Error;
use eth2::{types::*, BeaconNodeHttpClient, Url}; use eth2::{types::*, BeaconNodeHttpClient, Url};
use eth2_libp2p::{ use eth2_libp2p::{
rpc::methods::MetaData, rpc::methods::MetaData,
@ -624,14 +625,10 @@ impl ApiTester {
for state_id in self.interesting_state_ids() { for state_id in self.interesting_state_ids() {
let mut state_opt = self.get_state(state_id); let mut state_opt = self.get_state(state_id);
let epoch = state_opt let epoch_opt = state_opt.as_ref().map(|state| state.current_epoch());
.as_ref()
.map(|state| state.current_epoch())
.unwrap_or_else(|| Epoch::new(0));
let results = self let results = self
.client .client
.get_beacon_states_committees(state_id, epoch, None, None) .get_beacon_states_committees(state_id, None, None, epoch_opt)
.await .await
.unwrap() .unwrap()
.map(|res| res.data); .map(|res| res.data);
@ -641,11 +638,10 @@ impl ApiTester {
} }
let state = state_opt.as_mut().expect("result should be none"); let state = state_opt.as_mut().expect("result should be none");
state.build_all_committee_caches(&self.chain.spec).unwrap(); state.build_all_committee_caches(&self.chain.spec).unwrap();
let committees = state let committees = state
.get_beacon_committees_at_epoch( .get_beacon_committees_at_epoch(RelativeEpoch::Current)
RelativeEpoch::from_epoch(state.current_epoch(), epoch).unwrap(),
)
.unwrap(); .unwrap();
for (i, result) in results.unwrap().into_iter().enumerate() { for (i, result) in results.unwrap().into_iter().enumerate() {
@ -886,45 +882,60 @@ impl ApiTester {
} }
pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self {
for attestation in &self.attestations { self.client
self.client .post_beacon_pool_attestations(self.attestations.as_slice())
.post_beacon_pool_attestations(attestation) .await
.await .unwrap();
.unwrap();
assert!( assert!(
self.network_rx.try_recv().is_ok(), self.network_rx.try_recv().is_ok(),
"valid attestation should be sent to network" "valid attestation should be sent to network"
); );
}
self self
} }
pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self { pub async fn test_post_beacon_pool_attestations_invalid(mut self) -> Self {
let mut attestations = Vec::new();
for attestation in &self.attestations { for attestation in &self.attestations {
let mut attestation = attestation.clone(); let mut invalid_attestation = attestation.clone();
attestation.data.slot += 1; invalid_attestation.data.slot += 1;
assert!(self // add both to ensure we only fail on invalid attestations
.client attestations.push(attestation.clone());
.post_beacon_pool_attestations(&attestation) attestations.push(invalid_attestation);
.await
.is_err());
assert!(
self.network_rx.try_recv().is_err(),
"invalid attestation should not be sent to network"
);
} }
let err = self
.client
.post_beacon_pool_attestations(attestations.as_slice())
.await
.unwrap_err();
match err {
Error::ServerIndexedMessage(IndexedErrorMessage {
code,
message: _,
failures,
}) => {
assert_eq!(code, 400);
assert_eq!(failures.len(), self.attestations.len());
}
_ => panic!("query did not fail correctly"),
}
assert!(
self.network_rx.try_recv().is_ok(),
"if some attestations are valid, we should send them to the network"
);
self self
} }
pub async fn test_get_beacon_pool_attestations(self) -> Self { pub async fn test_get_beacon_pool_attestations(self) -> Self {
let result = self let result = self
.client .client
.get_beacon_pool_attestations() .get_beacon_pool_attestations(None, None)
.await .await
.unwrap() .unwrap()
.data; .data;

View File

@ -344,6 +344,22 @@ impl<T: EthSpec> OperationPool<T> {
.collect() .collect()
} }
/// Returns all known `Attestation` objects that pass the provided filter.
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<T>>
where
F: Fn(&Attestation<T>) -> bool,
{
self.attestations
.read()
.iter()
.map(|(_, attns)| attns.iter().cloned())
.flatten()
.filter(filter)
.collect()
}
/// Returns all known `AttesterSlashing` objects. /// Returns all known `AttesterSlashing` objects.
/// ///
/// This method may return objects that are invalid for block inclusion. /// This method may return objects that are invalid for block inclusion.

View File

@ -301,15 +301,15 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await self.get_opt(path).await
} }
/// `GET beacon/states/{state_id}/committees?slot,index` /// `GET beacon/states/{state_id}/committees?slot,index,epoch`
/// ///
/// Returns `Ok(None)` on a 404 error. /// Returns `Ok(None)` on a 404 error.
pub async fn get_beacon_states_committees( pub async fn get_beacon_states_committees(
&self, &self,
state_id: StateId, state_id: StateId,
epoch: Epoch,
slot: Option<Slot>, slot: Option<Slot>,
index: Option<u64>, index: Option<u64>,
epoch: Option<Epoch>,
) -> Result<Option<GenericResponse<Vec<CommitteeData>>>, Error> { ) -> Result<Option<GenericResponse<Vec<CommitteeData>>>, Error> {
let mut path = self.eth_path()?; let mut path = self.eth_path()?;
@ -318,8 +318,7 @@ impl BeaconNodeHttpClient {
.push("beacon") .push("beacon")
.push("states") .push("states")
.push(&state_id.to_string()) .push(&state_id.to_string())
.push("committees") .push("committees");
.push(&epoch.to_string());
if let Some(slot) = slot { if let Some(slot) = slot {
path.query_pairs_mut() path.query_pairs_mut()
@ -331,6 +330,11 @@ impl BeaconNodeHttpClient {
.append_pair("index", &index.to_string()); .append_pair("index", &index.to_string());
} }
if let Some(epoch) = epoch {
path.query_pairs_mut()
.append_pair("epoch", &epoch.to_string());
}
self.get_opt(path).await self.get_opt(path).await
} }
@ -479,7 +483,7 @@ impl BeaconNodeHttpClient {
/// `POST beacon/pool/attestations` /// `POST beacon/pool/attestations`
pub async fn post_beacon_pool_attestations<T: EthSpec>( pub async fn post_beacon_pool_attestations<T: EthSpec>(
&self, &self,
attestation: &Attestation<T>, attestations: &[Attestation<T>],
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut path = self.eth_path()?; let mut path = self.eth_path()?;
@ -489,14 +493,23 @@ impl BeaconNodeHttpClient {
.push("pool") .push("pool")
.push("attestations"); .push("attestations");
self.post(path, attestation).await?; let response = self
.client
.post(path)
.json(attestations)
.send()
.await
.map_err(Error::Reqwest)?;
ok_or_indexed_error(response).await?;
Ok(()) Ok(())
} }
/// `GET beacon/pool/attestations` /// `GET beacon/pool/attestations?slot,committee_index`
pub async fn get_beacon_pool_attestations<T: EthSpec>( pub async fn get_beacon_pool_attestations<T: EthSpec>(
&self, &self,
slot: Option<Slot>,
committee_index: Option<u64>,
) -> Result<GenericResponse<Vec<Attestation<T>>>, Error> { ) -> Result<GenericResponse<Vec<Attestation<T>>>, Error> {
let mut path = self.eth_path()?; let mut path = self.eth_path()?;
@ -506,6 +519,16 @@ impl BeaconNodeHttpClient {
.push("pool") .push("pool")
.push("attestations"); .push("attestations");
if let Some(slot) = slot {
path.query_pairs_mut()
.append_pair("slot", &slot.to_string());
}
if let Some(index) = committee_index {
path.query_pairs_mut()
.append_pair("committee_index", &index.to_string());
}
self.get(path).await self.get(path).await
} }

View File

@ -349,6 +349,13 @@ impl fmt::Display for ValidatorStatus {
pub struct CommitteesQuery { pub struct CommitteesQuery {
pub slot: Option<Slot>, pub slot: Option<Slot>,
pub index: Option<u64>, pub index: Option<u64>,
pub epoch: Option<Epoch>,
}
#[derive(Serialize, Deserialize)]
pub struct AttestationPoolQuery {
pub slot: Option<Slot>,
pub committee_index: Option<u64>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]

View File

@ -329,6 +329,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))? .map_err(|e| format!("Failed to produce attestation data: {:?}", e))?
.data; .data;
let mut attestations = Vec::with_capacity(validator_duties.len());
for duty in validator_duties { for duty in validator_duties {
// Ensure that all required fields are present in the validator duty. // Ensure that all required fields are present in the validator duty.
let ( let (
@ -370,39 +372,52 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
signature: AggregateSignature::infinity(), signature: AggregateSignature::infinity(),
}; };
self.validator_store if self
.validator_store
.sign_attestation( .sign_attestation(
duty.validator_pubkey(), duty.validator_pubkey(),
validator_committee_position, validator_committee_position,
&mut attestation, &mut attestation,
current_epoch, current_epoch,
) )
.ok_or_else(|| "Failed to sign attestation".to_string())?; .is_some()
match self
.beacon_node
.post_beacon_pool_attestations(&attestation)
.await
{ {
Ok(()) => info!( attestations.push(attestation);
} else {
crit!(
log, log,
"Successfully published attestation"; "Failed to sign attestation";
"head_block" => format!("{:?}", attestation.data.beacon_block_root), "committee_index" => committee_index,
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "unaggregated",
),
Err(e) => error!(
log,
"Unable to publish attestation";
"error" => e.to_string(),
"committee_index" => attestation.data.index,
"slot" => slot.as_u64(), "slot" => slot.as_u64(),
"type" => "unaggregated", );
), continue;
} }
} }
match self
.beacon_node
.post_beacon_pool_attestations(attestations.as_slice())
.await
{
Ok(()) => info!(
log,
"Successfully published attestations";
"count" => attestations.len(),
"head_block" => ?attestation_data.beacon_block_root,
"committee_index" => attestation_data.index,
"slot" => attestation_data.slot.as_u64(),
"type" => "unaggregated",
),
Err(e) => error!(
log,
"Unable to publish attestations";
"error" => ?e,
"committee_index" => attestation_data.index,
"slot" => slot.as_u64(),
"type" => "unaggregated",
),
}
Ok(Some(attestation_data)) Ok(Some(attestation_data))
} }