update tokio-stream to 0.1.3 and use BroadcastStream (#2212)

## Issue Addressed

Resolves #2189 

## Proposed Changes

use tokio's `BroadcastStream`

## Additional Info

N/A


Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
realbigsean 2021-03-01 01:58:05 +00:00
parent baef1db40f
commit ed9b245de0
5 changed files with 20 additions and 87 deletions

1
Cargo.lock generated
View File

@ -6551,6 +6551,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite 0.2.4", "pin-project-lite 0.2.4",
"tokio 1.1.0", "tokio 1.1.0",
"tokio-util 0.6.3",
] ]
[[package]] [[package]]

View File

@ -8,7 +8,7 @@ edition = "2018"
warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" } warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" }
serde = { version = "1.0.116", features = ["derive"] } serde = { version = "1.0.116", features = ["derive"] }
tokio = { version = "1.1.0", features = ["macros","sync"] } tokio = { version = "1.1.0", features = ["macros","sync"] }
tokio-stream = "0.1.2" tokio-stream = { version = "0.1.3", features = ["sync"] }
tokio-util = "0.6.3" tokio-util = "0.6.3"
parking_lot = "0.11.0" parking_lot = "0.11.0"
types = { path = "../../consensus/types" } types = { path = "../../consensus/types" }

View File

@ -1,66 +0,0 @@
// TODO: this should be replaced with the tokio's `BroadcastStream` once it's added to
// tokio-stream (https://github.com/tokio-rs/tokio/pull/3384)
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio_stream::Stream;
use tokio_util::sync::ReusableBoxFuture;
/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
///
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
/// [`Stream`]: trait@crate::Stream
pub struct BroadcastStream<T> {
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
}
/// An error returned from the inner stream of a [`BroadcastStream`].
#[derive(Debug, PartialEq)]
pub enum BroadcastStreamRecvError {
/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
///
/// Includes the number of skipped messages.
Lagged(u64),
}
async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
let result = rx.recv().await;
(result, rx)
}
impl<T: 'static + Clone + Send> BroadcastStream<T> {
/// Create a new `BroadcastStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
inner: ReusableBoxFuture::new(make_future(rx)),
}
}
}
impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
type Item = Result<T, BroadcastStreamRecvError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (result, rx) = match self.inner.poll(cx) {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
};
self.inner.set(make_future(rx));
match result {
Ok(item) => Poll::Ready(Some(Ok(item))),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(n)) => {
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
}
}
}
}
impl<T> fmt::Debug for BroadcastStream<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BroadcastStream").finish()
}
}

View File

@ -7,7 +7,6 @@
mod beacon_proposer_cache; mod beacon_proposer_cache;
mod block_id; mod block_id;
mod broadcast_stream;
mod metrics; mod metrics;
mod state_id; mod state_id;
mod validator_inclusion; mod validator_inclusion;
@ -36,7 +35,7 @@ use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt; use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ use types::{
Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec, Attestation, AttestationDuty, AttesterSlashing, CloneConfig, CommitteeCache, Epoch, EthSpec,
Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof, Hash256, ProposerSlashing, PublicKey, PublicKeyBytes, RelativeEpoch, SignedAggregateAndProof,
@ -2405,23 +2404,22 @@ pub fn serve<T: BeaconChainTypes>(
} }
}; };
receivers.push(broadcast_stream::BroadcastStream::new(receiver).map( receivers.push(BroadcastStream::new(receiver).map(|msg| {
|msg| { match msg {
match msg { Ok(data) => Event::default()
Ok(data) => Event::default() .event(data.topic_name())
.event(data.topic_name()) .json_data(data)
.json_data(data) .map_err(|e| {
.map_err(|e| { warp_utils::reject::server_sent_event_error(format!(
warp_utils::reject::server_sent_event_error( "{:?}",
format!("{:?}", e), e
) ))
}), }),
Err(e) => Err(warp_utils::reject::server_sent_event_error( Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("{:?}", e), format!("{:?}", e),
)), )),
} }
}, }));
));
} }
} else { } else {
return Err(warp_utils::reject::custom_server_error( return Err(warp_utils::reject::custom_server_error(

View File

@ -15,5 +15,5 @@ slot_clock = { path = "../../common/slot_clock" }
state_processing = { path = "../../consensus/state_processing" } state_processing = { path = "../../consensus/state_processing" }
task_executor = { path = "../../common/task_executor" } task_executor = { path = "../../common/task_executor" }
tokio = { version = "1.1.0", features = ["full"] } tokio = { version = "1.1.0", features = ["full"] }
tokio-stream = "0.1.2" tokio-stream = "0.1.3"
types = { path = "../../consensus/types" } types = { path = "../../consensus/types" }