diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 95d2c2948..b07d1d4fd 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -132,6 +132,9 @@ where /// Fork specific info. fork_context: Arc, + /// Waker, to be sure the handler gets polled when needed. + waker: Option, + /// Logger for handling RPC streams log: slog::Logger, } @@ -227,6 +230,7 @@ where max_dial_negotiated: 8, outbound_io_error_retries: 0, fork_context, + waker: None, log: log.clone(), } } @@ -431,6 +435,10 @@ where RPCSend::Response(inbound_id, response) => self.send_response(inbound_id, response), RPCSend::Shutdown(reason) => self.shutdown(Some(reason)), } + // In any case, we need the handler to process the event. + if let Some(waker) = &self.waker { + waker.wake_by_ref(); + } } fn inject_dial_upgrade_error( @@ -518,6 +526,13 @@ where Self::Error, >, > { + if let Some(waker) = &self.waker { + if waker.will_wake(cx.waker()) { + self.waker = Some(cx.waker().clone()); + } + } else { + self.waker = Some(cx.waker().clone()); + } // return any events that need to be reported if !self.events_out.is_empty() { return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));