diff --git a/Cargo.lock b/Cargo.lock index ec8ec6f5f..a7e831123 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3524,6 +3524,7 @@ version = "0.1.0" dependencies = [ "beacon_chain", "bs58", + "bytes", "directory", "environment", "eth1", diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 2b117b26c..4b4a28b51 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -42,6 +42,7 @@ operation_pool = { path = "../operation_pool" } sensitive_url = { path = "../../common/sensitive_url" } unused_port = {path = "../../common/unused_port"} store = { path = "../store" } +bytes = "1.1.0" [dev-dependencies] environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b45c4285a..7d1475809 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -29,6 +29,7 @@ use beacon_chain::{ BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped, }; pub use block_id::BlockId; +use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; use eth2::types::{ self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode, @@ -1236,6 +1237,41 @@ pub fn serve( }, ); + let post_beacon_blocks_ssz = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blocks")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .and_then( + |block_bytes: Bytes, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| async move { + let block = match SignedBeaconBlock::::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) { + Ok(data) => data, + Err(e) => { + return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e))) + } + }; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(Arc::new(block)), + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }, + ); + let post_beacon_blocks_v2 = eth_v2 .and(warp::path("beacon")) .and(warp::path("blocks")) @@ -1274,6 +1310,57 @@ pub fn serve( }, ); + let post_beacon_blocks_v2_ssz = eth_v2 + .and(warp::path("beacon")) + .and(warp::path("blocks")) + .and(warp::query::()) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |validation_level: api_types::BroadcastValidationQuery, + block_bytes: Bytes, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| async move { + let block = match SignedBeaconBlock::::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) { + Ok(data) => data, + Err(_) => { + return warp::reply::with_status( + StatusCode::BAD_REQUEST, + eth2::StatusCode::BAD_REQUEST, + ) + .into_response(); + } + }; + match publish_blocks::publish_block( + None, + ProvenancedBlock::local(Arc::new(block)), + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + { + Ok(()) => warp::reply().into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + StatusCode::INTERNAL_SERVER_ERROR, + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } + }, + ); + /* * beacon/blocks */ @@ -3984,7 +4071,10 @@ pub fn serve( .boxed() .uor( warp::post().and( - post_beacon_blocks + warp::header::exact("Content-Type", "application/octet-stream") + // Routes which expect `application/octet-stream` go within this `and`. + .and(post_beacon_blocks_ssz.uor(post_beacon_blocks_v2_ssz)) + .uor(post_beacon_blocks) .uor(post_beacon_blinded_blocks) .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 4819dd99e..457276d70 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -175,6 +175,48 @@ pub async fn gossip_full_pass() { .block_is_known_to_fork_choice(&block.canonical_root())); } +// This test checks that a block that is valid from both a gossip and consensus perspective is accepted when using `broadcast_validation=gossip`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn gossip_full_pass_ssz() { + /* this test targets gossip-level validation */ + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let tester = InteractiveTester::::new(None, validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let (block, _): (SignedBeaconBlock, _) = tester.harness.make_block(state_a, slot_b).await; + + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2_ssz(&block, validation_level) + .await; + + assert!(response.is_ok()); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + /// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=consensus`. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn consensus_invalid() { diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index dc8ca49d2..7c3872925 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1247,6 +1247,22 @@ impl ApiTester { self } + pub async fn test_post_beacon_blocks_ssz_valid(mut self) -> Self { + let next_block = &self.next_block; + + self.client + .post_beacon_blocks_ssz(next_block) + .await + .unwrap(); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "valid blocks should be sent to network" + ); + + self + } + pub async fn test_post_beacon_blocks_invalid(mut self) -> Self { let block = self .harness @@ -1270,6 +1286,29 @@ impl ApiTester { self } + pub async fn test_post_beacon_blocks_ssz_invalid(mut self) -> Self { + let block = self + .harness + .make_block_with_modifier( + self.harness.get_current_state(), + self.harness.get_current_slot(), + |b| { + *b.state_root_mut() = Hash256::zero(); + }, + ) + .await + .0; + + assert!(self.client.post_beacon_blocks_ssz(&block).await.is_err()); + + assert!( + self.network_rx.network_recv.recv().await.is_some(), + "gossip valid blocks should be sent to network" + ); + + self + } + pub async fn test_beacon_blocks(self) -> Self { for block_id in self.interesting_block_ids() { let expected = block_id @@ -4451,6 +4490,22 @@ async fn post_beacon_blocks_valid() { ApiTester::new().await.test_post_beacon_blocks_valid().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn post_beacon_blocks_ssz_valid() { + ApiTester::new() + .await + .test_post_beacon_blocks_ssz_valid() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_post_beacon_blocks_ssz_invalid() { + ApiTester::new() + .await + .test_post_beacon_blocks_ssz_invalid() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn post_beacon_blocks_invalid() { ApiTester::new() diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 5fcddbc46..661f9a09e 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -21,10 +21,14 @@ use futures_util::StreamExt; use lighthouse_network::PeerId; use pretty_reqwest_error::PrettyReqwestError; pub use reqwest; -use reqwest::{IntoUrl, RequestBuilder, Response}; +use reqwest::{ + header::{HeaderMap, HeaderValue}, + Body, IntoUrl, RequestBuilder, Response, +}; pub use reqwest::{StatusCode, Url}; pub use sensitive_url::{SensitiveError, SensitiveUrl}; use serde::{de::DeserializeOwned, Serialize}; +use ssz::Encode; use std::convert::TryFrom; use std::fmt; use std::iter::Iterator; @@ -322,6 +326,25 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts. + async fn post_generic_with_ssz_body, U: IntoUrl>( + &self, + url: U, + body: T, + timeout: Option, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let response = builder + .header("Content-Type", "application/octet-stream") + .body(body) + .send() + .await?; + ok_or_error(response).await + } + /// Generic POST function supporting arbitrary responses and timeouts. async fn post_generic_with_consensus_version( &self, @@ -342,6 +365,31 @@ impl BeaconNodeHttpClient { ok_or_error(response).await } + /// Generic POST function supporting arbitrary responses and timeouts. + async fn post_generic_with_consensus_version_and_ssz_body, U: IntoUrl>( + &self, + url: U, + body: T, + timeout: Option, + fork: ForkName, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let mut headers = HeaderMap::new(); + headers.insert( + CONSENSUS_VERSION_HEADER, + HeaderValue::from_str(&fork.to_string()).expect("Failed to create header value"), + ); + headers.insert( + "Content-Type", + HeaderValue::from_static("application/octet-stream"), + ); + let response = builder.headers(headers).body(body).send().await?; + ok_or_error(response).await + } + /// `GET beacon/genesis` /// /// ## Errors @@ -654,6 +702,26 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST beacon/blocks` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn post_beacon_blocks_ssz>( + &self, + block: &SignedBeaconBlock, + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("blocks"); + + self.post_generic_with_ssz_body(path, block.as_ssz_bytes(), Some(self.timeouts.proposal)) + .await?; + + Ok(()) + } + /// `POST beacon/blinded_blocks` /// /// Returns `Ok(None)` on a 404 error. @@ -727,6 +795,23 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST v2/beacon/blocks` + pub async fn post_beacon_blocks_v2_ssz>( + &self, + block: &SignedBeaconBlock, + validation_level: Option, + ) -> Result<(), Error> { + self.post_generic_with_consensus_version_and_ssz_body( + self.post_beacon_blocks_v2_path(validation_level)?, + block.as_ssz_bytes(), + Some(self.timeouts.proposal), + block.message().body().fork_name(), + ) + .await?; + + Ok(()) + } + /// `POST v2/beacon/blinded_blocks` pub async fn post_beacon_blinded_blocks_v2( &self,