Return HTTP 202 to indicate processing error.
- A processing error of a validator's block or attestation should not prevent publishing. Now a 202 error is returned, to indicate that it has not been processed, but has still been published. - Added a publish_attestation function to the API, handling POST requests for /beacon/validator/attestation.
This commit is contained in:
parent
00a5f003c4
commit
23ce271b5f
@ -10,7 +10,8 @@ pub enum ApiError {
|
|||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
NotFound(String),
|
NotFound(String),
|
||||||
UnsupportedType(String),
|
UnsupportedType(String),
|
||||||
ImATeapot(String), // Just in case.
|
ImATeapot(String), // Just in case.
|
||||||
|
ProcessingError(String), // A 202 error, for when a block/attestation cannot be processed, but still transmitted.
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type ApiResult = Result<Response<Body>, ApiError>;
|
pub type ApiResult = Result<Response<Body>, ApiError>;
|
||||||
@ -25,6 +26,7 @@ impl ApiError {
|
|||||||
ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc),
|
ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc),
|
||||||
ApiError::UnsupportedType(desc) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, desc),
|
ApiError::UnsupportedType(desc) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, desc),
|
||||||
ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc),
|
ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc),
|
||||||
|
ApiError::ProcessingError(desc) => (StatusCode::ACCEPTED, desc),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -34,7 +36,7 @@ impl Into<Response<Body>> for ApiError {
|
|||||||
let status_code = self.status_code();
|
let status_code = self.status_code();
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.status(status_code.0)
|
.status(status_code.0)
|
||||||
.header("content-type", "text/plain")
|
.header("content-type", "text/plain; charset=utf-8")
|
||||||
.body(Body::from(status_code.1))
|
.body(Body::from(status_code.1))
|
||||||
.expect("Response should always be created.")
|
.expect("Response should always be created.")
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,9 @@ use crate::{ApiError, ApiResult};
|
|||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
use bls::PublicKey;
|
use bls::PublicKey;
|
||||||
use eth2_libp2p::{PubsubMessage, Topic};
|
use eth2_libp2p::{PubsubMessage, Topic};
|
||||||
use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX};
|
use eth2_libp2p::{
|
||||||
|
BEACON_ATTESTATION_TOPIC, BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX,
|
||||||
|
};
|
||||||
use hex;
|
use hex;
|
||||||
use http::header;
|
use http::header;
|
||||||
use hyper::{Body, Request};
|
use hyper::{Body, Request};
|
||||||
@ -12,7 +14,7 @@ use ssz::Encode;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use store::{iter::AncestorIter, Store};
|
use store::{iter::AncestorIter, Store};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use types::{BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot};
|
use types::{Attestation, BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot};
|
||||||
|
|
||||||
/// Parse a slot from a `0x` preixed string.
|
/// Parse a slot from a `0x` preixed string.
|
||||||
///
|
///
|
||||||
@ -227,7 +229,7 @@ pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
|
|||||||
// Publish the block to the p2p network via gossipsub.
|
// Publish the block to the p2p network via gossipsub.
|
||||||
if let Err(e) = chan.write().try_send(NetworkMessage::Publish {
|
if let Err(e) = chan.write().try_send(NetworkMessage::Publish {
|
||||||
topics: vec![topic],
|
topics: vec![topic],
|
||||||
message: message,
|
message,
|
||||||
}) {
|
}) {
|
||||||
return Err(ApiError::ServerError(format!(
|
return Err(ApiError::ServerError(format!(
|
||||||
"Unable to send new block to network: {:?}",
|
"Unable to send new block to network: {:?}",
|
||||||
@ -238,6 +240,32 @@ pub fn publish_beacon_block_to_network<T: BeaconChainTypes + 'static>(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn publish_attestation_to_network<T: BeaconChainTypes + 'static>(
|
||||||
|
chan: Arc<RwLock<mpsc::UnboundedSender<NetworkMessage>>>,
|
||||||
|
attestation: Attestation<T::EthSpec>,
|
||||||
|
) -> Result<(), ApiError> {
|
||||||
|
// create the network topic to send on
|
||||||
|
let topic_string = format!(
|
||||||
|
"/{}/{}/{}",
|
||||||
|
TOPIC_PREFIX, BEACON_ATTESTATION_TOPIC, TOPIC_ENCODING_POSTFIX
|
||||||
|
);
|
||||||
|
let topic = Topic::new(topic_string);
|
||||||
|
let message = PubsubMessage::Attestation(attestation.as_ssz_bytes());
|
||||||
|
|
||||||
|
// Publish the attestation to the p2p network via gossipsub.
|
||||||
|
if let Err(e) = chan.write().try_send(NetworkMessage::Publish {
|
||||||
|
topics: vec![topic],
|
||||||
|
message,
|
||||||
|
}) {
|
||||||
|
return Err(ApiError::ServerError(format!(
|
||||||
|
"Unable to send new attestation to network: {:?}",
|
||||||
|
e
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> Service for ApiService<T> {
|
|||||||
into_boxfut(validator::get_new_attestation::<T>(req))
|
into_boxfut(validator::get_new_attestation::<T>(req))
|
||||||
}
|
}
|
||||||
(&Method::POST, "/beacon/validator/attestation") => {
|
(&Method::POST, "/beacon/validator/attestation") => {
|
||||||
into_boxfut(helpers::implementation_pending_response(req))
|
validator::publish_attestation::<T>(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
(&Method::GET, "/beacon/state") => into_boxfut(beacon::get_state::<T>(req)),
|
(&Method::GET, "/beacon/state") => into_boxfut(beacon::get_state::<T>(req)),
|
||||||
@ -164,6 +164,7 @@ impl<T: BeaconChainTypes> Service for ApiService<T> {
|
|||||||
(&Method::GET, "/spec/eth2_config") => into_boxfut(spec::get_eth2_config::<T>(req)),
|
(&Method::GET, "/spec/eth2_config") => into_boxfut(spec::get_eth2_config::<T>(req)),
|
||||||
|
|
||||||
(&Method::GET, "/metrics") => into_boxfut(metrics::get_prometheus::<T>(req)),
|
(&Method::GET, "/metrics") => into_boxfut(metrics::get_prometheus::<T>(req)),
|
||||||
|
|
||||||
_ => Box::new(futures::future::err(ApiError::NotFound(
|
_ => Box::new(futures::future::err(ApiError::NotFound(
|
||||||
"Request path and/or method not found.".to_owned(),
|
"Request path and/or method not found.".to_owned(),
|
||||||
))),
|
))),
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use crate::helpers::{
|
use crate::helpers::{
|
||||||
check_content_type_for_json, get_beacon_chain_from_request, get_logger_from_request,
|
check_content_type_for_json, get_beacon_chain_from_request, get_logger_from_request,
|
||||||
parse_pubkey, publish_beacon_block_to_network,
|
parse_pubkey, publish_attestation_to_network, publish_beacon_block_to_network,
|
||||||
};
|
};
|
||||||
use crate::response_builder::ResponseBuilder;
|
use crate::response_builder::ResponseBuilder;
|
||||||
use crate::{ApiError, ApiResult, BoxFut, UrlQuery};
|
use crate::{ApiError, ApiResult, BoxFut, UrlQuery};
|
||||||
use beacon_chain::{BeaconChainTypes, BlockProcessingOutcome};
|
use beacon_chain::{AttestationProcessingOutcome, BeaconChainTypes, BlockProcessingOutcome};
|
||||||
use bls::{AggregateSignature, PublicKey, Signature};
|
use bls::{AggregateSignature, PublicKey, Signature};
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
@ -228,16 +228,16 @@ pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -
|
|||||||
publish_beacon_block_to_network::<T>(network_chan, block)
|
publish_beacon_block_to_network::<T>(network_chan, block)
|
||||||
}
|
}
|
||||||
Ok(outcome) => {
|
Ok(outcome) => {
|
||||||
warn!(log, "Block could not be processed, but is being sent to the network anyway."; "block_slot" => slot, "outcome" => format!("{:?}", outcome));
|
warn!(log, "BeaconBlock could not be processed, but is being sent to the network anyway."; "outcome" => format!("{:?}", outcome));
|
||||||
//TODO need to send to network and return http 202
|
publish_beacon_block_to_network::<T>(network_chan, block)?;
|
||||||
Err(ApiError::BadRequest(format!(
|
Err(ApiError::ProcessingError(format!(
|
||||||
"The BeaconBlock could not be processed: {:?}",
|
"The BeaconBlock could not be processed, but has still been published: {:?}",
|
||||||
outcome
|
outcome
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
Err(ApiError::ServerError(format!(
|
Err(ApiError::ServerError(format!(
|
||||||
"Unable to process block: {:?}",
|
"Error while processing block: {:?}",
|
||||||
e
|
e
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
@ -351,3 +351,61 @@ pub fn get_new_attestation<T: BeaconChainTypes + 'static>(req: Request<Body>) ->
|
|||||||
|
|
||||||
ResponseBuilder::new(&req)?.body(&attestation)
|
ResponseBuilder::new(&req)?.body(&attestation)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// HTTP Handler to publish an Attestation, which has been signed by a validator.
|
||||||
|
pub fn publish_attestation<T: BeaconChainTypes + 'static>(req: Request<Body>) -> BoxFut {
|
||||||
|
let _ = try_future!(check_content_type_for_json(&req));
|
||||||
|
let log = get_logger_from_request(&req);
|
||||||
|
let beacon_chain = try_future!(get_beacon_chain_from_request::<T>(&req));
|
||||||
|
// Get the network sending channel from the request, for later transmission
|
||||||
|
let network_chan = req
|
||||||
|
.extensions()
|
||||||
|
.get::<Arc<RwLock<mpsc::UnboundedSender<NetworkMessage>>>>()
|
||||||
|
.expect("Should always get the network channel from the request, since we put it in there.")
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let response_builder = ResponseBuilder::new(&req);
|
||||||
|
|
||||||
|
let body = req.into_body();
|
||||||
|
trace!(
|
||||||
|
log,
|
||||||
|
"Got the request body, now going to parse it into an attesation."
|
||||||
|
);
|
||||||
|
Box::new(body
|
||||||
|
.concat2()
|
||||||
|
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}",e)))
|
||||||
|
.map(|chunk| chunk.iter().cloned().collect::<Vec<u8>>())
|
||||||
|
.and_then(|chunks| {
|
||||||
|
serde_json::from_slice(&chunks.as_slice()).map_err(|e| {
|
||||||
|
ApiError::BadRequest(format!(
|
||||||
|
"Unable to deserialize JSON into a BeaconBlock: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.and_then(move |attestation: Attestation<T::EthSpec>| {
|
||||||
|
match beacon_chain.process_attestation(attestation.clone()) {
|
||||||
|
Ok(AttestationProcessingOutcome::Processed) => {
|
||||||
|
// Block was processed, publish via gossipsub
|
||||||
|
info!(log, "Processed valid attestation from API, transmitting to network.");
|
||||||
|
publish_attestation_to_network::<T>(network_chan, attestation)
|
||||||
|
}
|
||||||
|
Ok(outcome) => {
|
||||||
|
warn!(log, "Attestation could not be processed, but is being sent to the network anyway."; "outcome" => format!("{:?}", outcome));
|
||||||
|
publish_attestation_to_network::<T>(network_chan, attestation)?;
|
||||||
|
Err(ApiError::ProcessingError(format!(
|
||||||
|
"The Attestation could not be processed, but has still been published: {:?}",
|
||||||
|
outcome
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
Err(ApiError::ServerError(format!(
|
||||||
|
"Error while processing attestation: {:?}",
|
||||||
|
e
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).and_then(|_| {
|
||||||
|
response_builder?.body_no_ssz(&())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user