From a43a2448b7c5240a727981538a459c5bfb9c25b7 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 16 Nov 2021 03:42:25 +0000 Subject: [PATCH] Investigate and correct RPC Response Timeouts (#2804) RPC Responses are for some reason not removing their timeout when they are completing. As an example: ``` Nov 09 01:18:20.256 DEBG Received BlocksByRange Request step: 1, start_slot: 728465, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:20.263 DEBG Received BlocksByRange Request step: 1, start_slot: 728593, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:20.483 DEBG BlocksByRange Response sent returned: 63, requested: 64, current_slot: 2466389, start_slot: 728465, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:20.500 DEBG BlocksByRange Response sent returned: 64, requested: 64, current_slot: 2466389, start_slot: 728593, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:21.068 DEBG Received BlocksByRange Request step: 1, start_slot: 728529, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:21.272 DEBG BlocksByRange Response sent returned: 63, requested: 64, current_slot: 2466389, start_slot: 728529, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:23.434 DEBG Received BlocksByRange Request step: 1, start_slot: 728657, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:23.665 DEBG BlocksByRange Response sent returned: 64, requested: 64, current_slot: 2466390, start_slot: 728657, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:25.851 DEBG Received BlocksByRange Request step: 1, start_slot: 728337, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:25.851 DEBG Received BlocksByRange Request step: 1, start_slot: 728401, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:26.094 DEBG BlocksByRange Response sent returned: 62, requested: 64, current_slot: 2466390, start_slot: 728401, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:26.100 DEBG BlocksByRange Response sent returned: 63, requested: 64, current_slot: 2466390, start_slot: 728337, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw Nov 09 01:18:31.070 DEBG RPC Error direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p Nov 09 01:18:31.070 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p Nov 09 01:18:31.085 DEBG RPC Error direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p Nov 09 01:18:31.085 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p Nov 09 01:18:31.459 DEBG RPC Error direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p Nov 09 01:18:31.459 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p Nov 09 01:18:34.129 DEBG RPC Error direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p Nov 09 01:18:34.130 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p Nov 09 01:18:35.686 DEBG Peer Manager disconnecting peer reason: Too many peers, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, service: libp2p ``` This PR is to investigate and correct the issue. ~~My current thoughts are that for some reason we are not closing the streams correctly, or fast enough, or the executor is not registering the closes and waking up.~~ - Pretty sure this is not the case, see message below for a more accurate reason. ~~I've currently added a timeout to stream closures in an attempt to force streams to close and the future to always complete.~~ I removed this --- .../src/peer_manager/mod.rs | 10 +- .../lighthouse_network/src/rpc/handler.rs | 238 ++++++++++-------- .../beacon_processor/worker/rpc_methods.rs | 4 +- 3 files changed, 149 insertions(+), 103 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index b8ca40bae..decc1ccd1 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -421,7 +421,11 @@ impl PeerManager { // They closed early, this could mean poor connection PeerAction::MidToleranceError } - RPCError::InternalError(_) | RPCError::HandlerRejected => { + RPCError::InternalError(e) => { + debug!(self.log, "Internal RPC Error"; "error" => %e, "peer_id" => %peer_id); + return; + } + RPCError::HandlerRejected => { // Our fault. Do nothing return; } @@ -478,8 +482,8 @@ impl PeerManager { } RPCError::StreamTimeout => match direction { ConnectionDirection::Incoming => { - // we timed out - warn!(self.log, "Timed out to a peer's request. Likely insufficient resources, reduce peer count"); + // There was a timeout responding to a peer. + debug!(self.log, "Timed out responding to RPC Request"; "peer_id" => %peer_id); return; } ConnectionDirection::Outgoing => match protocol { diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index b07d1d4fd..1a12c2600 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -22,11 +22,11 @@ use libp2p::swarm::NegotiatedSubstream; use slog::{crit, debug, trace, warn}; use smallvec::SmallVec; use std::{ - collections::hash_map::Entry, + collections::{hash_map::Entry, VecDeque}, pin::Pin, sync::Arc, task::{Context, Poll}, - time::Duration, + time::{Duration, Instant}, }; use tokio::time::{sleep_until, Instant as TInstant, Sleep}; use tokio_util::time::{delay_queue, DelayQueue}; @@ -47,14 +47,6 @@ pub struct SubstreamId(usize); type InboundSubstream = InboundFramed; -/// Output of the future handling the send of responses to a peer's request. -type InboundProcessingOutput = ( - InboundSubstream, /* substream */ - Vec, /* Errors sending messages if any */ - bool, /* whether to remove the stream afterwards */ - u64, /* Chunks remaining to be sent after this processing finishes */ -); - /// Events the handler emits to the behaviour. type HandlerEvent = Result, HandlerErr>; @@ -157,11 +149,14 @@ struct InboundInfo { /// State of the substream. state: InboundState, /// Responses queued for sending. - pending_items: Vec>, + pending_items: VecDeque>, /// Protocol of the original request we received from the peer. protocol: Protocol, /// Responses that the peer is still expecting from us. remaining_chunks: u64, + /// Useful to timing how long each request took to process. Currently only used by + /// BlocksByRange. + request_start_time: Instant, /// Key to keep track of the substream's timeout via `self.inbound_substreams_delay`. delay_key: Option, } @@ -185,7 +180,9 @@ enum InboundState { /// The underlying substream is not being used. Idle(InboundSubstream), /// The underlying substream is processing responses. - Busy(Pin> + Send>>), + /// The return value of the future is (substream, stream_was_closed). The stream_was_closed boolean + /// indicates if the stream was closed due to an error or successfully completing a response. + Busy(Pin, bool), RPCError>> + Send>>), /// Temporary state during processing Poisoned, } @@ -308,7 +305,7 @@ where "response" => %response, "id" => inbound_id); return; } - inbound_info.pending_items.push(response); + inbound_info.pending_items.push_back(response); } } @@ -353,9 +350,10 @@ where self.current_inbound_substream_id, InboundInfo { state: awaiting_stream, - pending_items: vec![], + pending_items: VecDeque::with_capacity(expected_responses as usize), delay_key: Some(delay_key), protocol: req.protocol(), + request_start_time: Instant::now(), remaining_chunks: expected_responses, }, ); @@ -565,9 +563,9 @@ where id: *inbound_id.get_ref(), })); - if info.pending_items.last().map(|l| l.close_after()) == Some(false) { + if info.pending_items.back().map(|l| l.close_after()) == Some(false) { // if the last chunk does not close the stream, append an error - info.pending_items.push(RPCCodedResponse::Error( + info.pending_items.push_back(RPCCodedResponse::Error( RPCResponseErrorCode::ServerError, "Request timed out".into(), )); @@ -621,32 +619,43 @@ where for (id, info) in self.inbound_substreams.iter_mut() { loop { match std::mem::replace(&mut info.state, InboundState::Poisoned) { + // This state indicates that we are not currently sending any messages to the + // peer. We need to check if there are messages to send, if so, start the + // sending process. InboundState::Idle(substream) if !deactivated => { - if !info.pending_items.is_empty() { - let to_send = std::mem::take(&mut info.pending_items); - let fut = process_inbound_substream( - substream, - info.remaining_chunks, - to_send, - ) - .boxed(); + // Process one more message if one exists. + if let Some(message) = info.pending_items.pop_front() { + // If this is the last chunk, terminate the stream. + let last_chunk = info.remaining_chunks <= 1; + let fut = + send_message_to_inbound_substream(substream, message, last_chunk) + .boxed(); + // Update the state and try to process this further. info.state = InboundState::Busy(Box::pin(fut)); } else { + // There is nothing left to process. Set the stream to idle and + // move on to the next one. info.state = InboundState::Idle(substream); break; } } + // This state indicates we are not sending at the moment, and the handler is in + // the process of closing the connection to the peer. InboundState::Idle(mut substream) => { - // handler is deactivated, close the stream and mark it for removal + // Handler is deactivated, close the stream and mark it for removal match substream.close().poll_unpin(cx) { - // if we can't close right now, put the substream back and try again later + // if we can't close right now, put the substream back and try again + // immediately, continue to do this until we close the substream. Poll::Pending => info.state = InboundState::Idle(substream), Poll::Ready(res) => { - // The substream closed, we remove it + // The substream closed, we remove it from the mapping and remove + // the timeout substreams_to_remove.push(*id); if let Some(ref delay_key) = info.delay_key { self.inbound_substreams_delay.remove(delay_key); } + // If there was an error in shutting down the substream report the + // error if let Err(error) = res { self.events_out.push(Err(HandlerErr::Inbound { error, @@ -654,7 +663,10 @@ where id: *id, })); } - if info.pending_items.last().map(|l| l.close_after()) == Some(false) + // If there are still requests to send, report that we are in the + // process of closing a connection to the peer and that we are not + // processing these excess requests. + if info.pending_items.back().map(|l| l.close_after()) == Some(false) { // if the request was still active, report back to cancel it self.events_out.push(Err(HandlerErr::Inbound { @@ -667,53 +679,91 @@ where } break; } + // This state indicates that there are messages to send back to the peer. + // The future here is built by the `process_inbound_substream` function. The + // output returns a substream and whether it was closed in this operation. InboundState::Busy(mut fut) => { - // first check if sending finished + // Check if the future has completed (i.e we have completed sending all our + // pending items) match fut.poll_unpin(cx) { - Poll::Ready((substream, errors, remove, new_remaining_chunks)) => { - info.remaining_chunks = new_remaining_chunks; - // report any error - for error in errors { - self.events_out.push(Err(HandlerErr::Inbound { - error, - proto: info.protocol, - id: *id, - })) - } - if remove { - substreams_to_remove.push(*id); - if let Some(ref delay_key) = info.delay_key { - self.inbound_substreams_delay.remove(delay_key); - } - break; - } else { - // If we are not removing this substream, we reset the timer. - // Each chunk is allowed RESPONSE_TIMEOUT to be sent. - if let Some(ref delay_key) = info.delay_key { - self.inbound_substreams_delay.reset( - delay_key, - Duration::from_secs(RESPONSE_TIMEOUT), - ); - } + // The pending messages have been sent successfully + Poll::Ready(Ok((substream, substream_was_closed))) + if !substream_was_closed => + { + // The substream is still active, decrement the remaining + // chunks expected. + info.remaining_chunks = info.remaining_chunks.saturating_sub(1); + + // If this substream has not ended, we reset the timer. + // Each chunk is allowed RESPONSE_TIMEOUT to be sent. + if let Some(ref delay_key) = info.delay_key { + self.inbound_substreams_delay + .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); } // The stream may be currently idle. Attempt to process more // elements - if !deactivated && !info.pending_items.is_empty() { - let to_send = std::mem::take(&mut info.pending_items); - let fut = process_inbound_substream( - substream, - info.remaining_chunks, - to_send, - ) - .boxed(); - info.state = InboundState::Busy(Box::pin(fut)); + // Process one more message if one exists. + if let Some(message) = info.pending_items.pop_front() { + // If this is the last chunk, terminate the stream. + let last_chunk = info.remaining_chunks <= 1; + let fut = send_message_to_inbound_substream( + substream, message, last_chunk, + ) + .boxed(); + // Update the state and try to process this further. + info.state = InboundState::Busy(Box::pin(fut)); + } } else { + // There is nothing left to process. Set the stream to idle and + // move on to the next one. info.state = InboundState::Idle(substream); break; } } + // The pending messages have been sent successfully and the stream has + // terminated + Poll::Ready(Ok((_substream, _substream_was_closed))) => { + // The substream has closed. Remove the timeout related to the + // substream. + substreams_to_remove.push(*id); + if let Some(ref delay_key) = info.delay_key { + self.inbound_substreams_delay.remove(delay_key); + } + + // BlocksByRange is the one that typically consumes the most time. + // Its useful to log when the request was completed. + if matches!(info.protocol, Protocol::BlocksByRange) { + debug!(self.log, "BlocksByRange Response sent"; "duration" => Instant::now().duration_since(info.request_start_time).as_secs()); + } + + // There is nothing more to process on this substream as it has + // been closed. Move on to the next one. + break; + } + // An error occurred when trying to send a response. + // This means we terminate the substream. + Poll::Ready(Err(error)) => { + // Remove the stream timeout from the mapping + substreams_to_remove.push(*id); + if let Some(ref delay_key) = info.delay_key { + self.inbound_substreams_delay.remove(delay_key); + } + // Report the error that occurred during the send process + self.events_out.push(Err(HandlerErr::Inbound { + error, + proto: info.protocol, + id: *id, + })); + + if matches!(info.protocol, Protocol::BlocksByRange) { + debug!(self.log, "BlocksByRange Response failed"; "duration" => info.request_start_time.elapsed().as_secs()); + } + break; + } + // The sending future has not completed. Leave the state as busy and + // try to progress later. Poll::Pending => { info.state = InboundState::Busy(fut); break; @@ -725,7 +775,7 @@ where } } - // remove closed substreams + // Remove closed substreams for inbound_id in substreams_to_remove { self.inbound_substreams.remove(&inbound_id); } @@ -935,44 +985,36 @@ impl slog::Value for SubstreamId { } } -/// Sends the queued items to the peer. -async fn process_inbound_substream( +/// Creates a future that can be polled that will send any queued message to the peer. +/// +/// This function returns the given substream, along with whether it has been closed or not. Any +/// error that occurred with sending a message is reported also. +async fn send_message_to_inbound_substream( mut substream: InboundSubstream, - mut remaining_chunks: u64, - pending_items: Vec>, -) -> InboundProcessingOutput { - let mut errors = Vec::new(); - let mut substream_closed = false; + message: RPCCodedResponse, + last_chunk: bool, +) -> Result<(InboundSubstream, bool), RPCError> { + if matches!(message, RPCCodedResponse::StreamTermination(_)) { + substream.close().await.map(|_| (substream, true)) + } else { + // chunks that are not stream terminations get sent, and the stream is closed if + // the response is an error + let is_error = matches!(message, RPCCodedResponse::Error(..)); - for item in pending_items { - if !substream_closed { - if matches!(item, RPCCodedResponse::StreamTermination(_)) { - substream.close().await.unwrap_or_else(|e| errors.push(e)); - substream_closed = true; + let send_result = substream.send(message).await; + + // If we need to close the substream, do so and return the result. + if last_chunk || is_error || send_result.is_err() { + let close_result = substream.close().await.map(|_| (substream, true)); + // If there was an error in sending, return this error, otherwise, return the + // result of closing the substream. + if let Err(e) = send_result { + return Err(e); } else { - remaining_chunks = remaining_chunks.saturating_sub(1); - // chunks that are not stream terminations get sent, and the stream is closed if - // the response is an error - let is_error = matches!(item, RPCCodedResponse::Error(..)); - - substream - .send(item) - .await - .unwrap_or_else(|e| errors.push(e)); - - if remaining_chunks == 0 || is_error { - substream.close().await.unwrap_or_else(|e| errors.push(e)); - substream_closed = true; - } + return close_result; } - } else if matches!(item, RPCCodedResponse::StreamTermination(_)) { - // The sender closed the stream before us, ignore this. - } else { - // we have more items after a closed substream, report those as errors - errors.push(RPCError::InternalError( - "Sending responses to closed inbound substream", - )); } + // Everything worked as expected return the result. + send_result.map(|_| (substream, false)) } - (substream, errors, substream_closed, remaining_chunks) } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 7a5d483d9..f3d49c2b4 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -255,7 +255,7 @@ impl Worker { .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); if blocks_sent < (req.count as usize) { - debug!(self.log, "BlocksByRange Response sent"; + debug!(self.log, "BlocksByRange Response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", "start_slot" => req.start_slot, @@ -263,7 +263,7 @@ impl Worker { "requested" => req.count, "returned" => blocks_sent); } else { - debug!(self.log, "BlocksByRange Response sent"; + debug!(self.log, "BlocksByRange Response processed"; "peer" => %peer_id, "start_slot" => req.start_slot, "current_slot" => current_slot,