diff --git a/Cargo.lock b/Cargo.lock index 4ca2739d1..9cd7ff2ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,7 +387,6 @@ version = "0.2.0" dependencies = [ "bitvec 0.20.4", "bls", - "crossbeam-channel", "derivative", "environment", "eth1", @@ -410,6 +409,7 @@ dependencies = [ "lru", "maplit", "merkle_proof", + "oneshot_broadcast", "operation_pool", "parking_lot 0.12.1", "proto_array", @@ -4343,6 +4343,13 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +[[package]] +name = "oneshot_broadcast" +version = "0.1.0" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "oorandom" version = "11.1.3" diff --git a/Cargo.toml b/Cargo.toml index a71a97a95..415c721d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "common/logging", "common/lru_cache", "common/malloc_utils", + "common/oneshot_broadcast", "common/sensitive_url", "common/slot_clock", "common/task_executor", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 43cbdf134..dd185ac75 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -63,7 +63,7 @@ superstruct = "0.5.0" hex = "0.4.2" exit-future = "0.2.0" unused_port = {path = "../../common/unused_port"} -crossbeam-channel = "0.5.6" +oneshot_broadcast = { path = "../../common/oneshot_broadcast" } [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a0377f5fc..96439f490 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4609,13 +4609,7 @@ impl BeaconChain { metrics::stop_timer(committee_building_timer); - if let Err(e) = sender.send(committee_cache.clone()) { - debug!( - self.log, - "Did not fulfil committee promise"; - "error" => %e - ) - } + sender.send(committee_cache.clone()); map_fn(&committee_cache, shuffling_decision_block) } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 8b547acf0..704cba489 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -202,7 +202,7 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, - CommitteeCacheWait(crossbeam_channel::RecvError), + CommitteePromiseFailed(oneshot_broadcast::Error), MaxCommitteePromises(usize), } diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 3fc5bebdf..a01847a0e 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,6 +1,6 @@ use crate::{metrics, BeaconChainError}; -use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError}; use lru::LruCache; +use oneshot_broadcast::{oneshot, Receiver, Sender}; use std::sync::Arc; use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; @@ -40,7 +40,7 @@ impl CacheItem { CacheItem::Committee(cache) => Ok(cache), CacheItem::Promise(receiver) => receiver .recv() - .map_err(BeaconChainError::CommitteeCacheWait), + .map_err(BeaconChainError::CommitteePromiseFailed), } } } @@ -72,7 +72,7 @@ impl ShufflingCache { item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { // The promise has already been resolved. Replace the entry in the cache with a // `Committee` entry and then return the committee. - Ok(committee) => { + Ok(Some(committee)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); let ready = CacheItem::Committee(committee); @@ -81,7 +81,7 @@ impl ShufflingCache { } // The promise has not yet been resolved. Return the promise so the caller can await // it. - Err(TryRecvError::Empty) => { + Ok(None) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() @@ -96,7 +96,7 @@ impl ShufflingCache { // memory and the nature of the LRU cache means that future, relevant entries will // still be added to the cache. We expect that *all* promises should be resolved, // unless there is a programming or database error. - Err(TryRecvError::Disconnected) => { + Err(oneshot_broadcast::Error::SenderDropped) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); self.cache.pop(key); @@ -147,7 +147,7 @@ impl ShufflingCache { return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); } - let (sender, receiver) = bounded(1); + let (sender, receiver) = oneshot(); self.cache.put(key, CacheItem::Promise(receiver)); Ok(sender) } @@ -262,7 +262,7 @@ mod test { ); // Resolve the promise. - sender.send(committee_a.clone()).unwrap(); + sender.send(committee_a.clone()); // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); @@ -324,7 +324,7 @@ mod test { ); // Resolve promise A. - sender_a.send(committee_a.clone()).unwrap(); + sender_a.send(committee_a.clone()); // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( @@ -333,7 +333,7 @@ mod test { ); // Resolve promise B. - sender_b.send(committee_b.clone()).unwrap(); + sender_b.send(committee_b.clone()); // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( diff --git a/common/oneshot_broadcast/Cargo.toml b/common/oneshot_broadcast/Cargo.toml new file mode 100644 index 000000000..baefe1066 --- /dev/null +++ b/common/oneshot_broadcast/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "oneshot_broadcast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +parking_lot = "0.12.0" diff --git a/common/oneshot_broadcast/src/lib.rs b/common/oneshot_broadcast/src/lib.rs new file mode 100644 index 000000000..237fc03cc --- /dev/null +++ b/common/oneshot_broadcast/src/lib.rs @@ -0,0 +1,188 @@ +//! Provides a single-sender, multiple receiver one-shot channel where any message sent will be +//! received by all senders. +//! +//! This implementation may not be blazingly fast but it should be simple enough to be reliable. +use parking_lot::{Condvar, Mutex}; +use std::sync::{Arc, Weak}; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum Error { + SenderDropped, +} + +enum Future { + /// The future is ready and the item may be consumed. + Ready(T), + /// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to + /// detect when the channel is disconnected. + NotReady(Weak<()>), +} + +struct MutexCondvar { + mutex: Mutex>, + condvar: Condvar, +} + +/// The sending pair of the `oneshot` channel. +pub struct Sender(Arc>, Option>); + +impl Sender { + /// Send a message, consuming `self` and delivering the message to *all* receivers. + pub fn send(self, item: T) { + *self.0.mutex.lock() = Future::Ready(item); + // Condvar notification will be handled by the `Drop` implementation. + } +} + +impl Drop for Sender { + /// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that + /// the sender has been dropped. + fn drop(&mut self) { + self.1 = None; + self.0.condvar.notify_all(); + } +} + +/// The receiving pair of the `oneshot` channel. Always receives the message sent by the `Sender` +/// (if any). +#[derive(Clone)] +pub struct Receiver(Arc>); + +impl Receiver { + /// Check to see if there is a message to be read *without* blocking/waiting. + /// + /// ## Note + /// + /// This method will technically perform *some* blocking to access a `Mutex`. It is non-blocking + /// in the sense that it won't block until a message is received (i.e., it may return `Ok(None)` + /// if no message has been sent yet). + pub fn try_recv(&self) -> Result, Error> { + match &*self.0.mutex.lock() { + Future::Ready(item) => Ok(Some(item.clone())), + Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None), + Future::NotReady(_) => Err(Error::SenderDropped), + } + } + + /// Check to see if there is a message to be read whilst blocking/waiting until a message is + /// sent or the `Sender` is dropped. + pub fn recv(self) -> Result { + let mut lock = self.0.mutex.lock(); + loop { + match &*lock { + Future::Ready(item) => return Ok(item.clone()), + Future::NotReady(weak) if weak.upgrade().is_some() => { + self.0.condvar.wait(&mut lock) + } + Future::NotReady(_) => return Err(Error::SenderDropped), + } + } + } +} + +/// A single-sender, multiple-receiver broadcast channel. +/// +/// The sender may send *only one* message which will be received by *all* receivers. +pub fn oneshot() -> (Sender, Receiver) { + let sender_ref = Arc::new(()); + let mutex_condvar = Arc::new(MutexCondvar { + mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))), + condvar: Condvar::new(), + }); + let receiver = Receiver(mutex_condvar.clone()); + let sender = Sender(mutex_condvar, Some(sender_ref)); + (sender, receiver) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::Duration; + + #[test] + fn single_thread_try_recv() { + let (sender, receiver) = oneshot(); + assert_eq!(receiver.try_recv(), Ok(None)); + sender.send(42); + assert_eq!(receiver.try_recv(), Ok(Some(42))); + } + + #[test] + fn single_thread_try_recv_no_message() { + let (sender, receiver) = oneshot::(); + assert_eq!(receiver.try_recv(), Ok(None)); + drop(sender); + assert_eq!(receiver.try_recv(), Err(Error::SenderDropped)); + } + + #[test] + fn single_thread_recv() { + let (sender, receiver) = oneshot(); + assert_eq!(receiver.try_recv(), Ok(None)); + sender.send(42); + assert_eq!(receiver.recv(), Ok(42)); + } + + #[test] + fn single_thread_recv_no_message() { + let (sender, receiver) = oneshot::(); + assert_eq!(receiver.try_recv(), Ok(None)); + drop(sender); + assert_eq!(receiver.recv(), Err(Error::SenderDropped)); + } + + #[test] + fn two_threads_message_sent() { + let (sender, receiver) = oneshot(); + + let handle = thread::spawn(|| receiver.recv().unwrap()); + + sender.send(42); + assert_eq!(handle.join().unwrap(), 42); + } + + #[test] + fn three_threads_message_set() { + let (sender, receiver) = oneshot(); + + let receiver_a = receiver.clone(); + let handle_a = thread::spawn(|| receiver_a.recv().unwrap()); + let handle_b = thread::spawn(|| receiver.recv().unwrap()); + + sender.send(42); + assert_eq!(handle_a.join().unwrap(), 42); + assert_eq!(handle_b.join().unwrap(), 42); + } + + #[test] + fn three_threads_sender_dropped() { + let (sender, receiver) = oneshot::(); + + let receiver_a = receiver.clone(); + let handle_a = thread::spawn(|| receiver_a.recv()); + let handle_b = thread::spawn(|| receiver.recv()); + + drop(sender); + assert_eq!(handle_a.join().unwrap(), Err(Error::SenderDropped)); + assert_eq!(handle_b.join().unwrap(), Err(Error::SenderDropped)); + } + + #[test] + fn sender_dropped_after_recv() { + let (sender_a, receiver_a) = oneshot(); + let (sender_b, receiver_b) = oneshot::(); + + let handle_0 = thread::spawn(|| { + sender_a.send(1); + receiver_b.recv() + }); + + assert_eq!(receiver_a.recv(), Ok(1)); + // This is a slightly hacky sleep that assumes that the thread has had enough time after + // sending down `sender_a` to start listening to `receiver_b`. + thread::sleep(Duration::from_secs(1)); + drop(sender_b); + assert_eq!(handle_0.join().unwrap(), Err(Error::SenderDropped)) + } +}