diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index 66f5e7731..4c57e4770 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -1,5 +1,6 @@ -use super::{success_response, ApiResult, ResponseBuilder}; -use crate::{helpers::*, ApiError, UrlQuery}; +use crate::helpers::*; +use crate::response_builder::ResponseBuilder; +use crate::{ApiError, ApiResult, BoxFut, NetworkService, UrlQuery}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use hyper::{Body, Request}; use serde::Serialize; @@ -57,7 +58,7 @@ pub fn get_head(req: Request) -> ApiResult let json: String = serde_json::to_string(&head) .map_err(|e| ApiError::ServerError(format!("Unable to serialize HeadResponse: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } #[derive(Serialize, Encode)] @@ -121,7 +122,7 @@ pub fn get_block_root(req: Request) -> ApiR let json: String = serde_json::to_string(&root) .map_err(|e| ApiError::ServerError(format!("Unable to serialize root: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } /// HTTP handler to return the `Fork` of the current head. @@ -132,7 +133,7 @@ pub fn get_fork(req: Request) -> ApiResult ApiError::ServerError(format!("Unable to serialize BeaconState::Fork: {:?}", e)) })?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } /// HTTP handler to return the set of validators for an `Epoch` @@ -243,7 +244,7 @@ pub fn get_state_root(req: Request) -> ApiR let json: String = serde_json::to_string(&root) .map_err(|e| ApiError::ServerError(format!("Unable to serialize root: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } /// HTTP handler to return the highest finalized slot. @@ -261,7 +262,7 @@ pub fn get_current_finalized_checkpoint( let json: String = serde_json::to_string(&checkpoint) .map_err(|e| ApiError::ServerError(format!("Unable to serialize checkpoint: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } /// HTTP handler to return a `BeaconState` at the genesis block. diff --git a/beacon_node/rest_api/src/error.rs b/beacon_node/rest_api/src/error.rs index b6b1bbfb5..138affae4 100644 --- a/beacon_node/rest_api/src/error.rs +++ b/beacon_node/rest_api/src/error.rs @@ -1,3 +1,4 @@ +use crate::BoxFut; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use std::error::Error as StdError; @@ -37,6 +38,12 @@ impl Into> for ApiError { } } +impl Into for ApiError { + fn into(self) -> BoxFut { + Box::new(futures::future::err(self)) + } +} + impl From for ApiError { fn from(e: store::Error) -> ApiError { ApiError::ServerError(format!("Database error: {:?}", e)) @@ -55,6 +62,12 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(e: hyper::error::Error) -> ApiError { + ApiError::ServerError(format!("Networking error: {:?}", e)) + } +} + impl StdError for ApiError { fn cause(&self) -> Option<&StdError> { None diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index bff7d9ece..006deb268 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -1,19 +1,46 @@ -use crate::{ApiError, ApiResult}; +use crate::response_builder::ResponseBuilder; +use crate::{ApiError, ApiResult, BoxFut}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bls::PublicKey; use eth2_libp2p::{PubsubMessage, Topic}; use eth2_libp2p::{BEACON_BLOCK_TOPIC, TOPIC_ENCODING_POSTFIX, TOPIC_PREFIX}; use hex; use http::header; -use hyper::{Body, Request}; +use hyper::{Body, Request, Response, StatusCode}; use network::NetworkMessage; use parking_lot::RwLock; +use serde::Serialize; use ssz::Encode; use std::sync::Arc; use store::{iter::AncestorIter, Store}; use tokio::sync::mpsc; use types::{BeaconBlock, BeaconState, EthSpec, Hash256, RelativeEpoch, Slot}; +pub fn success_response(req: Request, item: &T) -> BoxFut { + Box::new(match ResponseBuilder::new(&req).body(item) { + Ok(resp) => futures::future::ok(resp), + Err(e) => futures::future::err(e), + }) +} + +pub fn success_response_json(req: Request, item: &T) -> BoxFut { + if let Err(e) = check_content_type_for_json(&req) { + return Box::new(futures::future::err(e)); + } + Box::new(match ResponseBuilder::new(&req).body_json(item) { + Ok(resp) => futures::future::ok(resp), + Err(e) => futures::future::err(e), + }) +} + +pub fn success_response_old(body: Body) -> Response { + Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/json") + .body(body) + .expect("We should always be able to make response from the success body.") +} + /// Parse a slot from a `0x` preixed string. /// /// E.g., `"1234"` @@ -24,6 +51,21 @@ pub fn parse_slot(string: &str) -> Result { .map_err(|e| ApiError::InvalidQueryParams(format!("Unable to parse slot: {:?}", e))) } +/// Checks the provided request to ensure that the `content-type` header. +/// +/// The content-type header should either be omitted, in which case JSON is assumed, or it should +/// explicity specify `application/json`. If anything else is provided, an error is returned. +pub fn check_content_type_for_json(req: &Request) -> Result<(), ApiError> { + match req.headers().get(header::CONTENT_TYPE) { + Some(h) if h == "application/json" => Ok(()), + Some(h) => Err(ApiError::InvalidQueryParams(format!( + "The provided content-type {:?} is not available, this endpoint only supports json.", + h + ))), + _ => Ok(()), + } +} + /// Parse a root from a `0x` preixed string. /// /// E.g., `"0x0000000000000000000000000000000000000000000000000000000000000000"` @@ -42,21 +84,6 @@ pub fn parse_root(string: &str) -> Result { } } -/// Checks the provided request to ensure that the `content-type` header. -/// -/// The content-type header should either be omitted, in which case JSON is assumed, or it should -/// explicity specify `application/json`. If anything else is provided, an error is returned. -pub fn check_content_type_for_json(req: &Request) -> Result<(), ApiError> { - match req.headers().get(header::CONTENT_TYPE) { - Some(h) if h == "application/json" => Ok(()), - Some(h) => Err(ApiError::InvalidQueryParams(format!( - "The provided content-type {:?} is not available, it must be JSON.", - h - ))), - _ => Ok(()), - } -} - /// Parse a PublicKey from a `0x` prefixed hex string pub fn parse_pubkey(string: &str) -> Result { const PREFIX: &str = "0x"; @@ -186,10 +213,11 @@ pub fn state_root_at_slot( } } -pub fn implementation_pending_response(_req: Request) -> ApiResult { - Err(ApiError::NotImplemented( +pub fn implementation_pending_response(_req: Request) -> BoxFut { + ApiError::NotImplemented( "API endpoint has not yet been implemented, but is planned to be soon.".to_owned(), - )) + ) + .into() } pub fn get_beacon_chain_from_request( diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 0852dd1a3..dc4abc2bf 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -24,7 +24,6 @@ use hyper::server::conn::AddrStream; use hyper::service::{MakeService, Service}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use parking_lot::RwLock; -use response_builder::ResponseBuilder; use slog::{info, o, warn}; use std::ops::Deref; use std::path::PathBuf; @@ -59,6 +58,8 @@ impl Service for ApiService { metrics::inc_counter(&metrics::REQUEST_COUNT); let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); + // Add all the useful bits into the request, so that we can pull them out in the individual + // functions. req.extensions_mut() .insert::(self.log.clone()); req.extensions_mut() @@ -90,6 +91,7 @@ impl Service for ApiService { (&Method::GET, "/network/listen_port") => network::get_listen_port::(req), (&Method::GET, "/network/listen_addresses") => network::get_listen_addresses::(req), + /* // Methods for Beacon Node (&Method::GET, "/beacon/head") => beacon::get_head::(req), (&Method::GET, "/beacon/block") => beacon::get_block::(req), @@ -137,13 +139,13 @@ impl Service for ApiService { (&Method::GET, "/spec/eth2_config") => spec::get_eth2_config::(req), (&Method::GET, "/metrics") => metrics::get_prometheus::(req), - - _ => Err(ApiError::NotFound( + */ + _ => Box::new(futures::future::err(ApiError::NotFound( "Request path and/or method not found.".to_owned(), - )), + ))), }; - let response = match result { + let response = match result.wait() { // Return the `hyper::Response`. Ok(response) => { metrics::inc_counter(&metrics::SUCCESS_COUNT); @@ -228,14 +230,6 @@ pub fn start_server( Ok(exit_signal) } -fn success_response(body: Body) -> Response { - Response::builder() - .status(StatusCode::OK) - .header("content-type", "application/json") - .body(body) - .expect("We should always be able to make response from the success body.") -} - #[derive(Clone)] pub struct DBPath(PathBuf); diff --git a/beacon_node/rest_api/src/metrics.rs b/beacon_node/rest_api/src/metrics.rs index 62a769de1..d430db3f5 100644 --- a/beacon_node/rest_api/src/metrics.rs +++ b/beacon_node/rest_api/src/metrics.rs @@ -1,4 +1,5 @@ -use crate::{helpers::*, success_response, ApiError, ApiResult, DBPath}; +use crate::helpers::*; +use crate::{ApiError, ApiResult, DBPath}; use beacon_chain::BeaconChainTypes; use http::HeaderValue; use hyper::{Body, Request}; @@ -62,7 +63,7 @@ pub fn get_prometheus(req: Request) -> ApiR String::from_utf8(buffer) .map(|string| { - let mut response = success_response(Body::from(string)); + let mut response = success_response_old(Body::from(string)); // Need to change the header to text/plain for prometheus response.headers_mut().insert( "content-type", diff --git a/beacon_node/rest_api/src/network.rs b/beacon_node/rest_api/src/network.rs index 4f1f53bb9..e037d43f0 100644 --- a/beacon_node/rest_api/src/network.rs +++ b/beacon_node/rest_api/src/network.rs @@ -1,4 +1,5 @@ -use crate::{success_response, ApiError, ApiResult, NetworkService}; +use crate::helpers::*; +use crate::{ApiError, BoxFut, NetworkService}; use beacon_chain::BeaconChainTypes; use eth2_libp2p::{Enr, Multiaddr, PeerId}; use hyper::{Body, Request}; @@ -7,92 +8,75 @@ use std::sync::Arc; /// HTTP handler to return the list of libp2p multiaddr the client is listening on. /// /// Returns a list of `Multiaddr`, serialized according to their `serde` impl. -pub fn get_listen_addresses(req: Request) -> ApiResult { +pub fn get_listen_addresses(req: Request) -> BoxFut { let network = req .extensions() .get::>>() - .ok_or_else(|| ApiError::ServerError("NetworkService extension missing".to_string()))?; - + .expect("The network service should always be there, we put it there"); let multiaddresses: Vec = network.listen_multiaddrs(); - - Ok(success_response(Body::from( - serde_json::to_string(&multiaddresses) - .map_err(|e| ApiError::ServerError(format!("Unable to serialize Enr: {:?}", e)))?, - ))) + success_response_json(req, &multiaddresses) } /// HTTP handler to return the network port the client is listening on. /// /// Returns the TCP port number in its plain form (which is also valid JSON serialization) -pub fn get_listen_port(req: Request) -> ApiResult { +pub fn get_listen_port(req: Request) -> BoxFut { let network = req .extensions() .get::>>() - .ok_or_else(|| ApiError::ServerError("NetworkService extension missing".to_string()))?; + .expect("The network service should always be there, we put it there") + .clone(); - Ok(success_response(Body::from( - serde_json::to_string(&network.listen_port()) - .map_err(|e| ApiError::ServerError(format!("Unable to serialize port: {:?}", e)))?, - ))) + success_response(req, &network.listen_port()) } /// HTTP handler to return the Discv5 ENR from the client's libp2p service. /// /// ENR is encoded as base64 string. -pub fn get_enr(req: Request) -> ApiResult { +pub fn get_enr(req: Request) -> BoxFut { let network = req .extensions() .get::>>() - .ok_or_else(|| ApiError::ServerError("NetworkService extension missing".to_string()))?; + .expect("The network service should always be there, we put it there"); let enr: Enr = network.local_enr(); - - Ok(success_response(Body::from( - serde_json::to_string(&enr.to_base64()) - .map_err(|e| ApiError::ServerError(format!("Unable to serialize Enr: {:?}", e)))?, - ))) + success_response_json(req, &enr.to_base64()) } /// HTTP handler to return the `PeerId` from the client's libp2p service. /// /// PeerId is encoded as base58 string. -pub fn get_peer_id(req: Request) -> ApiResult { +pub fn get_peer_id(req: Request) -> BoxFut { let network = req .extensions() .get::>>() - .ok_or_else(|| ApiError::ServerError("NetworkService extension missing".to_string()))?; + .expect("The network service should always be there, we put it there"); let peer_id: PeerId = network.local_peer_id(); - Ok(success_response(Body::from( - serde_json::to_string(&peer_id.to_base58()) - .map_err(|e| ApiError::ServerError(format!("Unable to serialize Enr: {:?}", e)))?, - ))) + success_response_json(req, &peer_id.to_base58()) } /// HTTP handler to return the number of peers connected in the client's libp2p service. -pub fn get_peer_count(req: Request) -> ApiResult { +pub fn get_peer_count(req: Request) -> BoxFut { let network = req .extensions() .get::>>() - .ok_or_else(|| ApiError::ServerError("NetworkService extension missing".to_string()))?; + .expect("The network service should always be there, we put it there"); let connected_peers: usize = network.connected_peers(); - Ok(success_response(Body::from( - serde_json::to_string(&connected_peers) - .map_err(|e| ApiError::ServerError(format!("Unable to serialize Enr: {:?}", e)))?, - ))) + success_response(req, &connected_peers) } /// HTTP handler to return the list of peers connected to the client's libp2p service. /// /// Peers are presented as a list of `PeerId::to_string()`. -pub fn get_peer_list(req: Request) -> ApiResult { +pub fn get_peer_list(req: Request) -> BoxFut { let network = req .extensions() .get::>>() - .ok_or_else(|| ApiError::ServerError("NetworkService extension missing".to_string()))?; + .expect("The network service should always be there, we put it there"); let connected_peers: Vec = network .connected_peer_set() @@ -100,9 +84,5 @@ pub fn get_peer_list(req: Request) -> ApiResult { .map(PeerId::to_string) .collect(); - Ok(success_response(Body::from( - serde_json::to_string(&connected_peers).map_err(|e| { - ApiError::ServerError(format!("Unable to serialize Vec: {:?}", e)) - })?, - ))) + success_response_json(req, &connected_peers) } diff --git a/beacon_node/rest_api/src/node.rs b/beacon_node/rest_api/src/node.rs index c75d3ba20..8ca7fb48a 100644 --- a/beacon_node/rest_api/src/node.rs +++ b/beacon_node/rest_api/src/node.rs @@ -1,25 +1,23 @@ -use crate::helpers::get_beacon_chain_from_request; -use crate::{success_response, ApiResult}; +use crate::helpers::*; +use crate::{ApiResult, BoxFut}; use beacon_chain::BeaconChainTypes; use hyper::{Body, Request}; use version; /// Read the version string from the current Lighthouse build. -pub fn get_version(_req: Request) -> ApiResult { - let body = Body::from( - serde_json::to_string(&version::version()) - .expect("Version should always be serialializable as JSON."), - ); - Ok(success_response(body)) +pub fn get_version(req: Request) -> BoxFut { + success_response_json(req, &version::version()) } /// Read the genesis time from the current beacon chain state. -pub fn get_genesis_time(req: Request) -> ApiResult { - let (_beacon_chain, head_state) = get_beacon_chain_from_request::(&req)?; +pub fn get_genesis_time(req: Request) -> BoxFut { + let bc = get_beacon_chain_from_request::(&req); + let (_beacon_chain, head_state) = match bc { + Ok((bc, hs)) => (bc, hs), + Err(e) => { + return e.into(); + } + }; let gen_time: u64 = head_state.genesis_time; - let body = Body::from( - serde_json::to_string(&gen_time) - .expect("Genesis should time always have a valid JSON serialization."), - ); - Ok(success_response(body)) + success_response(req, &gen_time) } diff --git a/beacon_node/rest_api/src/response_builder.rs b/beacon_node/rest_api/src/response_builder.rs index c1df4892c..b48b9e41a 100644 --- a/beacon_node/rest_api/src/response_builder.rs +++ b/beacon_node/rest_api/src/response_builder.rs @@ -27,15 +27,9 @@ impl ResponseBuilder { pub fn body(self, item: &T) -> ApiResult { let (body, content_type) = match self.encoding { - Encoding::JSON => ( - Body::from(serde_json::to_string(&item).map_err(|e| { - ApiError::ServerError(format!( - "Unable to serialize response body as JSON: {:?}", - e - )) - })?), - "application/json", - ), + Encoding::JSON => { + return self.body_json(item); + } Encoding::SSZ => (Body::from(item.as_ssz_bytes()), "application/ssz"), Encoding::YAML => ( Body::from(serde_yaml::to_string(&item).map_err(|e| { @@ -54,4 +48,17 @@ impl ResponseBuilder { .body(Body::from(body)) .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) } + + pub fn body_json(self, item: &T) -> ApiResult { + Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_string(&item).map_err(|e| { + ApiError::ServerError(format!( + "Unable to serialize response body as JSON: {:?}", + e + )) + })?)) + .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) + } } diff --git a/beacon_node/rest_api/src/spec.rs b/beacon_node/rest_api/src/spec.rs index ad168faf1..3132f3cd4 100644 --- a/beacon_node/rest_api/src/spec.rs +++ b/beacon_node/rest_api/src/spec.rs @@ -1,4 +1,4 @@ -use super::{success_response, ApiResult}; +use super::ApiResult; use crate::helpers::*; use crate::ApiError; use beacon_chain::BeaconChainTypes; @@ -14,7 +14,7 @@ pub fn get_spec(req: Request) -> ApiResult let json: String = serde_json::to_string(&beacon_chain.spec) .map_err(|e| ApiError::ServerError(format!("Unable to serialize spec: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } /// HTTP handler to return the full Eth2Config object. @@ -27,7 +27,7 @@ pub fn get_eth2_config(req: Request) -> Api let json: String = serde_json::to_string(eth2_config.as_ref()) .map_err(|e| ApiError::ServerError(format!("Unable to serialize Eth2Config: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } /// HTTP handler to return the full spec object. @@ -35,5 +35,5 @@ pub fn get_slots_per_epoch(_req: Request) - let json: String = serde_json::to_string(&T::EthSpec::slots_per_epoch()) .map_err(|e| ApiError::ServerError(format!("Unable to serialize epoch: {:?}", e)))?; - Ok(success_response(Body::from(json))) + Ok(success_response_old(Body::from(json))) } diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 8b2bbd2ac..7d236e0cf 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -1,5 +1,5 @@ -use super::{success_response, ApiResult}; -use crate::{helpers::*, ApiError, UrlQuery}; +use crate::helpers::*; +use crate::{ApiError, ApiResult, UrlQuery}; use beacon_chain::{BeaconChainTypes, BlockProcessingOutcome}; use bls::{AggregateSignature, PublicKey, Signature}; use futures::future::Future; @@ -160,7 +160,7 @@ pub fn get_validator_duties(req: Request) - serde_json::to_string(&duties) .expect("We should always be able to serialize the duties we created."), ); - Ok(success_response(body)) + Ok(success_response_old(body)) } /// HTTP Handler to produce a new BeaconBlock from the current state, ready to be signed by a validator. @@ -200,12 +200,10 @@ pub fn get_new_beacon_block(req: Request) - serde_json::to_string(&new_block) .expect("We should always be able to serialize a new block that we produced."), ); - Ok(success_response(body)) + Ok(success_response_old(body)) } -/* - - HTTP Handler to publish a BeaconBlock, which has been signed by a validator. +/// HTTP Handler to publish a BeaconBlock, which has been signed by a validator. pub fn publish_beacon_block(req: Request) -> ApiResult { let _ = check_content_type_for_json(&req)?; let log = get_logger_from_request(&req); @@ -222,7 +220,7 @@ pub fn publish_beacon_block(req: Request) - log, "Got the request body, now going to parse it into a block." ); - let block = body + let block_future = body .concat2() .map(move |chunk| chunk.iter().cloned().collect::>()) .map(|chunks| { @@ -235,22 +233,8 @@ pub fn publish_beacon_block(req: Request) - }); block_result }); - - .map_err(|e| ApiError::ServerError(format!("Unable parse request body: {:?}", e))) - .and_then(|body| { - trace!(log, "parsing json"); - let block_result: Result, ApiError> = - serde_json::from_slice(&body.as_slice()).map_err(|e| { - ApiError::InvalidQueryParams(format!( - "Unable to deserialize JSON into a BeaconBlock: {:?}", - e - )) - }); - block_result - }); - tokio::run(block_future); - let block = block_future.wait()?; - trace!(log, "BeaconBlock successfully parsed from JSON"; "block" => serde_json::to_string(&block).expect("We should always be able to serialize a block that we just created.")); + let block = block_future.wait()??; + trace!(log, "BeaconBlock successfully parsed from JSON"); match beacon_chain.process_block(block.clone()) { Ok(BlockProcessingOutcome::Processed { block_root }) => { // Block was processed, publish via gossipsub @@ -273,9 +257,8 @@ pub fn publish_beacon_block(req: Request) - } } - Ok(success_response(Body::empty())) + Ok(success_response_old(Body::empty())) } - */ /// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator. pub fn get_new_attestation(req: Request) -> ApiResult { @@ -377,5 +360,5 @@ pub fn get_new_attestation(req: Request) -> serde_json::to_string(&attestation) .expect("We should always be able to serialize a new attestation that we produced."), ); - Ok(success_response(body)) + Ok(success_response_old(body)) }