From 9fc290a344ae79bbb3a2c565141fffc212a7f1a6 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sun, 28 Jun 2020 22:29:27 +1000 Subject: [PATCH] Add waker to attestation service (#1305) * Add waker to attestation service * Formatting --- .../network/src/attestation_service/mod.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index dfaf47198..bfd63e962 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -131,6 +131,9 @@ pub struct AttestationService { /// This is a set of validator indices. known_validators: HashSetDelay, + /// The waker for the current thread. + waker: Option, + /// The logger for the attestation service. log: slog::Logger, } @@ -171,6 +174,7 @@ impl AttestationService { unsubscriptions: HashSetDelay::new(default_timeout), aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), known_validators: HashSetDelay::new(last_seen_val_timeout), + waker: None, log, } } @@ -256,6 +260,11 @@ impl AttestationService { } } } + + // pre-emptively wake the thread to check for new events + if let Some(waker) = &self.waker { + waker.wake_by_ref(); + } Ok(()) } @@ -713,6 +722,15 @@ impl Stream for AttestationService { type Item = AttServiceMessage; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // update the waker if needed + if let Some(waker) = &self.waker { + if waker.will_wake(cx.waker()) { + self.waker = Some(cx.waker().clone()); + } + } else { + self.waker = Some(cx.waker().clone()); + } + // process any peer discovery events match self.discover_peers.poll_next_unpin(cx) { Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet),