Investigate and correct RPC Response Timeouts (#2804)

RPC Responses are for some reason not removing their timeout when they are completing. 

As an example:

```
Nov 09 01:18:20.256 DEBG Received BlocksByRange Request          step: 1, start_slot: 728465, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:20.263 DEBG Received BlocksByRange Request          step: 1, start_slot: 728593, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:20.483 DEBG BlocksByRange Response sent             returned: 63, requested: 64, current_slot: 2466389, start_slot: 728465, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:20.500 DEBG BlocksByRange Response sent             returned: 64, requested: 64, current_slot: 2466389, start_slot: 728593, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:21.068 DEBG Received BlocksByRange Request          step: 1, start_slot: 728529, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:21.272 DEBG BlocksByRange Response sent             returned: 63, requested: 64, current_slot: 2466389, start_slot: 728529, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:23.434 DEBG Received BlocksByRange Request          step: 1, start_slot: 728657, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:23.665 DEBG BlocksByRange Response sent             returned: 64, requested: 64, current_slot: 2466390, start_slot: 728657, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:25.851 DEBG Received BlocksByRange Request          step: 1, start_slot: 728337, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:25.851 DEBG Received BlocksByRange Request          step: 1, start_slot: 728401, count: 64, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:26.094 DEBG BlocksByRange Response sent             returned: 62, requested: 64, current_slot: 2466390, start_slot: 728401, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:26.100 DEBG BlocksByRange Response sent             returned: 63, requested: 64, current_slot: 2466390, start_slot: 728337, msg: Failed to return all requested blocks, peer: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw
Nov 09 01:18:31.070 DEBG RPC Error                               direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p
Nov 09 01:18:31.070 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p
Nov 09 01:18:31.085 DEBG RPC Error                               direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p
Nov 09 01:18:31.085 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p
Nov 09 01:18:31.459 DEBG RPC Error                               direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p
Nov 09 01:18:31.459 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p
Nov 09 01:18:34.129 DEBG RPC Error                               direction: Incoming, score: 0, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, client: Prysm: version: a80b1c252a9b4773493b41999769bf3134ac373f, os_version: unknown, err: Stream Timeout, protocol: beacon_blocks_by_range, service: libp2p
Nov 09 01:18:34.130 WARN Timed out to a peer's request. Likely insufficient resources, reduce peer count, service: libp2p
Nov 09 01:18:35.686 DEBG Peer Manager disconnecting peer         reason: Too many peers, peer_id: 16Uiu2HAmEmBURejquBUMgKAqxViNoPnSptTWLA2CfgSPnnKENBNw, service: libp2p
```

This PR is to investigate and correct the issue. 

~~My current thoughts are that for some reason we are not closing the streams correctly, or fast enough, or the executor is not registering the closes and waking up.~~ - Pretty sure this is not the case, see message below for a more accurate reason.

~~I've currently added a timeout to stream closures in an attempt to force streams to close and the future to always complete.~~ I removed this
This commit is contained in:
Age Manning 2021-11-16 03:42:25 +00:00
parent 58b04acf28
commit a43a2448b7
3 changed files with 149 additions and 103 deletions

View File

@ -421,7 +421,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// They closed early, this could mean poor connection
PeerAction::MidToleranceError
}
RPCError::InternalError(_) | RPCError::HandlerRejected => {
RPCError::InternalError(e) => {
debug!(self.log, "Internal RPC Error"; "error" => %e, "peer_id" => %peer_id);
return;
}
RPCError::HandlerRejected => {
// Our fault. Do nothing
return;
}
@ -478,8 +482,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
RPCError::StreamTimeout => match direction {
ConnectionDirection::Incoming => {
// we timed out
warn!(self.log, "Timed out to a peer's request. Likely insufficient resources, reduce peer count");
// There was a timeout responding to a peer.
debug!(self.log, "Timed out responding to RPC Request"; "peer_id" => %peer_id);
return;
}
ConnectionDirection::Outgoing => match protocol {

View File

@ -22,11 +22,11 @@ use libp2p::swarm::NegotiatedSubstream;
use slog::{crit, debug, trace, warn};
use smallvec::SmallVec;
use std::{
collections::hash_map::Entry,
collections::{hash_map::Entry, VecDeque},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
time::{Duration, Instant},
};
use tokio::time::{sleep_until, Instant as TInstant, Sleep};
use tokio_util::time::{delay_queue, DelayQueue};
@ -47,14 +47,6 @@ pub struct SubstreamId(usize);
type InboundSubstream<TSpec> = InboundFramed<NegotiatedSubstream, TSpec>;
/// Output of the future handling the send of responses to a peer's request.
type InboundProcessingOutput<TSpec> = (
InboundSubstream<TSpec>, /* substream */
Vec<RPCError>, /* Errors sending messages if any */
bool, /* whether to remove the stream afterwards */
u64, /* Chunks remaining to be sent after this processing finishes */
);
/// Events the handler emits to the behaviour.
type HandlerEvent<T> = Result<RPCReceived<T>, HandlerErr>;
@ -157,11 +149,14 @@ struct InboundInfo<TSpec: EthSpec> {
/// State of the substream.
state: InboundState<TSpec>,
/// Responses queued for sending.
pending_items: Vec<RPCCodedResponse<TSpec>>,
pending_items: VecDeque<RPCCodedResponse<TSpec>>,
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
remaining_chunks: u64,
/// Useful to timing how long each request took to process. Currently only used by
/// BlocksByRange.
request_start_time: Instant,
/// Key to keep track of the substream's timeout via `self.inbound_substreams_delay`.
delay_key: Option<delay_queue::Key>,
}
@ -185,7 +180,9 @@ enum InboundState<TSpec: EthSpec> {
/// The underlying substream is not being used.
Idle(InboundSubstream<TSpec>),
/// The underlying substream is processing responses.
Busy(Pin<Box<dyn Future<Output = InboundProcessingOutput<TSpec>> + Send>>),
/// The return value of the future is (substream, stream_was_closed). The stream_was_closed boolean
/// indicates if the stream was closed due to an error or successfully completing a response.
Busy(Pin<Box<dyn Future<Output = Result<(InboundSubstream<TSpec>, bool), RPCError>> + Send>>),
/// Temporary state during processing
Poisoned,
}
@ -308,7 +305,7 @@ where
"response" => %response, "id" => inbound_id);
return;
}
inbound_info.pending_items.push(response);
inbound_info.pending_items.push_back(response);
}
}
@ -353,9 +350,10 @@ where
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: vec![],
pending_items: VecDeque::with_capacity(expected_responses as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
@ -565,9 +563,9 @@ where
id: *inbound_id.get_ref(),
}));
if info.pending_items.last().map(|l| l.close_after()) == Some(false) {
if info.pending_items.back().map(|l| l.close_after()) == Some(false) {
// if the last chunk does not close the stream, append an error
info.pending_items.push(RPCCodedResponse::Error(
info.pending_items.push_back(RPCCodedResponse::Error(
RPCResponseErrorCode::ServerError,
"Request timed out".into(),
));
@ -621,32 +619,43 @@ where
for (id, info) in self.inbound_substreams.iter_mut() {
loop {
match std::mem::replace(&mut info.state, InboundState::Poisoned) {
// This state indicates that we are not currently sending any messages to the
// peer. We need to check if there are messages to send, if so, start the
// sending process.
InboundState::Idle(substream) if !deactivated => {
if !info.pending_items.is_empty() {
let to_send = std::mem::take(&mut info.pending_items);
let fut = process_inbound_substream(
substream,
info.remaining_chunks,
to_send,
)
.boxed();
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let fut =
send_message_to_inbound_substream(substream, message, last_chunk)
.boxed();
// Update the state and try to process this further.
info.state = InboundState::Busy(Box::pin(fut));
} else {
// There is nothing left to process. Set the stream to idle and
// move on to the next one.
info.state = InboundState::Idle(substream);
break;
}
}
// This state indicates we are not sending at the moment, and the handler is in
// the process of closing the connection to the peer.
InboundState::Idle(mut substream) => {
// handler is deactivated, close the stream and mark it for removal
// Handler is deactivated, close the stream and mark it for removal
match substream.close().poll_unpin(cx) {
// if we can't close right now, put the substream back and try again later
// if we can't close right now, put the substream back and try again
// immediately, continue to do this until we close the substream.
Poll::Pending => info.state = InboundState::Idle(substream),
Poll::Ready(res) => {
// The substream closed, we remove it
// The substream closed, we remove it from the mapping and remove
// the timeout
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
// If there was an error in shutting down the substream report the
// error
if let Err(error) = res {
self.events_out.push(Err(HandlerErr::Inbound {
error,
@ -654,7 +663,10 @@ where
id: *id,
}));
}
if info.pending_items.last().map(|l| l.close_after()) == Some(false)
// If there are still requests to send, report that we are in the
// process of closing a connection to the peer and that we are not
// processing these excess requests.
if info.pending_items.back().map(|l| l.close_after()) == Some(false)
{
// if the request was still active, report back to cancel it
self.events_out.push(Err(HandlerErr::Inbound {
@ -667,53 +679,91 @@ where
}
break;
}
// This state indicates that there are messages to send back to the peer.
// The future here is built by the `process_inbound_substream` function. The
// output returns a substream and whether it was closed in this operation.
InboundState::Busy(mut fut) => {
// first check if sending finished
// Check if the future has completed (i.e we have completed sending all our
// pending items)
match fut.poll_unpin(cx) {
Poll::Ready((substream, errors, remove, new_remaining_chunks)) => {
info.remaining_chunks = new_remaining_chunks;
// report any error
for error in errors {
self.events_out.push(Err(HandlerErr::Inbound {
error,
proto: info.protocol,
id: *id,
}))
}
if remove {
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
break;
} else {
// If we are not removing this substream, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.reset(
delay_key,
Duration::from_secs(RESPONSE_TIMEOUT),
);
}
// The pending messages have been sent successfully
Poll::Ready(Ok((substream, substream_was_closed)))
if !substream_was_closed =>
{
// The substream is still active, decrement the remaining
// chunks expected.
info.remaining_chunks = info.remaining_chunks.saturating_sub(1);
// If this substream has not ended, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT));
}
// The stream may be currently idle. Attempt to process more
// elements
if !deactivated && !info.pending_items.is_empty() {
let to_send = std::mem::take(&mut info.pending_items);
let fut = process_inbound_substream(
substream,
info.remaining_chunks,
to_send,
)
.boxed();
info.state = InboundState::Busy(Box::pin(fut));
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let fut = send_message_to_inbound_substream(
substream, message, last_chunk,
)
.boxed();
// Update the state and try to process this further.
info.state = InboundState::Busy(Box::pin(fut));
}
} else {
// There is nothing left to process. Set the stream to idle and
// move on to the next one.
info.state = InboundState::Idle(substream);
break;
}
}
// The pending messages have been sent successfully and the stream has
// terminated
Poll::Ready(Ok((_substream, _substream_was_closed))) => {
// The substream has closed. Remove the timeout related to the
// substream.
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
// BlocksByRange is the one that typically consumes the most time.
// Its useful to log when the request was completed.
if matches!(info.protocol, Protocol::BlocksByRange) {
debug!(self.log, "BlocksByRange Response sent"; "duration" => Instant::now().duration_since(info.request_start_time).as_secs());
}
// There is nothing more to process on this substream as it has
// been closed. Move on to the next one.
break;
}
// An error occurred when trying to send a response.
// This means we terminate the substream.
Poll::Ready(Err(error)) => {
// Remove the stream timeout from the mapping
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
// Report the error that occurred during the send process
self.events_out.push(Err(HandlerErr::Inbound {
error,
proto: info.protocol,
id: *id,
}));
if matches!(info.protocol, Protocol::BlocksByRange) {
debug!(self.log, "BlocksByRange Response failed"; "duration" => info.request_start_time.elapsed().as_secs());
}
break;
}
// The sending future has not completed. Leave the state as busy and
// try to progress later.
Poll::Pending => {
info.state = InboundState::Busy(fut);
break;
@ -725,7 +775,7 @@ where
}
}
// remove closed substreams
// Remove closed substreams
for inbound_id in substreams_to_remove {
self.inbound_substreams.remove(&inbound_id);
}
@ -935,44 +985,36 @@ impl slog::Value for SubstreamId {
}
}
/// Sends the queued items to the peer.
async fn process_inbound_substream<TSpec: EthSpec>(
/// Creates a future that can be polled that will send any queued message to the peer.
///
/// This function returns the given substream, along with whether it has been closed or not. Any
/// error that occurred with sending a message is reported also.
async fn send_message_to_inbound_substream<TSpec: EthSpec>(
mut substream: InboundSubstream<TSpec>,
mut remaining_chunks: u64,
pending_items: Vec<RPCCodedResponse<TSpec>>,
) -> InboundProcessingOutput<TSpec> {
let mut errors = Vec::new();
let mut substream_closed = false;
message: RPCCodedResponse<TSpec>,
last_chunk: bool,
) -> Result<(InboundSubstream<TSpec>, bool), RPCError> {
if matches!(message, RPCCodedResponse::StreamTermination(_)) {
substream.close().await.map(|_| (substream, true))
} else {
// chunks that are not stream terminations get sent, and the stream is closed if
// the response is an error
let is_error = matches!(message, RPCCodedResponse::Error(..));
for item in pending_items {
if !substream_closed {
if matches!(item, RPCCodedResponse::StreamTermination(_)) {
substream.close().await.unwrap_or_else(|e| errors.push(e));
substream_closed = true;
let send_result = substream.send(message).await;
// If we need to close the substream, do so and return the result.
if last_chunk || is_error || send_result.is_err() {
let close_result = substream.close().await.map(|_| (substream, true));
// If there was an error in sending, return this error, otherwise, return the
// result of closing the substream.
if let Err(e) = send_result {
return Err(e);
} else {
remaining_chunks = remaining_chunks.saturating_sub(1);
// chunks that are not stream terminations get sent, and the stream is closed if
// the response is an error
let is_error = matches!(item, RPCCodedResponse::Error(..));
substream
.send(item)
.await
.unwrap_or_else(|e| errors.push(e));
if remaining_chunks == 0 || is_error {
substream.close().await.unwrap_or_else(|e| errors.push(e));
substream_closed = true;
}
return close_result;
}
} else if matches!(item, RPCCodedResponse::StreamTermination(_)) {
// The sender closed the stream before us, ignore this.
} else {
// we have more items after a closed substream, report those as errors
errors.push(RPCError::InternalError(
"Sending responses to closed inbound substream",
));
}
// Everything worked as expected return the result.
send_result.map(|_| (substream, false))
}
(substream, errors, substream_closed, remaining_chunks)
}

View File

@ -255,7 +255,7 @@ impl<T: BeaconChainTypes> Worker<T> {
.unwrap_or_else(|_| self.chain.slot_clock.genesis_slot());
if blocks_sent < (req.count as usize) {
debug!(self.log, "BlocksByRange Response sent";
debug!(self.log, "BlocksByRange Response processed";
"peer" => %peer_id,
"msg" => "Failed to return all requested blocks",
"start_slot" => req.start_slot,
@ -263,7 +263,7 @@ impl<T: BeaconChainTypes> Worker<T> {
"requested" => req.count,
"returned" => blocks_sent);
} else {
debug!(self.log, "BlocksByRange Response sent";
debug!(self.log, "BlocksByRange Response processed";
"peer" => %peer_id,
"start_slot" => req.start_slot,
"current_slot" => current_slot,