Support duplicate keys in HTTP API query strings (#2908)

## Issues Addressed

Closes #2739
Closes #2812

## Proposed Changes

Support the deserialization of query strings containing duplicate keys into their corresponding types.
As `warp` does not support this feature natively (as discussed in #2739), it relies on the external library [`serde_array_query`](https://github.com/sigp/serde_array_query) (written by @michaelsproul)

This is backwards compatible meaning that both of the following requests will produce the same output:
```
curl "http://localhost:5052/eth/v1/events?topics=head,block"
```
```
curl "http://localhost:5052/eth/v1/events?topics=head&topics=block"
```

## Additional Info

Certain error messages have changed slightly.  This only affects endpoints which accept multiple values.
For example:
```
{"code":400,"message":"BAD_REQUEST: invalid query: Invalid query string","stacktraces":[]}
```
is now
```
{"code":400,"message":"BAD_REQUEST: unable to parse query","stacktraces":[]}
```


The serve order of the endpoints `get_beacon_state_validators` and `get_beacon_state_validators_id` have flipped:
```rust
.or(get_beacon_state_validators_id.boxed())
.or(get_beacon_state_validators.boxed())
``` 
This is to ensure proper error messages when filter fallback occurs due to the use of the `and_then` filter.

## Future Work
- Cleanup / remove filter fallback behaviour by substituting `and_then` with `then` where appropriate.
- Add regression tests for HTTP API error messages.

## Credits
- @mooori for doing the ground work of investigating possible solutions within the existing Rust ecosystem.
- @michaelsproul for writing [`serde_array_query`](https://github.com/sigp/serde_array_query) and for helping debug the behaviour of the `warp` filter fallback leading to incorrect error messages.
This commit is contained in:
Mac L 2022-01-20 09:14:19 +00:00
parent 79db2d4deb
commit d06f87486a
6 changed files with 130 additions and 33 deletions

11
Cargo.lock generated
View File

@ -5266,6 +5266,16 @@ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]]
name = "serde_array_query"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89c6e82b1005b33d5b2bbc47096800e5ad6b67ef5636f9c13ad29a6935734a7"
dependencies = [
"serde",
"serde_urlencoded",
]
[[package]] [[package]]
name = "serde_cbor" name = "serde_cbor"
version = "0.11.2" version = "0.11.2"
@ -6823,6 +6833,7 @@ dependencies = [
"lighthouse_metrics", "lighthouse_metrics",
"safe_arith", "safe_arith",
"serde", "serde",
"serde_array_query",
"state_processing", "state_processing",
"tokio", "tokio",
"types", "types",

View File

@ -55,7 +55,10 @@ use warp::http::StatusCode;
use warp::sse::Event; use warp::sse::Event;
use warp::Reply; use warp::Reply;
use warp::{http::Response, Filter}; use warp::{http::Response, Filter};
use warp_utils::task::{blocking_json_task, blocking_task}; use warp_utils::{
query::multi_key_query,
task::{blocking_json_task, blocking_task},
};
const API_PREFIX: &str = "eth"; const API_PREFIX: &str = "eth";
@ -505,12 +508,13 @@ pub fn serve<T: BeaconChainTypes>(
.clone() .clone()
.and(warp::path("validator_balances")) .and(warp::path("validator_balances"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::query::<api_types::ValidatorBalancesQuery>()) .and(multi_key_query::<api_types::ValidatorBalancesQuery>())
.and_then( .and_then(
|state_id: StateId, |state_id: StateId,
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
query: api_types::ValidatorBalancesQuery| { query_res: Result<api_types::ValidatorBalancesQuery, warp::Rejection>| {
blocking_json_task(move || { blocking_json_task(move || {
let query = query_res?;
state_id state_id
.map_state(&chain, |state| { .map_state(&chain, |state| {
Ok(state Ok(state
@ -521,7 +525,7 @@ pub fn serve<T: BeaconChainTypes>(
// filter by validator id(s) if provided // filter by validator id(s) if provided
.filter(|(index, (validator, _))| { .filter(|(index, (validator, _))| {
query.id.as_ref().map_or(true, |ids| { query.id.as_ref().map_or(true, |ids| {
ids.0.iter().any(|id| match id { ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => { ValidatorId::PublicKey(pubkey) => {
&validator.pubkey == pubkey &validator.pubkey == pubkey
} }
@ -548,11 +552,14 @@ pub fn serve<T: BeaconChainTypes>(
let get_beacon_state_validators = beacon_states_path let get_beacon_state_validators = beacon_states_path
.clone() .clone()
.and(warp::path("validators")) .and(warp::path("validators"))
.and(warp::query::<api_types::ValidatorsQuery>())
.and(warp::path::end()) .and(warp::path::end())
.and(multi_key_query::<api_types::ValidatorsQuery>())
.and_then( .and_then(
|state_id: StateId, chain: Arc<BeaconChain<T>>, query: api_types::ValidatorsQuery| { |state_id: StateId,
chain: Arc<BeaconChain<T>>,
query_res: Result<api_types::ValidatorsQuery, warp::Rejection>| {
blocking_json_task(move || { blocking_json_task(move || {
let query = query_res?;
state_id state_id
.map_state(&chain, |state| { .map_state(&chain, |state| {
let epoch = state.current_epoch(); let epoch = state.current_epoch();
@ -566,7 +573,7 @@ pub fn serve<T: BeaconChainTypes>(
// filter by validator id(s) if provided // filter by validator id(s) if provided
.filter(|(index, (validator, _))| { .filter(|(index, (validator, _))| {
query.id.as_ref().map_or(true, |ids| { query.id.as_ref().map_or(true, |ids| {
ids.0.iter().any(|id| match id { ids.iter().any(|id| match id {
ValidatorId::PublicKey(pubkey) => { ValidatorId::PublicKey(pubkey) => {
&validator.pubkey == pubkey &validator.pubkey == pubkey
} }
@ -586,8 +593,8 @@ pub fn serve<T: BeaconChainTypes>(
let status_matches = let status_matches =
query.status.as_ref().map_or(true, |statuses| { query.status.as_ref().map_or(true, |statuses| {
statuses.0.contains(&status) statuses.contains(&status)
|| statuses.0.contains(&status.superstatus()) || statuses.contains(&status.superstatus())
}); });
if status_matches { if status_matches {
@ -1721,11 +1728,13 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("node")) .and(warp::path("node"))
.and(warp::path("peers")) .and(warp::path("peers"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::query::<api_types::PeersQuery>()) .and(multi_key_query::<api_types::PeersQuery>())
.and(network_globals.clone()) .and(network_globals.clone())
.and_then( .and_then(
|query: api_types::PeersQuery, network_globals: Arc<NetworkGlobals<T::EthSpec>>| { |query_res: Result<api_types::PeersQuery, warp::Rejection>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
blocking_json_task(move || { blocking_json_task(move || {
let query = query_res?;
let mut peers: Vec<api_types::PeerData> = Vec::new(); let mut peers: Vec<api_types::PeerData> = Vec::new();
network_globals network_globals
.peers .peers
@ -1755,11 +1764,11 @@ pub fn serve<T: BeaconChainTypes>(
); );
let state_matches = query.state.as_ref().map_or(true, |states| { let state_matches = query.state.as_ref().map_or(true, |states| {
states.0.iter().any(|state_param| *state_param == state) states.iter().any(|state_param| *state_param == state)
}); });
let direction_matches = let direction_matches =
query.direction.as_ref().map_or(true, |directions| { query.direction.as_ref().map_or(true, |directions| {
directions.0.iter().any(|dir_param| *dir_param == direction) directions.iter().any(|dir_param| *dir_param == direction)
}); });
if state_matches && direction_matches { if state_matches && direction_matches {
@ -2534,16 +2543,18 @@ pub fn serve<T: BeaconChainTypes>(
let get_events = eth1_v1 let get_events = eth1_v1
.and(warp::path("events")) .and(warp::path("events"))
.and(warp::path::end()) .and(warp::path::end())
.and(warp::query::<api_types::EventQuery>()) .and(multi_key_query::<api_types::EventQuery>())
.and(chain_filter) .and(chain_filter)
.and_then( .and_then(
|topics: api_types::EventQuery, chain: Arc<BeaconChain<T>>| { |topics_res: Result<api_types::EventQuery, warp::Rejection>,
chain: Arc<BeaconChain<T>>| {
blocking_task(move || { blocking_task(move || {
let topics = topics_res?;
// for each topic subscribed spawn a new subscription // for each topic subscribed spawn a new subscription
let mut receivers = Vec::with_capacity(topics.topics.0.len()); let mut receivers = Vec::with_capacity(topics.topics.len());
if let Some(event_handler) = chain.event_handler.as_ref() { if let Some(event_handler) = chain.event_handler.as_ref() {
for topic in topics.topics.0.clone() { for topic in topics.topics {
let receiver = match topic { let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(), api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(), api_types::EventTopic::Block => event_handler.subscribe_block(),
@ -2606,8 +2617,8 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_beacon_state_fork.boxed()) .or(get_beacon_state_fork.boxed())
.or(get_beacon_state_finality_checkpoints.boxed()) .or(get_beacon_state_finality_checkpoints.boxed())
.or(get_beacon_state_validator_balances.boxed()) .or(get_beacon_state_validator_balances.boxed())
.or(get_beacon_state_validators.boxed())
.or(get_beacon_state_validators_id.boxed()) .or(get_beacon_state_validators_id.boxed())
.or(get_beacon_state_validators.boxed())
.or(get_beacon_state_committees.boxed()) .or(get_beacon_state_committees.boxed())
.or(get_beacon_state_sync_committees.boxed()) .or(get_beacon_state_sync_committees.boxed())
.or(get_beacon_headers.boxed()) .or(get_beacon_headers.boxed())

View File

@ -428,10 +428,13 @@ pub struct AttestationPoolQuery {
pub committee_index: Option<u64>, pub committee_index: Option<u64>,
} }
#[derive(Deserialize)] #[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ValidatorsQuery { pub struct ValidatorsQuery {
pub id: Option<QueryVec<ValidatorId>>, #[serde(default, deserialize_with = "option_query_vec")]
pub status: Option<QueryVec<ValidatorStatus>>, pub id: Option<Vec<ValidatorId>>,
#[serde(default, deserialize_with = "option_query_vec")]
pub status: Option<Vec<ValidatorStatus>>,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -520,27 +523,68 @@ pub struct SyncingData {
#[derive(Clone, PartialEq, Debug, Deserialize)] #[derive(Clone, PartialEq, Debug, Deserialize)]
#[serde(try_from = "String", bound = "T: FromStr")] #[serde(try_from = "String", bound = "T: FromStr")]
pub struct QueryVec<T: FromStr>(pub Vec<T>); pub struct QueryVec<T: FromStr> {
values: Vec<T>,
}
fn query_vec<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
where
D: serde::Deserializer<'de>,
T: FromStr,
{
let vec: Vec<QueryVec<T>> = Deserialize::deserialize(deserializer)?;
Ok(Vec::from(QueryVec::from(vec)))
}
fn option_query_vec<'de, D, T>(deserializer: D) -> Result<Option<Vec<T>>, D::Error>
where
D: serde::Deserializer<'de>,
T: FromStr,
{
let vec: Vec<QueryVec<T>> = Deserialize::deserialize(deserializer)?;
if vec.is_empty() {
return Ok(None);
}
Ok(Some(Vec::from(QueryVec::from(vec))))
}
impl<T: FromStr> From<Vec<QueryVec<T>>> for QueryVec<T> {
fn from(vecs: Vec<QueryVec<T>>) -> Self {
Self {
values: vecs.into_iter().flat_map(|qv| qv.values).collect(),
}
}
}
impl<T: FromStr> TryFrom<String> for QueryVec<T> { impl<T: FromStr> TryFrom<String> for QueryVec<T> {
type Error = String; type Error = String;
fn try_from(string: String) -> Result<Self, Self::Error> { fn try_from(string: String) -> Result<Self, Self::Error> {
if string.is_empty() { if string.is_empty() {
return Ok(Self(vec![])); return Ok(Self { values: vec![] });
} }
string Ok(Self {
values: string
.split(',') .split(',')
.map(|s| s.parse().map_err(|_| "unable to parse".to_string())) .map(|s| s.parse().map_err(|_| "unable to parse query".to_string()))
.collect::<Result<Vec<T>, String>>() .collect::<Result<Vec<T>, String>>()?,
.map(Self) })
}
}
impl<T: FromStr> From<QueryVec<T>> for Vec<T> {
fn from(vec: QueryVec<T>) -> Vec<T> {
vec.values
} }
} }
#[derive(Clone, Deserialize)] #[derive(Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ValidatorBalancesQuery { pub struct ValidatorBalancesQuery {
pub id: Option<QueryVec<ValidatorId>>, #[serde(default, deserialize_with = "option_query_vec")]
pub id: Option<Vec<ValidatorId>>,
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
@ -602,9 +646,12 @@ pub struct BeaconCommitteeSubscription {
} }
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PeersQuery { pub struct PeersQuery {
pub state: Option<QueryVec<PeerState>>, #[serde(default, deserialize_with = "option_query_vec")]
pub direction: Option<QueryVec<PeerDirection>>, pub state: Option<Vec<PeerState>>,
#[serde(default, deserialize_with = "option_query_vec")]
pub direction: Option<Vec<PeerDirection>>,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -858,8 +905,10 @@ impl<T: EthSpec> EventKind<T> {
} }
#[derive(Clone, Deserialize)] #[derive(Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct EventQuery { pub struct EventQuery {
pub topics: QueryVec<EventTopic>, #[serde(deserialize_with = "query_vec")]
pub topics: Vec<EventTopic>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Deserialize)]
@ -961,7 +1010,9 @@ mod tests {
fn query_vec() { fn query_vec() {
assert_eq!( assert_eq!(
QueryVec::try_from("0,1,2".to_string()).unwrap(), QueryVec::try_from("0,1,2".to_string()).unwrap(),
QueryVec(vec![0_u64, 1, 2]) QueryVec {
values: vec![0_u64, 1, 2]
}
); );
} }
} }

View File

@ -18,3 +18,4 @@ tokio = { version = "1.14.0", features = ["sync"] }
headers = "0.3.2" headers = "0.3.2"
lighthouse_metrics = { path = "../lighthouse_metrics" } lighthouse_metrics = { path = "../lighthouse_metrics" }
lazy_static = "1.4.0" lazy_static = "1.4.0"
serde_array_query = "0.1.0"

View File

@ -3,5 +3,6 @@
pub mod cors; pub mod cors;
pub mod metrics; pub mod metrics;
pub mod query;
pub mod reject; pub mod reject;
pub mod task; pub mod task;

View File

@ -0,0 +1,22 @@
use crate::reject::custom_bad_request;
use serde::Deserialize;
use warp::Filter;
// Custom query filter using `serde_array_query`.
// This allows duplicate keys inside query strings.
pub fn multi_key_query<'de, T: Deserialize<'de>>(
) -> impl warp::Filter<Extract = (Result<T, warp::Rejection>,), Error = std::convert::Infallible> + Copy
{
raw_query().then(|query_str: String| async move {
serde_array_query::from_str(&query_str).map_err(|e| custom_bad_request(e.to_string()))
})
}
// This ensures that empty query strings are still accepted.
// This is because warp::filters::query::raw() does not allow empty query strings
// but warp::query::<T>() does.
fn raw_query() -> impl Filter<Extract = (String,), Error = std::convert::Infallible> + Copy {
warp::filters::query::raw()
.or(warp::any().map(String::default))
.unify()
}