Standard beacon api updates (#1831)

## Issue Addressed

Resolves #1809 
Resolves #1824
Resolves #1818
Resolves #1828 (hopefully)

## Proposed Changes

- add `validator_index` to the proposer duties endpoint
- add the ability to query for historical proposer duties
- `StateId` deserialization now fails with a 400 warp rejection
- add the `validator_balances` endpoint
- update the `aggregate_and_proofs` endpoint to accept an array
- updates the attester duties endpoint from a `GET` to a `POST`
- reduces the number of times we query for proposer duties from once per slot per validator to only once per slot 


Co-authored-by: realbigsean <seananderson33@gmail.com>
Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
realbigsean 2020-11-09 23:13:56 +00:00
parent 556190ff46
commit f8da151b0b
10 changed files with 723 additions and 323 deletions

View File

@ -984,9 +984,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// - `VerifiedUnaggregatedAttestation`
/// - `VerifiedAggregatedAttestation`
pub fn apply_attestation_to_fork_choice<'a>(
pub fn apply_attestation_to_fork_choice(
&self,
verified: &'a impl SignatureVerifiedAttestation<T>,
verified: &impl SignatureVerifiedAttestation<T>,
) -> Result<(), Error> {
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);

View File

@ -89,6 +89,7 @@ impl BeaconProposerCache {
Ok(ProposerData {
pubkey: PublicKeyBytes::from(pubkey),
validator_index: i as u64,
slot,
})
})

View File

@ -39,7 +39,7 @@ use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use types::{
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
Hash256, ProposerSlashing, PublicKey, RelativeEpoch, SignedAggregateAndProof,
Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, YamlConfig,
};
use warp::{http::Response, Filter};
@ -390,7 +390,13 @@ pub fn serve<T: BeaconChainTypes>(
let beacon_states_path = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("states"))
.and(warp::path::param::<StateId>())
.and(warp::path::param::<StateId>().or_else(|_| {
blocking_task(|| {
Err(warp_utils::reject::custom_bad_request(
"Invalid state ID".to_string(),
))
})
}))
.and(chain_filter.clone());
// GET beacon/states/{state_id}/root
@ -435,6 +441,50 @@ pub fn serve<T: BeaconChainTypes>(
})
});
// GET beacon/states/{state_id}/validator_balances?id
let get_beacon_state_validator_balances = beacon_states_path
.clone()
.and(warp::path("validator_balances"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorBalancesQuery>())
.and_then(
|state_id: StateId,
chain: Arc<BeaconChain<T>>,
query: api_types::ValidatorBalancesQuery| {
blocking_json_task(move || {
state_id
.map_state(&chain, |state| {
Ok(state
.validators
.iter()
.zip(state.balances.iter())
.enumerate()
// filter by validator id(s) if provided
.filter(|(index, (validator, _))| {
query.id.as_ref().map_or(true, |ids| {
ids.0.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => {
&validator.pubkey == pubkey
}
ValidatorId::Index(param_index) => {
*param_index == *index as u64
}
})
})
})
.map(|(index, (_, balance))| {
Some(api_types::ValidatorBalanceData {
index: index as u64,
balance: *balance,
})
})
.collect::<Vec<_>>())
})
.map(api_types::GenericResponse::from)
})
},
);
// GET beacon/states/{state_id}/validators?id,status
let get_beacon_state_validators = beacon_states_path
.clone()
@ -747,7 +797,7 @@ pub fn serve<T: BeaconChainTypes>(
* beacon/blocks
*/
// POST beacon/blocks/{block_id}
// POST beacon/blocks
let post_beacon_blocks = eth1_v1
.and(warp::path("beacon"))
.and(warp::path("blocks"))
@ -1370,18 +1420,158 @@ pub fn serve<T: BeaconChainTypes>(
* validator
*/
// GET validator/duties/attester/{epoch}
let get_validator_duties_attester = eth1_v1
// GET validator/duties/proposer/{epoch}
let get_validator_duties_proposer = eth1_v1
.and(warp::path("validator"))
.and(warp::path("duties"))
.and(warp::path("proposer"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(beacon_proposer_cache())
.and_then(
|epoch: Epoch,
chain: Arc<BeaconChain<T>>,
beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>| {
blocking_json_task(move || {
let current_epoch = chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?;
if epoch > current_epoch {
return Err(warp_utils::reject::custom_bad_request(format!(
"request epoch {} is ahead of the current epoch {}",
epoch, current_epoch
)));
}
if epoch == current_epoch {
beacon_proposer_cache
.lock()
.get_proposers(&chain, epoch)
.map(api_types::GenericResponse::from)
} else {
let state =
StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state(&chain)?;
epoch
.slot_iter(T::EthSpec::slots_per_epoch())
.map(|slot| {
state
.get_beacon_proposer_index(slot, &chain.spec)
.map_err(warp_utils::reject::beacon_state_error)
.and_then(|i| {
let pubkey =
chain.validator_pubkey(i)
.map_err(warp_utils::reject::beacon_chain_error)?
.ok_or_else(||
warp_utils::reject::beacon_chain_error(
BeaconChainError::ValidatorPubkeyCacheIncomplete(i)
)
)?;
Ok(api_types::ProposerData {
pubkey: PublicKeyBytes::from(pubkey),
validator_index: i as u64,
slot,
})
})
})
.collect::<Result<Vec<api_types::ProposerData>, _>>()
.map(api_types::GenericResponse::from)
}
})
},
);
// GET validator/blocks/{slot}
let get_validator_blocks = eth1_v1
.and(warp::path("validator"))
.and(warp::path("blocks"))
.and(warp::path::param::<Slot>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::query::<api_types::ValidatorBlocksQuery>())
.and(chain_filter.clone())
.and_then(
|slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let randao_reveal = (&query.randao_reveal).try_into().map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"randao reveal is not valid BLS signature: {:?}",
e
))
})?;
chain
.produce_block(randao_reveal, slot, query.graffiti.map(Into::into))
.map(|block_and_state| block_and_state.0)
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::block_production_error)
})
},
);
// GET validator/attestation_data?slot,committee_index
let get_validator_attestation_data = eth1_v1
.and(warp::path("validator"))
.and(warp::path("attestation_data"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAttestationDataQuery>())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAttestationDataQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
chain
.produce_unaggregated_attestation(query.slot, query.committee_index)
.map(|attestation| attestation.data)
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::beacon_chain_error)
})
},
);
// GET validator/aggregate_attestation?attestation_data_root,slot
let get_validator_aggregate_attestation = eth1_v1
.and(warp::path("validator"))
.and(warp::path("aggregate_attestation"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAggregateAttestationQuery>())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAggregateAttestationQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
chain
.get_aggregated_attestation_by_slot_and_root(
query.slot,
&query.attestation_data_root,
)
.map(api_types::GenericResponse::from)
.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"no matching aggregate found".to_string(),
)
})
})
},
);
// POST validator/duties/attester/{epoch}
let post_validator_duties_attester = eth1_v1
.and(warp::path("validator"))
.and(warp::path("duties"))
.and(warp::path("attester"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::query::<api_types::ValidatorDutiesQuery>())
.and(warp::body::json())
.and(chain_filter.clone())
.and_then(
|epoch: Epoch, query: api_types::ValidatorDutiesQuery, chain: Arc<BeaconChain<T>>| {
|epoch: Epoch, indices: api_types::ValidatorIndexData, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let current_epoch = chain
.epoch()
@ -1397,30 +1587,22 @@ pub fn serve<T: BeaconChainTypes>(
let validator_count = StateId::head()
.map_state(&chain, |state| Ok(state.validators.len() as u64))?;
let indices = query
.index
.as_ref()
.map(|index| index.0.clone())
.map(Result::Ok)
.unwrap_or_else(|| {
Ok::<_, warp::Rejection>((0..validator_count).collect())
})?;
let pubkeys = indices
.into_iter()
.filter(|i| *i < validator_count as u64)
.0
.iter()
.filter(|i| **i < validator_count as u64)
.map(|i| {
let pubkey = chain
.validator_pubkey(i as usize)
.validator_pubkey(*i as usize)
.map_err(warp_utils::reject::beacon_chain_error)?
.ok_or_else(|| {
warp_utils::reject::custom_bad_request(format!(
"unknown validator index {}",
i
*i
))
})?;
Ok((i, pubkey))
Ok((*i, pubkey))
})
.collect::<Result<Vec<_>, warp::Rejection>>()?;
@ -1536,103 +1718,6 @@ pub fn serve<T: BeaconChainTypes>(
},
);
// GET validator/duties/proposer/{epoch}
let get_validator_duties_proposer = eth1_v1
.and(warp::path("validator"))
.and(warp::path("duties"))
.and(warp::path("proposer"))
.and(warp::path::param::<Epoch>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(beacon_proposer_cache())
.and_then(
|epoch: Epoch,
chain: Arc<BeaconChain<T>>,
beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>| {
blocking_json_task(move || {
beacon_proposer_cache
.lock()
.get_proposers(&chain, epoch)
.map(api_types::GenericResponse::from)
})
},
);
// GET validator/blocks/{slot}
let get_validator_blocks = eth1_v1
.and(warp::path("validator"))
.and(warp::path("blocks"))
.and(warp::path::param::<Slot>())
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(warp::query::<api_types::ValidatorBlocksQuery>())
.and(chain_filter.clone())
.and_then(
|slot: Slot, query: api_types::ValidatorBlocksQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let randao_reveal = (&query.randao_reveal).try_into().map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"randao reveal is not valid BLS signature: {:?}",
e
))
})?;
chain
.produce_block(randao_reveal, slot, query.graffiti.map(Into::into))
.map(|block_and_state| block_and_state.0)
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::block_production_error)
})
},
);
// GET validator/attestation_data?slot,committee_index
let get_validator_attestation_data = eth1_v1
.and(warp::path("validator"))
.and(warp::path("attestation_data"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAttestationDataQuery>())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAttestationDataQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
chain
.produce_unaggregated_attestation(query.slot, query.committee_index)
.map(|attestation| attestation.data)
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::beacon_chain_error)
})
},
);
// GET validator/aggregate_attestation?attestation_data_root,slot
let get_validator_aggregate_attestation = eth1_v1
.and(warp::path("validator"))
.and(warp::path("aggregate_attestation"))
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAggregateAttestationQuery>())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAggregateAttestationQuery, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
chain
.get_aggregated_attestation_by_slot_and_root(
query.slot,
&query.attestation_data_root,
)
.map(api_types::GenericResponse::from)
.ok_or_else(|| {
warp_utils::reject::custom_not_found(
"no matching aggregate found".to_string(),
)
})
})
},
);
// POST validator/aggregate_and_proofs
let post_validator_aggregate_and_proofs = eth1_v1
.and(warp::path("validator"))
@ -1642,53 +1727,81 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(warp::body::json())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
|chain: Arc<BeaconChain<T>>,
aggregate: SignedAggregateAndProof<T::EthSpec>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, log: Logger| {
blocking_json_task(move || {
let aggregate =
let mut verified_aggregates = Vec::with_capacity(aggregates.len());
let mut messages = Vec::with_capacity(aggregates.len());
let mut failures = Vec::new();
// Verify that all messages in the post are valid before processing further
for (index, aggregate) in aggregates.as_slice().iter().enumerate() {
match chain.verify_aggregated_attestation_for_gossip(aggregate.clone()) {
Ok(aggregate) => aggregate,
Ok(verified_aggregate) => {
messages.push(PubsubMessage::AggregateAndProofAttestation(Box::new(
verified_aggregate.aggregate().clone(),
)));
verified_aggregates.push((index, verified_aggregate));
}
// If we already know the attestation, don't broadcast it or attempt to
// further verify it. Return success.
//
// It's reasonably likely that two different validators produce
// identical aggregates, especially if they're using the same beacon
// node.
Err(AttnError::AttestationAlreadyKnown(_)) => return Ok(()),
Err(AttnError::AttestationAlreadyKnown(_)) => continue,
Err(e) => {
return Err(warp_utils::reject::object_invalid(format!(
"gossip verification failed: {:?}",
e
)));
error!(log,
"Failure verifying aggregate and proofs";
"error" => format!("{:?}", e),
"request_index" => index,
"aggregator_index" => aggregate.message.aggregator_index,
"attestation_index" => aggregate.message.aggregate.data.index,
"attestation_slot" => aggregate.message.aggregate.data.slot,
);
failures.push(api_types::Failure::new(index, format!("Verification: {:?}", e)));
},
}
}
};
publish_pubsub_message(
&network_tx,
PubsubMessage::AggregateAndProofAttestation(Box::new(
aggregate.aggregate().clone(),
)),
)?;
// Publish aggregate attestations to the libp2p network
if !messages.is_empty() {
publish_network_message(&network_tx, NetworkMessage::Publish { messages })?;
}
chain
.apply_attestation_to_fork_choice(&aggregate)
.map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"not applied to fork choice: {:?}",
e
// Import aggregate attestations
for (index, verified_aggregate) in verified_aggregates {
if let Err(e) = chain.apply_attestation_to_fork_choice(&verified_aggregate) {
error!(log,
"Failure applying verified aggregate attestation to fork choice";
"error" => format!("{:?}", e),
"request_index" => index,
"aggregator_index" => verified_aggregate.aggregate().message.aggregator_index,
"attestation_index" => verified_aggregate.attestation().data.index,
"attestation_slot" => verified_aggregate.attestation().data.slot,
);
failures.push(api_types::Failure::new(index, format!("Fork choice: {:?}", e)));
}
if let Err(e) = chain.add_to_block_inclusion_pool(verified_aggregate) {
warn!(log,
"Could not add verified aggregate attestation to the inclusion pool";
"error" => format!("{:?}", e),
"request_index" => index,
);
failures.push(api_types::Failure::new(index, format!("Op pool: {:?}", e)));
}
}
if !failures.is_empty() {
Err(warp_utils::reject::indexed_bad_request("error processing aggregate and proofs".to_string(),
failures
))
})?;
chain.add_to_block_inclusion_pool(aggregate).map_err(|e| {
warp_utils::reject::broadcast_without_import(format!(
"not applied to block inclusion pool: {:?}",
e
))
})?;
} else {
Ok(())
}
})
},
);
@ -1935,9 +2048,11 @@ pub fn serve<T: BeaconChainTypes>(
let routes = warp::get()
.and(
get_beacon_genesis
.boxed()
.or(get_beacon_state_root.boxed())
.or(get_beacon_state_fork.boxed())
.or(get_beacon_state_finality_checkpoints.boxed())
.or(get_beacon_state_validator_balances.boxed())
.or(get_beacon_state_validators.boxed())
.or(get_beacon_state_validators_id.boxed())
.or(get_beacon_state_committees.boxed())
@ -1961,7 +2076,6 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_node_health.boxed())
.or(get_node_peers_by_id.boxed())
.or(get_node_peers.boxed())
.or(get_validator_duties_attester.boxed())
.or(get_validator_duties_proposer.boxed())
.or(get_validator_blocks.boxed())
.or(get_validator_attestation_data.boxed())
@ -1976,23 +2090,19 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_lighthouse_eth1_syncing.boxed())
.or(get_lighthouse_eth1_block_cache.boxed())
.or(get_lighthouse_eth1_deposit_cache.boxed())
.or(get_lighthouse_beacon_states_ssz.boxed())
.boxed(),
.or(get_lighthouse_beacon_states_ssz.boxed()),
)
.or(warp::post()
.and(
.or(warp::post().and(
post_beacon_blocks
.boxed()
.or(post_beacon_pool_attestations.boxed())
.or(post_beacon_pool_attester_slashings.boxed())
.or(post_beacon_pool_proposer_slashings.boxed())
.or(post_beacon_pool_voluntary_exits.boxed())
.or(post_validator_duties_attester.boxed())
.or(post_validator_aggregate_and_proofs.boxed())
.or(post_validator_beacon_committee_subscriptions.boxed())
.boxed(),
)
.boxed())
.boxed()
// Maps errors into HTTP responses.
.or(post_validator_beacon_committee_subscriptions.boxed()),
))
.recover(warp_utils::reject::handle_rejection)
.with(slog_logging(log.clone()))
.with(prometheus_metrics())

View File

@ -411,6 +411,73 @@ impl ApiTester {
self
}
pub async fn test_beacon_states_validator_balances(self) -> Self {
for state_id in self.interesting_state_ids() {
for validator_indices in self.interesting_validator_indices() {
let state_opt = self.get_state(state_id);
let validators: Vec<Validator> = match state_opt.as_ref() {
Some(state) => state.validators.clone().into(),
None => vec![],
};
let validator_index_ids = validator_indices
.iter()
.cloned()
.map(|i| ValidatorId::Index(i))
.collect::<Vec<ValidatorId>>();
let validator_pubkey_ids = validator_indices
.iter()
.cloned()
.map(|i| {
ValidatorId::PublicKey(
validators
.get(i as usize)
.map_or(PublicKeyBytes::empty(), |val| val.pubkey.clone()),
)
})
.collect::<Vec<ValidatorId>>();
let result_index_ids = self
.client
.get_beacon_states_validator_balances(
state_id,
Some(validator_index_ids.as_slice()),
)
.await
.unwrap()
.map(|res| res.data);
let result_pubkey_ids = self
.client
.get_beacon_states_validator_balances(
state_id,
Some(validator_pubkey_ids.as_slice()),
)
.await
.unwrap()
.map(|res| res.data);
let expected = state_opt.map(|state| {
let mut validators = Vec::with_capacity(validator_indices.len());
for i in validator_indices {
if i < state.balances.len() as u64 {
validators.push(ValidatorBalanceData {
index: i as u64,
balance: state.balances[i as usize],
});
}
}
validators
});
assert_eq!(result_index_ids, expected, "{:?}", state_id);
assert_eq!(result_pubkey_ids, expected, "{:?}", state_id);
}
}
self
}
pub async fn test_beacon_states_validators(self) -> Self {
for state_id in self.interesting_state_ids() {
for statuses in self.interesting_validator_statuses() {
@ -1235,7 +1302,7 @@ impl ApiTester {
if epoch > current_epoch + 1 {
assert_eq!(
self.client
.get_validator_duties_attester(epoch, Some(&indices))
.post_validator_duties_attester(epoch, indices.as_slice())
.await
.unwrap_err()
.status()
@ -1247,7 +1314,7 @@ impl ApiTester {
let results = self
.client
.get_validator_duties_attester(epoch, Some(&indices))
.post_validator_duties_attester(epoch, indices.as_slice())
.await
.unwrap()
.data;
@ -1336,7 +1403,11 @@ impl ApiTester {
.unwrap();
let pubkey = state.validators[index].pubkey.clone().into();
ProposerData { pubkey, slot }
ProposerData {
pubkey,
validator_index: index as u64,
slot,
}
})
.collect::<Vec<_>>();
@ -1473,17 +1544,17 @@ impl ApiTester {
let fork = head.beacon_state.fork;
let genesis_validators_root = self.chain.genesis_validators_root;
let mut duties = vec![];
for i in 0..self.validator_keypairs.len() {
duties.push(
self.client
.get_validator_duties_attester(epoch, Some(&[i as u64]))
let duties = self
.client
.post_validator_duties_attester(
epoch,
(0..self.validator_keypairs.len() as u64)
.collect::<Vec<u64>>()
.as_slice(),
)
.await
.unwrap()
.data[0]
.clone(),
)
}
.data;
let (i, kp, duty, proof) = self
.validator_keypairs
@ -1554,7 +1625,7 @@ impl ApiTester {
let aggregate = self.get_aggregate().await;
self.client
.post_validator_aggregate_and_proof::<E>(&aggregate)
.post_validator_aggregate_and_proof::<E>(&[aggregate])
.await
.unwrap();
@ -1569,7 +1640,7 @@ impl ApiTester {
aggregate.message.aggregate.data.slot += 1;
self.client
.post_validator_aggregate_and_proof::<E>(&aggregate)
.post_validator_aggregate_and_proof::<E>(&[aggregate])
.await
.unwrap_err();
@ -1700,6 +1771,8 @@ async fn beacon_get() {
.await
.test_beacon_states_validators()
.await
.test_beacon_states_validator_balances()
.await
.test_beacon_states_committees()
.await
.test_beacon_states_validator_id()

View File

@ -28,6 +28,8 @@ pub enum Error {
Reqwest(reqwest::Error),
/// The server returned an error message where the body was able to be parsed.
ServerMessage(ErrorMessage),
/// The server returned an error message with an array of errors.
ServerIndexedMessage(IndexedErrorMessage),
/// The server returned an error message where the body was unable to be parsed.
StatusCode(StatusCode),
/// The supplied URL is badly formatted. It should look something like `http://127.0.0.1:5052`.
@ -50,6 +52,7 @@ impl Error {
match self {
Error::Reqwest(error) => error.status(),
Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(),
Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(),
Error::StatusCode(status) => Some(*status),
Error::InvalidUrl(_) => None,
Error::InvalidSecret(_) => None,
@ -137,6 +140,26 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// Perform a HTTP POST request, returning a JSON response.
async fn post_with_response<T: DeserializeOwned, U: IntoUrl, V: Serialize>(
&self,
url: U,
body: &V,
) -> Result<T, Error> {
let response = self
.client
.post(url)
.json(body)
.send()
.await
.map_err(Error::Reqwest)?;
ok_or_error(response)
.await?
.json()
.await
.map_err(Error::Reqwest)
}
/// `GET beacon/genesis`
///
/// ## Errors
@ -210,6 +233,35 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}
/// `GET beacon/states/{state_id}/validator_balances?id`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn get_beacon_states_validator_balances(
&self,
state_id: StateId,
ids: Option<&[ValidatorId]>,
) -> Result<Option<GenericResponse<Vec<ValidatorBalanceData>>>, Error> {
let mut path = self.eth_path()?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("states")
.push(&state_id.to_string())
.push("validator_balances");
if let Some(ids) = ids {
let id_string = ids
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
path.query_pairs_mut().append_pair("id", &id_string);
}
self.get_opt(path).await
}
/// `GET beacon/states/{state_id}/validators?id,status`
///
/// Returns `Ok(None)` on a 404 error.
@ -713,37 +765,6 @@ impl BeaconNodeHttpClient {
self.get(path).await
}
/// `GET validator/duties/attester/{epoch}?index`
///
/// ## Note
///
/// The `index` query parameter accepts a list of validator indices.
pub async fn get_validator_duties_attester(
&self,
epoch: Epoch,
index: Option<&[u64]>,
) -> Result<GenericResponse<Vec<AttesterData>>, Error> {
let mut path = self.eth_path()?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("duties")
.push("attester")
.push(&epoch.to_string());
if let Some(index) = index {
let string = index
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
path.query_pairs_mut().append_pair("index", &string);
}
self.get(path).await
}
/// `GET validator/duties/proposer/{epoch}`
pub async fn get_validator_duties_proposer(
&self,
@ -830,10 +851,28 @@ impl BeaconNodeHttpClient {
self.get_opt(path).await
}
/// `POST validator/duties/attester/{epoch}`
pub async fn post_validator_duties_attester(
&self,
epoch: Epoch,
indices: &[u64],
) -> Result<GenericResponse<Vec<AttesterData>>, Error> {
let mut path = self.eth_path()?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("duties")
.push("attester")
.push(&epoch.to_string());
self.post_with_response(path, &indices).await
}
/// `POST validator/aggregate_and_proofs`
pub async fn post_validator_aggregate_and_proof<T: EthSpec>(
&self,
aggregate: &SignedAggregateAndProof<T>,
aggregates: &[SignedAggregateAndProof<T>],
) -> Result<(), Error> {
let mut path = self.eth_path()?;
@ -842,7 +881,14 @@ impl BeaconNodeHttpClient {
.push("validator")
.push("aggregate_and_proofs");
self.post(path, aggregate).await?;
let response = self
.client
.post(path)
.json(aggregates)
.send()
.await
.map_err(Error::Reqwest)?;
ok_or_indexed_error(response).await?;
Ok(())
}
@ -878,3 +924,17 @@ async fn ok_or_error(response: Response) -> Result<Response, Error> {
Err(Error::StatusCode(status))
}
}
/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an
/// appropriate indexed error message.
async fn ok_or_indexed_error(response: Response) -> Result<Response, Error> {
let status = response.status();
if status == StatusCode::OK {
Ok(response)
} else if let Ok(message) = response.json().await {
Err(Error::ServerIndexedMessage(message))
} else {
Err(Error::StatusCode(status))
}
}

View File

@ -18,6 +18,30 @@ pub struct ErrorMessage {
pub stacktraces: Vec<String>,
}
/// An indexed API error serializable to JSON.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct IndexedErrorMessage {
pub code: u16,
pub message: String,
pub failures: Vec<Failure>,
}
/// A single failure in an index of API errors, serializable to JSON.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Failure {
pub index: u64,
pub message: String,
}
impl Failure {
pub fn new(index: usize, message: String) -> Self {
Self {
index: index as u64,
message,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct GenesisData {
#[serde(with = "serde_utils::quoted_u64")]
@ -206,6 +230,14 @@ pub struct ValidatorData {
pub validator: Validator,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ValidatorBalanceData {
#[serde(with = "serde_utils::quoted_u64")]
pub index: u64,
#[serde(with = "serde_utils::quoted_u64")]
pub balance: u64,
}
// TODO: This does not currently match the spec, but I'm going to try and change the spec using
// this proposal:
//
@ -415,10 +447,14 @@ impl<T: FromStr> TryFrom<String> for QueryVec<T> {
}
#[derive(Clone, Deserialize)]
pub struct ValidatorDutiesQuery {
pub index: Option<QueryVec<u64>>,
pub struct ValidatorBalancesQuery {
pub id: Option<QueryVec<ValidatorId>>,
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ValidatorIndexData(#[serde(with = "serde_utils::quoted_u64_vec")] pub Vec<u64>);
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AttesterData {
pub pubkey: PublicKeyBytes,
@ -438,6 +474,8 @@ pub struct AttesterData {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ProposerData {
pub pubkey: PublicKeyBytes,
#[serde(with = "serde_utils::quoted_u64")]
pub validator_index: u64,
pub slot: Slot,
}

View File

@ -1,4 +1,4 @@
use eth2::types::ErrorMessage;
use eth2::types::{ErrorMessage, Failure, IndexedErrorMessage};
use std::convert::Infallible;
use warp::{http::StatusCode, reject::Reject};
@ -110,12 +110,37 @@ pub fn invalid_auth(msg: String) -> warp::reject::Rejection {
warp::reject::custom(InvalidAuthorization(msg))
}
#[derive(Debug)]
pub struct IndexedBadRequestErrors {
pub message: String,
pub failures: Vec<Failure>,
}
impl Reject for IndexedBadRequestErrors {}
pub fn indexed_bad_request(message: String, failures: Vec<Failure>) -> warp::reject::Rejection {
warp::reject::custom(IndexedBadRequestErrors { message, failures })
}
/// This function receives a `Rejection` and tries to return a custom
/// value, otherwise simply passes the rejection along.
pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
let code;
let message;
if let Some(e) = err.find::<crate::reject::IndexedBadRequestErrors>() {
message = format!("BAD_REQUEST: {}", e.message);
code = StatusCode::BAD_REQUEST;
let json = warp::reply::json(&IndexedErrorMessage {
code: code.as_u16(),
message,
failures: e.failures.clone(),
});
return Ok(warp::reply::with_status(json, code));
}
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "NOT_FOUND".to_string();

View File

@ -437,6 +437,8 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
.ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))?
.data;
let mut signed_aggregate_and_proofs = Vec::new();
for duty_and_proof in validator_duties {
let selection_proof = if let Some(proof) = duty_and_proof.selection_proof.as_ref() {
proof
@ -462,44 +464,53 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
continue;
}
let signed_aggregate_and_proof = if let Some(aggregate) =
self.validator_store.produce_signed_aggregate_and_proof(
if let Some(aggregate) = self.validator_store.produce_signed_aggregate_and_proof(
pubkey,
validator_index,
aggregated_attestation.clone(),
selection_proof.clone(),
) {
aggregate
signed_aggregate_and_proofs.push(aggregate);
} else {
crit!(log, "Failed to sign attestation");
continue;
};
}
let attestation = &signed_aggregate_and_proof.message.aggregate;
if !signed_aggregate_and_proofs.is_empty() {
match self
.beacon_node
.post_validator_aggregate_and_proof(&signed_aggregate_and_proof)
.post_validator_aggregate_and_proof(signed_aggregate_and_proofs.as_slice())
.await
{
Ok(()) => info!(
Ok(()) => {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = &signed_aggregate_and_proof.message.aggregate;
info!(
log,
"Successfully published attestation";
"Successfully published attestations";
"aggregator" => signed_aggregate_and_proof.message.aggregator_index,
"signatures" => attestation.aggregation_bits.num_set_bits(),
"head_block" => format!("{:?}", attestation.data.beacon_block_root),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
Err(e) => crit!(
);
}
}
Err(e) => {
for signed_aggregate_and_proof in signed_aggregate_and_proofs {
let attestation = &signed_aggregate_and_proof.message.aggregate;
crit!(
log,
"Failed to publish attestation";
"error" => e.to_string(),
"committee_index" => attestation.data.index,
"slot" => attestation.data.slot.as_u64(),
"type" => "aggregated",
),
);
}
}
}
}

View File

@ -106,6 +106,10 @@ impl DutyAndProof {
pub fn validator_pubkey(&self) -> &PublicKey {
&self.duty.validator_pubkey
}
pub fn validator_index(&self) -> Option<u64> {
self.duty.validator_index
}
}
impl Into<DutyAndProof> for ValidatorDuty {
@ -229,6 +233,14 @@ impl DutiesStore {
.collect()
}
fn get_index(&self, pubkey: &PublicKey, epoch: Epoch) -> Option<u64> {
self.store
.read()
.get(pubkey)?
.get(&epoch)?
.validator_index()
}
fn is_aggregator(&self, validator_pubkey: &PublicKey, epoch: Epoch) -> Option<bool> {
Some(
self.store
@ -588,13 +600,25 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
let mut replaced = 0;
let mut invalid = 0;
// Determine which pubkeys we already know the index of by checking the duties store for
// the current epoch.
let pubkeys: Vec<(PublicKey, Option<u64>)> = self
.validator_store
.voting_pubkeys()
.into_iter()
.map(|pubkey| {
let index = self.store.get_index(&pubkey, current_epoch);
(pubkey, index)
})
.collect();
let mut validator_subscriptions = vec![];
for pubkey in self.validator_store.voting_pubkeys() {
let remote_duties = match ValidatorDuty::download(
let remote_duties: Vec<ValidatorDuty> = match ValidatorDuty::download(
&self.beacon_node,
current_epoch,
request_epoch,
pubkey,
pubkeys,
&log,
)
.await
{
@ -605,12 +629,13 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
"Failed to download validator duties";
"error" => e
);
continue;
vec![]
}
};
remote_duties.iter().for_each(|remote_duty| {
// Convert the remote duties into our local representation.
let duties: DutyAndProof = remote_duties.clone().into();
let duties: DutyAndProof = remote_duty.clone().into();
let validator_pubkey = duties.duty.validator_pubkey.clone();
@ -628,9 +653,9 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
debug!(
log,
"First duty assignment for validator";
"proposal_slots" => format!("{:?}", &remote_duties.block_proposal_slots),
"attestation_slot" => format!("{:?}", &remote_duties.attestation_slot),
"validator" => format!("{:?}", &remote_duties.validator_pubkey)
"proposal_slots" => format!("{:?}", &remote_duty.block_proposal_slots),
"attestation_slot" => format!("{:?}", &remote_duty.attestation_slot),
"validator" => format!("{:?}", &remote_duty.validator_pubkey)
);
new_validator += 1;
}
@ -645,7 +670,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
self.store.is_aggregator(&validator_pubkey, request_epoch)
{
if outcome.is_subscription_candidate() {
if let Some(subscription) = remote_duties.subscription(is_aggregator) {
if let Some(subscription) = remote_duty.subscription(is_aggregator) {
validator_subscriptions.push(subscription)
}
}
@ -657,7 +682,7 @@ impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
"error" => e
),
}
}
});
if invalid > 0 {
error!(

View File

@ -3,6 +3,8 @@ use eth2::{
BeaconNodeHttpClient,
};
use serde::{Deserialize, Serialize};
use slog::{error, Logger};
use std::collections::HashMap;
use types::{CommitteeIndex, Epoch, PublicKey, PublicKeyBytes, Slot};
/// This struct is being used as a shim since we deprecated the `rest_api` in favour of `http_api`.
@ -33,10 +35,10 @@ pub struct ValidatorDuty {
impl ValidatorDuty {
/// Instantiate `Self` as if there are no known dutes for `validator_pubkey`.
fn no_duties(validator_pubkey: PublicKey) -> Self {
fn no_duties(validator_pubkey: PublicKey, validator_index: Option<u64>) -> Self {
ValidatorDuty {
validator_pubkey,
validator_index: None,
validator_index,
attestation_slot: None,
attestation_committee_index: None,
attestation_committee_position: None,
@ -53,59 +55,114 @@ impl ValidatorDuty {
beacon_node: &BeaconNodeHttpClient,
current_epoch: Epoch,
request_epoch: Epoch,
pubkey: PublicKey,
) -> Result<ValidatorDuty, String> {
let pubkey_bytes = PublicKeyBytes::from(&pubkey);
let validator_index = if let Some(index) = beacon_node
mut pubkeys: Vec<(PublicKey, Option<u64>)>,
log: &Logger,
) -> Result<Vec<ValidatorDuty>, String> {
for (pubkey, index_opt) in &mut pubkeys {
if index_opt.is_none() {
*index_opt = beacon_node
.get_beacon_states_validator_id(
StateId::Head,
&ValidatorId::PublicKey(pubkey_bytes.clone()),
&ValidatorId::PublicKey(PublicKeyBytes::from(&*pubkey)),
)
.await
.map_err(|e| format!("Failed to get validator index: {}", e))?
.map(|body| body.data.index)
{
index
} else {
return Ok(Self::no_duties(pubkey));
};
.map_err(|e| {
error!(
log,
"Failed to obtain validator index";
"pubkey" => ?pubkey,
"error" => ?e
)
})
// Supress the error since we've already logged an error and we don't want to
// stop the rest of the code.
.ok()
.and_then(|body_opt| body_opt.map(|body| body.data.index));
}
}
if let Some(attester) = beacon_node
.get_validator_duties_attester(request_epoch, Some(&[validator_index]))
.await
.map_err(|e| format!("Failed to get attester duties: {}", e))?
.data
.first()
{
let block_proposal_slots = if current_epoch == request_epoch {
// Query for all block proposer duties in the current epoch and map the response by index.
let proposal_slots_by_index: HashMap<u64, Vec<Slot>> = if current_epoch == request_epoch {
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
.map_err(|e| format!("Failed to get proposer indices: {}", e))?
.data
.map(|resp| resp.data)
// Exit early if there's an error.
.map_err(|e| format!("Failed to get proposer indices: {:?}", e))?
.into_iter()
.filter(|data| data.pubkey == pubkey_bytes)
.map(|data| data.slot)
.collect()
.fold(
HashMap::with_capacity(pubkeys.len()),
|mut map, proposer_data| {
map.entry(proposer_data.validator_index)
.or_insert_with(Vec::new)
.push(proposer_data.slot);
map
},
)
} else {
vec![]
HashMap::new()
};
Ok(ValidatorDuty {
let query_indices = pubkeys
.iter()
.filter_map(|(_, index_opt)| *index_opt)
.collect::<Vec<_>>();
let attester_data_map = beacon_node
.post_validator_duties_attester(request_epoch, query_indices.as_slice())
.await
.map(|resp| resp.data)
// Exit early if there's an error.
.map_err(|e| format!("Failed to get attester duties: {:?}", e))?
.into_iter()
.fold(
HashMap::with_capacity(pubkeys.len()),
|mut map, attester_data| {
map.insert(attester_data.validator_index, attester_data);
map
},
);
let duties = pubkeys
.into_iter()
.map(|(pubkey, index_opt)| {
if let Some(index) = index_opt {
if let Some(attester_data) = attester_data_map.get(&index) {
match attester_data.pubkey.decompress() {
Ok(pubkey) => ValidatorDuty {
validator_pubkey: pubkey,
validator_index: Some(attester.validator_index),
attestation_slot: Some(attester.slot),
attestation_committee_index: Some(attester.committee_index),
attestation_committee_position: Some(attester.validator_committee_index as usize),
committee_count_at_slot: Some(attester.committees_at_slot),
committee_length: Some(attester.committee_length),
block_proposal_slots: Some(block_proposal_slots),
})
} else {
Ok(Self::no_duties(pubkey))
validator_index: Some(attester_data.validator_index),
attestation_slot: Some(attester_data.slot),
attestation_committee_index: Some(attester_data.committee_index),
attestation_committee_position: Some(
attester_data.validator_committee_index as usize,
),
committee_count_at_slot: Some(attester_data.committees_at_slot),
committee_length: Some(attester_data.committee_length),
block_proposal_slots: proposal_slots_by_index
.get(&attester_data.validator_index)
.cloned(),
},
Err(e) => {
error!(
log,
"Could not deserialize validator public key";
"error" => format!("{:?}", e),
"validator_index" => attester_data.validator_index
);
Self::no_duties(pubkey, Some(index))
}
}
} else {
Self::no_duties(pubkey, Some(index))
}
} else {
Self::no_duties(pubkey, None)
}
})
.collect();
Ok(duties)
}
/// Return `true` if these validator duties are equal, ignoring their `block_proposal_slots`.
pub fn eq_ignoring_proposal_slots(&self, other: &Self) -> bool {