#[macro_use] mod macros; #[macro_use] extern crate lazy_static; extern crate network as client_network; mod beacon; mod config; mod error; mod helpers; mod metrics; mod network; mod node; mod response_builder; mod spec; mod url_query; mod validator; use beacon_chain::{BeaconChain, BeaconChainTypes}; use client_network::NetworkMessage; use client_network::Service as NetworkService; use error::{ApiError, ApiResult}; use eth2_config::Eth2Config; use hyper::rt::Future; use hyper::server::conn::AddrStream; use hyper::service::{MakeService, Service}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use parking_lot::RwLock; use slog::{info, o, warn}; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::mpsc; use url_query::UrlQuery; pub use beacon::{BlockResponse, HeadResponse, StateResponse}; pub use config::Config as ApiConfig; use eth2_libp2p::rpc::RequestId; use serde::export::PhantomData; type BoxFut = Box, Error = ApiError> + Send>; pub struct ApiService { log: slog::Logger, beacon_chain: Arc>, db_path: DBPath, network_service: Arc>, network_channel: Arc>>, eth2_config: Arc, } impl Service for ApiService { type ReqBody = Body; type ResBody = Body; type Error = ApiError; type Future = BoxFut; fn call(&mut self, mut req: Request) -> Self::Future { metrics::inc_counter(&metrics::REQUEST_COUNT); let timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); // Add all the useful bits into the request, so that we can pull them out in the individual // functions. req.extensions_mut() .insert::(self.log.clone()); req.extensions_mut() .insert::>>(self.beacon_chain.clone()); req.extensions_mut().insert::(self.db_path.clone()); req.extensions_mut() .insert::>>(self.network_service.clone()); req.extensions_mut() .insert::>>>( self.network_channel.clone(), ); req.extensions_mut() .insert::>(self.eth2_config.clone()); let path = req.uri().path().to_string(); // Route the request to the correct handler. let result = match (req.method(), path.as_ref()) { // Methods for Client (&Method::GET, "/node/version") => node::get_version(req), (&Method::GET, "/node/genesis_time") => node::get_genesis_time::(req), (&Method::GET, "/node/syncing") => helpers::implementation_pending_response(req), // Methods for Network (&Method::GET, "/network/enr") => network::get_enr::(req), (&Method::GET, "/network/peer_count") => network::get_peer_count::(req), (&Method::GET, "/network/peer_id") => network::get_peer_id::(req), (&Method::GET, "/network/peers") => network::get_peer_list::(req), (&Method::GET, "/network/listen_port") => network::get_listen_port::(req), (&Method::GET, "/network/listen_addresses") => network::get_listen_addresses::(req), // Methods for Beacon Node (&Method::GET, "/beacon/head") => beacon::get_head::(req), (&Method::GET, "/beacon/block") => beacon::get_block::(req), (&Method::GET, "/beacon/block_root") => beacon::get_block_root::(req), (&Method::GET, "/beacon/blocks") => helpers::implementation_pending_response(req), (&Method::GET, "/beacon/fork") => beacon::get_fork::(req), (&Method::GET, "/beacon/attestations") => helpers::implementation_pending_response(req), (&Method::GET, "/beacon/attestations/pending") => { helpers::implementation_pending_response(req) } (&Method::GET, "/beacon/validators") => beacon::get_validators::(req), (&Method::GET, "/beacon/validators/indicies") => { helpers::implementation_pending_response(req) } (&Method::GET, "/beacon/validators/pubkeys") => { helpers::implementation_pending_response(req) } /* // Methods for Validator (&Method::GET, "/beacon/validator/duties") => validator::get_validator_duties::(req), (&Method::GET, "/beacon/validator/block") => validator::get_new_beacon_block::(req), //(&Method::POST, "/beacon/validator/block") => validator::publish_beacon_block::(req), (&Method::GET, "/beacon/validator/attestation") => { validator::get_new_attestation::(req) } (&Method::POST, "/beacon/validator/attestation") => { helpers::implementation_pending_response(req) } (&Method::GET, "/beacon/state") => beacon::get_state::(req), (&Method::GET, "/beacon/state_root") => beacon::get_state_root::(req), (&Method::GET, "/beacon/state/current_finalized_checkpoint") => { beacon::get_current_finalized_checkpoint::(req) } (&Method::GET, "/beacon/state/genesis") => beacon::get_genesis_state::(req), //TODO: Add aggreggate/filtered state lookups here, e.g. /beacon/validators/balances // Methods for bootstrap and checking configuration (&Method::GET, "/spec") => spec::get_spec::(req), (&Method::GET, "/spec/slots_per_epoch") => spec::get_slots_per_epoch::(req), (&Method::GET, "/spec/deposit_contract") => { helpers::implementation_pending_response(req) } (&Method::GET, "/spec/eth2_config") => spec::get_eth2_config::(req), (&Method::GET, "/metrics") => metrics::get_prometheus::(req), */ _ => Box::new(futures::future::err(ApiError::NotFound( "Request path and/or method not found.".to_owned(), ))), }; let response = match result.wait() { // Return the `hyper::Response`. Ok(response) => { metrics::inc_counter(&metrics::SUCCESS_COUNT); slog::debug!(self.log, "Request successful: {:?}", path); response } // Map the `ApiError` into `hyper::Response`. Err(e) => { slog::debug!(self.log, "Request failure: {:?}", path); e.into() } }; metrics::stop_timer(timer); Box::new(futures::future::ok(response)) } } pub fn start_server( config: &ApiConfig, executor: &TaskExecutor, beacon_chain: Arc>, network_service: Arc>, network_chan: mpsc::UnboundedSender, db_path: PathBuf, eth2_config: Eth2Config, log: &slog::Logger, ) -> Result { let log = log.new(o!("Service" => "Api")); // build a channel to kill the HTTP server let (exit_signal, exit) = exit_future::signal(); let exit_log = log.clone(); let server_exit = exit.and_then(move |_| { info!(exit_log, "API service shutdown"); Ok(()) }); let db_path = DBPath(db_path); // Get the address to bind to let bind_addr = (config.listen_address, config.port).into(); // Clone our stateful objects, for use in service closure. let server_log = log.clone(); let server_bc = beacon_chain.clone(); let eth2_config = Arc::new(eth2_config); let service = move || -> futures::future::FutureResult, String> { futures::future::ok(ApiService { log: server_log.clone(), beacon_chain: server_bc.clone(), db_path: db_path.clone(), network_service: network_service.clone(), network_channel: Arc::new(RwLock::new(network_chan.clone())), eth2_config: eth2_config.clone(), }) }; let log_clone = log.clone(); let server = Server::bind(&bind_addr) .serve(service) .with_graceful_shutdown(server_exit) .map_err(move |e| { warn!( log_clone, "API failed to start, Unable to bind"; "address" => format!("{:?}", e) ) }); info!( log, "REST API started"; "address" => format!("{}", config.listen_address), "port" => config.port, ); executor.spawn(server); Ok(exit_signal) } #[derive(Clone)] pub struct DBPath(PathBuf); impl Deref for DBPath { type Target = PathBuf; fn deref(&self) -> &Self::Target { &self.0 } }