diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index bb6d3a8ef..2646c49a6 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -253,11 +253,6 @@ where } } - /// Returns the number of pending requests. - pub fn pending_requests(&self) -> u32 { - self.dial_negotiated + self.dial_queue.len() as u32 - } - /// Returns a reference to the listen protocol configuration. /// /// > **Note**: If you modify the protocol, modifications will only applies to future inbound @@ -268,17 +263,36 @@ where /// Returns a mutable reference to the listen protocol configuration. /// - /// > **Note**: If you modify the protocol, modifications will only applies to future inbound + /// > **Note**: If you modify the protocol, modifications will only apply to future inbound /// > substreams, not the ones already being negotiated. pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol> { &mut self.listen_protocol } /// Opens an outbound substream with a request. - pub fn send_request(&mut self, id: RequestId, req: RPCRequest) { - self.keep_alive = KeepAlive::Yes; - + fn send_request(&mut self, id: RequestId, req: RPCRequest) { self.dial_queue.push((id, req)); + self.update_keep_alive(); + } + + /// Updates the `KeepAlive` returned by `connection_keep_alive`. + /// + /// The handler stays alive as long as there are inbound/outbound substreams established and no + /// items dialing/to be dialed. Otherwise it is given a grace period of inactivity of + /// `self.inactive_timeout`. + fn update_keep_alive(&mut self) { + // Check that we don't have outbound items pending for dialing, nor dialing, nor + // established. Also check that there are no established inbound substreams. + let should_shutdown = self.dial_queue.is_empty() + && self.dial_negotiated == 0 + && self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty(); + + if should_shutdown { + self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout) + } else { + self.keep_alive = KeepAlive::Yes + } } } @@ -301,11 +315,6 @@ where &mut self, substream: >::Output, ) { - // update the keep alive timeout if there are no more remaining outbound streams - if let KeepAlive::Until(_) = self.keep_alive { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); - } - let (req, substream) = substream; // drop the stream and return a 0 id for goodbye "requests" if let r @ RPCRequest::Goodbye(_) = req { @@ -336,15 +345,6 @@ where ) { self.dial_negotiated -= 1; - if self.dial_negotiated == 0 - && self.dial_queue.is_empty() - && self.outbound_substreams.is_empty() - { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); - } else { - self.keep_alive = KeepAlive::Yes; - } - // add the stream to substreams if we expect a response, otherwise drop the stream. let (mut id, request) = request_info; if request.expect_response() { @@ -384,6 +384,8 @@ where crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); } } + + self.update_keep_alive(); } // NOTE: If the substream has closed due to inactivity, or the substream is in the @@ -607,6 +609,8 @@ where if let Some((_id, _stream, protocol, _)) = self.outbound_substreams.remove(stream_id.get_ref()) { + self.update_keep_alive(); + // notify the user return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( *stream_id.get_ref(), @@ -683,6 +687,7 @@ where Poll::Ready(Err(e)) => { error!(self.log, "Outbound substream error while sending RPC message: {:?}", e); entry.remove(); + self.update_keep_alive(); return Poll::Ready(ProtocolsHandlerEvent::Close(e)); } Poll::Pending => { @@ -730,13 +735,7 @@ where self.queued_outbound_items.remove(&request_id); entry.remove(); - if self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - { - self.keep_alive = KeepAlive::Until( - Instant::now() + self.inactive_timeout, - ); - } + self.update_keep_alive(); } Poll::Pending => { entry.get_mut().0 = @@ -763,13 +762,7 @@ where self.queued_outbound_items.remove(&request_id); entry.remove(); - if self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - { - self.keep_alive = KeepAlive::Until( - Instant::now() + self.inactive_timeout, - ); - } + self.update_keep_alive(); } // drop the stream Poll::Ready(Err(e)) => { error!(self.log, "Error closing inbound stream"; "error" => e.to_string()); @@ -781,13 +774,7 @@ where self.queued_outbound_items.remove(&request_id); entry.remove(); - if self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - { - self.keep_alive = KeepAlive::Until( - Instant::now() + self.inactive_timeout, - ); - } + self.update_keep_alive(); } Poll::Pending => { entry.get_mut().0 = @@ -864,6 +851,8 @@ where let delay_key = &entry.get().1; self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); + + self.update_keep_alive(); // notify the application error if request.multiple_responses() { // return an end of stream result @@ -896,6 +885,7 @@ where self.outbound_substreams_delay.remove(delay_key); let protocol = entry.get().2; entry.remove_entry(); + self.update_keep_alive(); return Poll::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error(request_id, protocol, e), )); @@ -909,15 +899,7 @@ where let protocol = entry.get().2; self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); - - // adjust the RPC keep-alive - if self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - { - self.keep_alive = KeepAlive::Until( - Instant::now() + self.inactive_timeout, - ); - } + self.update_keep_alive(); // report the stream termination to the user // @@ -969,6 +951,7 @@ where self.dial_negotiated += 1; let (id, req) = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); + self.update_keep_alive(); return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(req.clone()), info: (id, req), diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index efda27cc4..e276bf6b3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -115,7 +115,7 @@ where SubstreamProtocol::new(RPCProtocol { phantom: PhantomData, }), - Duration::from_secs(30), + Duration::from_secs(5), &self.log, ) }