Assume Content-Type is json for endpoints that require json (#4575)

* added default content type filter

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into unstable

* create custom warp json filter that ignores content type header

* cargo fmt and linting

* updated test

* updated test

* merge unstable

* merge conflicts

* workspace=true

* use Bytes instead of Buf

* resolve merge conflict

* resolve merge conflicts

* add extra error message context

* merge conflicts

* lint
This commit is contained in:
Eitan Seri-Levi 2024-01-31 03:11:54 +02:00 committed by GitHub
parent b8db3e4f08
commit 1d87edb03d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 88 additions and 28 deletions

2
Cargo.lock generated
View File

@ -8739,6 +8739,7 @@ name = "warp_utils"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"beacon_chain", "beacon_chain",
"bytes",
"eth2", "eth2",
"headers", "headers",
"lazy_static", "lazy_static",
@ -8746,6 +8747,7 @@ dependencies = [
"safe_arith", "safe_arith",
"serde", "serde",
"serde_array_query", "serde_array_query",
"serde_json",
"state_processing", "state_processing",
"tokio", "tokio",
"types", "types",

View File

@ -682,7 +682,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("validator_balances")) .and(warp::path("validator_balances"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.then( .then(
|state_id: StateId, |state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>, task_spawner: TaskSpawner<T::EthSpec>,
@ -726,7 +726,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("validators")) .and(warp::path("validators"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.then( .then(
|state_id: StateId, |state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>, task_spawner: TaskSpawner<T::EthSpec>,
@ -1257,7 +1257,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon")) .and(warp::path("beacon"))
.and(warp::path("blocks")) .and(warp::path("blocks"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
@ -1327,7 +1327,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks")) .and(warp::path("blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>()) .and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
@ -1404,7 +1404,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("beacon")) .and(warp::path("beacon"))
.and(warp::path("blinded_blocks")) .and(warp::path("blinded_blocks"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
@ -1472,7 +1472,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blinded_blocks")) .and(warp::path("blinded_blocks"))
.and(warp::query::<api_types::BroadcastValidationQuery>()) .and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
@ -1754,7 +1754,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("attestations")) .and(warp::path("attestations"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.then( .then(
@ -1930,7 +1930,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("attester_slashings")) .and(warp::path("attester_slashings"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
@ -1988,7 +1988,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("proposer_slashings")) .and(warp::path("proposer_slashings"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
@ -2046,7 +2046,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("voluntary_exits")) .and(warp::path("voluntary_exits"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
@ -2102,7 +2102,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("sync_committees")) .and(warp::path("sync_committees"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.then( .then(
@ -2139,7 +2139,7 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("bls_to_execution_changes")) .and(warp::path("bls_to_execution_changes"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.then( .then(
@ -2533,7 +2533,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("attestations")) .and(warp::path("attestations"))
.and(warp::path::param::<Epoch>()) .and(warp::path::param::<Epoch>())
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
@ -2583,7 +2583,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("sync_committee")) .and(warp::path("sync_committee"))
.and(block_id_or_err) .and(block_id_or_err)
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(log_filter.clone()) .and(log_filter.clone())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
@ -3326,7 +3326,7 @@ pub fn serve<T: BeaconChainTypes>(
})) }))
.and(warp::path::end()) .and(warp::path::end())
.and(not_while_syncing_filter.clone()) .and(not_while_syncing_filter.clone())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
@ -3352,7 +3352,7 @@ pub fn serve<T: BeaconChainTypes>(
})) }))
.and(warp::path::end()) .and(warp::path::end())
.and(not_while_syncing_filter.clone()) .and(not_while_syncing_filter.clone())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
@ -3406,7 +3406,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone()) .and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter.clone()) .and(network_tx_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.then( .then(
@ -3519,7 +3519,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(not_while_syncing_filter.clone()) .and(not_while_syncing_filter.clone())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(network_tx_filter) .and(network_tx_filter)
.and(log_filter.clone()) .and(log_filter.clone())
.then( .then(
@ -3545,7 +3545,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator")) .and(warp::path("validator"))
.and(warp::path("beacon_committee_subscriptions")) .and(warp::path("beacon_committee_subscriptions"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(validator_subscription_tx_filter.clone()) .and(validator_subscription_tx_filter.clone())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
@ -3601,7 +3601,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and(warp::body::json()) .and(warp_utils::json::json())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
@ -3652,7 +3652,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.and(log_filter.clone()) .and(log_filter.clone())
.and(warp::body::json()) .and(warp_utils::json::json())
.then( .then(
|task_spawner: TaskSpawner<T::EthSpec>, |task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
@ -3826,7 +3826,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("validator")) .and(warp::path("validator"))
.and(warp::path("sync_committee_subscriptions")) .and(warp::path("sync_committee_subscriptions"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(validator_subscription_tx_filter) .and(validator_subscription_tx_filter)
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
@ -3872,7 +3872,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("liveness")) .and(warp::path("liveness"))
.and(warp::path::param::<Epoch>()) .and(warp::path::param::<Epoch>())
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
@ -3913,7 +3913,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_liveness = warp::path("lighthouse") let post_lighthouse_liveness = warp::path("lighthouse")
.and(warp::path("liveness")) .and(warp::path("liveness"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
@ -4016,7 +4016,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui")) .and(warp::path("ui"))
.and(warp::path("validator_metrics")) .and(warp::path("validator_metrics"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
@ -4035,7 +4035,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("ui")) .and(warp::path("ui"))
.and(warp::path("validator_info")) .and(warp::path("validator_info"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::body::json()) .and(warp_utils::json::json())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())
.then( .then(
@ -4338,7 +4338,7 @@ pub fn serve<T: BeaconChainTypes>(
let post_lighthouse_block_rewards = warp::path("lighthouse") let post_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis")) .and(warp::path("analysis"))
.and(warp::path("block_rewards")) .and(warp::path("block_rewards"))
.and(warp::body::json()) .and(warp_utils::json::json())
.and(warp::path::end()) .and(warp::path::end())
.and(task_spawner_filter.clone()) .and(task_spawner_filter.clone())
.and(chain_filter.clone()) .and(chain_filter.clone())

View File

@ -373,10 +373,30 @@ impl BeaconNodeHttpClient {
if let Some(timeout) = timeout { if let Some(timeout) = timeout {
builder = builder.timeout(timeout); builder = builder.timeout(timeout);
} }
let response = builder.json(body).send().await?; let response = builder.json(body).send().await?;
ok_or_error(response).await ok_or_error(response).await
} }
/// Generic POST function supporting arbitrary responses and timeouts.
/// Does not include Content-Type application/json in the request header.
async fn post_generic_json_without_content_type_header<T: Serialize, U: IntoUrl>(
&self,
url: U,
body: &T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let serialized_body = serde_json::to_vec(body).map_err(Error::InvalidJson)?;
let response = builder.body(serialized_body).send().await?;
ok_or_error(response).await
}
/// Generic POST function supporting arbitrary responses and timeouts. /// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>( async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self, &self,
@ -1250,7 +1270,8 @@ impl BeaconNodeHttpClient {
.push("pool") .push("pool")
.push("attester_slashings"); .push("attester_slashings");
self.post(path, slashing).await?; self.post_generic_json_without_content_type_header(path, slashing, None)
.await?;
Ok(()) Ok(())
} }

View File

@ -14,8 +14,10 @@ beacon_chain = { workspace = true }
state_processing = { workspace = true } state_processing = { workspace = true }
safe_arith = { workspace = true } safe_arith = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
headers = "0.3.2" headers = "0.3.2"
lighthouse_metrics = { workspace = true } lighthouse_metrics = { workspace = true }
lazy_static = { workspace = true } lazy_static = { workspace = true }
serde_array_query = "0.1.0" serde_array_query = "0.1.0"
bytes = { workspace = true }

View File

@ -0,0 +1,22 @@
use bytes::Bytes;
use serde::de::DeserializeOwned;
use std::error::Error as StdError;
use warp::{Filter, Rejection};
use crate::reject;
struct Json;
type BoxError = Box<dyn StdError + Send + Sync>;
impl Json {
fn decode<T: DeserializeOwned>(bytes: Bytes) -> Result<T, BoxError> {
serde_json::from_slice(&bytes).map_err(Into::into)
}
}
pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
warp::body::bytes().and_then(|bytes: Bytes| async move {
Json::decode(bytes).map_err(|err| reject::custom_deserialize_error(format!("{:?}", err)))
})
}

View File

@ -2,6 +2,7 @@
//! Lighthouse project. E.g., the `http_api` and `http_metrics` crates. //! Lighthouse project. E.g., the `http_api` and `http_metrics` crates.
pub mod cors; pub mod cors;
pub mod json;
pub mod metrics; pub mod metrics;
pub mod query; pub mod query;
pub mod reject; pub mod reject;

View File

@ -82,6 +82,15 @@ pub fn custom_bad_request(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomBadRequest(msg)) warp::reject::custom(CustomBadRequest(msg))
} }
#[derive(Debug)]
pub struct CustomDeserializeError(pub String);
impl Reject for CustomDeserializeError {}
pub fn custom_deserialize_error(msg: String) -> warp::reject::Rejection {
warp::reject::custom(CustomDeserializeError(msg))
}
#[derive(Debug)] #[derive(Debug)]
pub struct CustomServerError(pub String); pub struct CustomServerError(pub String);
@ -161,6 +170,9 @@ pub async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply,
if err.is_not_found() { if err.is_not_found() {
code = StatusCode::NOT_FOUND; code = StatusCode::NOT_FOUND;
message = "NOT_FOUND".to_string(); message = "NOT_FOUND".to_string();
} else if let Some(e) = err.find::<crate::reject::CustomDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e.0);
code = StatusCode::BAD_REQUEST;
} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() { } else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
message = format!("BAD_REQUEST: body deserialize error: {}", e); message = format!("BAD_REQUEST: body deserialize error: {}", e);
code = StatusCode::BAD_REQUEST; code = StatusCode::BAD_REQUEST;