add ssz support in request body for /beacon/blocks endpoints (v1 & v2) (#4479)

## Issue Addressed

[#4457](https://github.com/sigp/lighthouse/issues/4457)

## Proposed Changes

add ssz support in request body for  /beacon/blocks endpoints (v1 & v2)


## Additional Info
This commit is contained in:
Eitan Seri-Levi 2023-07-31 23:51:37 +00:00
parent 8654f20028
commit e8c411c288
6 changed files with 276 additions and 2 deletions

1
Cargo.lock generated
View File

@ -3524,6 +3524,7 @@ version = "0.1.0"
dependencies = [
"beacon_chain",
"bs58",
"bytes",
"directory",
"environment",
"eth1",

View File

@ -42,6 +42,7 @@ operation_pool = { path = "../operation_pool" }
sensitive_url = { path = "../../common/sensitive_url" }
unused_port = {path = "../../common/unused_port"}
store = { path = "../store" }
bytes = "1.1.0"
[dev-dependencies]
environment = { path = "../../lighthouse/environment" }

View File

@ -29,6 +29,7 @@ use beacon_chain::{
BeaconChainTypes, ProduceBlockVerification, WhenSlotSkipped,
};
pub use block_id::BlockId;
use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
@ -1236,6 +1237,41 @@ pub fn serve<T: BeaconChainTypes>(
},
);
let post_beacon_blocks_ssz = eth_v1
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::bytes())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.and_then(
|block_bytes: Bytes,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block = match SignedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(e) => {
return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e)))
}
};
publish_blocks::publish_block(
None,
ProvenancedBlock::local(Arc::new(block)),
chain,
&network_tx,
log,
BroadcastValidation::default(),
)
.await
.map(|()| warp::reply().into_response())
},
);
let post_beacon_blocks_v2 = eth_v2
.and(warp::path("beacon"))
.and(warp::path("blocks"))
@ -1274,6 +1310,57 @@ pub fn serve<T: BeaconChainTypes>(
},
);
let post_beacon_blocks_v2_ssz = eth_v2
.and(warp::path("beacon"))
.and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
|validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| async move {
let block = match SignedBeaconBlock::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
) {
Ok(data) => data,
Err(_) => {
return warp::reply::with_status(
StatusCode::BAD_REQUEST,
eth2::StatusCode::BAD_REQUEST,
)
.into_response();
}
};
match publish_blocks::publish_block(
None,
ProvenancedBlock::local(Arc::new(block)),
chain,
&network_tx,
log,
validation_level.broadcast_validation,
)
.await
{
Ok(()) => warp::reply().into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
},
);
/*
* beacon/blocks
*/
@ -3984,7 +4071,10 @@ pub fn serve<T: BeaconChainTypes>(
.boxed()
.uor(
warp::post().and(
post_beacon_blocks
warp::header::exact("Content-Type", "application/octet-stream")
// Routes which expect `application/octet-stream` go within this `and`.
.and(post_beacon_blocks_ssz.uor(post_beacon_blocks_v2_ssz))
.uor(post_beacon_blocks)
.uor(post_beacon_blinded_blocks)
.uor(post_beacon_blocks_v2)
.uor(post_beacon_blinded_blocks_v2)

View File

@ -175,6 +175,48 @@ pub async fn gossip_full_pass() {
.block_is_known_to_fork_choice(&block.canonical_root()));
}
// This test checks that a block that is valid from both a gossip and consensus perspective is accepted when using `broadcast_validation=gossip`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn gossip_full_pass_ssz() {
/* this test targets gossip-level validation */
let validation_level: Option<BroadcastValidation> = Some(BroadcastValidation::Gossip);
// Validator count needs to be at least 32 or proposer boost gets set to 0 when computing
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let tester = InteractiveTester::<E>::new(None, validator_count).await;
// Create some chain depth.
tester.harness.advance_slot();
tester
.harness
.extend_chain(
num_initial as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
tester.harness.advance_slot();
let slot_a = Slot::new(num_initial);
let slot_b = slot_a + 1;
let state_a = tester.harness.get_current_state();
let (block, _): (SignedBeaconBlock<E>, _) = tester.harness.make_block(state_a, slot_b).await;
let response: Result<(), eth2::Error> = tester
.client
.post_beacon_blocks_v2_ssz(&block, validation_level)
.await;
assert!(response.is_ok());
assert!(tester
.harness
.chain
.block_is_known_to_fork_choice(&block.canonical_root()));
}
/// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=consensus`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn consensus_invalid() {

View File

@ -1247,6 +1247,22 @@ impl ApiTester {
self
}
pub async fn test_post_beacon_blocks_ssz_valid(mut self) -> Self {
let next_block = &self.next_block;
self.client
.post_beacon_blocks_ssz(next_block)
.await
.unwrap();
assert!(
self.network_rx.network_recv.recv().await.is_some(),
"valid blocks should be sent to network"
);
self
}
pub async fn test_post_beacon_blocks_invalid(mut self) -> Self {
let block = self
.harness
@ -1270,6 +1286,29 @@ impl ApiTester {
self
}
pub async fn test_post_beacon_blocks_ssz_invalid(mut self) -> Self {
let block = self
.harness
.make_block_with_modifier(
self.harness.get_current_state(),
self.harness.get_current_slot(),
|b| {
*b.state_root_mut() = Hash256::zero();
},
)
.await
.0;
assert!(self.client.post_beacon_blocks_ssz(&block).await.is_err());
assert!(
self.network_rx.network_recv.recv().await.is_some(),
"gossip valid blocks should be sent to network"
);
self
}
pub async fn test_beacon_blocks(self) -> Self {
for block_id in self.interesting_block_ids() {
let expected = block_id
@ -4451,6 +4490,22 @@ async fn post_beacon_blocks_valid() {
ApiTester::new().await.test_post_beacon_blocks_valid().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn post_beacon_blocks_ssz_valid() {
ApiTester::new()
.await
.test_post_beacon_blocks_ssz_valid()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_post_beacon_blocks_ssz_invalid() {
ApiTester::new()
.await
.test_post_beacon_blocks_ssz_invalid()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn post_beacon_blocks_invalid() {
ApiTester::new()

View File

@ -21,10 +21,14 @@ use futures_util::StreamExt;
use lighthouse_network::PeerId;
use pretty_reqwest_error::PrettyReqwestError;
pub use reqwest;
use reqwest::{IntoUrl, RequestBuilder, Response};
use reqwest::{
header::{HeaderMap, HeaderValue},
Body, IntoUrl, RequestBuilder, Response,
};
pub use reqwest::{StatusCode, Url};
pub use sensitive_url::{SensitiveError, SensitiveUrl};
use serde::{de::DeserializeOwned, Serialize};
use ssz::Encode;
use std::convert::TryFrom;
use std::fmt;
use std::iter::Iterator;
@ -322,6 +326,25 @@ impl BeaconNodeHttpClient {
ok_or_error(response).await
}
/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_ssz_body<T: Into<Body>, U: IntoUrl>(
&self,
url: U,
body: T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let response = builder
.header("Content-Type", "application/octet-stream")
.body(body)
.send()
.await?;
ok_or_error(response).await
}
/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self,
@ -342,6 +365,31 @@ impl BeaconNodeHttpClient {
ok_or_error(response).await
}
/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version_and_ssz_body<T: Into<Body>, U: IntoUrl>(
&self,
url: U,
body: T,
timeout: Option<Duration>,
fork: ForkName,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let mut headers = HeaderMap::new();
headers.insert(
CONSENSUS_VERSION_HEADER,
HeaderValue::from_str(&fork.to_string()).expect("Failed to create header value"),
);
headers.insert(
"Content-Type",
HeaderValue::from_static("application/octet-stream"),
);
let response = builder.headers(headers).body(body).send().await?;
ok_or_error(response).await
}
/// `GET beacon/genesis`
///
/// ## Errors
@ -654,6 +702,26 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// `POST beacon/blocks`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn post_beacon_blocks_ssz<T: EthSpec, Payload: AbstractExecPayload<T>>(
&self,
block: &SignedBeaconBlock<T, Payload>,
) -> Result<(), Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("blocks");
self.post_generic_with_ssz_body(path, block.as_ssz_bytes(), Some(self.timeouts.proposal))
.await?;
Ok(())
}
/// `POST beacon/blinded_blocks`
///
/// Returns `Ok(None)` on a 404 error.
@ -727,6 +795,23 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// `POST v2/beacon/blocks`
pub async fn post_beacon_blocks_v2_ssz<T: EthSpec, Payload: AbstractExecPayload<T>>(
&self,
block: &SignedBeaconBlock<T, Payload>,
validation_level: Option<BroadcastValidation>,
) -> Result<(), Error> {
self.post_generic_with_consensus_version_and_ssz_body(
self.post_beacon_blocks_v2_path(validation_level)?,
block.as_ssz_bytes(),
Some(self.timeouts.proposal),
block.message().body().fork_name(),
)
.await?;
Ok(())
}
/// `POST v2/beacon/blinded_blocks`
pub async fn post_beacon_blinded_blocks_v2<T: EthSpec>(
&self,