diff --git a/Cargo.lock b/Cargo.lock index 1b0657f24..ab3c5e5c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6551,6 +6551,7 @@ dependencies = [ "futures-core", "pin-project-lite 0.2.4", "tokio 1.1.0", + "tokio-util 0.6.3", ] [[package]] diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index bf2449965..4d5d88d40 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" } serde = { version = "1.0.116", features = ["derive"] } tokio = { version = "1.1.0", features = ["macros","sync"] } -tokio-stream = "0.1.2" +tokio-stream = { version = "0.1.3", features = ["sync"] } tokio-util = "0.6.3" parking_lot = "0.11.0" types = { path = "../../consensus/types" } diff --git a/beacon_node/http_api/src/broadcast_stream.rs b/beacon_node/http_api/src/broadcast_stream.rs deleted file mode 100644 index 9e35a556a..000000000 --- a/beacon_node/http_api/src/broadcast_stream.rs +++ /dev/null @@ -1,66 +0,0 @@ -// TODO: this should be replaced with the tokio's `BroadcastStream` once it's added to -// tokio-stream (https://github.com/tokio-rs/tokio/pull/3384) - -use std::fmt; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::broadcast::Receiver; -use tokio_stream::Stream; -use tokio_util::sync::ReusableBoxFuture; - -/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. -/// -/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver -/// [`Stream`]: trait@crate::Stream -pub struct BroadcastStream { - inner: ReusableBoxFuture<(Result, Receiver)>, -} - -/// An error returned from the inner stream of a [`BroadcastStream`]. -#[derive(Debug, PartialEq)] -pub enum BroadcastStreamRecvError { - /// The receiver lagged too far behind. Attempting to receive again will - /// return the oldest message still retained by the channel. - /// - /// Includes the number of skipped messages. - Lagged(u64), -} - -async fn make_future(mut rx: Receiver) -> (Result, Receiver) { - let result = rx.recv().await; - (result, rx) -} - -impl BroadcastStream { - /// Create a new `BroadcastStream`. - pub fn new(rx: Receiver) -> Self { - Self { - inner: ReusableBoxFuture::new(make_future(rx)), - } - } -} - -impl Stream for BroadcastStream { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let (result, rx) = match self.inner.poll(cx) { - std::task::Poll::Ready(t) => t, - std::task::Poll::Pending => return std::task::Poll::Pending, - }; - self.inner.set(make_future(rx)); - match result { - Ok(item) => Poll::Ready(Some(Ok(item))), - Err(RecvError::Closed) => Poll::Ready(None), - Err(RecvError::Lagged(n)) => { - Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) - } - } - } -} - -impl fmt::Debug for BroadcastStream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BroadcastStream").finish() - } -} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index f3c890d35..23cf5db44 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -7,7 +7,6 @@ mod beacon_proposer_cache; mod block_id; -mod broadcast_stream; mod metrics; mod state_id; mod validator_inclusion; @@ -36,7 +35,7 @@ use std::future::Future; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; -use tokio_stream::StreamExt; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec, Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof, @@ -2405,23 +2404,22 @@ pub fn serve( } }; - receivers.push(broadcast_stream::BroadcastStream::new(receiver).map( - |msg| { - match msg { - Ok(data) => Event::default() - .event(data.topic_name()) - .json_data(data) - .map_err(|e| { - warp_utils::reject::server_sent_event_error( - format!("{:?}", e), - ) - }), - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("{:?}", e), - )), - } - }, - )); + receivers.push(BroadcastStream::new(receiver).map(|msg| { + match msg { + Ok(data) => Event::default() + .event(data.topic_name()) + .json_data(data) + .map_err(|e| { + warp_utils::reject::server_sent_event_error(format!( + "{:?}", + e + )) + }), + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("{:?}", e), + )), + } + })); } } else { return Err(warp_utils::reject::custom_server_error( diff --git a/slasher/service/Cargo.toml b/slasher/service/Cargo.toml index ddc321196..0c43ed113 100644 --- a/slasher/service/Cargo.toml +++ b/slasher/service/Cargo.toml @@ -15,5 +15,5 @@ slot_clock = { path = "../../common/slot_clock" } state_processing = { path = "../../consensus/state_processing" } task_executor = { path = "../../common/task_executor" } tokio = { version = "1.1.0", features = ["full"] } -tokio-stream = "0.1.2" +tokio-stream = "0.1.3" types = { path = "../../consensus/types" }