From c895dc89711793e0d12734ae9476bc800923d8da Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 24 Aug 2020 03:06:10 +0000 Subject: [PATCH] Shift HTTP server heavy-lifting to blocking executor (#1518) ## Issue Addressed NA ## Proposed Changes Shift practically all HTTP endpoint handlers to the blocking executor (some very light tasks are left on the core executor). ## Additional Info This PR covers the `rest_api` which will soon be refactored to suit the standard API. As such, I've cut a few corners and left some existing issues open in this patch. What I have done here should leave the API in state that is not necessary *exactly* the same, but good enough for us to run validators with. Specifically, the number of blocking workers that can be spawned is unbounded and I have not implemented a queue; this will need to be fixed when we implement the standard API. --- Cargo.lock | 7 + beacon_node/rest_api/src/advanced.rs | 34 -- beacon_node/rest_api/src/beacon.rs | 279 ++++------ beacon_node/rest_api/src/consensus.rs | 41 +- beacon_node/rest_api/src/helpers.rs | 27 +- beacon_node/rest_api/src/lib.rs | 50 +- beacon_node/rest_api/src/lighthouse.rs | 38 +- beacon_node/rest_api/src/macros.rs | 11 - beacon_node/rest_api/src/metrics.rs | 68 ++- beacon_node/rest_api/src/network.rs | 72 --- beacon_node/rest_api/src/node.rs | 40 +- beacon_node/rest_api/src/response_builder.rs | 83 --- beacon_node/rest_api/src/router.rs | 477 ++++++++++-------- beacon_node/rest_api/src/spec.rs | 28 - beacon_node/rest_api/src/validator.rs | 208 +++----- beacon_node/rest_api/tests/test.rs | 2 +- common/rest_types/Cargo.toml | 7 + .../rest_types/src/api_error.rs | 0 common/rest_types/src/handler.rs | 247 +++++++++ common/rest_types/src/lib.rs | 11 +- consensus/proto_array/src/proto_array.rs | 2 +- lighthouse/environment/src/executor.rs | 2 +- 22 files changed, 828 insertions(+), 906 deletions(-) delete mode 100644 beacon_node/rest_api/src/advanced.rs delete mode 100644 beacon_node/rest_api/src/macros.rs delete mode 100644 beacon_node/rest_api/src/network.rs delete mode 100644 beacon_node/rest_api/src/response_builder.rs delete mode 100644 beacon_node/rest_api/src/spec.rs rename beacon_node/rest_api/src/error.rs => common/rest_types/src/api_error.rs (100%) create mode 100644 common/rest_types/src/handler.rs diff --git a/Cargo.lock b/Cargo.lock index 3da097362..12b8bbb0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4308,15 +4308,22 @@ dependencies = [ name = "rest_types" version = "0.2.0" dependencies = [ + "beacon_chain", "bls", + "environment", "eth2_hashing", "eth2_ssz", "eth2_ssz_derive", + "hyper 0.13.7", "procinfo", "psutil", "rayon", "serde", + "serde_json", + "serde_yaml", "state_processing", + "store", + "tokio 0.2.22", "tree_hash", "types", ] diff --git a/beacon_node/rest_api/src/advanced.rs b/beacon_node/rest_api/src/advanced.rs deleted file mode 100644 index 6ee14891f..000000000 --- a/beacon_node/rest_api/src/advanced.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::response_builder::ResponseBuilder; -use crate::ApiResult; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use hyper::{Body, Request}; -use operation_pool::PersistedOperationPool; -use std::sync::Arc; - -/// Returns the `proto_array` fork choice struct, encoded as JSON. -/// -/// Useful for debugging or advanced inspection of the chain. -pub fn get_fork_choice( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz( - &*beacon_chain - .fork_choice - .read() - .proto_array() - .core_proto_array(), - ) -} - -/// Returns the `PersistedOperationPool` struct. -/// -/// Useful for debugging or advanced inspection of the stored operations. -pub fn get_operation_pool( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&PersistedOperationPool::from_operation_pool( - &beacon_chain.op_pool, - )) -} diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index f7887c48f..0ac95f7ca 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -1,14 +1,13 @@ use crate::helpers::*; -use crate::response_builder::ResponseBuilder; use crate::validator::get_state_for_epoch; -use crate::{ApiError, ApiResult, UrlQuery}; +use crate::Context; +use crate::{ApiError, UrlQuery}; use beacon_chain::{ observed_operations::ObservationOutcome, BeaconChain, BeaconChainTypes, StateSkipConfig, }; -use bus::BusReader; use futures::executor::block_on; use hyper::body::Bytes; -use hyper::{Body, Request, Response}; +use hyper::{Body, Request}; use rest_types::{ BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse, ValidatorRequest, ValidatorResponse, @@ -16,20 +15,20 @@ use rest_types::{ use std::io::Write; use std::sync::Arc; -use slog::{error, Logger}; +use slog::error; use types::{ AttesterSlashing, BeaconState, EthSpec, Hash256, ProposerSlashing, PublicKeyBytes, RelativeEpoch, SignedBeaconBlockHash, Slot, }; -/// HTTP handler to return a `BeaconBlock` at a given `root` or `slot`. +/// Returns a summary of the head of the beacon chain. pub fn get_head( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + ctx: Arc>, +) -> Result { + let beacon_chain = &ctx.beacon_chain; let chain_head = beacon_chain.head()?; - let head = CanonicalHeadResponse { + Ok(CanonicalHeadResponse { slot: chain_head.beacon_state.slot, block_root: chain_head.beacon_block_root, state_root: chain_head.beacon_state_root, @@ -51,33 +50,27 @@ pub fn get_head( .epoch .start_slot(T::EthSpec::slots_per_epoch()), previous_justified_block_root: chain_head.beacon_state.previous_justified_checkpoint.root, - }; - - ResponseBuilder::new(&req)?.body(&head) + }) } -/// HTTP handler to return a list of head BeaconBlocks. -pub fn get_heads( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let heads = beacon_chain +/// Return the list of heads of the beacon chain. +pub fn get_heads(ctx: Arc>) -> Vec { + ctx.beacon_chain .heads() .into_iter() .map(|(beacon_block_root, beacon_block_slot)| HeadBeaconBlock { beacon_block_root, beacon_block_slot, }) - .collect::>(); - - ResponseBuilder::new(&req)?.body(&heads) + .collect() } /// HTTP handler to return a `BeaconBlock` at a given `root` or `slot`. pub fn get_block( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { + let beacon_chain = &ctx.beacon_chain; let query_params = ["root", "slot"]; let (key, value) = UrlQuery::from_request(&req)?.first_of(&query_params)?; @@ -85,7 +78,7 @@ pub fn get_block( ("slot", value) => { let target = parse_slot(&value)?; - block_root_at_slot(&beacon_chain, target)?.ok_or_else(|| { + block_root_at_slot(beacon_chain, target)?.ok_or_else(|| { ApiError::NotFound(format!( "Unable to find SignedBeaconBlock for slot {:?}", target @@ -103,30 +96,26 @@ pub fn get_block( )) })?; - let response = BlockResponse { + Ok(BlockResponse { root: block_root, beacon_block: block, - }; - - ResponseBuilder::new(&req)?.body(&response) + }) } /// HTTP handler to return a `SignedBeaconBlock` root at a given `slot`. pub fn get_block_root( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result { let slot_string = UrlQuery::from_request(&req)?.only_one("slot")?; let target = parse_slot(&slot_string)?; - let root = block_root_at_slot(&beacon_chain, target)?.ok_or_else(|| { + block_root_at_slot(&ctx.beacon_chain, target)?.ok_or_else(|| { ApiError::NotFound(format!( "Unable to find SignedBeaconBlock for slot {:?}", target )) - })?; - - ResponseBuilder::new(&req)?.body(&root) + }) } fn make_sse_response_chunk(new_head_hash: SignedBeaconBlockHash) -> std::io::Result { @@ -140,45 +129,27 @@ fn make_sse_response_chunk(new_head_hash: SignedBeaconBlockHash) -> std::io::Res Ok(bytes) } -pub fn stream_forks( - log: Logger, - mut events: BusReader, -) -> ApiResult { +pub fn stream_forks(ctx: Arc>) -> Result { + let mut events = ctx.events.lock().add_rx(); let (mut sender, body) = Body::channel(); std::thread::spawn(move || { while let Ok(new_head_hash) = events.recv() { let chunk = match make_sse_response_chunk(new_head_hash) { Ok(chunk) => chunk, Err(e) => { - error!(log, "Failed to make SSE chunk"; "error" => e.to_string()); + error!(ctx.log, "Failed to make SSE chunk"; "error" => e.to_string()); sender.abort(); break; } }; match block_on(sender.send_data(chunk)) { Err(e) if e.is_closed() => break, - Err(e) => error!(log, "Couldn't stream piece {:?}", e), + Err(e) => error!(ctx.log, "Couldn't stream piece {:?}", e), Ok(_) => (), } } }); - let response = Response::builder() - .status(200) - .header("Content-Type", "text/event-stream") - .header("Connection", "Keep-Alive") - .header("Cache-Control", "no-cache") - .header("Access-Control-Allow-Origin", "*") - .body(body) - .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e)))?; - Ok(response) -} - -/// HTTP handler to return the `Fork` of the current head. -pub fn get_fork( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&beacon_chain.head()?.beacon_state.fork) + Ok(body) } /// HTTP handler to which accepts a query string of a list of validator pubkeys and maps it to a @@ -187,9 +158,9 @@ pub fn get_fork( /// This method is limited to as many `pubkeys` that can fit in a URL. See `post_validators` for /// doing bulk requests. pub fn get_validators( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let validator_pubkeys = query @@ -204,17 +175,14 @@ pub fn get_validators( None }; - let validators = - validator_responses_by_pubkey(beacon_chain, state_root_opt, validator_pubkeys)?; - - ResponseBuilder::new(&req)?.body(&validators) + validator_responses_by_pubkey(&ctx.beacon_chain, state_root_opt, validator_pubkeys) } /// HTTP handler to return all validators, each as a `ValidatorResponse`. pub fn get_all_validators( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let state_root_opt = if let Some((_key, value)) = query.first_of_opt(&["state_root"]) { @@ -223,23 +191,21 @@ pub fn get_all_validators( None }; - let mut state = get_state_from_root_opt(&beacon_chain, state_root_opt)?; + let mut state = get_state_from_root_opt(&ctx.beacon_chain, state_root_opt)?; state.update_pubkey_cache()?; - let validators = state + state .validators .iter() .map(|validator| validator_response_by_pubkey(&state, validator.pubkey.clone())) - .collect::, _>>()?; - - ResponseBuilder::new(&req)?.body(&validators) + .collect::, _>>() } /// HTTP handler to return all active validators, each as a `ValidatorResponse`. pub fn get_active_validators( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let state_root_opt = if let Some((_key, value)) = query.first_of_opt(&["state_root"]) { @@ -248,17 +214,15 @@ pub fn get_active_validators( None }; - let mut state = get_state_from_root_opt(&beacon_chain, state_root_opt)?; + let mut state = get_state_from_root_opt(&ctx.beacon_chain, state_root_opt)?; state.update_pubkey_cache()?; - let validators = state + state .validators .iter() .filter(|validator| validator.is_active_at(state.current_epoch())) .map(|validator| validator_response_by_pubkey(&state, validator.pubkey.clone())) - .collect::, _>>()?; - - ResponseBuilder::new(&req)?.body(&validators) + .collect::, _>>() } /// HTTP handler to which accepts a `ValidatorRequest` and returns a `ValidatorResponse` for @@ -266,17 +230,11 @@ pub fn get_active_validators( /// /// This method allows for a basically unbounded list of `pubkeys`, where as the `get_validators` /// request is limited by the max number of pubkeys you can fit in a URL. -pub async fn post_validators( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let response_builder = ResponseBuilder::new(&req); - - let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - serde_json::from_slice::(&chunks) +pub fn post_validators( + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { + serde_json::from_slice::(&req.into_body()) .map_err(|e| { ApiError::BadRequest(format!( "Unable to parse JSON into ValidatorRequest: {:?}", @@ -285,12 +243,11 @@ pub async fn post_validators( }) .and_then(|bulk_request| { validator_responses_by_pubkey( - beacon_chain, + &ctx.beacon_chain, bulk_request.state_root, bulk_request.pubkeys, ) }) - .and_then(|validators| response_builder?.body(&validators)) } /// Returns either the state given by `state_root_opt`, or the canonical head state if it is @@ -317,11 +274,11 @@ fn get_state_from_root_opt( /// Maps a vec of `validator_pubkey` to a vec of `ValidatorResponse`, using the state at the given /// `state_root`. If `state_root.is_none()`, uses the canonial head state. fn validator_responses_by_pubkey( - beacon_chain: Arc>, + beacon_chain: &BeaconChain, state_root_opt: Option, validator_pubkeys: Vec, ) -> Result, ApiError> { - let mut state = get_state_from_root_opt(&beacon_chain, state_root_opt)?; + let mut state = get_state_from_root_opt(beacon_chain, state_root_opt)?; state.update_pubkey_cache()?; validator_pubkeys @@ -372,24 +329,25 @@ fn validator_response_by_pubkey( /// HTTP handler pub fn get_committees( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let epoch = query.epoch()?; - let mut state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?; + let mut state = + get_state_for_epoch(&ctx.beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?; let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), epoch).map_err(|e| { ApiError::ServerError(format!("Failed to get state suitable for epoch: {:?}", e)) })?; state - .build_committee_cache(relative_epoch, &beacon_chain.spec) + .build_committee_cache(relative_epoch, &ctx.beacon_chain.spec) .map_err(|e| ApiError::ServerError(format!("Unable to build committee cache: {:?}", e)))?; - let committees = state + Ok(state .get_beacon_committees_at_epoch(relative_epoch) .map_err(|e| ApiError::ServerError(format!("Unable to get all committees: {:?}", e)))? .into_iter() @@ -398,9 +356,7 @@ pub fn get_committees( index: c.index, committee: c.committee.to_vec(), }) - .collect::>(); - - ResponseBuilder::new(&req)?.body(&committees) + .collect::>()) } /// HTTP handler to return a `BeaconState` at a given `root` or `slot`. @@ -408,10 +364,10 @@ pub fn get_committees( /// Will not return a state if the request slot is in the future. Will return states higher than /// the current head by skipping slots. pub fn get_state( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let head_state = beacon_chain.head()?.beacon_state; + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { + let head_state = ctx.beacon_chain.head()?.beacon_state; let (key, value) = match UrlQuery::from_request(&req) { Ok(query) => { @@ -429,11 +385,12 @@ pub fn get_state( }; let (root, state): (Hash256, BeaconState) = match (key.as_ref(), value) { - ("slot", value) => state_at_slot(&beacon_chain, parse_slot(&value)?)?, + ("slot", value) => state_at_slot(&ctx.beacon_chain, parse_slot(&value)?)?, ("root", value) => { let root = &parse_root(&value)?; - let state = beacon_chain + let state = ctx + .beacon_chain .store .get_state(root, None)? .ok_or_else(|| ApiError::NotFound(format!("No state for root: {:?}", root)))?; @@ -443,12 +400,10 @@ pub fn get_state( _ => return Err(ApiError::ServerError("Unexpected query parameter".into())), }; - let response = StateResponse { + Ok(StateResponse { root, beacon_state: state, - }; - - ResponseBuilder::new(&req)?.body(&response) + }) } /// HTTP handler to return a `BeaconState` root at a given `slot`. @@ -456,15 +411,13 @@ pub fn get_state( /// Will not return a state if the request slot is in the future. Will return states higher than /// the current head by skipping slots. pub fn get_state_root( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result { let slot_string = UrlQuery::from_request(&req)?.only_one("slot")?; let slot = parse_slot(&slot_string)?; - let root = state_root_at_slot(&beacon_chain, slot, StateSkipConfig::WithStateRoots)?; - - ResponseBuilder::new(&req)?.body(&root) + state_root_at_slot(&ctx.beacon_chain, slot, StateSkipConfig::WithStateRoots) } /// HTTP handler to return a `BeaconState` at the genesis block. @@ -472,50 +425,28 @@ pub fn get_state_root( /// This is an undocumented convenience method used during testing. For production, simply do a /// state request at slot 0. pub fn get_genesis_state( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let (_root, state) = state_at_slot(&beacon_chain, Slot::new(0))?; - - ResponseBuilder::new(&req)?.body(&state) + ctx: Arc>, +) -> Result, ApiError> { + state_at_slot(&ctx.beacon_chain, Slot::new(0)).map(|(_root, state)| state) } -/// Read the genesis time from the current beacon chain state. -pub fn get_genesis_time( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&beacon_chain.head_info()?.genesis_time) -} - -/// Read the `genesis_validators_root` from the current beacon chain state. -pub fn get_genesis_validators_root( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&beacon_chain.head_info()?.genesis_validators_root) -} - -pub async fn proposer_slashing( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let response_builder = ResponseBuilder::new(&req); - +pub fn proposer_slashing( + req: Request>, + ctx: Arc>, +) -> Result { let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - serde_json::from_slice::(&chunks) + serde_json::from_slice::(&body) .map_err(|e| format!("Unable to parse JSON into ProposerSlashing: {:?}", e)) .and_then(move |proposer_slashing| { - if beacon_chain.eth1_chain.is_some() { - let obs_outcome = beacon_chain + if ctx.beacon_chain.eth1_chain.is_some() { + let obs_outcome = ctx + .beacon_chain .verify_proposer_slashing_for_gossip(proposer_slashing) .map_err(|e| format!("Error while verifying proposer slashing: {:?}", e))?; if let ObservationOutcome::New(verified_proposer_slashing) = obs_outcome { - beacon_chain.import_proposer_slashing(verified_proposer_slashing); + ctx.beacon_chain + .import_proposer_slashing(verified_proposer_slashing); Ok(()) } else { Err("Proposer slashing for that validator index already known".into()) @@ -524,22 +455,17 @@ pub async fn proposer_slashing( Err("Cannot insert proposer slashing on node without Eth1 connection.".to_string()) } }) - .map_err(ApiError::BadRequest) - .and_then(|_| response_builder?.body(&true)) + .map_err(ApiError::BadRequest)?; + + Ok(true) } -pub async fn attester_slashing( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let response_builder = ResponseBuilder::new(&req); - +pub fn attester_slashing( + req: Request>, + ctx: Arc>, +) -> Result { let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - - serde_json::from_slice::>(&chunks) + serde_json::from_slice::>(&body) .map_err(|e| { ApiError::BadRequest(format!( "Unable to parse JSON into AttesterSlashing: {:?}", @@ -547,13 +473,13 @@ pub async fn attester_slashing( )) }) .and_then(move |attester_slashing| { - if beacon_chain.eth1_chain.is_some() { - beacon_chain + if ctx.beacon_chain.eth1_chain.is_some() { + ctx.beacon_chain .verify_attester_slashing_for_gossip(attester_slashing) .map_err(|e| format!("Error while verifying attester slashing: {:?}", e)) .and_then(|outcome| { if let ObservationOutcome::New(verified_attester_slashing) = outcome { - beacon_chain + ctx.beacon_chain .import_attester_slashing(verified_attester_slashing) .map_err(|e| { format!("Error while importing attester slashing: {:?}", e) @@ -568,6 +494,7 @@ pub async fn attester_slashing( "Cannot insert attester slashing on node without Eth1 connection.".to_string(), )) } - }) - .and_then(|_| response_builder?.body(&true)) + })?; + + Ok(true) } diff --git a/beacon_node/rest_api/src/consensus.rs b/beacon_node/rest_api/src/consensus.rs index a006b379f..d82b05b7a 100644 --- a/beacon_node/rest_api/src/consensus.rs +++ b/beacon_node/rest_api/src/consensus.rs @@ -1,8 +1,7 @@ use crate::helpers::*; -use crate::response_builder::ResponseBuilder; -use crate::{ApiError, ApiResult, UrlQuery}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use hyper::{Body, Request}; +use crate::{ApiError, Context, UrlQuery}; +use beacon_chain::BeaconChainTypes; +use hyper::Request; use rest_types::{IndividualVotesRequest, IndividualVotesResponse}; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; @@ -50,38 +49,31 @@ impl Into for TotalBalances { /// HTTP handler return a `VoteCount` for some given `Epoch`. pub fn get_vote_count( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result { let query = UrlQuery::from_request(&req)?; let epoch = query.epoch()?; // This is the last slot of the given epoch (one prior to the first slot of the next epoch). let target_slot = (epoch + 1).start_slot(T::EthSpec::slots_per_epoch()) - 1; - let (_root, state) = state_at_slot(&beacon_chain, target_slot)?; - let spec = &beacon_chain.spec; + let (_root, state) = state_at_slot(&ctx.beacon_chain, target_slot)?; + let spec = &ctx.beacon_chain.spec; let mut validator_statuses = ValidatorStatuses::new(&state, spec)?; validator_statuses.process_attestations(&state, spec)?; - let report: VoteCount = validator_statuses.total_balances.into(); - - ResponseBuilder::new(&req)?.body(&report) + Ok(validator_statuses.total_balances.into()) } -pub async fn post_individual_votes( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let response_builder = ResponseBuilder::new(&req); - +pub fn post_individual_votes( + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - serde_json::from_slice::(&chunks) + serde_json::from_slice::(&body) .map_err(|e| { ApiError::BadRequest(format!( "Unable to parse JSON into ValidatorDutiesRequest: {:?}", @@ -94,8 +86,8 @@ pub async fn post_individual_votes( // This is the last slot of the given epoch (one prior to the first slot of the next epoch). let target_slot = (epoch + 1).start_slot(T::EthSpec::slots_per_epoch()) - 1; - let (_root, mut state) = state_at_slot(&beacon_chain, target_slot)?; - let spec = &beacon_chain.spec; + let (_root, mut state) = state_at_slot(&ctx.beacon_chain, target_slot)?; + let spec = &ctx.beacon_chain.spec; let mut validator_statuses = ValidatorStatuses::new(&state, spec)?; validator_statuses.process_attestations(&state, spec)?; @@ -135,5 +127,4 @@ pub async fn post_individual_votes( }) .collect::, _>>() }) - .and_then(|votes| response_builder?.body_no_ssz(&votes)) } diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 51a12d1bf..66b5bd1a0 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -1,9 +1,7 @@ -use crate::{ApiError, ApiResult, NetworkChannel}; +use crate::{ApiError, NetworkChannel}; use beacon_chain::{BeaconChain, BeaconChainTypes, StateSkipConfig}; use bls::PublicKeyBytes; use eth2_libp2p::PubsubMessage; -use http::header; -use hyper::{Body, Request}; use itertools::process_results; use network::NetworkMessage; use ssz::Decode; @@ -41,21 +39,6 @@ pub fn parse_committee_index(string: &str) -> Result { .map_err(|e| ApiError::BadRequest(format!("Unable to parse committee index: {:?}", e))) } -/// Checks the provided request to ensure that the `content-type` header. -/// -/// The content-type header should either be omitted, in which case JSON is assumed, or it should -/// explicitly specify `application/json`. If anything else is provided, an error is returned. -pub fn check_content_type_for_json(req: &Request) -> Result<(), ApiError> { - match req.headers().get(header::CONTENT_TYPE) { - Some(h) if h == "application/json" => Ok(()), - Some(h) => Err(ApiError::BadRequest(format!( - "The provided content-type {:?} is not available, this endpoint only supports json.", - h - ))), - _ => Ok(()), - } -} - /// Parse an SSZ object from some hex-encoded bytes. /// /// E.g., A signature is `"0x0000000000000000000000000000000000000000000000000000000000000000"` @@ -228,14 +211,8 @@ pub fn state_root_at_slot( } } -pub fn implementation_pending_response(_req: Request) -> ApiResult { - Err(ApiError::NotImplemented( - "API endpoint has not yet been implemented, but is planned to be soon.".to_owned(), - )) -} - pub fn publish_beacon_block_to_network( - chan: NetworkChannel, + chan: &NetworkChannel, block: SignedBeaconBlock, ) -> Result<(), ApiError> { // send the block via SSZ encoding diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 988a0910c..405e08e21 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -1,22 +1,15 @@ #[macro_use] -mod macros; -#[macro_use] extern crate lazy_static; +mod router; extern crate network as client_network; -mod advanced; mod beacon; pub mod config; mod consensus; -mod error; mod helpers; mod lighthouse; mod metrics; -mod network; mod node; -mod response_builder; -mod router; -mod spec; mod url_query; mod validator; @@ -24,7 +17,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use bus::Bus; use client_network::NetworkMessage; pub use config::ApiEncodingFormat; -use error::{ApiError, ApiResult}; use eth2_config::Eth2Config; use eth2_libp2p::NetworkGlobals; use futures::future::TryFutureExt; @@ -32,6 +24,7 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Server}; use parking_lot::Mutex; +use rest_types::ApiError; use slog::{info, warn}; use std::net::SocketAddr; use std::path::PathBuf; @@ -42,6 +35,7 @@ use url_query::UrlQuery; pub use crate::helpers::parse_pubkey_bytes; pub use config::Config; +pub use router::Context; pub type NetworkChannel = mpsc::UnboundedSender>; @@ -63,36 +57,28 @@ pub fn start_server( events: Arc>>, ) -> Result { let log = executor.log(); - let inner_log = log.clone(); - let rest_api_config = Arc::new(config.clone()); let eth2_config = Arc::new(eth2_config); + let context = Arc::new(Context { + executor: executor.clone(), + config: config.clone(), + beacon_chain, + network_globals: network_info.network_globals.clone(), + network_chan: network_info.network_chan, + eth2_config, + log: log.clone(), + db_path, + freezer_db_path, + events, + }); + // Define the function that will build the request handler. let make_service = make_service_fn(move |_socket: &AddrStream| { - let beacon_chain = beacon_chain.clone(); - let log = inner_log.clone(); - let rest_api_config = rest_api_config.clone(); - let eth2_config = eth2_config.clone(); - let network_globals = network_info.network_globals.clone(); - let network_channel = network_info.network_chan.clone(); - let db_path = db_path.clone(); - let freezer_db_path = freezer_db_path.clone(); - let events = events.clone(); + let ctx = context.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |req: Request| { - router::route( - req, - beacon_chain.clone(), - network_globals.clone(), - network_channel.clone(), - rest_api_config.clone(), - eth2_config.clone(), - log.clone(), - db_path.clone(), - freezer_db_path.clone(), - events.clone(), - ) + router::on_http_request(req, ctx.clone()) })) } }); diff --git a/beacon_node/rest_api/src/lighthouse.rs b/beacon_node/rest_api/src/lighthouse.rs index 556046ab3..4d0fae926 100644 --- a/beacon_node/rest_api/src/lighthouse.rs +++ b/beacon_node/rest_api/src/lighthouse.rs @@ -1,24 +1,16 @@ //! This contains a collection of lighthouse specific HTTP endpoints. -use crate::response_builder::ResponseBuilder; -use crate::ApiResult; -use eth2_libp2p::{NetworkGlobals, PeerInfo}; -use hyper::{Body, Request}; +use crate::{ApiError, Context}; +use beacon_chain::BeaconChainTypes; +use eth2_libp2p::PeerInfo; use serde::Serialize; use std::sync::Arc; use types::EthSpec; -/// The syncing state of the beacon node. -pub fn syncing( - req: Request, - network_globals: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(&network_globals.sync_state()) -} - /// Returns all known peers and corresponding information -pub fn peers(req: Request, network_globals: Arc>) -> ApiResult { - let peers: Vec> = network_globals +pub fn peers(ctx: Arc>) -> Result>, ApiError> { + Ok(ctx + .network_globals .peers .read() .peers() @@ -26,16 +18,15 @@ pub fn peers(req: Request, network_globals: Arc( - req: Request, - network_globals: Arc>, -) -> ApiResult { - let peers: Vec> = network_globals +pub fn connected_peers( + ctx: Arc>, +) -> Result>, ApiError> { + Ok(ctx + .network_globals .peers .read() .connected_peers() @@ -43,14 +34,13 @@ pub fn connected_peers( peer_id: peer_id.to_string(), peer_info: peer_info.clone(), }) - .collect(); - ResponseBuilder::new(&req)?.body_no_ssz(&peers) + .collect()) } /// Information returned by `peers` and `connected_peers`. #[derive(Clone, Debug, Serialize)] #[serde(bound = "T: EthSpec")] -struct Peer { +pub struct Peer { /// The Peer's ID peer_id: String, /// The PeerInfo associated with the peer. diff --git a/beacon_node/rest_api/src/macros.rs b/beacon_node/rest_api/src/macros.rs deleted file mode 100644 index f43224e5d..000000000 --- a/beacon_node/rest_api/src/macros.rs +++ /dev/null @@ -1,11 +0,0 @@ -macro_rules! try_future { - ($expr:expr) => { - match $expr { - core::result::Result::Ok(val) => val, - core::result::Result::Err(err) => return Err(std::convert::From::from(err)), - } - }; - ($expr:expr,) => { - $crate::try_future!($expr) - }; -} diff --git a/beacon_node/rest_api/src/metrics.rs b/beacon_node/rest_api/src/metrics.rs index 87b4f6285..4b1ba737d 100644 --- a/beacon_node/rest_api/src/metrics.rs +++ b/beacon_node/rest_api/src/metrics.rs @@ -1,42 +1,38 @@ -use crate::response_builder::ResponseBuilder; -use crate::{ApiError, ApiResult}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use hyper::{Body, Request}; +use crate::{ApiError, Context}; +use beacon_chain::BeaconChainTypes; use lighthouse_metrics::{Encoder, TextEncoder}; use rest_types::Health; -use std::path::PathBuf; use std::sync::Arc; pub use lighthouse_metrics::*; lazy_static! { + pub static ref BEACON_HTTP_API_REQUESTS_TOTAL: Result = + try_create_int_counter_vec( + "beacon_http_api_requests_total", + "Count of HTTP requests received", + &["endpoint"] + ); + pub static ref BEACON_HTTP_API_SUCCESS_TOTAL: Result = + try_create_int_counter_vec( + "beacon_http_api_success_total", + "Count of HTTP requests that returned 200 OK", + &["endpoint"] + ); + pub static ref BEACON_HTTP_API_ERROR_TOTAL: Result = try_create_int_counter_vec( + "beacon_http_api_error_total", + "Count of HTTP that did not return 200 OK", + &["endpoint"] + ); + pub static ref BEACON_HTTP_API_TIMES_TOTAL: Result = try_create_histogram_vec( + "beacon_http_api_times_total", + "Duration to process HTTP requests", + &["endpoint"] + ); pub static ref REQUEST_RESPONSE_TIME: Result = try_create_histogram( "http_server_request_duration_seconds", "Time taken to build a response to a HTTP request" ); - pub static ref REQUEST_COUNT: Result = try_create_int_counter( - "http_server_request_total", - "Total count of HTTP requests received" - ); - pub static ref SUCCESS_COUNT: Result = try_create_int_counter( - "http_server_success_total", - "Total count of HTTP 200 responses sent" - ); - pub static ref VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME: Result = - try_create_histogram( - "http_server_validator_block_get_request_duration_seconds", - "Time taken to respond to GET /validator/block" - ); - pub static ref VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME: Result = - try_create_histogram( - "http_server_validator_attestation_get_request_duration_seconds", - "Time taken to respond to GET /validator/attestation" - ); - pub static ref VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME: Result = - try_create_histogram( - "http_server_validator_duties_get_request_duration_seconds", - "Time taken to respond to GET /validator/duties" - ); pub static ref PROCESS_NUM_THREADS: Result = try_create_int_gauge( "process_num_threads", "Number of threads used by the current process" @@ -77,11 +73,8 @@ lazy_static! { /// /// This is a HTTP handler method. pub fn get_prometheus( - req: Request, - beacon_chain: Arc>, - db_path: PathBuf, - freezer_db_path: PathBuf, -) -> ApiResult { + ctx: Arc>, +) -> std::result::Result { let mut buffer = vec![]; let encoder = TextEncoder::new(); @@ -101,9 +94,9 @@ pub fn get_prometheus( // using `lighthouse_metrics::gather(..)` to collect the global `DEFAULT_REGISTRY` metrics into // a string that can be returned via HTTP. - slot_clock::scrape_for_metrics::(&beacon_chain.slot_clock); - store::scrape_for_metrics(&db_path, &freezer_db_path); - beacon_chain::scrape_for_metrics(&beacon_chain); + slot_clock::scrape_for_metrics::(&ctx.beacon_chain.slot_clock); + store::scrape_for_metrics(&ctx.db_path, &ctx.freezer_db_path); + beacon_chain::scrape_for_metrics(&ctx.beacon_chain); eth2_libp2p::scrape_discovery_metrics(); // This will silently fail if we are unable to observe the health. This is desired behaviour @@ -133,6 +126,5 @@ pub fn get_prometheus( .unwrap(); String::from_utf8(buffer) - .map(|string| ResponseBuilder::new(&req)?.body_text(string)) - .map_err(|e| ApiError::ServerError(format!("Failed to encode prometheus info: {:?}", e)))? + .map_err(|e| ApiError::ServerError(format!("Failed to encode prometheus info: {:?}", e))) } diff --git a/beacon_node/rest_api/src/network.rs b/beacon_node/rest_api/src/network.rs deleted file mode 100644 index ae2486d34..000000000 --- a/beacon_node/rest_api/src/network.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::error::ApiResult; -use crate::response_builder::ResponseBuilder; -use crate::NetworkGlobals; -use beacon_chain::BeaconChainTypes; -use eth2_libp2p::{Multiaddr, PeerId}; -use hyper::{Body, Request}; -use std::sync::Arc; - -/// HTTP handler to return the list of libp2p multiaddr the client is listening on. -/// -/// Returns a list of `Multiaddr`, serialized according to their `serde` impl. -pub fn get_listen_addresses( - req: Request, - network: Arc>, -) -> ApiResult { - let multiaddresses: Vec = network.listen_multiaddrs(); - ResponseBuilder::new(&req)?.body_no_ssz(&multiaddresses) -} - -/// HTTP handler to return the network port the client is listening on. -/// -/// Returns the TCP port number in its plain form (which is also valid JSON serialization) -pub fn get_listen_port( - req: Request, - network: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&network.listen_port_tcp()) -} - -/// HTTP handler to return the Discv5 ENR from the client's libp2p service. -/// -/// ENR is encoded as base64 string. -pub fn get_enr( - req: Request, - network: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(&network.local_enr().to_base64()) -} - -/// HTTP handler to return the `PeerId` from the client's libp2p service. -/// -/// PeerId is encoded as base58 string. -pub fn get_peer_id( - req: Request, - network: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(&network.local_peer_id().to_base58()) -} - -/// HTTP handler to return the number of peers connected in the client's libp2p service. -pub fn get_peer_count( - req: Request, - network: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body(&network.connected_peers()) -} - -/// HTTP handler to return the list of peers connected to the client's libp2p service. -/// -/// Peers are presented as a list of `PeerId::to_string()`. -pub fn get_peer_list( - req: Request, - network: Arc>, -) -> ApiResult { - let connected_peers: Vec = network - .peers - .read() - .connected_peer_ids() - .map(PeerId::to_string) - .collect(); - ResponseBuilder::new(&req)?.body_no_ssz(&connected_peers) -} diff --git a/beacon_node/rest_api/src/node.rs b/beacon_node/rest_api/src/node.rs index e779bb090..bd5615de3 100644 --- a/beacon_node/rest_api/src/node.rs +++ b/beacon_node/rest_api/src/node.rs @@ -1,23 +1,19 @@ -use crate::response_builder::ResponseBuilder; -use crate::{ApiError, ApiResult}; -use eth2_libp2p::{types::SyncState, NetworkGlobals}; -use hyper::{Body, Request}; -use lighthouse_version::version_with_platform; -use rest_types::{Health, SyncingResponse, SyncingStatus}; +use crate::{ApiError, Context}; +use beacon_chain::BeaconChainTypes; +use eth2_libp2p::types::SyncState; +use rest_types::{SyncingResponse, SyncingStatus}; use std::sync::Arc; -use types::{EthSpec, Slot}; +use types::Slot; -/// Read the version string from the current Lighthouse build. -pub fn get_version(req: Request) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(&version_with_platform()) -} +/// Returns a syncing status. +pub fn syncing(ctx: Arc>) -> Result { + let current_slot = ctx + .beacon_chain + .head_info() + .map_err(|e| ApiError::ServerError(format!("Unable to read head slot: {:?}", e)))? + .slot; -pub fn syncing( - req: Request, - network: Arc>, - current_slot: Slot, -) -> ApiResult { - let (starting_slot, highest_slot) = match network.sync_state() { + let (starting_slot, highest_slot) = match ctx.network_globals.sync_state() { SyncState::SyncingFinalized { start_slot, head_slot, @@ -36,14 +32,8 @@ pub fn syncing( highest_slot, }; - ResponseBuilder::new(&req)?.body(&SyncingResponse { - is_syncing: network.is_syncing(), + Ok(SyncingResponse { + is_syncing: ctx.network_globals.is_syncing(), sync_status, }) } - -pub fn get_health(req: Request) -> ApiResult { - let health = Health::observe().map_err(ApiError::ServerError)?; - - ResponseBuilder::new(&req)?.body_no_ssz(&health) -} diff --git a/beacon_node/rest_api/src/response_builder.rs b/beacon_node/rest_api/src/response_builder.rs deleted file mode 100644 index 237716791..000000000 --- a/beacon_node/rest_api/src/response_builder.rs +++ /dev/null @@ -1,83 +0,0 @@ -use super::{ApiError, ApiResult}; -use crate::config::ApiEncodingFormat; -use hyper::header; -use hyper::{Body, Request, Response, StatusCode}; -use serde::Serialize; -use ssz::Encode; - -pub struct ResponseBuilder { - encoding: ApiEncodingFormat, -} - -impl ResponseBuilder { - pub fn new(req: &Request) -> Result { - let accept_header: String = req - .headers() - .get(header::ACCEPT) - .map_or(Ok(""), |h| h.to_str()) - .map_err(|e| { - ApiError::BadRequest(format!( - "The Accept header contains invalid characters: {:?}", - e - )) - }) - .map(String::from)?; - - // JSON is our default encoding, unless something else is requested. - let encoding = ApiEncodingFormat::from(accept_header.as_str()); - Ok(Self { encoding }) - } - - pub fn body(self, item: &T) -> ApiResult { - match self.encoding { - ApiEncodingFormat::SSZ => Response::builder() - .status(StatusCode::OK) - .header("content-type", "application/ssz") - .body(Body::from(item.as_ssz_bytes())) - .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))), - _ => self.body_no_ssz(item), - } - } - - pub fn body_no_ssz(self, item: &T) -> ApiResult { - let (body, content_type) = match self.encoding { - ApiEncodingFormat::JSON => ( - Body::from(serde_json::to_string(&item).map_err(|e| { - ApiError::ServerError(format!( - "Unable to serialize response body as JSON: {:?}", - e - )) - })?), - "application/json", - ), - ApiEncodingFormat::SSZ => { - return Err(ApiError::UnsupportedType( - "Response cannot be encoded as SSZ.".into(), - )); - } - ApiEncodingFormat::YAML => ( - Body::from(serde_yaml::to_string(&item).map_err(|e| { - ApiError::ServerError(format!( - "Unable to serialize response body as YAML: {:?}", - e - )) - })?), - "application/yaml", - ), - }; - - Response::builder() - .status(StatusCode::OK) - .header("content-type", content_type) - .body(body) - .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) - } - - pub fn body_text(self, text: String) -> ApiResult { - Response::builder() - .status(StatusCode::OK) - .header("content-type", "text/plain; charset=utf-8") - .body(Body::from(text)) - .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) - } -} diff --git a/beacon_node/rest_api/src/router.rs b/beacon_node/rest_api/src/router.rs index a66094db1..bed7ba77a 100644 --- a/beacon_node/rest_api/src/router.rs +++ b/beacon_node/rest_api/src/router.rs @@ -1,241 +1,322 @@ use crate::{ - advanced, beacon, config::Config, consensus, error::ApiError, helpers, lighthouse, metrics, - network, node, spec, validator, NetworkChannel, + beacon, config::Config, consensus, lighthouse, metrics, node, validator, NetworkChannel, }; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bus::Bus; +use environment::TaskExecutor; use eth2_config::Eth2Config; -use eth2_libp2p::NetworkGlobals; +use eth2_libp2p::{NetworkGlobals, PeerId}; use hyper::header::HeaderValue; use hyper::{Body, Method, Request, Response}; +use lighthouse_version::version_with_platform; +use operation_pool::PersistedOperationPool; use parking_lot::Mutex; +use rest_types::{ApiError, Handler, Health}; use slog::debug; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use types::{SignedBeaconBlockHash, Slot}; +use types::{EthSpec, SignedBeaconBlockHash}; -// Allowing more than 7 arguments. -#[allow(clippy::too_many_arguments)] -pub async fn route( +pub struct Context { + pub executor: TaskExecutor, + pub config: Config, + pub beacon_chain: Arc>, + pub network_globals: Arc>, + pub network_chan: NetworkChannel, + pub eth2_config: Arc, + pub log: slog::Logger, + pub db_path: PathBuf, + pub freezer_db_path: PathBuf, + pub events: Arc>>, +} + +pub async fn on_http_request( req: Request, - beacon_chain: Arc>, - network_globals: Arc>, - network_channel: NetworkChannel, - rest_api_config: Arc, - eth2_config: Arc, - local_log: slog::Logger, - db_path: PathBuf, - freezer_db_path: PathBuf, - events: Arc>>, + ctx: Arc>, ) -> Result, ApiError> { - metrics::inc_counter(&metrics::REQUEST_COUNT); - let received_instant = Instant::now(); - let path = req.uri().path().to_string(); - let log = local_log.clone(); - let result = { - let _timer = metrics::start_timer(&metrics::REQUEST_RESPONSE_TIME); + let _timer = metrics::start_timer_vec(&metrics::BEACON_HTTP_API_TIMES_TOTAL, &[&path]); + metrics::inc_counter_vec(&metrics::BEACON_HTTP_API_REQUESTS_TOTAL, &[&path]); - match (req.method(), path.as_ref()) { - // Methods for Client - (&Method::GET, "/node/health") => node::get_health(req), - (&Method::GET, "/node/version") => node::get_version(req), - (&Method::GET, "/node/syncing") => { - // inform the current slot, or set to 0 - let current_slot = beacon_chain - .head_info() - .map(|info| info.slot) - .unwrap_or_else(|_| Slot::from(0u64)); + let received_instant = Instant::now(); + let log = ctx.log.clone(); + let allow_origin = ctx.config.allow_origin.clone(); - node::syncing::(req, network_globals, current_slot) - } - - // Methods for Network - (&Method::GET, "/network/enr") => network::get_enr::(req, network_globals), - (&Method::GET, "/network/peer_count") => { - network::get_peer_count::(req, network_globals) - } - (&Method::GET, "/network/peer_id") => network::get_peer_id::(req, network_globals), - (&Method::GET, "/network/peers") => network::get_peer_list::(req, network_globals), - (&Method::GET, "/network/listen_port") => { - network::get_listen_port::(req, network_globals) - } - (&Method::GET, "/network/listen_addresses") => { - network::get_listen_addresses::(req, network_globals) - } - - // Methods for Beacon Node - (&Method::GET, "/beacon/head") => beacon::get_head::(req, beacon_chain), - (&Method::GET, "/beacon/heads") => beacon::get_heads::(req, beacon_chain), - (&Method::GET, "/beacon/block") => beacon::get_block::(req, beacon_chain), - (&Method::GET, "/beacon/block_root") => beacon::get_block_root::(req, beacon_chain), - (&Method::GET, "/beacon/fork") => beacon::get_fork::(req, beacon_chain), - (&Method::GET, "/beacon/fork/stream") => { - let reader = events.lock().add_rx(); - beacon::stream_forks::(log, reader) - } - (&Method::GET, "/beacon/genesis_time") => { - beacon::get_genesis_time::(req, beacon_chain) - } - (&Method::GET, "/beacon/genesis_validators_root") => { - beacon::get_genesis_validators_root::(req, beacon_chain) - } - (&Method::GET, "/beacon/validators") => beacon::get_validators::(req, beacon_chain), - (&Method::POST, "/beacon/validators") => { - beacon::post_validators::(req, beacon_chain).await - } - (&Method::GET, "/beacon/validators/all") => { - beacon::get_all_validators::(req, beacon_chain) - } - (&Method::GET, "/beacon/validators/active") => { - beacon::get_active_validators::(req, beacon_chain) - } - (&Method::GET, "/beacon/state") => beacon::get_state::(req, beacon_chain), - (&Method::GET, "/beacon/state_root") => beacon::get_state_root::(req, beacon_chain), - (&Method::GET, "/beacon/state/genesis") => { - beacon::get_genesis_state::(req, beacon_chain) - } - (&Method::GET, "/beacon/committees") => beacon::get_committees::(req, beacon_chain), - (&Method::POST, "/beacon/proposer_slashing") => { - beacon::proposer_slashing::(req, beacon_chain).await - } - (&Method::POST, "/beacon/attester_slashing") => { - beacon::attester_slashing::(req, beacon_chain).await - } - - // Methods for Validator - (&Method::POST, "/validator/duties") => { - let timer = - metrics::start_timer(&metrics::VALIDATOR_GET_DUTIES_REQUEST_RESPONSE_TIME); - let response = validator::post_validator_duties::(req, beacon_chain); - drop(timer); - response.await - } - (&Method::POST, "/validator/subscribe") => { - validator::post_validator_subscriptions::(req, network_channel).await - } - (&Method::GET, "/validator/duties/all") => { - validator::get_all_validator_duties::(req, beacon_chain) - } - (&Method::GET, "/validator/duties/active") => { - validator::get_active_validator_duties::(req, beacon_chain) - } - (&Method::GET, "/validator/block") => { - let timer = - metrics::start_timer(&metrics::VALIDATOR_GET_BLOCK_REQUEST_RESPONSE_TIME); - let response = validator::get_new_beacon_block::(req, beacon_chain, log); - drop(timer); - response - } - (&Method::POST, "/validator/block") => { - validator::publish_beacon_block::(req, beacon_chain, network_channel, log).await - } - (&Method::GET, "/validator/attestation") => { - let timer = - metrics::start_timer(&metrics::VALIDATOR_GET_ATTESTATION_REQUEST_RESPONSE_TIME); - let response = validator::get_new_attestation::(req, beacon_chain); - drop(timer); - response - } - (&Method::GET, "/validator/aggregate_attestation") => { - validator::get_aggregate_attestation::(req, beacon_chain) - } - (&Method::POST, "/validator/attestations") => { - validator::publish_attestations::(req, beacon_chain, network_channel, log).await - } - (&Method::POST, "/validator/aggregate_and_proofs") => { - validator::publish_aggregate_and_proofs::( - req, - beacon_chain, - network_channel, - log, - ) - .await - } - - // Methods for consensus - (&Method::GET, "/consensus/global_votes") => { - consensus::get_vote_count::(req, beacon_chain) - } - (&Method::POST, "/consensus/individual_votes") => { - consensus::post_individual_votes::(req, beacon_chain).await - } - - // Methods for bootstrap and checking configuration - (&Method::GET, "/spec") => spec::get_spec::(req, beacon_chain), - (&Method::GET, "/spec/slots_per_epoch") => spec::get_slots_per_epoch::(req), - (&Method::GET, "/spec/deposit_contract") => { - helpers::implementation_pending_response(req) - } - (&Method::GET, "/spec/eth2_config") => spec::get_eth2_config::(req, eth2_config), - - // Methods for advanced parameters - (&Method::GET, "/advanced/fork_choice") => { - advanced::get_fork_choice::(req, beacon_chain) - } - (&Method::GET, "/advanced/operation_pool") => { - advanced::get_operation_pool::(req, beacon_chain) - } - - (&Method::GET, "/metrics") => { - metrics::get_prometheus::(req, beacon_chain, db_path, freezer_db_path) - } - - // Lighthouse specific - (&Method::GET, "/lighthouse/syncing") => { - lighthouse::syncing::(req, network_globals) - } - - (&Method::GET, "/lighthouse/peers") => { - lighthouse::peers::(req, network_globals) - } - - (&Method::GET, "/lighthouse/connected_peers") => { - lighthouse::connected_peers::(req, network_globals) - } - _ => Err(ApiError::NotFound( - "Request path and/or method not found.".to_owned(), - )), - } - }; - - let request_processing_duration = Instant::now().duration_since(received_instant); - - // Map the Rust-friendly `Result` in to a http-friendly response. In effect, this ensures that - // any `Err` returned from our response handlers becomes a valid http response to the client - // (e.g., a response with a 404 or 500 status). - - match result { + match route(req, ctx).await { Ok(mut response) => { - if rest_api_config.allow_origin != "" { + metrics::inc_counter_vec(&metrics::BEACON_HTTP_API_SUCCESS_TOTAL, &[&path]); + + if allow_origin != "" { let headers = response.headers_mut(); headers.insert( hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN, - HeaderValue::from_str(&rest_api_config.allow_origin)?, + HeaderValue::from_str(&allow_origin)?, ); headers.insert(hyper::header::VARY, HeaderValue::from_static("Origin")); } debug!( - local_log, + log, "HTTP API request successful"; "path" => path, - "duration_ms" => request_processing_duration.as_millis() + "duration_ms" => Instant::now().duration_since(received_instant).as_millis() ); - metrics::inc_counter(&metrics::SUCCESS_COUNT); Ok(response) } Err(error) => { + metrics::inc_counter_vec(&metrics::BEACON_HTTP_API_ERROR_TOTAL, &[&path]); + debug!( - local_log, + log, "HTTP API request failure"; "path" => path, - "duration_ms" => request_processing_duration.as_millis() + "duration_ms" => Instant::now().duration_since(received_instant).as_millis() ); Ok(error.into()) } } } + +async fn route( + req: Request, + ctx: Arc>, +) -> Result, ApiError> { + let path = req.uri().path().to_string(); + let ctx = ctx.clone(); + let method = req.method().clone(); + let executor = ctx.executor.clone(); + let handler = Handler::new(req, ctx, executor)?; + + match (method, path.as_ref()) { + (Method::GET, "/node/version") => handler + .static_value(version_with_platform()) + .await? + .serde_encodings(), + (Method::GET, "/node/health") => handler + .static_value(Health::observe().map_err(ApiError::ServerError)?) + .await? + .serde_encodings(), + (Method::GET, "/node/syncing") => handler + .allow_body() + .in_blocking_task(|_, ctx| node::syncing(ctx)) + .await? + .serde_encodings(), + (Method::GET, "/network/enr") => handler + .in_core_task(|_, ctx| Ok(ctx.network_globals.local_enr().to_base64())) + .await? + .serde_encodings(), + (Method::GET, "/network/peer_count") => handler + .in_core_task(|_, ctx| Ok(ctx.network_globals.connected_peers())) + .await? + .serde_encodings(), + (Method::GET, "/network/peer_id") => handler + .in_core_task(|_, ctx| Ok(ctx.network_globals.local_peer_id().to_base58())) + .await? + .serde_encodings(), + (Method::GET, "/network/peers") => handler + .in_blocking_task(|_, ctx| { + Ok(ctx + .network_globals + .peers + .read() + .connected_peer_ids() + .map(PeerId::to_string) + .collect::>()) + }) + .await? + .serde_encodings(), + (Method::GET, "/network/listen_port") => handler + .in_core_task(|_, ctx| Ok(ctx.network_globals.listen_port_tcp())) + .await? + .serde_encodings(), + (Method::GET, "/network/listen_addresses") => handler + .in_blocking_task(|_, ctx| Ok(ctx.network_globals.listen_multiaddrs())) + .await? + .serde_encodings(), + (Method::GET, "/beacon/head") => handler + .in_blocking_task(|_, ctx| beacon::get_head(ctx)) + .await? + .all_encodings(), + (Method::GET, "/beacon/heads") => handler + .in_blocking_task(|_, ctx| Ok(beacon::get_heads(ctx))) + .await? + .all_encodings(), + (Method::GET, "/beacon/block") => handler + .in_blocking_task(beacon::get_block) + .await? + .all_encodings(), + (Method::GET, "/beacon/block_root") => handler + .in_blocking_task(beacon::get_block_root) + .await? + .all_encodings(), + (Method::GET, "/beacon/fork") => handler + .in_blocking_task(|_, ctx| Ok(ctx.beacon_chain.head_info()?.fork)) + .await? + .all_encodings(), + (Method::GET, "/beacon/fork/stream") => { + handler.sse_stream(|_, ctx| beacon::stream_forks(ctx)).await + } + (Method::GET, "/beacon/genesis_time") => handler + .in_blocking_task(|_, ctx| Ok(ctx.beacon_chain.head_info()?.genesis_time)) + .await? + .all_encodings(), + (Method::GET, "/beacon/genesis_validators_root") => handler + .in_blocking_task(|_, ctx| Ok(ctx.beacon_chain.head_info()?.genesis_validators_root)) + .await? + .all_encodings(), + (Method::GET, "/beacon/validators") => handler + .in_blocking_task(beacon::get_validators) + .await? + .all_encodings(), + (Method::POST, "/beacon/validators") => handler + .allow_body() + .in_blocking_task(beacon::post_validators) + .await? + .all_encodings(), + (Method::GET, "/beacon/validators/all") => handler + .in_blocking_task(beacon::get_all_validators) + .await? + .all_encodings(), + (Method::GET, "/beacon/validators/active") => handler + .in_blocking_task(beacon::get_active_validators) + .await? + .all_encodings(), + (Method::GET, "/beacon/state") => handler + .in_blocking_task(beacon::get_state) + .await? + .all_encodings(), + (Method::GET, "/beacon/state_root") => handler + .in_blocking_task(beacon::get_state_root) + .await? + .all_encodings(), + (Method::GET, "/beacon/state/genesis") => handler + .in_blocking_task(|_, ctx| beacon::get_genesis_state(ctx)) + .await? + .all_encodings(), + (Method::GET, "/beacon/committees") => handler + .in_blocking_task(beacon::get_committees) + .await? + .all_encodings(), + (Method::POST, "/beacon/proposer_slashing") => handler + .allow_body() + .in_blocking_task(beacon::proposer_slashing) + .await? + .serde_encodings(), + (Method::POST, "/beacon/attester_slashing") => handler + .allow_body() + .in_blocking_task(beacon::attester_slashing) + .await? + .serde_encodings(), + (Method::POST, "/validator/duties") => handler + .allow_body() + .in_blocking_task(validator::post_validator_duties) + .await? + .serde_encodings(), + (Method::POST, "/validator/subscribe") => handler + .allow_body() + .in_blocking_task(validator::post_validator_subscriptions) + .await? + .serde_encodings(), + (Method::GET, "/validator/duties/all") => handler + .in_blocking_task(validator::get_all_validator_duties) + .await? + .serde_encodings(), + (Method::GET, "/validator/duties/active") => handler + .in_blocking_task(validator::get_active_validator_duties) + .await? + .serde_encodings(), + (Method::GET, "/validator/block") => handler + .in_blocking_task(validator::get_new_beacon_block) + .await? + .serde_encodings(), + (Method::POST, "/validator/block") => handler + .allow_body() + .in_blocking_task(validator::publish_beacon_block) + .await? + .serde_encodings(), + (Method::GET, "/validator/attestation") => handler + .in_blocking_task(validator::get_new_attestation) + .await? + .serde_encodings(), + (Method::GET, "/validator/aggregate_attestation") => handler + .in_blocking_task(validator::get_aggregate_attestation) + .await? + .serde_encodings(), + (Method::POST, "/validator/attestations") => handler + .allow_body() + .in_blocking_task(validator::publish_attestations) + .await? + .serde_encodings(), + (Method::POST, "/validator/aggregate_and_proofs") => handler + .allow_body() + .in_blocking_task(validator::publish_aggregate_and_proofs) + .await? + .serde_encodings(), + (Method::GET, "/consensus/global_votes") => handler + .allow_body() + .in_blocking_task(consensus::get_vote_count) + .await? + .serde_encodings(), + (Method::POST, "/consensus/individual_votes") => handler + .allow_body() + .in_blocking_task(consensus::post_individual_votes) + .await? + .serde_encodings(), + (Method::GET, "/spec") => handler + // TODO: this clone is not ideal. + .in_blocking_task(|_, ctx| Ok(ctx.beacon_chain.spec.clone())) + .await? + .serde_encodings(), + (Method::GET, "/spec/slots_per_epoch") => handler + .static_value(T::EthSpec::slots_per_epoch()) + .await? + .serde_encodings(), + (Method::GET, "/spec/eth2_config") => handler + // TODO: this clone is not ideal. + .in_blocking_task(|_, ctx| Ok(ctx.eth2_config.as_ref().clone())) + .await? + .serde_encodings(), + (Method::GET, "/advanced/fork_choice") => handler + .in_blocking_task(|_, ctx| { + Ok(ctx + .beacon_chain + .fork_choice + .read() + .proto_array() + .core_proto_array() + .clone()) + }) + .await? + .serde_encodings(), + (Method::GET, "/advanced/operation_pool") => handler + .in_blocking_task(|_, ctx| { + Ok(PersistedOperationPool::from_operation_pool( + &ctx.beacon_chain.op_pool, + )) + }) + .await? + .serde_encodings(), + (Method::GET, "/metrics") => handler + .in_blocking_task(|_, ctx| metrics::get_prometheus(ctx)) + .await? + .text_encoding(), + (Method::GET, "/lighthouse/syncing") => handler + .in_blocking_task(|_, ctx| Ok(ctx.network_globals.sync_state())) + .await? + .serde_encodings(), + (Method::GET, "/lighthouse/peers") => handler + .in_blocking_task(|_, ctx| lighthouse::peers(ctx)) + .await? + .serde_encodings(), + (Method::GET, "/lighthouse/connected_peers") => handler + .in_blocking_task(|_, ctx| lighthouse::connected_peers(ctx)) + .await? + .serde_encodings(), + _ => Err(ApiError::NotFound( + "Request path and/or method not found.".to_owned(), + )), + } +} diff --git a/beacon_node/rest_api/src/spec.rs b/beacon_node/rest_api/src/spec.rs deleted file mode 100644 index 6fd2575f5..000000000 --- a/beacon_node/rest_api/src/spec.rs +++ /dev/null @@ -1,28 +0,0 @@ -use super::ApiResult; -use crate::response_builder::ResponseBuilder; -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_config::Eth2Config; -use hyper::{Body, Request}; -use std::sync::Arc; -use types::EthSpec; - -/// HTTP handler to return the full spec object. -pub fn get_spec( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(&beacon_chain.spec) -} - -/// HTTP handler to return the full Eth2Config object. -pub fn get_eth2_config( - req: Request, - eth2_config: Arc, -) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(eth2_config.as_ref()) -} - -/// HTTP handler to return the full spec object. -pub fn get_slots_per_epoch(req: Request) -> ApiResult { - ResponseBuilder::new(&req)?.body(&T::EthSpec::slots_per_epoch()) -} diff --git a/beacon_node/rest_api/src/validator.rs b/beacon_node/rest_api/src/validator.rs index 43a87a3ad..724719c5d 100644 --- a/beacon_node/rest_api/src/validator.rs +++ b/beacon_node/rest_api/src/validator.rs @@ -1,15 +1,12 @@ -use crate::helpers::{ - check_content_type_for_json, parse_hex_ssz_bytes, publish_beacon_block_to_network, -}; -use crate::response_builder::ResponseBuilder; -use crate::{ApiError, ApiResult, NetworkChannel, UrlQuery}; +use crate::helpers::{parse_hex_ssz_bytes, publish_beacon_block_to_network}; +use crate::{ApiError, Context, NetworkChannel, UrlQuery}; use beacon_chain::{ attestation_verification::Error as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, StateSkipConfig, }; use bls::PublicKeyBytes; use eth2_libp2p::PubsubMessage; -use hyper::{Body, Request}; +use hyper::Request; use network::NetworkMessage; use rayon::prelude::*; use rest_types::{ValidatorDutiesRequest, ValidatorDutyBytes, ValidatorSubscription}; @@ -17,25 +14,20 @@ use slog::{error, info, trace, warn, Logger}; use std::sync::Arc; use types::beacon_state::EthSpec; use types::{ - Attestation, AttestationData, BeaconState, Epoch, RelativeEpoch, SelectionProof, + Attestation, AttestationData, BeaconBlock, BeaconState, Epoch, RelativeEpoch, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, SubnetId, }; /// HTTP Handler to retrieve the duties for a set of validators during a particular epoch. This /// method allows for collecting bulk sets of validator duties without risking exceeding the max /// URL length with query pairs. -pub async fn post_validator_duties( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { - let response_builder = ResponseBuilder::new(&req); - +pub fn post_validator_duties( + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - serde_json::from_slice::(&chunks) + serde_json::from_slice::(&body) .map_err(|e| { ApiError::BadRequest(format!( "Unable to parse JSON into ValidatorDutiesRequest: {:?}", @@ -44,29 +36,22 @@ pub async fn post_validator_duties( }) .and_then(|bulk_request| { return_validator_duties( - beacon_chain, + &ctx.beacon_chain.clone(), bulk_request.epoch, bulk_request.pubkeys.into_iter().map(Into::into).collect(), ) }) - .and_then(|duties| response_builder?.body_no_ssz(&duties)) } /// HTTP Handler to retrieve subscriptions for a set of validators. This allows the node to /// organise peer discovery and topic subscription for known validators. -pub async fn post_validator_subscriptions( - req: Request, - network_chan: NetworkChannel, -) -> ApiResult { - try_future!(check_content_type_for_json(&req)); - let response_builder = ResponseBuilder::new(&req); - +pub fn post_validator_subscriptions( + req: Request>, + ctx: Arc>, +) -> Result<(), ApiError> { let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - serde_json::from_slice(&chunks) + serde_json::from_slice(&body) .map_err(|e| { ApiError::BadRequest(format!( "Unable to parse JSON into ValidatorSubscriptions: {:?}", @@ -74,7 +59,7 @@ pub async fn post_validator_subscriptions( )) }) .and_then(move |subscriptions: Vec| { - network_chan + ctx.network_chan .send(NetworkMessage::Subscribe { subscriptions }) .map_err(|e| { ApiError::ServerError(format!( @@ -84,19 +69,18 @@ pub async fn post_validator_subscriptions( })?; Ok(()) }) - .and_then(|_| response_builder?.body_no_ssz(&())) } /// HTTP Handler to retrieve all validator duties for the given epoch. pub fn get_all_validator_duties( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let epoch = query.epoch()?; - let state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?; + let state = get_state_for_epoch(&ctx.beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?; let validator_pubkeys = state .validators @@ -104,21 +88,19 @@ pub fn get_all_validator_duties( .map(|validator| validator.pubkey.clone()) .collect(); - let duties = return_validator_duties(beacon_chain, epoch, validator_pubkeys)?; - - ResponseBuilder::new(&req)?.body_no_ssz(&duties) + return_validator_duties(&ctx.beacon_chain, epoch, validator_pubkeys) } /// HTTP Handler to retrieve all active validator duties for the given epoch. pub fn get_active_validator_duties( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let epoch = query.epoch()?; - let state = get_state_for_epoch(&beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?; + let state = get_state_for_epoch(&ctx.beacon_chain, epoch, StateSkipConfig::WithoutStateRoots)?; let validator_pubkeys = state .validators @@ -127,9 +109,7 @@ pub fn get_active_validator_duties( .map(|validator| validator.pubkey.clone()) .collect(); - let duties = return_validator_duties(beacon_chain, epoch, validator_pubkeys)?; - - ResponseBuilder::new(&req)?.body_no_ssz(&duties) + return_validator_duties(&ctx.beacon_chain, epoch, validator_pubkeys) } /// Helper function to return the state that can be used to determine the duties for some `epoch`. @@ -165,7 +145,7 @@ pub fn get_state_for_epoch( /// Helper function to get the duties for some `validator_pubkeys` in some `epoch`. fn return_validator_duties( - beacon_chain: Arc>, + beacon_chain: &BeaconChain, epoch: Epoch, validator_pubkeys: Vec, ) -> Result, ApiError> { @@ -281,10 +261,9 @@ fn return_validator_duties( /// HTTP Handler to produce a new BeaconBlock from the current state, ready to be signed by a validator. pub fn get_new_beacon_block( - req: Request, - beacon_chain: Arc>, - log: Logger, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let slot = query.slot()?; @@ -296,11 +275,12 @@ pub fn get_new_beacon_block( None }; - let (new_block, _state) = beacon_chain + let (new_block, _state) = ctx + .beacon_chain .produce_block(randao_reveal, slot, validator_graffiti) .map_err(|e| { error!( - log, + ctx.log, "Error whilst producing block"; "error" => format!("{:?}", e) ); @@ -311,48 +291,40 @@ pub fn get_new_beacon_block( )) })?; - ResponseBuilder::new(&req)?.body(&new_block) + Ok(new_block) } /// HTTP Handler to publish a SignedBeaconBlock, which has been signed by a validator. -pub async fn publish_beacon_block( - req: Request, - beacon_chain: Arc>, - network_chan: NetworkChannel, - log: Logger, -) -> ApiResult { - try_future!(check_content_type_for_json(&req)); - let response_builder = ResponseBuilder::new(&req); - +pub fn publish_beacon_block( + req: Request>, + ctx: Arc>, +) -> Result<(), ApiError> { let body = req.into_body(); - let chunks = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - serde_json::from_slice(&chunks).map_err(|e| { + serde_json::from_slice(&body).map_err(|e| { ApiError::BadRequest(format!("Unable to parse JSON into SignedBeaconBlock: {:?}", e)) }) .and_then(move |block: SignedBeaconBlock| { let slot = block.slot(); - match beacon_chain.process_block(block.clone()) { + match ctx.beacon_chain.process_block(block.clone()) { Ok(block_root) => { // Block was processed, publish via gossipsub info!( - log, + ctx.log, "Block from local validator"; "block_root" => format!("{}", block_root), "block_slot" => slot, ); - publish_beacon_block_to_network::(network_chan, block)?; + publish_beacon_block_to_network::(&ctx.network_chan, block)?; // Run the fork choice algorithm and enshrine a new canonical head, if // found. // // The new head may or may not be the block we just received. - if let Err(e) = beacon_chain.fork_choice() { + if let Err(e) = ctx.beacon_chain.fork_choice() { error!( - log, + ctx.log, "Failed to find beacon chain head"; "error" => format!("{:?}", e) ); @@ -366,9 +338,9 @@ pub async fn publish_beacon_block( // - Excessive time between block produce and publish. // - A validator is using another beacon node to produce blocks and // submitting them here. - if beacon_chain.head()?.beacon_block_root != block_root { + if ctx.beacon_chain.head()?.beacon_block_root != block_root { warn!( - log, + ctx.log, "Block from validator is not head"; "desc" => "potential re-org", ); @@ -380,7 +352,7 @@ pub async fn publish_beacon_block( } Err(BlockError::BeaconChainError(e)) => { error!( - log, + ctx.log, "Error whilst processing block"; "error" => format!("{:?}", e) ); @@ -392,7 +364,7 @@ pub async fn publish_beacon_block( } Err(other) => { warn!( - log, + ctx.log, "Invalid block from local validator"; "outcome" => format!("{:?}", other) ); @@ -404,41 +376,41 @@ pub async fn publish_beacon_block( } } }) - .and_then(|_| response_builder?.body_no_ssz(&())) } /// HTTP Handler to produce a new Attestation from the current state, ready to be signed by a validator. pub fn get_new_attestation( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let slot = query.slot()?; let index = query.committee_index()?; - let attestation = beacon_chain + ctx.beacon_chain .produce_unaggregated_attestation(slot, index) - .map_err(|e| ApiError::BadRequest(format!("Unable to produce attestation: {:?}", e)))?; - - ResponseBuilder::new(&req)?.body(&attestation) + .map_err(|e| ApiError::BadRequest(format!("Unable to produce attestation: {:?}", e))) } /// HTTP Handler to retrieve the aggregate attestation for a slot pub fn get_aggregate_attestation( - req: Request, - beacon_chain: Arc>, -) -> ApiResult { + req: Request>, + ctx: Arc>, +) -> Result, ApiError> { let query = UrlQuery::from_request(&req)?; let attestation_data = query.attestation_data()?; - match beacon_chain.get_aggregated_attestation(&attestation_data) { - Ok(Some(attestation)) => ResponseBuilder::new(&req)?.body(&attestation), + match ctx + .beacon_chain + .get_aggregated_attestation(&attestation_data) + { + Ok(Some(attestation)) => Ok(attestation), Ok(None) => Err(ApiError::NotFound(format!( "No matching aggregate attestation for slot {:?} is known in slot {:?}", attestation_data.slot, - beacon_chain.slot() + ctx.beacon_chain.slot() ))), Err(e) => Err(ApiError::ServerError(format!( "Unable to obtain attestation: {:?}", @@ -448,22 +420,13 @@ pub fn get_aggregate_attestation( } /// HTTP Handler to publish a list of Attestations, which have been signed by a number of validators. -pub async fn publish_attestations( - req: Request, - beacon_chain: Arc>, - network_chan: NetworkChannel, - log: Logger, -) -> ApiResult { - try_future!(check_content_type_for_json(&req)); - let response_builder = ResponseBuilder::new(&req); +pub fn publish_attestations( + req: Request>, + ctx: Arc>, +) -> Result<(), ApiError> { + let bytes = req.into_body(); - let body = req.into_body(); - let chunk = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - - let chunks = chunk.iter().cloned().collect::>(); - serde_json::from_slice(&chunks.as_slice()) + serde_json::from_slice(&bytes) .map_err(|e| { ApiError::BadRequest(format!( "Unable to deserialize JSON into a list of attestations: {:?}", @@ -478,12 +441,12 @@ pub async fn publish_attestations( .enumerate() .map(|(i, (attestation, subnet_id))| { process_unaggregated_attestation( - &beacon_chain, - network_chan.clone(), + &ctx.beacon_chain, + ctx.network_chan.clone(), attestation, subnet_id, i, - &log, + &ctx.log, ) }) .collect::>>() @@ -493,7 +456,7 @@ pub async fn publish_attestations( // // Note: this will only provide info about the _first_ failure, not all failures. .and_then(|processing_results| processing_results.into_iter().try_for_each(|result| result)) - .and_then(|_| response_builder?.body_no_ssz(&())) + .map(|_| ()) } /// Processes an unaggregrated attestation that was included in a list of attestations with the @@ -566,21 +529,13 @@ fn process_unaggregated_attestation( } /// HTTP Handler to publish an Attestation, which has been signed by a validator. -#[allow(clippy::redundant_clone)] // false positives in this function. -pub async fn publish_aggregate_and_proofs( - req: Request, - beacon_chain: Arc>, - network_chan: NetworkChannel, - log: Logger, -) -> ApiResult { - try_future!(check_content_type_for_json(&req)); - let response_builder = ResponseBuilder::new(&req); +pub fn publish_aggregate_and_proofs( + req: Request>, + ctx: Arc>, +) -> Result<(), ApiError> { let body = req.into_body(); - let chunk = hyper::body::to_bytes(body) - .await - .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; - let chunks = chunk.iter().cloned().collect::>(); - serde_json::from_slice(&chunks.as_slice()) + + serde_json::from_slice(&body) .map_err(|e| { ApiError::BadRequest(format!( "Unable to deserialize JSON into a list of SignedAggregateAndProof: {:?}", @@ -595,11 +550,11 @@ pub async fn publish_aggregate_and_proofs( .enumerate() .map(|(i, signed_aggregate)| { process_aggregated_attestation( - &beacon_chain, - network_chan.clone(), + &ctx.beacon_chain, + ctx.network_chan.clone(), signed_aggregate, i, - &log, + &ctx.log, ) }) .collect::>>() @@ -609,7 +564,6 @@ pub async fn publish_aggregate_and_proofs( // // Note: this will only provide info about the _first_ failure, not all failures. .and_then(|processing_results| processing_results.into_iter().try_for_each(|result| result)) - .and_then(|_| response_builder?.body_no_ssz(&())) } /// Processes an aggregrated attestation that was included in a list of attestations with the index diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index 05abc1609..160ee667c 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -804,7 +804,7 @@ fn get_version() { let version = env .runtime() .block_on(remote_node.http.node().get_version()) - .expect("should fetch eth2 config from http api"); + .expect("should fetch version from http api"); assert_eq!( lighthouse_version::version_with_platform(), diff --git a/common/rest_types/Cargo.toml b/common/rest_types/Cargo.toml index 51e69dcca..d9e021fe1 100644 --- a/common/rest_types/Cargo.toml +++ b/common/rest_types/Cargo.toml @@ -14,6 +14,13 @@ state_processing = { path = "../../consensus/state_processing" } bls = { path = "../../crypto/bls" } serde = { version = "1.0.110", features = ["derive"] } rayon = "1.3.0" +hyper = "0.13.5" +tokio = { version = "0.2.21", features = ["sync"] } +environment = { path = "../../lighthouse/environment" } +store = { path = "../../beacon_node/store" } +beacon_chain = { path = "../../beacon_node/beacon_chain" } +serde_json = "1.0.52" +serde_yaml = "0.8.11" [target.'cfg(target_os = "linux")'.dependencies] psutil = "3.1.0" diff --git a/beacon_node/rest_api/src/error.rs b/common/rest_types/src/api_error.rs similarity index 100% rename from beacon_node/rest_api/src/error.rs rename to common/rest_types/src/api_error.rs diff --git a/common/rest_types/src/handler.rs b/common/rest_types/src/handler.rs new file mode 100644 index 000000000..cbbcd73b1 --- /dev/null +++ b/common/rest_types/src/handler.rs @@ -0,0 +1,247 @@ +use crate::{ApiError, ApiResult}; +use environment::TaskExecutor; +use hyper::header; +use hyper::{Body, Request, Response, StatusCode}; +use serde::Deserialize; +use serde::Serialize; +use ssz::Encode; + +/// Defines the encoding for the API. +#[derive(Clone, Serialize, Deserialize, Copy)] +pub enum ApiEncodingFormat { + JSON, + YAML, + SSZ, +} + +impl ApiEncodingFormat { + pub fn get_content_type(&self) -> &str { + match self { + ApiEncodingFormat::JSON => "application/json", + ApiEncodingFormat::YAML => "application/yaml", + ApiEncodingFormat::SSZ => "application/ssz", + } + } +} + +impl From<&str> for ApiEncodingFormat { + fn from(f: &str) -> ApiEncodingFormat { + match f { + "application/yaml" => ApiEncodingFormat::YAML, + "application/ssz" => ApiEncodingFormat::SSZ, + _ => ApiEncodingFormat::JSON, + } + } +} + +/// Provides a HTTP request handler with Lighthouse-specific functionality. +pub struct Handler { + executor: TaskExecutor, + req: Request<()>, + body: Body, + ctx: T, + encoding: ApiEncodingFormat, + allow_body: bool, +} + +impl Handler { + /// Start handling a new request. + pub fn new(req: Request, ctx: T, executor: TaskExecutor) -> Result { + let (req_parts, body) = req.into_parts(); + let req = Request::from_parts(req_parts, ()); + + let accept_header: String = req + .headers() + .get(header::ACCEPT) + .map_or(Ok(""), |h| h.to_str()) + .map_err(|e| { + ApiError::BadRequest(format!( + "The Accept header contains invalid characters: {:?}", + e + )) + }) + .map(String::from)?; + + Ok(Self { + executor, + req, + body, + ctx, + allow_body: false, + encoding: ApiEncodingFormat::from(accept_header.as_str()), + }) + } + + /// The default behaviour is to return an error if any body is supplied in the request. Calling + /// this function disables that error. + pub fn allow_body(mut self) -> Self { + self.allow_body = true; + self + } + + /// Return a simple static value. + /// + /// Does not use the blocking executor. + pub async fn static_value(self, value: V) -> Result, ApiError> { + // Always check and disallow a body for a static value. + let _ = Self::get_body(self.body, false).await?; + + Ok(HandledRequest { + value, + encoding: self.encoding, + }) + } + + /// Calls `func` in-line, on the core executor. + /// + /// This should only be used for very fast tasks. + pub async fn in_core_task(self, func: F) -> Result, ApiError> + where + V: Send + Sync + 'static, + F: Fn(Request>, T) -> Result + Send + Sync + 'static, + { + let body = Self::get_body(self.body, self.allow_body).await?; + let (req_parts, _) = self.req.into_parts(); + let req = Request::from_parts(req_parts, body); + + let value = func(req, self.ctx)?; + + Ok(HandledRequest { + value, + encoding: self.encoding, + }) + } + + /// Spawns `func` on the blocking executor. + /// + /// This method is suitable for handling long-running or intensive tasks. + pub async fn in_blocking_task(self, func: F) -> Result, ApiError> + where + V: Send + Sync + 'static, + F: Fn(Request>, T) -> Result + Send + Sync + 'static, + { + let ctx = self.ctx; + let body = Self::get_body(self.body, self.allow_body).await?; + let (req_parts, _) = self.req.into_parts(); + let req = Request::from_parts(req_parts, body); + + let value = self + .executor + .clone() + .handle + .spawn_blocking(move || func(req, ctx)) + .await + .map_err(|e| { + ApiError::ServerError(format!( + "Failed to get blocking join handle: {}", + e.to_string() + )) + })??; + + Ok(HandledRequest { + value, + encoding: self.encoding, + }) + } + + /// Call `func`, then return a response that is suitable for an SSE stream. + pub async fn sse_stream(self, func: F) -> ApiResult + where + F: Fn(Request<()>, T) -> Result, + { + let body = func(self.req, self.ctx)?; + + Response::builder() + .status(200) + .header("Content-Type", "text/event-stream") + .header("Connection", "Keep-Alive") + .header("Cache-Control", "no-cache") + .header("Access-Control-Allow-Origin", "*") + .body(body) + .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) + } + + /// Downloads the bytes for `body`. + async fn get_body(body: Body, allow_body: bool) -> Result, ApiError> { + let bytes = hyper::body::to_bytes(body) + .await + .map_err(|e| ApiError::ServerError(format!("Unable to get request body: {:?}", e)))?; + + if !allow_body && !bytes[..].is_empty() { + Err(ApiError::BadRequest( + "The request body must be empty".to_string(), + )) + } else { + Ok(bytes.into_iter().collect()) + } + } +} + +/// A request that has been "handled" and now a result (`value`) needs to be serialize and +/// returned. +pub struct HandledRequest { + encoding: ApiEncodingFormat, + value: V, +} + +impl HandledRequest { + /// Simple encode a string as utf-8. + pub fn text_encoding(self) -> ApiResult { + Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/plain; charset=utf-8") + .body(Body::from(self.value)) + .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) + } +} + +impl HandledRequest { + /// Suitable for all items which implement `serde` and `ssz`. + pub fn all_encodings(self) -> ApiResult { + match self.encoding { + ApiEncodingFormat::SSZ => Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/ssz") + .body(Body::from(self.value.as_ssz_bytes())) + .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))), + _ => self.serde_encodings(), + } + } +} + +impl HandledRequest { + /// Suitable for items which only implement `serde`. + pub fn serde_encodings(self) -> ApiResult { + let (body, content_type) = match self.encoding { + ApiEncodingFormat::JSON => ( + Body::from(serde_json::to_string(&self.value).map_err(|e| { + ApiError::ServerError(format!( + "Unable to serialize response body as JSON: {:?}", + e + )) + })?), + "application/json", + ), + ApiEncodingFormat::SSZ => { + return Err(ApiError::UnsupportedType( + "Response cannot be encoded as SSZ.".into(), + )); + } + ApiEncodingFormat::YAML => ( + Body::from(serde_yaml::to_string(&self.value).map_err(|e| { + ApiError::ServerError(format!( + "Unable to serialize response body as YAML: {:?}", + e + )) + })?), + "application/yaml", + ), + }; + + Response::builder() + .status(StatusCode::OK) + .header("content-type", content_type) + .body(body) + .map_err(|e| ApiError::ServerError(format!("Failed to build response: {:?}", e))) + } +} diff --git a/common/rest_types/src/lib.rs b/common/rest_types/src/lib.rs index 79a66e034..1bedd1cad 100644 --- a/common/rest_types/src/lib.rs +++ b/common/rest_types/src/lib.rs @@ -2,20 +2,21 @@ //! //! This is primarily used by the validator client and the beacon node rest API. +mod api_error; mod beacon; mod consensus; +mod handler; mod node; mod validator; +pub use api_error::{ApiError, ApiResult}; pub use beacon::{ BlockResponse, CanonicalHeadResponse, Committee, HeadBeaconBlock, StateResponse, ValidatorRequest, ValidatorResponse, }; - +pub use consensus::{IndividualVote, IndividualVotesRequest, IndividualVotesResponse}; +pub use handler::{ApiEncodingFormat, Handler}; +pub use node::{Health, SyncingResponse, SyncingStatus}; pub use validator::{ ValidatorDutiesRequest, ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription, }; - -pub use consensus::{IndividualVote, IndividualVotesRequest, IndividualVotesResponse}; - -pub use node::{Health, SyncingResponse, SyncingStatus}; diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 299c525a7..18db8d340 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -27,7 +27,7 @@ pub struct ProtoNode { best_descendant: Option, } -#[derive(PartialEq, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct ProtoArray { /// Do not attempt to prune the tree unless it has at least this many nodes. Small prunes /// simply waste time. diff --git a/lighthouse/environment/src/executor.rs b/lighthouse/environment/src/executor.rs index 00b1d4b15..b5f415187 100644 --- a/lighthouse/environment/src/executor.rs +++ b/lighthouse/environment/src/executor.rs @@ -8,7 +8,7 @@ use tokio::runtime::Handle; #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned - pub(crate) handle: Handle, + pub handle: Handle, /// The receiver exit future which on receiving shuts down the task pub(crate) exit: exit_future::Exit, /// Sender given to tasks, so that if they encounter a state in which execution cannot