Add waker to attestation service (#1305)

* Add waker to attestation service

* Formatting
This commit is contained in:
Age Manning 2020-06-28 22:29:27 +10:00 committed by GitHub
parent 95320f8ab0
commit 9fc290a344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -131,6 +131,9 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// This is a set of validator indices. /// This is a set of validator indices.
known_validators: HashSetDelay<u64>, known_validators: HashSetDelay<u64>,
/// The waker for the current thread.
waker: Option<std::task::Waker>,
/// The logger for the attestation service. /// The logger for the attestation service.
log: slog::Logger, log: slog::Logger,
} }
@ -171,6 +174,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
unsubscriptions: HashSetDelay::new(default_timeout), unsubscriptions: HashSetDelay::new(default_timeout),
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
known_validators: HashSetDelay::new(last_seen_val_timeout), known_validators: HashSetDelay::new(last_seen_val_timeout),
waker: None,
log, log,
} }
} }
@ -256,6 +260,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
} }
} }
// pre-emptively wake the thread to check for new events
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
Ok(()) Ok(())
} }
@ -713,6 +722,15 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
type Item = AttServiceMessage; type Item = AttServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// update the waker if needed
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// process any peer discovery events // process any peer discovery events
match self.discover_peers.poll_next_unpin(cx) { match self.discover_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet), Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet),