8e95b69a1a
## Issue Addressed Closes #4473 (take 3) ## Proposed Changes - Send a 202 status code by default for duplicate blocks, instead of 400. This conveys to the caller that the block was published, but makes no guarantees about its validity. Block relays can count this as a success or a failure as they wish. - For users wanting finer-grained control over which status is returned for duplicates, a flag `--http-duplicate-block-status` can be used to adjust the behaviour. A 400 status can be supplied to restore the old (spec-compliant) behaviour, or a 200 status can be used to silence VCs that warn loudly for non-200 codes (e.g. Lighthouse prior to v4.4.0). - Update the Lighthouse VC to gracefully handle success codes other than 200. The info message isn't the nicest thing to read, but it covers all bases and isn't a nasty `ERRO`/`CRIT` that will wake anyone up. ## Additional Info I'm planning to raise a PR to `beacon-APIs` to specify that clients may return 202 for duplicate blocks. Really it would be nice to use some 2xx code that _isn't_ the same as the code for "published but invalid". I think unfortunately there aren't any suitable codes, and maybe the best fit is `409 CONFLICT`. Given that we need to fix this promptly for our release, I think using the 202 code temporarily with configuration strikes a nice compromise.
193 lines
7.5 KiB
Rust
193 lines
7.5 KiB
Rust
use beacon_processor::{BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent};
|
|
use serde::Serialize;
|
|
use std::future::Future;
|
|
use tokio::sync::{mpsc::error::TrySendError, oneshot};
|
|
use types::EthSpec;
|
|
use warp::reply::{Reply, Response};
|
|
|
|
/// Maps a request to a queue in the `BeaconProcessor`.
|
|
#[derive(Clone, Copy)]
|
|
pub enum Priority {
|
|
/// The highest priority.
|
|
P0,
|
|
/// The lowest priority.
|
|
P1,
|
|
}
|
|
|
|
impl Priority {
|
|
/// Wrap `self` in a `WorkEvent` with an appropriate priority.
|
|
fn work_event<E: EthSpec>(&self, process_fn: BlockingOrAsync) -> WorkEvent<E> {
|
|
let work = match self {
|
|
Priority::P0 => Work::ApiRequestP0(process_fn),
|
|
Priority::P1 => Work::ApiRequestP1(process_fn),
|
|
};
|
|
WorkEvent {
|
|
drop_during_sync: false,
|
|
work,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor.
|
|
pub struct TaskSpawner<E: EthSpec> {
|
|
/// Used to send tasks to the `BeaconProcessor`. The tokio executor will be
|
|
/// used if this is `None`.
|
|
beacon_processor_send: Option<BeaconProcessorSend<E>>,
|
|
}
|
|
|
|
/// Convert a warp `Rejection` into a `Response`.
|
|
///
|
|
/// This function should *always* be used to convert rejections into responses. This prevents warp
|
|
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
|
|
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
|
|
match res {
|
|
Ok(response) => response.into_response(),
|
|
Err(e) => match warp_utils::reject::handle_rejection(e).await {
|
|
Ok(reply) => reply.into_response(),
|
|
Err(_) => warp::reply::with_status(
|
|
warp::reply::json(&"unhandled error"),
|
|
eth2::StatusCode::INTERNAL_SERVER_ERROR,
|
|
)
|
|
.into_response(),
|
|
},
|
|
}
|
|
}
|
|
|
|
impl<E: EthSpec> TaskSpawner<E> {
|
|
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
|
|
Self {
|
|
beacon_processor_send,
|
|
}
|
|
}
|
|
|
|
/// Executes a "blocking" (non-async) task which returns a `Response`.
|
|
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
|
|
where
|
|
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
|
|
T: Reply + Send + 'static,
|
|
{
|
|
if let Some(beacon_processor_send) = &self.beacon_processor_send {
|
|
// Create a closure that will execute `func` and send the result to
|
|
// a channel held by this thread.
|
|
let (tx, rx) = oneshot::channel();
|
|
let process_fn = move || {
|
|
// Execute the function, collect the return value.
|
|
let func_result = func();
|
|
// Send the result down the channel. Ignore any failures; the
|
|
// send can only fail if the receiver is dropped.
|
|
let _ = tx.send(func_result);
|
|
};
|
|
|
|
// Send the function to the beacon processor for execution at some arbitrary time.
|
|
let result = send_to_beacon_processor(
|
|
beacon_processor_send,
|
|
priority,
|
|
BlockingOrAsync::Blocking(Box::new(process_fn)),
|
|
rx,
|
|
)
|
|
.await
|
|
.and_then(|x| x);
|
|
convert_rejection(result).await
|
|
} else {
|
|
// There is no beacon processor so spawn a task directly on the
|
|
// tokio executor.
|
|
convert_rejection(warp_utils::task::blocking_response_task(func).await).await
|
|
}
|
|
}
|
|
|
|
/// Executes a "blocking" (non-async) task which returns a JSON-serializable
|
|
/// object.
|
|
pub async fn blocking_json_task<F, T>(self, priority: Priority, func: F) -> Response
|
|
where
|
|
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
|
|
T: Serialize + Send + 'static,
|
|
{
|
|
let func = || func().map(|t| warp::reply::json(&t).into_response());
|
|
self.blocking_response_task(priority, func).await
|
|
}
|
|
|
|
/// Executes an async task which may return a `Rejection`, which will be converted to a response.
|
|
pub async fn spawn_async_with_rejection(
|
|
self,
|
|
priority: Priority,
|
|
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
|
|
) -> Response {
|
|
let result = self
|
|
.spawn_async_with_rejection_no_conversion(priority, func)
|
|
.await;
|
|
convert_rejection(result).await
|
|
}
|
|
|
|
/// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection.
|
|
///
|
|
/// If you call this function you MUST convert the rejection to a response and not let it
|
|
/// propagate into Warp's filters. See `convert_rejection`.
|
|
pub async fn spawn_async_with_rejection_no_conversion(
|
|
self,
|
|
priority: Priority,
|
|
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
|
|
) -> Result<Response, warp::Rejection> {
|
|
if let Some(beacon_processor_send) = &self.beacon_processor_send {
|
|
// Create a wrapper future that will execute `func` and send the
|
|
// result to a channel held by this thread.
|
|
let (tx, rx) = oneshot::channel();
|
|
let process_fn = async move {
|
|
// Await the future, collect the return value.
|
|
let func_result = func.await;
|
|
// Send the result down the channel. Ignore any failures; the
|
|
// send can only fail if the receiver is dropped.
|
|
let _ = tx.send(func_result);
|
|
};
|
|
|
|
// Send the function to the beacon processor for execution at some arbitrary time.
|
|
send_to_beacon_processor(
|
|
beacon_processor_send,
|
|
priority,
|
|
BlockingOrAsync::Async(Box::pin(process_fn)),
|
|
rx,
|
|
)
|
|
.await
|
|
.and_then(|x| x)
|
|
} else {
|
|
// There is no beacon processor so spawn a task directly on the
|
|
// tokio executor.
|
|
tokio::task::spawn(func)
|
|
.await
|
|
.map_err(|_| {
|
|
warp_utils::reject::custom_server_error("Tokio failed to spawn task".into())
|
|
})
|
|
.and_then(|x| x)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Send a task to the beacon processor and await execution.
|
|
///
|
|
/// If the task is not executed, return an `Err` with an error message
|
|
/// for the API consumer.
|
|
async fn send_to_beacon_processor<E: EthSpec, T>(
|
|
beacon_processor_send: &BeaconProcessorSend<E>,
|
|
priority: Priority,
|
|
process_fn: BlockingOrAsync,
|
|
rx: oneshot::Receiver<T>,
|
|
) -> Result<T, warp::Rejection> {
|
|
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
|
|
Ok(()) => {
|
|
match rx.await {
|
|
// The beacon processor executed the task and sent a result.
|
|
Ok(func_result) => return Ok(func_result),
|
|
// The beacon processor dropped the channel without sending a
|
|
// result. The beacon processor dropped this task because its
|
|
// queues are full or it's shutting down.
|
|
Err(_) => "The task did not execute. The server is overloaded or shutting down.",
|
|
}
|
|
}
|
|
Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.",
|
|
Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.",
|
|
};
|
|
|
|
Err(warp_utils::reject::custom_server_error(
|
|
error_message.to_string(),
|
|
))
|
|
}
|