diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d1beca9d4..25f051ac1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -47,7 +47,10 @@ use types::{ SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; -use version::{fork_versioned_response, unsupported_version_rejection, V1}; +use version::{ + add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection, + unsupported_version_rejection, V1, +}; use warp::http::StatusCode; use warp::sse::Event; use warp::Reply; @@ -1003,6 +1006,9 @@ pub fn serve( accept_header: Option| { blocking_task(move || { let block = block_id.block(&chain)?; + let fork_name = block + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; match accept_header { Some(api_types::Accept::Ssz) => Response::builder() .status(200) @@ -1014,12 +1020,10 @@ pub fn serve( e )) }), - _ => { - let fork_name = block.fork_name(&chain.spec).ok(); - fork_versioned_response(endpoint_version, fork_name, block) - .map(|res| warp::reply::json(&res).into_response()) - } + _ => fork_versioned_response(endpoint_version, fork_name, block) + .map(|res| warp::reply::json(&res).into_response()), } + .map(|resp| add_consensus_version_header(resp, fork_name)) }) }, ); @@ -1459,10 +1463,14 @@ pub fn serve( blocking_task(move || match accept_header { Some(api_types::Accept::Ssz) => { let state = state_id.state(&chain)?; + let fork_name = state + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; Response::builder() .status(200) .header("Content-Type", "application/octet-stream") .body(state.as_ssz_bytes().into()) + .map(|resp| add_consensus_version_header(resp, fork_name)) .map_err(|e| { warp_utils::reject::custom_server_error(format!( "failed to create response: {}", @@ -1471,9 +1479,14 @@ pub fn serve( }) } _ => state_id.map_state(&chain, |state| { - let fork_name = state.fork_name(&chain.spec).ok(); + let fork_name = state + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; let res = fork_versioned_response(endpoint_version, fork_name, &state)?; - Ok(warp::reply::json(&res).into_response()) + Ok(add_consensus_version_header( + warp::reply::json(&res).into_response(), + fork_name, + )) }), }) }, @@ -1821,7 +1834,10 @@ pub fn serve( let (block, _) = chain .produce_block(randao_reveal, slot, query.graffiti.map(Into::into)) .map_err(warp_utils::reject::block_production_error)?; - let fork_name = block.to_ref().fork_name(&chain.spec).ok(); + let fork_name = block + .to_ref() + .fork_name(&chain.spec) + .map_err(inconsistent_fork_rejection)?; fork_versioned_response(endpoint_version, fork_name, block) }) }, diff --git a/beacon_node/http_api/src/version.rs b/beacon_node/http_api/src/version.rs index db891727e..854ef0c85 100644 --- a/beacon_node/http_api/src/version.rs +++ b/beacon_node/http_api/src/version.rs @@ -1,19 +1,21 @@ use crate::api_types::{EndpointVersion, ForkVersionedResponse}; +use eth2::CONSENSUS_VERSION_HEADER; use serde::Serialize; -use types::ForkName; +use types::{ForkName, InconsistentFork}; +use warp::reply::{self, Reply, WithHeader}; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); pub fn fork_versioned_response( endpoint_version: EndpointVersion, - fork_name: Option, + fork_name: ForkName, data: T, ) -> Result, warp::reject::Rejection> { let fork_name = if endpoint_version == V1 { None } else if endpoint_version == V2 { - fork_name + Some(fork_name) } else { return Err(unsupported_version_rejection(endpoint_version)); }; @@ -23,6 +25,15 @@ pub fn fork_versioned_response( }) } +/// Add the `Eth-Consensus-Version` header to a response. +pub fn add_consensus_version_header(reply: T, fork_name: ForkName) -> WithHeader { + reply::with_header(reply, CONSENSUS_VERSION_HEADER, fork_name.to_string()) +} + +pub fn inconsistent_fork_rejection(error: InconsistentFork) -> warp::reject::Rejection { + warp_utils::reject::custom_server_error(format!("wrong fork: {:?}", error)) +} + pub fn unsupported_version_rejection(version: EndpointVersion) -> warp::reject::Rejection { warp_utils::reject::custom_bad_request(format!("Unsupported endpoint version: {}", version)) } diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index d6def5d19..95f087130 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -5,9 +5,12 @@ use beacon_chain::{ BeaconChain, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; use environment::null_logger; -use eth2::Error; -use eth2::StatusCode; -use eth2::{types::*, BeaconNodeHttpClient, Timeouts}; +use eth2::{ + mixin::{RequestAccept, ResponseForkName, ResponseOptional}, + reqwest::RequestBuilder, + types::*, + BeaconNodeHttpClient, Error, StatusCode, Timeouts, +}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; use lighthouse_network::{Enr, EnrExt, PeerId}; @@ -952,6 +955,7 @@ impl ApiTester { } } + // Check the JSON endpoint. let json_result = self.client.get_beacon_blocks(block_id).await.unwrap(); if let (Some(json), Some(expected)) = (&json_result, &expected) { @@ -965,6 +969,7 @@ impl ApiTester { assert_eq!(expected, None); } + // Check the SSZ endpoint. let ssz_result = self .client .get_beacon_blocks_ssz(block_id, &self.chain.spec) @@ -981,6 +986,34 @@ impl ApiTester { assert_eq!(v1_result, None); assert_eq!(expected, None); } + + // Check that version headers are provided. + let url = self.client.get_beacon_blocks_path(block_id).unwrap(); + + let builders: Vec RequestBuilder> = vec![ + |b| b, + |b| b.accept(Accept::Ssz), + |b| b.accept(Accept::Json), + |b| b.accept(Accept::Any), + ]; + + for req_builder in builders { + let raw_res = self + .client + .get_response(url.clone(), req_builder) + .await + .optional() + .unwrap(); + if let (Some(raw_res), Some(expected)) = (&raw_res, &expected) { + assert_eq!( + raw_res.fork_name_from_header().unwrap(), + Some(expected.fork_name(&self.chain.spec).unwrap()) + ); + } else { + assert!(raw_res.is_none()); + assert_eq!(expected, None); + } + } } self @@ -1440,6 +1473,30 @@ impl ApiTester { assert_eq!(result_v1, None); assert_eq!(expected, None); } + + // Check that version headers are provided. + let url = self.client.get_debug_beacon_states_path(state_id).unwrap(); + + let builders: Vec RequestBuilder> = + vec![|b| b, |b| b.accept(Accept::Ssz)]; + + for req_builder in builders { + let raw_res = self + .client + .get_response(url.clone(), req_builder) + .await + .optional() + .unwrap(); + if let (Some(raw_res), Some(expected)) = (&raw_res, &expected) { + assert_eq!( + raw_res.fork_name_from_header().unwrap(), + Some(expected.fork_name(&self.chain.spec).unwrap()) + ); + } else { + assert!(raw_res.is_none()); + assert_eq!(expected, None); + } + } } self diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs index c43332346..9c8fcc4b7 100644 --- a/beacon_node/store/src/partial_beacon_state.rs +++ b/beacon_node/store/src/partial_beacon_state.rs @@ -176,16 +176,13 @@ impl PartialBeaconState { )?; let slot = Slot::from_ssz_bytes(slot_bytes)?; - let epoch = slot.epoch(T::slots_per_epoch()); + let fork_at_slot = spec.fork_name_at_slot::(slot); - if spec - .altair_fork_epoch - .map_or(true, |altair_epoch| epoch < altair_epoch) - { - PartialBeaconStateBase::from_ssz_bytes(bytes).map(Self::Base) - } else { - PartialBeaconStateAltair::from_ssz_bytes(bytes).map(Self::Altair) - } + Ok(map_fork_name!( + fork_at_slot, + Self, + <_>::from_ssz_bytes(bytes)? + )) } /// Prepare the partial state for storage in the KV database. diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 97bcdc1b3..bdad67286 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -10,14 +10,17 @@ #[cfg(feature = "lighthouse")] pub mod lighthouse; pub mod lighthouse_vc; +pub mod mixin; pub mod types; +use self::mixin::{RequestAccept, ResponseForkName, ResponseOptional}; use self::types::{Error as ResponseError, *}; +use ::types::map_fork_name_with; use futures::Stream; use futures_util::StreamExt; use lighthouse_network::PeerId; pub use reqwest; -use reqwest::{IntoUrl, Response}; +use reqwest::{IntoUrl, RequestBuilder, Response}; pub use reqwest::{StatusCode, Url}; use sensitive_url::SensitiveUrl; use serde::{de::DeserializeOwned, Serialize}; @@ -29,6 +32,8 @@ use std::time::Duration; pub const V1: EndpointVersion = EndpointVersion(1); pub const V2: EndpointVersion = EndpointVersion(2); +pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; + #[derive(Debug)] pub enum Error { /// The `reqwest` client raised an error. @@ -55,6 +60,12 @@ pub enum Error { InvalidSsz(ssz::DecodeError), } +impl From for Error { + fn from(error: reqwest::Error) -> Self { + Error::Reqwest(error) + } +} + impl Error { /// If the error has a HTTP status code, return it. pub fn status(&self) -> Option { @@ -161,12 +172,18 @@ impl BeaconNodeHttpClient { /// Perform a HTTP GET request. async fn get(&self, url: U) -> Result { - let response = self.client.get(url).send().await.map_err(Error::Reqwest)?; - ok_or_error(response) - .await? - .json() - .await - .map_err(Error::Reqwest) + let response = self.get_response(url, |b| b).await?; + Ok(response.json().await?) + } + + /// Perform an HTTP GET request, returning the `Response` for processing. + pub async fn get_response( + &self, + url: U, + builder: impl FnOnce(RequestBuilder) -> RequestBuilder, + ) -> Result { + let response = builder(self.client.get(url)).send().await?; + ok_or_error(response).await } /// Perform a HTTP GET request with a custom timeout. @@ -176,31 +193,16 @@ impl BeaconNodeHttpClient { timeout: Duration, ) -> Result { let response = self - .client - .get(url) - .timeout(timeout) - .send() - .await - .map_err(Error::Reqwest)?; - ok_or_error(response) - .await? - .json() - .await - .map_err(Error::Reqwest) + .get_response(url, |builder| builder.timeout(timeout)) + .await?; + Ok(response.json().await?) } /// Perform a HTTP GET request, returning `None` on a 404 error. async fn get_opt(&self, url: U) -> Result, Error> { - let response = self.client.get(url).send().await.map_err(Error::Reqwest)?; - match ok_or_error(response).await { - Ok(resp) => resp.json().await.map(Option::Some).map_err(Error::Reqwest), - Err(err) => { - if err.status() == Some(StatusCode::NOT_FOUND) { - Ok(None) - } else { - Err(err) - } - } + match self.get_response(url, |b| b).await.optional()? { + Some(response) => Ok(Some(response.json().await?)), + None => Ok(None), } } @@ -210,22 +212,13 @@ impl BeaconNodeHttpClient { url: U, timeout: Duration, ) -> Result, Error> { - let response = self - .client - .get(url) - .timeout(timeout) - .send() + let opt_response = self + .get_response(url, |b| b.timeout(timeout)) .await - .map_err(Error::Reqwest)?; - match ok_or_error(response).await { - Ok(resp) => resp.json().await.map(Option::Some).map_err(Error::Reqwest), - Err(err) => { - if err.status() == Some(StatusCode::NOT_FOUND) { - Ok(None) - } else { - Err(err) - } - } + .optional()?; + match opt_response { + Some(response) => Ok(Some(response.json().await?)), + None => Ok(None), } } @@ -235,28 +228,13 @@ impl BeaconNodeHttpClient { url: U, accept_header: Accept, ) -> Result>, Error> { - let response = self - .client - .get(url) - .header(ACCEPT, accept_header.to_string()) - .send() + let opt_response = self + .get_response(url, |b| b.accept(accept_header)) .await - .map_err(Error::Reqwest)?; - match ok_or_error(response).await { - Ok(resp) => Ok(Some( - resp.bytes() - .await - .map_err(Error::Reqwest)? - .into_iter() - .collect::>(), - )), - Err(err) => { - if err.status() == Some(StatusCode::NOT_FOUND) { - Ok(None) - } else { - Err(err) - } - } + .optional()?; + match opt_response { + Some(resp) => Ok(Some(resp.bytes().await?.into_iter().collect::>())), + None => Ok(None), } } @@ -315,7 +293,7 @@ impl BeaconNodeHttpClient { if let Some(timeout) = timeout { builder = builder.timeout(timeout); } - let response = builder.json(body).send().await.map_err(Error::Reqwest)?; + let response = builder.json(body).send().await?; ok_or_error(response).await } @@ -607,6 +585,17 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Path for `v2/beacon/blocks` + pub fn get_beacon_blocks_path(&self, block_id: BlockId) -> Result { + let mut path = self.eth_path(V2)?; + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blocks") + .push(&block_id.to_string()); + Ok(path) + } + /// `GET v2/beacon/blocks` /// /// Returns `Ok(None)` on a 404 error. @@ -614,15 +603,30 @@ impl BeaconNodeHttpClient { &self, block_id: BlockId, ) -> Result>>, Error> { - let mut path = self.eth_path(V2)?; + let path = self.get_beacon_blocks_path(block_id)?; + let response = match self.get_response(path, |b| b).await.optional()? { + Some(res) => res, + None => return Ok(None), + }; - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("beacon") - .push("blocks") - .push(&block_id.to_string()); - - self.get_opt(path).await + // If present, use the fork provided in the headers to decode the block. Gracefully handle + // missing and malformed fork names by falling back to regular deserialisation. + let (block, version) = match response.fork_name_from_header() { + Ok(Some(fork_name)) => { + map_fork_name_with!(fork_name, SignedBeaconBlock, { + let ForkVersionedResponse { version, data } = response.json().await?; + (data, version) + }) + } + Ok(None) | Err(_) => { + let ForkVersionedResponse { version, data } = response.json().await?; + (data, version) + } + }; + Ok(Some(ForkVersionedResponse { + version, + data: block, + })) } /// `GET v1/beacon/blocks` (LEGACY) @@ -651,13 +655,7 @@ impl BeaconNodeHttpClient { block_id: BlockId, spec: &ChainSpec, ) -> Result>, Error> { - let mut path = self.eth_path(V2)?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("beacon") - .push("blocks") - .push(&block_id.to_string()); + let path = self.get_beacon_blocks_path(block_id)?; self.get_bytes_opt_accept_header(path, Accept::Ssz) .await? @@ -966,13 +964,7 @@ impl BeaconNodeHttpClient { .push("node") .push("health"); - let status = self - .client - .get(path) - .send() - .await - .map_err(Error::Reqwest)? - .status(); + let status = self.client.get(path).send().await?.status(); if status == StatusCode::OK || status == StatusCode::PARTIAL_CONTENT { Ok(status) } else { @@ -1042,11 +1034,8 @@ impl BeaconNodeHttpClient { self.get(path).await } - /// `GET v2/debug/beacon/states/{state_id}` - pub async fn get_debug_beacon_states( - &self, - state_id: StateId, - ) -> Result>>, Error> { + /// URL path for `v2/debug/beacon/states/{state_id}`. + pub fn get_debug_beacon_states_path(&self, state_id: StateId) -> Result { let mut path = self.eth_path(V2)?; path.path_segments_mut() @@ -1055,7 +1044,15 @@ impl BeaconNodeHttpClient { .push("beacon") .push("states") .push(&state_id.to_string()); + Ok(path) + } + /// `GET v2/debug/beacon/states/{state_id}` + pub async fn get_debug_beacon_states( + &self, + state_id: StateId, + ) -> Result>>, Error> { + let path = self.get_debug_beacon_states_path(state_id)?; self.get_opt(path).await } @@ -1083,14 +1080,7 @@ impl BeaconNodeHttpClient { state_id: StateId, spec: &ChainSpec, ) -> Result>, Error> { - let mut path = self.eth_path(V1)?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("debug") - .push("beacon") - .push("states") - .push(&state_id.to_string()); + let path = self.get_debug_beacon_states_path(state_id)?; self.get_bytes_opt_accept_header(path, Accept::Ssz) .await? @@ -1343,8 +1333,7 @@ impl BeaconNodeHttpClient { .client .get(path) .send() - .await - .map_err(Error::Reqwest)? + .await? .bytes_stream() .map(|next| match next { Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()), diff --git a/common/eth2/src/mixin.rs b/common/eth2/src/mixin.rs new file mode 100644 index 000000000..1de26961e --- /dev/null +++ b/common/eth2/src/mixin.rs @@ -0,0 +1,50 @@ +use crate::{types::Accept, Error, CONSENSUS_VERSION_HEADER}; +use reqwest::{header::ACCEPT, RequestBuilder, Response, StatusCode}; +use std::str::FromStr; +use types::ForkName; + +/// Trait for converting a 404 error into an `Option`. +pub trait ResponseOptional { + fn optional(self) -> Result, Error>; +} + +impl ResponseOptional for Result { + fn optional(self) -> Result, Error> { + match self { + Ok(x) => Ok(Some(x)), + Err(e) if e.status() == Some(StatusCode::NOT_FOUND) => Ok(None), + Err(e) => Err(e), + } + } +} + +/// Trait for extracting the fork name from the headers of a response. +pub trait ResponseForkName { + #[allow(clippy::result_unit_err)] + fn fork_name_from_header(&self) -> Result, ()>; +} + +impl ResponseForkName for Response { + fn fork_name_from_header(&self) -> Result, ()> { + self.headers() + .get(CONSENSUS_VERSION_HEADER) + .map(|fork_name| { + fork_name + .to_str() + .map_err(|_| ()) + .and_then(ForkName::from_str) + }) + .transpose() + } +} + +/// Trait for adding an "accept" header to a request builder. +pub trait RequestAccept { + fn accept(self, accept: Accept) -> RequestBuilder; +} + +impl RequestAccept for RequestBuilder { + fn accept(self, accept: Accept) -> RequestBuilder { + self.header(ACCEPT, accept.to_string()) + } +} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 3f9950870..42131b49c 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -3,7 +3,6 @@ use crate::Error as ServerError; use lighthouse_network::{ConnectionDirection, Enr, Multiaddr, PeerConnectionStatus}; -pub use reqwest::header::ACCEPT; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::fmt; @@ -213,7 +212,6 @@ impl<'a, T: Serialize> From<&'a T> for GenericResponseRef<'a, T> { } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -// #[serde(bound = "T: Serialize + serde::de::DeserializeOwned")] pub struct ForkVersionedResponse { #[serde(skip_serializing_if = "Option::is_none")] pub version: Option, diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index 613e39adc..f11b92148 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -81,16 +81,13 @@ impl BeaconBlock { })?; let slot = Slot::from_ssz_bytes(slot_bytes)?; - let epoch = slot.epoch(T::slots_per_epoch()); + let fork_at_slot = spec.fork_name_at_slot::(slot); - if spec - .altair_fork_epoch - .map_or(true, |altair_epoch| epoch < altair_epoch) - { - BeaconBlockBase::from_ssz_bytes(bytes).map(Self::Base) - } else { - BeaconBlockAltair::from_ssz_bytes(bytes).map(Self::Altair) - } + Ok(map_fork_name!( + fork_at_slot, + Self, + <_>::from_ssz_bytes(bytes)? + )) } /// Try decoding each beacon block variant in sequence. diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index d4396f494..a12f35143 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -411,16 +411,13 @@ impl BeaconState { })?; let slot = Slot::from_ssz_bytes(slot_bytes)?; - let epoch = slot.epoch(T::slots_per_epoch()); + let fork_at_slot = spec.fork_name_at_slot::(slot); - if spec - .altair_fork_epoch - .map_or(true, |altair_epoch| epoch < altair_epoch) - { - BeaconStateBase::from_ssz_bytes(bytes).map(Self::Base) - } else { - BeaconStateAltair::from_ssz_bytes(bytes).map(Self::Altair) - } + Ok(map_fork_name!( + fork_at_slot, + Self, + <_>::from_ssz_bytes(bytes)? + )) } /// Returns the `tree_hash_root` of the state. diff --git a/consensus/types/src/fork_name.rs b/consensus/types/src/fork_name.rs index 15d5eab6a..85ba35e39 100644 --- a/consensus/types/src/fork_name.rs +++ b/consensus/types/src/fork_name.rs @@ -54,6 +54,43 @@ impl ForkName { } } +/// Map a fork name into a fork-versioned superstruct type like `BeaconBlock`. +/// +/// The `$body` expression is where the magic happens. The macro allows us to achieve polymorphism +/// in the return type, which is not usually possible in Rust without trait objects. +/// +/// E.g. you could call `map_fork_name!(fork, BeaconBlock, serde_json::from_str(s))` to decode +/// different `BeaconBlock` variants depending on the value of `fork`. Note how the type of the body +/// will change between `BeaconBlockBase` and `BeaconBlockAltair` depending on which branch is +/// taken, the important thing is that they are re-unified by injecting them back into the +/// `BeaconBlock` parent enum. +/// +/// If you would also like to extract additional data alongside the superstruct type, use +/// the more flexible `map_fork_name_with` macro. +#[macro_export] +macro_rules! map_fork_name { + ($fork_name:expr, $t:tt, $body:expr) => { + map_fork_name_with!($fork_name, $t, { ($body, ()) }).0 + }; +} + +/// Map a fork name into a tuple of `(t, extra)` where `t` is a superstruct type. +#[macro_export] +macro_rules! map_fork_name_with { + ($fork_name:expr, $t:tt, $body:block) => { + match $fork_name { + ForkName::Base => { + let (value, extra_data) = $body; + ($t::Base(value), extra_data) + } + ForkName::Altair => { + let (value, extra_data) = $body; + ($t::Altair(value), extra_data) + } + } + }; +} + impl FromStr for ForkName { type Err = ();