Drop gossipsub stale messages when polling ConnectionHandler. (#5175)

* drop gossipsub stale messages

* convert async-channel::Receiver to Peekable,

to be able to Peek next message without dropping it
This commit is contained in:
João Oliveira 2024-02-15 00:41:52 +00:00 committed by GitHub
parent e7ef2a3a54
commit 256d9042d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 549 additions and 438 deletions

View File

@ -176,12 +176,6 @@ impl Handler {
}
impl EnabledHandler {
#[cfg(test)]
/// For testing purposed obtain the RPCReceiver
pub fn receiver(&mut self) -> RpcReceiver {
self.send_queue.clone()
}
fn on_fully_negotiated_inbound(
&mut self,
(substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
@ -237,7 +231,7 @@ impl EnabledHandler {
}
// determine if we need to create the outbound stream
if !self.send_queue.is_empty()
if !self.send_queue.poll_is_empty(cx)
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
@ -247,10 +241,6 @@ impl EnabledHandler {
});
}
// We may need to inform the behviour if we have a dropped a message. This gets set if that
// is the case.
let mut dropped_message = None;
// process outbound stream
loop {
match std::mem::replace(
@ -271,10 +261,11 @@ impl EnabledHandler {
} => {
if Pin::new(timeout).poll(cx).is_ready() {
// Inform the behaviour and end the poll.
dropped_message = Some(HandlerEvent::MessageDropped(message));
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(message),
));
}
}
_ => {} // All other messages are not time-bound.
@ -348,13 +339,7 @@ impl EnabledHandler {
}
}
// If there was a timeout in sending a message, inform the behaviour before restarting the
// poll
if let Some(handler_event) = dropped_message {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event));
}
// Handle inbound messages
// Handle inbound messages.
loop {
match std::mem::replace(
&mut self.inbound_substream,
@ -419,6 +404,13 @@ impl EnabledHandler {
}
}
// Drop the next message in queue if it's stale.
if let Poll::Ready(Some(rpc)) = self.send_queue.poll_stale(cx) {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(rpc),
));
}
Poll::Pending
}
}

View File

@ -22,7 +22,8 @@
use crate::gossipsub::metrics::Metrics;
use crate::gossipsub::TopicHash;
use async_channel::{Receiver, Sender};
use futures::Stream;
use futures::stream::Peekable;
use futures::{Future, Stream, StreamExt};
use futures_timer::Delay;
use instant::Duration;
use libp2p::identity::PeerId;
@ -33,7 +34,7 @@ use std::collections::BTreeSet;
use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::task::{Context, Poll};
use std::{fmt, pin::Pin};
use crate::gossipsub::rpc_proto::proto;
@ -591,9 +592,10 @@ impl fmt::Display for PeerKind {
pub(crate) struct RpcSender {
cap: usize,
len: Arc<AtomicUsize>,
priority: Sender<RpcOut>,
non_priority: Sender<RpcOut>,
receiver: RpcReceiver,
pub(crate) priority_sender: Sender<RpcOut>,
pub(crate) non_priority_sender: Sender<RpcOut>,
priority_receiver: Receiver<RpcOut>,
non_priority_receiver: Receiver<RpcOut>,
}
impl RpcSender {
@ -602,29 +604,29 @@ impl RpcSender {
let (priority_sender, priority_receiver) = async_channel::unbounded();
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
let len = Arc::new(AtomicUsize::new(0));
let receiver = RpcReceiver {
priority_len: len.clone(),
priority: priority_receiver,
non_priority: non_priority_receiver,
};
RpcSender {
cap: cap / 2,
len,
priority: priority_sender,
non_priority: non_priority_sender,
receiver: receiver.clone(),
priority_sender,
non_priority_sender,
priority_receiver,
non_priority_receiver,
}
}
/// Create a new Receiver to the sender.
pub(crate) fn new_receiver(&self) -> RpcReceiver {
self.receiver.clone()
RpcReceiver {
priority_len: self.len.clone(),
priority: self.priority_receiver.clone().peekable(),
non_priority: self.non_priority_receiver.clone().peekable(),
}
}
/// Send a `RpcOut::Graft` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn graft(&mut self, graft: Graft) {
self.priority
self.priority_sender
.try_send(RpcOut::Graft(graft))
.expect("Channel is unbounded and should always be open");
}
@ -632,7 +634,7 @@ impl RpcSender {
/// Send a `RpcOut::Prune` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn prune(&mut self, prune: Prune) {
self.priority
self.priority_sender
.try_send(RpcOut::Prune(prune))
.expect("Channel is unbounded and should always be open");
}
@ -641,7 +643,7 @@ impl RpcSender {
/// this is low priority, if the queue is full an Err is returned.
#[allow(clippy::result_large_err)]
pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), RpcOut> {
self.non_priority
self.non_priority_sender
.try_send(RpcOut::IHave(ihave))
.map_err(|err| err.into_inner())
}
@ -650,7 +652,7 @@ impl RpcSender {
/// this is low priority, if the queue is full an Err is returned.
#[allow(clippy::result_large_err)]
pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), RpcOut> {
self.non_priority
self.non_priority_sender
.try_send(RpcOut::IWant(iwant))
.map_err(|err| err.into_inner())
}
@ -658,7 +660,7 @@ impl RpcSender {
/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn subscribe(&mut self, topic: TopicHash) {
self.priority
self.priority_sender
.try_send(RpcOut::Subscribe(topic))
.expect("Channel is unbounded and should always be open");
}
@ -666,7 +668,7 @@ impl RpcSender {
/// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn unsubscribe(&mut self, topic: TopicHash) {
self.priority
self.priority_sender
.try_send(RpcOut::Unsubscribe(topic))
.expect("Channel is unbounded and should always be open");
}
@ -682,7 +684,7 @@ impl RpcSender {
if self.len.load(Ordering::Relaxed) >= self.cap {
return Err(());
}
self.priority
self.priority_sender
.try_send(RpcOut::Publish {
message: message.clone(),
timeout: Delay::new(timeout),
@ -705,7 +707,7 @@ impl RpcSender {
timeout: Duration,
metrics: Option<&mut Metrics>,
) -> Result<(), ()> {
self.non_priority
self.non_priority_sender
.try_send(RpcOut::Forward {
message: message.clone(),
timeout: Delay::new(timeout),
@ -726,25 +728,73 @@ impl RpcSender {
/// Returns the current size of the non-priority queue.
pub(crate) fn non_priority_len(&self) -> usize {
self.non_priority.len()
self.non_priority_sender.len()
}
}
/// `RpcOut` sender that is priority aware.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct RpcReceiver {
/// The maximum length of the priority queue.
priority_len: Arc<AtomicUsize>,
pub(crate) priority_len: Arc<AtomicUsize>,
/// The priority queue receiver.
pub(crate) priority: Receiver<RpcOut>,
pub(crate) priority: Peekable<Receiver<RpcOut>>,
/// The non priority queue receiver.
pub(crate) non_priority: Receiver<RpcOut>,
pub(crate) non_priority: Peekable<Receiver<RpcOut>>,
}
impl RpcReceiver {
/// Check if both queues are empty.
pub(crate) fn is_empty(&self) -> bool {
self.priority.is_empty() && self.non_priority.is_empty()
// Peek the next message in the queues and return it if its timeout has elapsed.
// Returns `None` if there aren't any more messages on the stream or none is stale.
pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll<Option<RpcOut>> {
// Peek priority queue.
let priority = match Pin::new(&mut self.priority).poll_peek_mut(cx) {
Poll::Ready(Some(RpcOut::Publish {
message: _,
ref mut timeout,
})) => {
if Pin::new(timeout).poll(cx).is_ready() {
// Return the message.
let dropped = futures::ready!(self.priority.poll_next_unpin(cx))
.expect("There should be a message");
return Poll::Ready(Some(dropped));
}
Poll::Ready(None)
}
poll => poll,
};
let non_priority = match Pin::new(&mut self.non_priority).poll_peek_mut(cx) {
Poll::Ready(Some(RpcOut::Forward {
message: _,
ref mut timeout,
})) => {
if Pin::new(timeout).poll(cx).is_ready() {
// Return the message.
let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx))
.expect("There should be a message");
return Poll::Ready(Some(dropped));
}
Poll::Ready(None)
}
poll => poll,
};
match (priority, non_priority) {
(Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
/// Poll queues and return true if both are empty.
pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool {
matches!(
(
Pin::new(&mut self.priority).poll_peek(cx),
Pin::new(&mut self.non_priority).poll_peek(cx),
),
(Poll::Ready(None), Poll::Ready(None))
)
}
}