WIP: Trying to restructure ApiService to be async.

This commit is contained in:
Luke Anderson 2019-09-10 10:56:50 +10:00
parent 476cbae577
commit 405a59e8b9
No known key found for this signature in database
GPG Key ID: 44408169EC61E228
2 changed files with 204 additions and 166 deletions

View File

@ -0,0 +1,61 @@
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::error::Error as StdError;
type Cause = Box<dyn StdErr + Send + Sync>;
pub struct ApiError {
kind: ApiErrorKind,
cause: Option<Cause>,
}
#[derive(PartialEq, Debug)]
pub enum ApiErrorKind {
MethodNotAllowed(String),
ServerError(String),
NotImplemented(String),
InvalidQueryParams(String),
NotFound(String),
ImATeapot(String), // Just in case.
}
pub type ApiResult = Result<Response<Body>, ApiError>;
impl Into<Response<Body>> for ApiError {
fn into(self) -> Response<Body> {
let status_code: (StatusCode, String) = match self {
ApiError::MethodNotAllowed(desc) => (StatusCode::METHOD_NOT_ALLOWED, desc),
ApiError::ServerError(desc) => (StatusCode::INTERNAL_SERVER_ERROR, desc),
ApiError::NotImplemented(desc) => (StatusCode::NOT_IMPLEMENTED, desc),
ApiError::InvalidQueryParams(desc) => (StatusCode::BAD_REQUEST, desc),
ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc),
ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc),
};
Response::builder()
.status(status_code.0)
.header("content-type", "text/plain")
.body(Body::from(status_code.1))
.expect("Response should always be created.")
}
}
impl From<store::Error> for ApiError {
fn from(e: store::Error) -> ApiError {
ApiError::ServerError(format!("Database error: {:?}", e))
}
}
impl From<types::BeaconStateError> for ApiError {
fn from(e: types::BeaconStateError) -> ApiError {
ApiError::ServerError(format!("BeaconState error: {:?}", e))
}
}
impl From<state_processing::per_slot_processing::Error> for ApiError {
fn from(e: state_processing::per_slot_processing::Error) -> ApiError {
ApiError::ServerError(format!("PerSlotProcessing error: {:?}", e))
}
}
impl std::error::Error for ApiError {
fn cause(&self) -> Option<&Error> {}
}

View File

@ -4,6 +4,7 @@ extern crate network as client_network;
mod beacon; mod beacon;
mod config; mod config;
mod error;
mod helpers; mod helpers;
mod metrics; mod metrics;
mod network; mod network;
@ -32,109 +33,43 @@ use url_query::UrlQuery;
pub use beacon::{BlockResponse, HeadResponse, StateResponse}; pub use beacon::{BlockResponse, HeadResponse, StateResponse};
pub use config::Config as ApiConfig; pub use config::Config as ApiConfig;
use eth2_libp2p::rpc::RequestId;
use serde::ser::StdError;
#[derive(PartialEq, Debug)] type BoxFut = Box<dyn Future<Item = Response<Body>, Error = ApiError> + Send>;
pub enum ApiError {
MethodNotAllowed(String),
ServerError(String),
NotImplemented(String),
InvalidQueryParams(String),
NotFound(String),
ImATeapot(String), // Just in case.
}
pub type ApiResult = Result<Response<Body>, ApiError>; pub struct ApiService<T: BeaconChainTypes + 'static> {
log: slog::Logger,
impl Into<Response<Body>> for ApiError {
fn into(self) -> Response<Body> {
let status_code: (StatusCode, String) = match self {
ApiError::MethodNotAllowed(desc) => (StatusCode::METHOD_NOT_ALLOWED, desc),
ApiError::ServerError(desc) => (StatusCode::INTERNAL_SERVER_ERROR, desc),
ApiError::NotImplemented(desc) => (StatusCode::NOT_IMPLEMENTED, desc),
ApiError::InvalidQueryParams(desc) => (StatusCode::BAD_REQUEST, desc),
ApiError::NotFound(desc) => (StatusCode::NOT_FOUND, desc),
ApiError::ImATeapot(desc) => (StatusCode::IM_A_TEAPOT, desc),
};
Response::builder()
.status(status_code.0)
.header("content-type", "text/plain")
.body(Body::from(status_code.1))
.expect("Response should always be created.")
}
}
impl From<store::Error> for ApiError {
fn from(e: store::Error) -> ApiError {
ApiError::ServerError(format!("Database error: {:?}", e))
}
}
impl From<types::BeaconStateError> for ApiError {
fn from(e: types::BeaconStateError) -> ApiError {
ApiError::ServerError(format!("BeaconState error: {:?}", e))
}
}
impl From<state_processing::per_slot_processing::Error> for ApiError {
fn from(e: state_processing::per_slot_processing::Error) -> ApiError {
ApiError::ServerError(format!("PerSlotProcessing error: {:?}", e))
}
}
pub fn start_server<T: BeaconChainTypes>(
config: &ApiConfig,
executor: &TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
db_path: DBPath,
network_service: Arc<NetworkService<T>>, network_service: Arc<NetworkService<T>>,
network_chan: mpsc::UnboundedSender<NetworkMessage>, network_channel: Arc<RwLock<mpsc::UnboundedSender<NetworkMessage>>>,
db_path: PathBuf, eth2_config: Arc<Eth2Config>,
eth2_config: Eth2Config, }
log: &slog::Logger,
) -> Result<exit_future::Signal, hyper::Error> {
let log = log.new(o!("Service" => "Api"));
// build a channel to kill the HTTP server impl<T: BeaconChainTypes> Service for ApiService<T> {
let (exit_signal, exit) = exit_future::signal(); type ReqBody = Body;
type ResBody = Body;
type Error = ApiError;
type Future = BoxFut;
let exit_log = log.clone(); fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let server_exit = exit.and_then(move |_| {
info!(exit_log, "API service shutdown");
Ok(())
});
let db_path = DBPath(db_path);
// Get the address to bind to
let bind_addr = (config.listen_address, config.port).into();
// Clone our stateful objects, for use in service closure.
let server_log = log.clone();
let server_bc = beacon_chain.clone();
let eth2_config = Arc::new(eth2_config);
let service = move || {
let log = server_log.clone();
let beacon_chain = server_bc.clone();
let db_path = db_path.clone();
let network_service = network_service.clone();
let network_chan = network_chan.clone();
let eth2_config = eth2_config.clone();
// Create a simple handler for the router, inject our stateful objects into the request.
service_fn_ok(move |mut req| {
metrics::inc_counter(&metrics::REQUEST_COUNT); metrics::inc_counter(&metrics::REQUEST_COUNT);
let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME);
req.extensions_mut().insert::<slog::Logger>(log.clone());
req.extensions_mut() req.extensions_mut()
.insert::<Arc<BeaconChain<T>>>(beacon_chain.clone()); .insert::<slog::Logger>(self.log.clone());
req.extensions_mut().insert::<DBPath>(db_path.clone());
req.extensions_mut() req.extensions_mut()
.insert::<Arc<NetworkService<T>>>(network_service.clone()); .insert::<Arc<BeaconChain<T>>>(self.beacon_chain.clone());
req.extensions_mut().insert::<DBPath>(self.db_path.clone());
req.extensions_mut() req.extensions_mut()
.insert::<mpsc::UnboundedSender<NetworkMessage>>(network_chan.clone()); .insert::<Arc<NetworkService<T>>>(self.network_service.clone());
req.extensions_mut() req.extensions_mut()
.insert::<Arc<Eth2Config>>(eth2_config.clone()); .insert::<Arc<RwLock<mpsc::UnboundedSender<NetworkMessage>>>>(
self.network_channel.clone(),
);
req.extensions_mut()
.insert::<Arc<Eth2Config>>(self.eth2_config.clone());
let path = req.uri().path().to_string(); let path = req.uri().path().to_string();
@ -142,6 +77,7 @@ pub fn start_server<T: BeaconChainTypes>(
let result = match (req.method(), path.as_ref()) { let result = match (req.method(), path.as_ref()) {
// Methods for Client // Methods for Client
(&Method::GET, "/node/version") => node::get_version(req), (&Method::GET, "/node/version") => node::get_version(req),
/*
(&Method::GET, "/node/genesis_time") => node::get_genesis_time::<T>(req), (&Method::GET, "/node/genesis_time") => node::get_genesis_time::<T>(req),
(&Method::GET, "/node/syncing") => helpers::implementation_pending_response(req), (&Method::GET, "/node/syncing") => helpers::implementation_pending_response(req),
@ -211,6 +147,7 @@ pub fn start_server<T: BeaconChainTypes>(
(&Method::GET, "/metrics") => metrics::get_prometheus::<T>(req), (&Method::GET, "/metrics") => metrics::get_prometheus::<T>(req),
*/
_ => Err(ApiError::NotFound( _ => Err(ApiError::NotFound(
"Request path and/or method not found.".to_owned(), "Request path and/or method not found.".to_owned(),
)), )),
@ -220,20 +157,60 @@ pub fn start_server<T: BeaconChainTypes>(
// Return the `hyper::Response`. // Return the `hyper::Response`.
Ok(response) => { Ok(response) => {
metrics::inc_counter(&metrics::SUCCESS_COUNT); metrics::inc_counter(&metrics::SUCCESS_COUNT);
slog::debug!(log, "Request successful: {:?}", path); slog::debug!(self.log, "Request successful: {:?}", path);
response Box::new(response)
} }
// Map the `ApiError` into `hyper::Response`. // Map the `ApiError` into `hyper::Response`.
Err(e) => { Err(e) => {
slog::debug!(log, "Request failure: {:?}", path); slog::debug!(self.log, "Request failure: {:?}", path);
e.into() Box::new(e.into())
} }
}; };
metrics::stop_timer(timer); metrics::stop_timer(timer);
response Box::new(futures::future::ok(response))
}) }
}
pub fn start_server<T: BeaconChainTypes>(
config: &ApiConfig,
executor: &TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
network_service: Arc<NetworkService<T>>,
network_chan: mpsc::UnboundedSender<NetworkMessage>,
db_path: PathBuf,
eth2_config: Eth2Config,
log: &slog::Logger,
) -> Result<exit_future::Signal, hyper::Error> {
let log = log.new(o!("Service" => "Api"));
// build a channel to kill the HTTP server
let (exit_signal, exit) = exit_future::signal();
let exit_log = log.clone();
let server_exit = exit.and_then(move |_| {
info!(exit_log, "API service shutdown");
Ok(())
});
let db_path = DBPath(db_path);
// Get the address to bind to
let bind_addr = (config.listen_address, config.port).into();
// Clone our stateful objects, for use in service closure.
let server_log = log.clone();
let server_bc = beacon_chain.clone();
let eth2_config = Arc::new(eth2_config);
let service = move || ApiService {
log: server_log.clone(),
beacon_chain: server_bc.clone(),
db_path: db_path.clone(),
network_service: network_service.clone(),
network_channel: Arc::new(RwLock::new(network_chan.clone())),
eth2_config: eth2_config.clone(),
}; };
let log_clone = log.clone(); let log_clone = log.clone();