From dadbd69eec195c42265bedd7d661077784b541c3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 21 Sep 2022 10:52:14 +0000 Subject: [PATCH] Fix concurrency issue with `oneshot_broadcast` (#3596) ## Issue Addressed NA ## Proposed Changes Fixes an issue found during testing with #3595. ## Additional Info NA --- common/oneshot_broadcast/src/lib.rs | 36 +++++++++++++++-------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/common/oneshot_broadcast/src/lib.rs b/common/oneshot_broadcast/src/lib.rs index 237fc03cc..2c616b3bb 100644 --- a/common/oneshot_broadcast/src/lib.rs +++ b/common/oneshot_broadcast/src/lib.rs @@ -3,7 +3,7 @@ //! //! This implementation may not be blazingly fast but it should be simple enough to be reliable. use parking_lot::{Condvar, Mutex}; -use std::sync::{Arc, Weak}; +use std::sync::Arc; #[derive(Copy, Clone, Debug, PartialEq)] pub enum Error { @@ -13,9 +13,10 @@ pub enum Error { enum Future { /// The future is ready and the item may be consumed. Ready(T), - /// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to - /// detect when the channel is disconnected. - NotReady(Weak<()>), + /// Future is not ready. + NotReady, + /// The sender has been dropped without sending a message. + SenderDropped, } struct MutexCondvar { @@ -24,7 +25,7 @@ struct MutexCondvar { } /// The sending pair of the `oneshot` channel. -pub struct Sender(Arc>, Option>); +pub struct Sender(Arc>); impl Sender { /// Send a message, consuming `self` and delivering the message to *all* receivers. @@ -35,11 +36,15 @@ impl Sender { } impl Drop for Sender { - /// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that - /// the sender has been dropped. + /// Flag the sender as dropped and notify all receivers. fn drop(&mut self) { - self.1 = None; + let mut lock = self.0.mutex.lock(); + if !matches!(*lock, Future::Ready(_)) { + *lock = Future::SenderDropped + } self.0.condvar.notify_all(); + // The lock must be held whilst the condvar is notified. + drop(lock); } } @@ -59,8 +64,8 @@ impl Receiver { pub fn try_recv(&self) -> Result, Error> { match &*self.0.mutex.lock() { Future::Ready(item) => Ok(Some(item.clone())), - Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None), - Future::NotReady(_) => Err(Error::SenderDropped), + Future::NotReady => Ok(None), + Future::SenderDropped => Err(Error::SenderDropped), } } @@ -71,10 +76,8 @@ impl Receiver { loop { match &*lock { Future::Ready(item) => return Ok(item.clone()), - Future::NotReady(weak) if weak.upgrade().is_some() => { - self.0.condvar.wait(&mut lock) - } - Future::NotReady(_) => return Err(Error::SenderDropped), + Future::NotReady => self.0.condvar.wait(&mut lock), + Future::SenderDropped => return Err(Error::SenderDropped), } } } @@ -84,13 +87,12 @@ impl Receiver { /// /// The sender may send *only one* message which will be received by *all* receivers. pub fn oneshot() -> (Sender, Receiver) { - let sender_ref = Arc::new(()); let mutex_condvar = Arc::new(MutexCondvar { - mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))), + mutex: Mutex::new(Future::NotReady), condvar: Condvar::new(), }); let receiver = Receiver(mutex_condvar.clone()); - let sender = Sender(mutex_condvar, Some(sender_ref)); + let sender = Sender(mutex_condvar); (sender, receiver) }