Update the RPC handler's keep alive logic (#1220)

This commit is contained in:
divma 2020-05-28 21:03:13 -05:00 committed by GitHub
parent 812809913d
commit 91a28e7438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 54 deletions

View File

@ -253,11 +253,6 @@ where
}
}
/// Returns the number of pending requests.
pub fn pending_requests(&self) -> u32 {
self.dial_negotiated + self.dial_queue.len() as u32
}
/// Returns a reference to the listen protocol configuration.
///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
@ -268,17 +263,36 @@ where
/// Returns a mutable reference to the listen protocol configuration.
///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > **Note**: If you modify the protocol, modifications will only apply to future inbound
/// > substreams, not the ones already being negotiated.
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<RPCProtocol<TSpec>> {
&mut self.listen_protocol
}
/// Opens an outbound substream with a request.
pub fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
self.keep_alive = KeepAlive::Yes;
fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
self.dial_queue.push((id, req));
self.update_keep_alive();
}
/// Updates the `KeepAlive` returned by `connection_keep_alive`.
///
/// The handler stays alive as long as there are inbound/outbound substreams established and no
/// items dialing/to be dialed. Otherwise it is given a grace period of inactivity of
/// `self.inactive_timeout`.
fn update_keep_alive(&mut self) {
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
let should_shutdown = self.dial_queue.is_empty()
&& self.dial_negotiated == 0
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty();
if should_shutdown {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout)
} else {
self.keep_alive = KeepAlive::Yes
}
}
}
@ -301,11 +315,6 @@ where
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
// update the keep alive timeout if there are no more remaining outbound streams
if let KeepAlive::Until(_) = self.keep_alive {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
let (req, substream) = substream;
// drop the stream and return a 0 id for goodbye "requests"
if let r @ RPCRequest::Goodbye(_) = req {
@ -336,15 +345,6 @@ where
) {
self.dial_negotiated -= 1;
if self.dial_negotiated == 0
&& self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
} else {
self.keep_alive = KeepAlive::Yes;
}
// add the stream to substreams if we expect a response, otherwise drop the stream.
let (mut id, request) = request_info;
if request.expect_response() {
@ -384,6 +384,8 @@ where
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
}
}
self.update_keep_alive();
}
// NOTE: If the substream has closed due to inactivity, or the substream is in the
@ -607,6 +609,8 @@ where
if let Some((_id, _stream, protocol, _)) =
self.outbound_substreams.remove(stream_id.get_ref())
{
self.update_keep_alive();
// notify the user
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
*stream_id.get_ref(),
@ -683,6 +687,7 @@ where
Poll::Ready(Err(e)) => {
error!(self.log, "Outbound substream error while sending RPC message: {:?}", e);
entry.remove();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
Poll::Pending => {
@ -730,13 +735,7 @@ where
self.queued_outbound_items.remove(&request_id);
entry.remove();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
self.update_keep_alive();
}
Poll::Pending => {
entry.get_mut().0 =
@ -763,13 +762,7 @@ where
self.queued_outbound_items.remove(&request_id);
entry.remove();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
self.update_keep_alive();
} // drop the stream
Poll::Ready(Err(e)) => {
error!(self.log, "Error closing inbound stream"; "error" => e.to_string());
@ -781,13 +774,7 @@ where
self.queued_outbound_items.remove(&request_id);
entry.remove();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
self.update_keep_alive();
}
Poll::Pending => {
entry.get_mut().0 =
@ -864,6 +851,8 @@ where
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
self.update_keep_alive();
// notify the application error
if request.multiple_responses() {
// return an end of stream result
@ -896,6 +885,7 @@ where
self.outbound_substreams_delay.remove(delay_key);
let protocol = entry.get().2;
entry.remove_entry();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, protocol, e),
));
@ -909,15 +899,7 @@ where
let protocol = entry.get().2;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
// adjust the RPC keep-alive
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
self.update_keep_alive();
// report the stream termination to the user
//
@ -969,6 +951,7 @@ where
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),

View File

@ -115,7 +115,7 @@ where
SubstreamProtocol::new(RPCProtocol {
phantom: PhantomData,
}),
Duration::from_secs(30),
Duration::from_secs(5),
&self.log,
)
}