Further restructuring futures API.

- Adding try_future! macros where necessary
 - Returning ApiResult and mapping it to future instead
 - Upgrading POST publish block to return a future directly
This commit is contained in:
Luke Anderson 2019-09-11 18:02:00 +10:00
parent ebd97730d5
commit 2739ee83f9
No known key found for this signature in database
GPG Key ID: 44408169EC61E228
5 changed files with 68 additions and 60 deletions

View File

@ -203,7 +203,7 @@ pub fn get_state<T: BeaconChainTypes + 'static>(req: Request<Body>) -> BoxFut {
try_future!(parse_slot(&value))
)),
("root", value) => {
let root = &try_future!(parse_root(&value));
let root: &Hash256 = &try_future!(parse_root(&value));
let state = try_future!(try_future!(beacon_chain.store.get(root))
.ok_or_else(|| ApiError::NotFound(format!("No state for root: {:?}", root))));

View File

@ -23,6 +23,13 @@ pub fn success_response<T: Serialize + Encode>(req: Request<Body>, item: &T) ->
})
}
pub fn success_response_2<T: Serialize + Encode>(req: Request<Body>, item: &T) -> ApiResult {
ResponseBuilder::new(&req).body(item)
}
pub fn success_response_2_json<T: Serialize>(req: Request<Body>, item: &T) -> ApiResult {
ResponseBuilder::new(&req).body_json(item)
}
pub fn success_response_json<T: Serialize>(req: Request<Body>, item: &T) -> BoxFut {
if let Err(e) = check_content_type_for_json(&req) {
return Box::new(futures::future::err(e));
@ -213,11 +220,10 @@ pub fn state_root_at_slot<T: BeaconChainTypes>(
}
}
pub fn implementation_pending_response(_req: Request<Body>) -> BoxFut {
ApiError::NotImplemented(
pub fn implementation_pending_response(_req: Request<Body>) -> ApiResult {
Err(ApiError::NotImplemented(
"API endpoint has not yet been implemented, but is planned to be soon.".to_owned(),
)
.into()
))
}
pub fn get_beacon_chain_from_request<T: BeaconChainTypes + 'static>(

View File

@ -33,6 +33,7 @@ use std::sync::Arc;
use tokio::runtime::TaskExecutor;
use tokio::sync::mpsc;
use url_query::UrlQuery;
use futures::future::IntoFuture;
pub use beacon::{BlockResponse, HeadResponse, StateResponse};
pub use config::Config as ApiConfig;
@ -81,9 +82,10 @@ impl<T: BeaconChainTypes> Service for ApiService<T> {
// Route the request to the correct handler.
let result = match (req.method(), path.as_ref()) {
// Methods for Client
(&Method::GET, "/node/version") => node::get_version(req),
(&Method::GET, "/node/genesis_time") => node::get_genesis_time::<T>(req),
(&Method::GET, "/node/syncing") => helpers::implementation_pending_response(req),
(&Method::GET, "/node/version") => Box::new(node::get_version(req).into_future()),
(&Method::GET, "/node/genesis_time") => Box::new(node::get_genesis_time::<T>(req).into_future()),
(&Method::GET, "/node/syncing") => Box::new(helpers::implementation_pending_response(req).into_future()),
/*
// Methods for Network
(&Method::GET, "/network/enr") => network::get_enr::<T>(req),
@ -112,11 +114,12 @@ impl<T: BeaconChainTypes> Service for ApiService<T> {
helpers::implementation_pending_response(req)
}
/*
// Methods for Validator
(&Method::GET, "/beacon/validator/duties") => validator::get_validator_duties::<T>(req),
(&Method::GET, "/beacon/validator/block") => validator::get_new_beacon_block::<T>(req),
//(&Method::POST, "/beacon/validator/block") => validator::publish_beacon_block::<T>(req),
*/
(&Method::POST, "/beacon/validator/block") => validator::publish_beacon_block::<T>(req),
/*
(&Method::GET, "/beacon/validator/attestation") => {
validator::get_new_attestation::<T>(req)
}

View File

@ -5,19 +5,13 @@ use hyper::{Body, Request};
use version;
/// Read the version string from the current Lighthouse build.
pub fn get_version(req: Request<Body>) -> BoxFut {
success_response_json(req, &version::version())
pub fn get_version(req: Request<Body>) -> ApiResult {
success_response_2_json(req, &version::version())
}
/// Read the genesis time from the current beacon chain state.
pub fn get_genesis_time<T: BeaconChainTypes + 'static>(req: Request<Body>) -> BoxFut {
let bc = get_beacon_chain_from_request::<T>(&req);
let (_beacon_chain, head_state) = match bc {
Ok((bc, hs)) => (bc, hs),
Err(e) => {
return e.into();
}
};
pub fn get_genesis_time<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
let (_beacon_chain, head_state) = get_beacon_chain_from_request::<T>(&req)?;
let gen_time: u64 = head_state.genesis_time;
success_response(req, &gen_time)
success_response_2(req, &gen_time)
}

View File

@ -1,5 +1,6 @@
use crate::helpers::*;
use crate::{ApiError, ApiResult, UrlQuery};
use crate::response_builder::ResponseBuilder;
use crate::{ApiError, ApiResult, UrlQuery, BoxFut};
use beacon_chain::{BeaconChainTypes, BlockProcessingOutcome};
use bls::{AggregateSignature, PublicKey, Signature};
use futures::future::Future;
@ -204,10 +205,10 @@ pub fn get_new_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -
}
/// HTTP Handler to publish a BeaconBlock, which has been signed by a validator.
pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -> ApiResult {
let _ = check_content_type_for_json(&req)?;
pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -> BoxFut {
let _ = try_future!(check_content_type_for_json(&req));
let log = get_logger_from_request(&req);
let (beacon_chain, _head_state) = get_beacon_chain_from_request::<T>(&req)?;
let (beacon_chain, _head_state) = try_future!(get_beacon_chain_from_request::<T>(&req));
// Get the network sending channel from the request, for later transmission
let network_chan = req
.extensions()
@ -215,49 +216,53 @@ pub fn publish_beacon_block<T: BeaconChainTypes + 'static>(req: Request<Body>) -
.expect("Should always get the network channel from the request, since we put it in there.")
.clone();
let response_builder = ResponseBuilder::new(&req);
let body = req.into_body();
trace!(
log,
"Got the request body, now going to parse it into a block."
);
let block_future = body
Box::new(body
.concat2()
.map(move |chunk| chunk.iter().cloned().collect::<Vec<u8>>())
.map(|chunks| {
let block_result: Result<BeaconBlock<T::EthSpec>, ApiError> =
serde_json::from_slice(&chunks.as_slice()).map_err(|e| {
ApiError::InvalidQueryParams(format!(
"Unable to deserialize JSON into a BeaconBlock: {:?}",
.map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}",e)))
.map(|chunk| chunk.iter().cloned().collect::<Vec<u8>>())
.and_then(|chunks| {
serde_json::from_slice(&chunks.as_slice()).map_err(|e| {
ApiError::InvalidQueryParams(format!(
"Unable to deserialize JSON into a BeaconBlock: {:?}",
e
))
})
})
.and_then(move |block: BeaconBlock<T::EthSpec>| {
let slot = block.slot;
match beacon_chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::Processed { block_root }) => {
// Block was processed, publish via gossipsub
info!(log, "Processed valid block from API, transmitting to network."; "block_slot" => slot, "block_root" => format!("{}", block_root));
publish_beacon_block_to_network::<T>(network_chan, block)
}
Ok(outcome) => {
warn!(log, "Block could not be processed, but is being sent to the network anyway."; "block_slot" => slot, "outcome" => format!("{:?}", outcome));
//TODO need to send to network and return http 202
Err(ApiError::InvalidQueryParams(format!(
"The BeaconBlock could not be processed: {:?}",
outcome
)))
}
Err(e) => {
Err(ApiError::ServerError(format!(
"Unable to process block: {:?}",
e
))
});
block_result
});
let block = block_future.wait()??;
trace!(log, "BeaconBlock successfully parsed from JSON");
match beacon_chain.process_block(block.clone()) {
Ok(BlockProcessingOutcome::Processed { block_root }) => {
// Block was processed, publish via gossipsub
info!(log, "Processed valid block from API, transmitting to network."; "block_slot" => block.slot, "block_root" => format!("{}", block_root));
publish_beacon_block_to_network::<T>(network_chan, block)?;
}
Ok(outcome) => {
warn!(log, "Block could not be processed, but is being sent to the network anyway."; "block_slot" => block.slot, "outcome" => format!("{:?}", outcome));
//TODO need to send to network and return http 202
return Err(ApiError::InvalidQueryParams(format!(
"The BeaconBlock could not be processed: {:?}",
outcome
)));
}
Err(e) => {
return Err(ApiError::ServerError(format!(
"Unable to process block: {:?}",
e
)));
}
}
)))
}
}
}).and_then(|_| {
response_builder.body_json(&())
}))
Ok(success_response_old(Body::empty()))
}
/// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator.