Add a waker to the RPC handler (#2721)

## Issue Addressed

Attempts to fix #2701 but I doubt this is the reason behind that.

## Proposed Changes

maintain a waker in the rpc handler and call it if an event is received
This commit is contained in:
Divma 2021-10-21 06:14:36 +00:00
parent de34001e78
commit d4819bfd42

View File

@ -132,6 +132,9 @@ where
/// Fork specific info. /// Fork specific info.
fork_context: Arc<ForkContext>, fork_context: Arc<ForkContext>,
/// Waker, to be sure the handler gets polled when needed.
waker: Option<std::task::Waker>,
/// Logger for handling RPC streams /// Logger for handling RPC streams
log: slog::Logger, log: slog::Logger,
} }
@ -227,6 +230,7 @@ where
max_dial_negotiated: 8, max_dial_negotiated: 8,
outbound_io_error_retries: 0, outbound_io_error_retries: 0,
fork_context, fork_context,
waker: None,
log: log.clone(), log: log.clone(),
} }
} }
@ -431,6 +435,10 @@ where
RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response), RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response),
RPCSend::Shutdown(reason) => self.shutdown(Some(reason)), RPCSend::Shutdown(reason) => self.shutdown(Some(reason)),
} }
// In any case, we need the handler to process the event.
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
} }
fn inject_dial_upgrade_error( fn inject_dial_upgrade_error(
@ -518,6 +526,13 @@ where
Self::Error, Self::Error,
>, >,
> { > {
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// return any events that need to be reported // return any events that need to be reported
if !self.events_out.is_empty() { if !self.events_out.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))); return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));