Impl oneshot_broadcast
for committee promises (#3595)
## Issue Addressed NA ## Proposed Changes Fixes an issue introduced in #3574 where I erroneously assumed that a `crossbeam_channel` multiple receiver queue was a *broadcast* queue. This is incorrect, each message will be received by *only one* receiver. The effect of this mistake is these logs: ``` Sep 20 06:56:17.001 INFO Synced slot: 4736079, block: 0xaa8a…180d, epoch: 148002, finalized_epoch: 148000, finalized_root: 0x2775…47f2, exec_hash: 0x2ca5…ffde (verified), peers: 6, service: slot_notifier Sep 20 06:56:23.237 ERRO Unable to validate attestation error: CommitteeCacheWait(RecvError), peer_id: 16Uiu2HAm2Jnnj8868tb7hCta1rmkXUf5YjqUH1YPj35DCwNyeEzs, type: "aggregated", slot: Slot(4736047), beacon_block_root: 0x88d318534b1010e0ebd79aed60b6b6da1d70357d72b271c01adf55c2b46206c1 ``` ## Additional Info NA
This commit is contained in:
parent
a95bcba2ab
commit
96692b8e43
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -387,7 +387,6 @@ version = "0.2.0"
|
||||
dependencies = [
|
||||
"bitvec 0.20.4",
|
||||
"bls",
|
||||
"crossbeam-channel",
|
||||
"derivative",
|
||||
"environment",
|
||||
"eth1",
|
||||
@ -410,6 +409,7 @@ dependencies = [
|
||||
"lru",
|
||||
"maplit",
|
||||
"merkle_proof",
|
||||
"oneshot_broadcast",
|
||||
"operation_pool",
|
||||
"parking_lot 0.12.1",
|
||||
"proto_array",
|
||||
@ -4343,6 +4343,13 @@ version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1"
|
||||
|
||||
[[package]]
|
||||
name = "oneshot_broadcast"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "oorandom"
|
||||
version = "11.1.3"
|
||||
|
@ -34,6 +34,7 @@ members = [
|
||||
"common/logging",
|
||||
"common/lru_cache",
|
||||
"common/malloc_utils",
|
||||
"common/oneshot_broadcast",
|
||||
"common/sensitive_url",
|
||||
"common/slot_clock",
|
||||
"common/task_executor",
|
||||
|
@ -63,7 +63,7 @@ superstruct = "0.5.0"
|
||||
hex = "0.4.2"
|
||||
exit-future = "0.2.0"
|
||||
unused_port = {path = "../../common/unused_port"}
|
||||
crossbeam-channel = "0.5.6"
|
||||
oneshot_broadcast = { path = "../../common/oneshot_broadcast" }
|
||||
|
||||
[[test]]
|
||||
name = "beacon_chain_tests"
|
||||
|
@ -4609,13 +4609,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
metrics::stop_timer(committee_building_timer);
|
||||
|
||||
if let Err(e) = sender.send(committee_cache.clone()) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Did not fulfil committee promise";
|
||||
"error" => %e
|
||||
)
|
||||
}
|
||||
sender.send(committee_cache.clone());
|
||||
|
||||
map_fn(&committee_cache, shuffling_decision_block)
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ pub enum BeaconChainError {
|
||||
},
|
||||
AttestationHeadNotInForkChoice(Hash256),
|
||||
MissingPersistedForkChoice,
|
||||
CommitteeCacheWait(crossbeam_channel::RecvError),
|
||||
CommitteePromiseFailed(oneshot_broadcast::Error),
|
||||
MaxCommitteePromises(usize),
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::{metrics, BeaconChainError};
|
||||
use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError};
|
||||
use lru::LruCache;
|
||||
use oneshot_broadcast::{oneshot, Receiver, Sender};
|
||||
use std::sync::Arc;
|
||||
use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256};
|
||||
|
||||
@ -40,7 +40,7 @@ impl CacheItem {
|
||||
CacheItem::Committee(cache) => Ok(cache),
|
||||
CacheItem::Promise(receiver) => receiver
|
||||
.recv()
|
||||
.map_err(BeaconChainError::CommitteeCacheWait),
|
||||
.map_err(BeaconChainError::CommitteePromiseFailed),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -72,7 +72,7 @@ impl ShufflingCache {
|
||||
item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() {
|
||||
// The promise has already been resolved. Replace the entry in the cache with a
|
||||
// `Committee` entry and then return the committee.
|
||||
Ok(committee) => {
|
||||
Ok(Some(committee)) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS);
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
|
||||
let ready = CacheItem::Committee(committee);
|
||||
@ -81,7 +81,7 @@ impl ShufflingCache {
|
||||
}
|
||||
// The promise has not yet been resolved. Return the promise so the caller can await
|
||||
// it.
|
||||
Err(TryRecvError::Empty) => {
|
||||
Ok(None) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS);
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
|
||||
item.cloned()
|
||||
@ -96,7 +96,7 @@ impl ShufflingCache {
|
||||
// memory and the nature of the LRU cache means that future, relevant entries will
|
||||
// still be added to the cache. We expect that *all* promises should be resolved,
|
||||
// unless there is a programming or database error.
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
Err(oneshot_broadcast::Error::SenderDropped) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS);
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES);
|
||||
self.cache.pop(key);
|
||||
@ -147,7 +147,7 @@ impl ShufflingCache {
|
||||
return Err(BeaconChainError::MaxCommitteePromises(num_active_promises));
|
||||
}
|
||||
|
||||
let (sender, receiver) = bounded(1);
|
||||
let (sender, receiver) = oneshot();
|
||||
self.cache.put(key, CacheItem::Promise(receiver));
|
||||
Ok(sender)
|
||||
}
|
||||
@ -262,7 +262,7 @@ mod test {
|
||||
);
|
||||
|
||||
// Resolve the promise.
|
||||
sender.send(committee_a.clone()).unwrap();
|
||||
sender.send(committee_a.clone());
|
||||
|
||||
// Ensure the promise has been resolved.
|
||||
let item = cache.get(&id_a).unwrap();
|
||||
@ -324,7 +324,7 @@ mod test {
|
||||
);
|
||||
|
||||
// Resolve promise A.
|
||||
sender_a.send(committee_a.clone()).unwrap();
|
||||
sender_a.send(committee_a.clone());
|
||||
// Ensure promise A has been resolved.
|
||||
let item = cache.get(&id_a).unwrap();
|
||||
assert!(
|
||||
@ -333,7 +333,7 @@ mod test {
|
||||
);
|
||||
|
||||
// Resolve promise B.
|
||||
sender_b.send(committee_b.clone()).unwrap();
|
||||
sender_b.send(committee_b.clone());
|
||||
// Ensure promise B has been resolved.
|
||||
let item = cache.get(&id_b).unwrap();
|
||||
assert!(
|
||||
|
9
common/oneshot_broadcast/Cargo.toml
Normal file
9
common/oneshot_broadcast/Cargo.toml
Normal file
@ -0,0 +1,9 @@
|
||||
[package]
|
||||
name = "oneshot_broadcast"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
parking_lot = "0.12.0"
|
188
common/oneshot_broadcast/src/lib.rs
Normal file
188
common/oneshot_broadcast/src/lib.rs
Normal file
@ -0,0 +1,188 @@
|
||||
//! Provides a single-sender, multiple receiver one-shot channel where any message sent will be
|
||||
//! received by all senders.
|
||||
//!
|
||||
//! This implementation may not be blazingly fast but it should be simple enough to be reliable.
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
SenderDropped,
|
||||
}
|
||||
|
||||
enum Future<T> {
|
||||
/// The future is ready and the item may be consumed.
|
||||
Ready(T),
|
||||
/// Future is not ready. The contained `Weak` is a reference to the `Sender` that may be used to
|
||||
/// detect when the channel is disconnected.
|
||||
NotReady(Weak<()>),
|
||||
}
|
||||
|
||||
struct MutexCondvar<T> {
|
||||
mutex: Mutex<Future<T>>,
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
/// The sending pair of the `oneshot` channel.
|
||||
pub struct Sender<T>(Arc<MutexCondvar<T>>, Option<Arc<()>>);
|
||||
|
||||
impl<T> Sender<T> {
|
||||
/// Send a message, consuming `self` and delivering the message to *all* receivers.
|
||||
pub fn send(self, item: T) {
|
||||
*self.0.mutex.lock() = Future::Ready(item);
|
||||
// Condvar notification will be handled by the `Drop` implementation.
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
/// Drop the `Arc` and notify all receivers so they can't upgrade their `Weak`s and know that
|
||||
/// the sender has been dropped.
|
||||
fn drop(&mut self) {
|
||||
self.1 = None;
|
||||
self.0.condvar.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
/// The receiving pair of the `oneshot` channel. Always receives the message sent by the `Sender`
|
||||
/// (if any).
|
||||
#[derive(Clone)]
|
||||
pub struct Receiver<T: Clone>(Arc<MutexCondvar<T>>);
|
||||
|
||||
impl<T: Clone> Receiver<T> {
|
||||
/// Check to see if there is a message to be read *without* blocking/waiting.
|
||||
///
|
||||
/// ## Note
|
||||
///
|
||||
/// This method will technically perform *some* blocking to access a `Mutex`. It is non-blocking
|
||||
/// in the sense that it won't block until a message is received (i.e., it may return `Ok(None)`
|
||||
/// if no message has been sent yet).
|
||||
pub fn try_recv(&self) -> Result<Option<T>, Error> {
|
||||
match &*self.0.mutex.lock() {
|
||||
Future::Ready(item) => Ok(Some(item.clone())),
|
||||
Future::NotReady(weak) if weak.upgrade().is_some() => Ok(None),
|
||||
Future::NotReady(_) => Err(Error::SenderDropped),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check to see if there is a message to be read whilst blocking/waiting until a message is
|
||||
/// sent or the `Sender` is dropped.
|
||||
pub fn recv(self) -> Result<T, Error> {
|
||||
let mut lock = self.0.mutex.lock();
|
||||
loop {
|
||||
match &*lock {
|
||||
Future::Ready(item) => return Ok(item.clone()),
|
||||
Future::NotReady(weak) if weak.upgrade().is_some() => {
|
||||
self.0.condvar.wait(&mut lock)
|
||||
}
|
||||
Future::NotReady(_) => return Err(Error::SenderDropped),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A single-sender, multiple-receiver broadcast channel.
|
||||
///
|
||||
/// The sender may send *only one* message which will be received by *all* receivers.
|
||||
pub fn oneshot<T: Clone>() -> (Sender<T>, Receiver<T>) {
|
||||
let sender_ref = Arc::new(());
|
||||
let mutex_condvar = Arc::new(MutexCondvar {
|
||||
mutex: Mutex::new(Future::NotReady(Arc::downgrade(&sender_ref))),
|
||||
condvar: Condvar::new(),
|
||||
});
|
||||
let receiver = Receiver(mutex_condvar.clone());
|
||||
let sender = Sender(mutex_condvar, Some(sender_ref));
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn single_thread_try_recv() {
|
||||
let (sender, receiver) = oneshot();
|
||||
assert_eq!(receiver.try_recv(), Ok(None));
|
||||
sender.send(42);
|
||||
assert_eq!(receiver.try_recv(), Ok(Some(42)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_thread_try_recv_no_message() {
|
||||
let (sender, receiver) = oneshot::<u8>();
|
||||
assert_eq!(receiver.try_recv(), Ok(None));
|
||||
drop(sender);
|
||||
assert_eq!(receiver.try_recv(), Err(Error::SenderDropped));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_thread_recv() {
|
||||
let (sender, receiver) = oneshot();
|
||||
assert_eq!(receiver.try_recv(), Ok(None));
|
||||
sender.send(42);
|
||||
assert_eq!(receiver.recv(), Ok(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_thread_recv_no_message() {
|
||||
let (sender, receiver) = oneshot::<u8>();
|
||||
assert_eq!(receiver.try_recv(), Ok(None));
|
||||
drop(sender);
|
||||
assert_eq!(receiver.recv(), Err(Error::SenderDropped));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn two_threads_message_sent() {
|
||||
let (sender, receiver) = oneshot();
|
||||
|
||||
let handle = thread::spawn(|| receiver.recv().unwrap());
|
||||
|
||||
sender.send(42);
|
||||
assert_eq!(handle.join().unwrap(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn three_threads_message_set() {
|
||||
let (sender, receiver) = oneshot();
|
||||
|
||||
let receiver_a = receiver.clone();
|
||||
let handle_a = thread::spawn(|| receiver_a.recv().unwrap());
|
||||
let handle_b = thread::spawn(|| receiver.recv().unwrap());
|
||||
|
||||
sender.send(42);
|
||||
assert_eq!(handle_a.join().unwrap(), 42);
|
||||
assert_eq!(handle_b.join().unwrap(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn three_threads_sender_dropped() {
|
||||
let (sender, receiver) = oneshot::<u8>();
|
||||
|
||||
let receiver_a = receiver.clone();
|
||||
let handle_a = thread::spawn(|| receiver_a.recv());
|
||||
let handle_b = thread::spawn(|| receiver.recv());
|
||||
|
||||
drop(sender);
|
||||
assert_eq!(handle_a.join().unwrap(), Err(Error::SenderDropped));
|
||||
assert_eq!(handle_b.join().unwrap(), Err(Error::SenderDropped));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sender_dropped_after_recv() {
|
||||
let (sender_a, receiver_a) = oneshot();
|
||||
let (sender_b, receiver_b) = oneshot::<u8>();
|
||||
|
||||
let handle_0 = thread::spawn(|| {
|
||||
sender_a.send(1);
|
||||
receiver_b.recv()
|
||||
});
|
||||
|
||||
assert_eq!(receiver_a.recv(), Ok(1));
|
||||
// This is a slightly hacky sleep that assumes that the thread has had enough time after
|
||||
// sending down `sender_a` to start listening to `receiver_b`.
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
drop(sender_b);
|
||||
assert_eq!(handle_0.join().unwrap(), Err(Error::SenderDropped))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user