Refactor inbound substream logic with async (#1325)

## Issue Addressed
#1112 

The logic is slightly different but still valid wrt to error handling.
- Inbound state is either Busy with a future that return the subtream (and info about the processing)
- The state machine works as follows:
  - `Idle` with pending responses => `Busy`
  - `Busy` => finished ? if so and there are new pending responses then `Busy`, if not then `Idle`
               => not finished remains `Busy`
- Add an `InboundInfo` for readability
- Other stuff:
  - Close inbound substreams when all expected responses are sent
  - Remove the error variants from `RPCCodedResponse` and use the codes instead
  - Fix various spelling mistakes because I got sloppy last time

Sorry for the delay

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
divma 2020-07-23 12:30:43 +00:00
parent 3c4daec9af
commit ba10c80633
7 changed files with 232 additions and 520 deletions

8
Cargo.lock generated
View File

@ -885,9 +885,9 @@ checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
[[package]]
name = "cpuid-bool"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec6763c20301ab0dc67051d1b6f4cc9132ad9e6eddcb1f10c6c53ea6d6ae2183"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "crc32fast"
@ -5134,9 +5134,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.34"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b"
checksum = "fb7f4c519df8c117855e19dd8cc851e89eb746fe7a73f0157e0d95fdec5369b0"
dependencies = [
"proc-macro2",
"quote",

View File

@ -256,11 +256,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
error: RPCResponseErrorCode,
reason: String,
) {
self.eth2_rpc.send_response(
peer_id,
id,
RPCCodedResponse::from_error_code(error, reason),
)
self.eth2_rpc
.send_response(peer_id, id, RPCCodedResponse::Error(error, reason.into()))
}
/* Peer management functions */

View File

@ -56,9 +56,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZInboundCodec<TSpec>
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
},
RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(),
RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(),
RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(),
RPCCodedResponse::Error(_, err) => err.as_ssz_bytes(),
RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}

View File

@ -64,9 +64,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
},
RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(),
RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(),
RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(),
RPCCodedResponse::Error(_, err) => err.as_ssz_bytes(),
RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination")
}

View File

@ -1,7 +1,7 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::cognitive_complexity)]
use super::methods::{RPCCodedResponse, RequestId, ResponseTermination};
use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination};
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
use super::{RPCReceived, RPCSend};
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
@ -14,7 +14,7 @@ use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p::swarm::NegotiatedSubstream;
use slog::{crit, debug, error, trace, warn};
use slog::{crit, debug, warn};
use smallvec::SmallVec;
use std::{
collections::hash_map::Entry,
@ -40,6 +40,16 @@ const SHUTDOWN_TIMEOUT_SECS: u8 = 15;
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct SubstreamId(usize);
type InboundSubstream<TSpec> = InboundFramed<NegotiatedSubstream, TSpec>;
/// Output of the future handling the send of responses to a peer's request.
type InboundProcessingOutput<TSpec> = (
InboundSubstream<TSpec>, /* substream */
Vec<RPCError>, /* Errors sending messages if any */
bool, /* whether to remove the stream afterwards */
u64, /* Chunks remaining to be sent after this processing finishes */
);
/// An error encountered by the handler.
pub enum HandlerErr {
/// An error occurred for this peer's request. This can occur during protocol negotiation,
@ -73,7 +83,7 @@ where
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>,
/// Errors ocurring on outbound and inbound connections queued for reporting back.
/// Errors occurring on outbound and inbound connections queued for reporting back.
pending_errors: Vec<HandlerErr>,
/// Queue of events to produce in `poll()`.
@ -86,14 +96,7 @@ where
dial_negotiated: u32,
/// Current inbound substreams awaiting processing.
inbound_substreams: FnvHashMap<
SubstreamId,
(
InboundSubstreamState<TSpec>,
Option<delay_queue::Key>,
Protocol,
),
>,
inbound_substreams: FnvHashMap<SubstreamId, InboundInfo<TSpec>>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
inbound_substreams_delay: DelayQueue<SubstreamId>,
@ -104,9 +107,6 @@ where
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<SubstreamId>,
/// Map of outbound items that are queued as the stream processes them.
queued_outbound_items: FnvHashMap<SubstreamId, Vec<RPCCodedResponse<TSpec>>>,
/// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID.
current_inbound_substream_id: SubstreamId,
@ -143,6 +143,20 @@ enum HandlerState {
Deactivated,
}
/// Contains the information the handler keeps on established inbound substreams.
struct InboundInfo<TSpec: EthSpec> {
/// State of the substream.
state: InboundState<TSpec>,
/// Responses queued for sending.
pending_items: Vec<RPCCodedResponse<TSpec>>,
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
remaining_chunks: u64,
/// Key to keep track of the substream's timeout via `self.inbound_substreams_delay`.
delay_key: Option<delay_queue::Key>,
}
/// Contains the information the handler keeps on established outbound substreams.
struct OutboundInfo<TSpec: EthSpec> {
/// State of the substream.
@ -154,45 +168,46 @@ struct OutboundInfo<TSpec: EthSpec> {
/// Number of chunks to be seen from the peer's response.
// TODO: removing the option could allow clossing the streams after the number of
// expected responses is met for all protocols.
// TODO: the type of this is wrong
remaining_chunks: Option<usize>,
/// RequestId as given by the application that sent the request.
remaining_chunks: Option<u64>,
/// `RequestId` as given by the application that sent the request.
req_id: RequestId,
}
pub enum InboundSubstreamState<TSpec>
where
TSpec: EthSpec,
{
/// A response has been sent, pending writing.
ResponsePendingSend {
/// The substream used to send the response
substream: Box<InboundFramed<NegotiatedSubstream, TSpec>>,
/// The message that is attempting to be sent.
message: RPCCodedResponse<TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
closing: bool,
},
/// A response has been sent, pending flush.
ResponsePendingFlush {
/// The substream used to send the response
substream: Box<InboundFramed<NegotiatedSubstream, TSpec>>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
closing: bool,
},
/// The response stream is idle and awaiting input from the application to send more chunked
/// responses.
ResponseIdle(Box<InboundFramed<NegotiatedSubstream, TSpec>>),
/// The substream is attempting to shutdown.
Closing(Box<InboundFramed<NegotiatedSubstream, TSpec>>),
/// State of an inbound substream connection.
enum InboundState<TSpec: EthSpec> {
/// The underlying substream is not being used.
Idle(InboundSubstream<TSpec>),
/// The underlying substream is processing responses.
Busy(Pin<Box<dyn Future<Output = InboundProcessingOutput<TSpec>> + Send>>),
/// Temporary state during processing
Poisoned,
}
impl<TSpec: EthSpec> InboundState<TSpec> {
/// Sends the given items over the underlying substream, if the state allows it, and returns the
/// final state.
fn send_items(
self,
pending_items: &mut Vec<RPCCodedResponse<TSpec>>,
remaining_chunks: u64,
) -> Self {
if let InboundState::Idle(substream) = self {
// only send on Idle
if !pending_items.is_empty() {
// take the items that we need to send
let to_send = std::mem::replace(pending_items, vec![]);
let fut = process_inbound_substream(substream, remaining_chunks, to_send).boxed();
InboundState::Busy(Box::pin(fut))
} else {
// nothing to do, keep waiting for responses
InboundState::Idle(substream)
}
} else {
self
}
}
}
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// A request has been sent, and we are awaiting a response. This future is driven in the
@ -209,69 +224,6 @@ pub enum OutboundSubstreamState<TSpec: EthSpec> {
Poisoned,
}
impl<TSpec> InboundSubstreamState<TSpec>
where
TSpec: EthSpec,
{
/// Moves the substream state to closing and informs the connected peer. The
/// `queued_outbound_items` must be given as a parameter to add stream termination messages to
/// the outbound queue.
pub fn close(&mut self, outbound_queue: &mut Vec<RPCCodedResponse<TSpec>>) {
// When terminating a stream, report the stream termination to the requesting user via
// an RPC error
let error = RPCCodedResponse::ServerError("Request timed out".into());
// The stream termination type is irrelevant, this will terminate the
// stream
let stream_termination =
RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange);
match std::mem::replace(self, InboundSubstreamState::Poisoned) {
// if we are busy awaiting a send/flush add the termination to the queue
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
}
}
// if we are busy awaiting a send/flush add the termination to the queue
InboundSubstreamState::ResponsePendingFlush { substream, closing } => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingFlush { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*self = InboundSubstreamState::ResponsePendingSend {
substream,
message: error,
closing: true,
};
}
InboundSubstreamState::Closing(substream) => {
// let the stream close
*self = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::Poisoned => {
unreachable!("Coding error: Timeout poisoned substream")
}
};
}
}
impl<TSpec> RPCHandler<TSpec>
where
TSpec: EthSpec,
@ -283,7 +235,6 @@ where
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
dial_negotiated: 0,
queued_outbound_items: FnvHashMap::default(),
inbound_substreams: FnvHashMap::default(),
outbound_substreams: FnvHashMap::default(),
inbound_substreams_delay: DelayQueue::new(),
@ -360,32 +311,21 @@ where
// NOTE: If the substream has closed due to inactivity, or the substream is in the
// wrong state a response will fail silently.
fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse<TSpec>) {
// Variables indicating if the response is an error response or a multi-part
// response
let res_is_error = response.is_error();
let res_is_multiple = response.multiple_responses();
// check if the stream matching the response still exists
let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id) {
Some((substream_state, _, protocol)) => (substream_state, protocol),
None => {
warn!(self.log, "Stream has expired. Response not sent";
"response" => response.to_string(), "id" => inbound_id);
return;
}
let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) {
info
} else {
warn!(self.log, "Stream has expired. Response not sent";
"response" => response.to_string(), "id" => inbound_id);
return;
};
// If the response we are sending is an error, report back for handling
match response {
RPCCodedResponse::InvalidRequest(ref reason)
| RPCCodedResponse::ServerError(ref reason)
| RPCCodedResponse::Unknown(ref reason) => {
let code = &response
.error_code()
.expect("Error response should map to an error code");
RPCCodedResponse::Error(ref code, ref reason) => {
let err = HandlerErr::Inbound {
id: inbound_id,
proto: *protocol,
proto: inbound_info.protocol,
error: RPCError::ErrorResponse(*code, reason.to_string()),
};
self.pending_errors.push(err);
@ -399,85 +339,7 @@ where
"response" => response.to_string(), "id" => inbound_id);
return;
}
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponseIdle(substream) => {
// close the stream if there is no response
if let RPCCodedResponse::StreamTermination(_) = response {
*substream_state = InboundSubstreamState::Closing(substream);
} else {
// send the response
// if it's a single rpc request or an error close the stream after.
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message: response,
closing: !res_is_multiple | res_is_error,
}
}
}
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} if res_is_multiple => {
// the stream is in use, add the request to a pending queue if active
self.queued_outbound_items
.entry(inbound_id)
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
InboundSubstreamState::ResponsePendingFlush { substream, closing }
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(inbound_id)
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state =
InboundSubstreamState::ResponsePendingFlush { substream, closing };
}
InboundSubstreamState::Closing(substream) => {
*substream_state = InboundSubstreamState::Closing(substream);
debug!(self.log, "Response not sent. Stream is closing"; "response" => response.to_string());
}
InboundSubstreamState::ResponsePendingSend {
substream, message, ..
} => {
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing: true,
};
error!(
self.log,
"Attempted sending multiple responses to a single response request"
);
}
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing: true,
};
error!(
self.log,
"Attempted sending multiple responses to a single response request"
);
}
InboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned inbound substream");
unreachable!("Coding error: Poisoned substream");
}
}
inbound_info.pending_items.push(response);
}
/// Updates the `KeepAlive` returned by `connection_keep_alive`.
@ -538,18 +400,25 @@ where
}
let (req, substream) = substream;
let expected_responses = req.expected_responses();
// store requests that expect responses
if req.expected_responses() > 0 {
if expected_responses > 0 {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundSubstreamState::ResponseIdle(Box::new(substream));
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
(awaiting_stream, Some(delay_key), req.protocol()),
InboundInfo {
state: awaiting_stream,
pending_items: vec![],
delay_key: Some(delay_key),
protocol: req.protocol(),
remaining_chunks: expected_responses,
},
);
}
@ -709,13 +578,6 @@ where
if delay.is_elapsed() {
self.state = HandlerState::Deactivated;
debug!(self.log, "Handler deactivated");
// Drain queued responses
for (inbound_id, queued_responses) in self.queued_outbound_items.drain() {
for response in queued_responses {
debug!(self.log, "Response not sent. Deactivated handler";
"response" => response.to_string(), "id" => inbound_id);
}
}
}
}
@ -724,23 +586,22 @@ where
match self.inbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(inbound_id))) => {
// handle a stream timeout for various states
if let Some((substream_state, delay_key, protocol)) =
self.inbound_substreams.get_mut(inbound_id.get_ref())
{
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
// the delay has been removed
*delay_key = None;
info.delay_key = None;
self.pending_errors.push(HandlerErr::Inbound {
id: *inbound_id.get_ref(),
proto: *protocol,
proto: info.protocol,
error: RPCError::StreamTimeout,
});
let outbound_queue = self
.queued_outbound_items
.entry(inbound_id.into_inner())
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
if info.pending_items.last().map(|l| l.close_after()) == Some(false) {
// if the last chunk does not close the stream, append an error
info.pending_items.push(RPCCodedResponse::Error(
RPCResponseErrorCode::ServerError,
"Request timed out".into(),
));
}
}
}
Poll::Ready(Some(Err(e))) => {
@ -788,204 +649,80 @@ where
let deactivated = matches!(self.state, HandlerState::Deactivated);
// drive inbound streams that need to be processed
for request_id in self.inbound_substreams.keys().copied().collect::<Vec<_>>() {
// Drain all queued items until all messages have been processed for this stream
// TODO Improve this code logic
let mut drive_stream_further = true;
while drive_stream_further {
drive_stream_further = false;
match self.inbound_substreams.entry(request_id) {
Entry::Occupied(mut entry) => {
match std::mem::replace(
&mut entry.get_mut().0,
InboundSubstreamState::Poisoned,
) {
InboundSubstreamState::ResponsePendingSend {
mut substream,
message,
closing,
} => {
if deactivated {
if !closing {
// inform back to cancel this request's processing
self.pending_errors.push(HandlerErr::Inbound {
id: request_id,
proto: entry.get().2,
error: RPCError::HandlerRejected,
});
}
entry.get_mut().0 = InboundSubstreamState::Closing(substream);
drive_stream_further = true;
} else {
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
// stream is ready to send data
match Sink::start_send(
Pin::new(&mut substream),
message,
) {
Ok(()) => {
// await flush
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
drive_stream_further = true;
}
Err(e) => {
// error with sending in the codec
warn!(self.log, "Error sending RPC message"; "error" => e.to_string());
// keep connection with the peer and return the
// stream to awaiting response if this message
// wasn't closing the stream
if closing {
entry.get_mut().0 =
InboundSubstreamState::Closing(
substream,
);
drive_stream_further = true;
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
*substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
&mut drive_stream_further,
);
}
}
}
}
Poll::Ready(Err(e)) => {
error!(
self.log,
"Outbound substream error while sending RPC message: {:?}",
e
);
entry.remove();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
Poll::Pending => {
// the stream is not yet ready, continue waiting
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
}
}
let mut substreams_to_remove = Vec::new(); // Closed substreams that need to be removed
for (id, info) in self.inbound_substreams.iter_mut() {
match std::mem::replace(&mut info.state, InboundState::Poisoned) {
state @ InboundState::Idle(..) if !deactivated => {
info.state = state.send_items(&mut info.pending_items, info.remaining_chunks);
}
InboundState::Idle(mut substream) => {
// handler is deactivated, close the stream and mark it for removal
match substream.close().poll_unpin(cx) {
// if we can't close right now, put the substream back and try again later
Poll::Pending => info.state = InboundState::Idle(substream),
Poll::Ready(res) => {
substreams_to_remove.push(id.clone());
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
InboundSubstreamState::ResponsePendingFlush {
mut substream,
closing,
} => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
// finished flushing
// TODO: Duplicate code
if closing | deactivated {
if !closing {
// inform back to cancel this request's processing
self.pending_errors.push(HandlerErr::Inbound {
id: request_id,
proto: entry.get().2,
error: RPCError::HandlerRejected,
});
}
entry.get_mut().0 =
InboundSubstreamState::Closing(substream);
drive_stream_further = true;
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
*substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
&mut drive_stream_further,
);
}
}
Poll::Ready(Err(e)) => {
// error during flush
trace!(self.log, "Error sending flushing RPC message"; "error" => e.to_string());
// we drop the stream on error and inform the user, remove
// any pending requests
// TODO: Duplicate code
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
entry.remove();
self.update_keep_alive();
}
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
}
}
if let Err(error) = res {
self.pending_errors.push(HandlerErr::Inbound {
id: id.clone(),
error,
proto: info.protocol,
});
}
InboundSubstreamState::ResponseIdle(substream) => {
if !deactivated {
entry.get_mut().0 = apply_queued_responses(
*substream,
&mut self.queued_outbound_items.get_mut(&request_id),
&mut drive_stream_further,
);
} else {
entry.get_mut().0 = InboundSubstreamState::Closing(substream);
drive_stream_further = true;
}
}
InboundSubstreamState::Closing(mut substream) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
entry.remove();
self.update_keep_alive();
} // drop the stream
Poll::Ready(Err(e)) => {
error!(self.log, "Error closing inbound stream"; "error" => e.to_string());
// drop the stream anyway
// TODO: Duplicate code
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
entry.remove();
self.update_keep_alive();
}
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream);
}
}
}
InboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Inbound Substream is poisoned");
if info.pending_items.last().map(|l| l.close_after()) == Some(false) {
// if the request was still active, report back to cancel it
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
proto: info.protocol,
error: RPCError::HandlerRejected,
});
}
}
}
Entry::Vacant(_) => unreachable!(),
}
InboundState::Busy(mut fut) => {
// first check if sending finished
let state = match fut.poll_unpin(cx) {
Poll::Ready((substream, errors, remove, new_remaining_chunks)) => {
info.remaining_chunks = new_remaining_chunks;
// report any error
for error in errors {
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
error,
proto: info.protocol,
})
}
if remove {
substreams_to_remove.push(id.clone());
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
}
InboundState::Idle(substream)
}
Poll::Pending => InboundState::Busy(fut),
};
info.state = if !deactivated {
// if the last batch finished, send more.
state.send_items(&mut info.pending_items, info.remaining_chunks)
} else {
state
};
}
InboundState::Poisoned => unreachable!("Poisoned inbound substream"),
}
}
// remove closed substreams
for inbound_id in substreams_to_remove {
self.inbound_substreams.remove(&inbound_id);
}
self.update_keep_alive();
// drive outbound streams that need to be processed
for outbound_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
// get the state and mark it as poisoned
@ -1018,7 +755,7 @@ where
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.expected_responses() > 1 && !response.is_error() {
if request.expected_responses() > 1 && !response.close_after() {
let substream_entry = entry.get_mut();
let delay_key = &substream_entry.delay_key;
// chunks left after this one
@ -1041,8 +778,8 @@ where
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT));
}
} else {
// either this is a single response request or we received an
// error only expect a single response, close the stream
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
}
@ -1055,18 +792,11 @@ where
Ok(RPCReceived::EndOfStream(id, t))
}
RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)),
RPCCodedResponse::InvalidRequest(ref r)
| RPCCodedResponse::ServerError(ref r)
| RPCCodedResponse::Unknown(ref r) => {
let code = response.error_code().expect(
"Response indicating and error should map to an error code",
);
Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::ErrorResponse(code, r.to_string()),
})
}
RPCCodedResponse::Error(ref code, ref r) => Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::ErrorResponse(*code, r.to_string()),
}),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(received));
@ -1172,35 +902,6 @@ where
}
}
// Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<TSpec: EthSpec>(
substream: InboundFramed<NegotiatedSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCCodedResponse<TSpec>>>,
new_items_to_send: &mut bool,
) -> InboundSubstreamState<TSpec> {
match queued_outbound_items {
Some(ref mut queue) if !queue.is_empty() => {
*new_items_to_send = true;
// we have queued items
match queue.remove(0) {
RPCCodedResponse::StreamTermination(_) => {
// close the stream if this is a stream termination
InboundSubstreamState::Closing(Box::new(substream))
}
chunk => InboundSubstreamState::ResponsePendingSend {
substream: Box::new(substream),
message: chunk,
closing: false,
},
}
}
_ => {
// no items queued set to idle
InboundSubstreamState::ResponseIdle(Box::new(substream))
}
}
}
impl slog::Value for SubstreamId {
fn serialize(
&self,
@ -1211,3 +912,43 @@ impl slog::Value for SubstreamId {
slog::Value::serialize(&self.0, record, key, serializer)
}
}
/// Sends the queued items to the peer.
async fn process_inbound_substream<TSpec: EthSpec>(
mut substream: InboundSubstream<TSpec>,
mut remaining_chunks: u64,
pending_items: Vec<RPCCodedResponse<TSpec>>,
) -> InboundProcessingOutput<TSpec> {
let mut errors = Vec::new();
let mut substream_closed = false;
for item in pending_items {
if !substream_closed {
if matches!(item, RPCCodedResponse::StreamTermination(_)) {
substream.close().await.unwrap_or_else(|e| errors.push(e));
substream_closed = true;
} else {
remaining_chunks = remaining_chunks.saturating_sub(1);
// chunks that are not stream terminations get sent, and the stream is closed if
// the response is an error
let is_error = matches!(item, RPCCodedResponse::Error(..));
substream
.send(item)
.await
.unwrap_or_else(|e| errors.push(e));
if remaining_chunks == 0 || is_error {
substream.close().await.unwrap_or_else(|e| errors.push(e));
substream_closed = true;
}
}
} else {
// we have more items after a closed substream, report those as errors
errors.push(RPCError::InternalError(
"Sending responses to closed inbound substream",
));
}
}
(substream, errors, substream_closed, remaining_chunks)
}

View File

@ -249,14 +249,7 @@ pub enum RPCCodedResponse<T: EthSpec> {
/// The response is a successful.
Success(RPCResponse<T>),
/// The response was invalid.
InvalidRequest(ErrorType),
/// The response indicates a server error.
ServerError(ErrorType),
/// There was an unknown response.
Unknown(ErrorType),
Error(RPCResponseErrorCode, ErrorType),
/// Received a stream termination indicating which response is being terminated.
StreamTermination(ResponseTermination),
@ -275,9 +268,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
pub fn as_u8(&self) -> Option<u8> {
match self {
RPCCodedResponse::Success(_) => Some(0),
RPCCodedResponse::InvalidRequest(_) => Some(1),
RPCCodedResponse::ServerError(_) => Some(2),
RPCCodedResponse::Unknown(_) => Some(255),
RPCCodedResponse::Error(code, _) => Some(code.as_u8()),
RPCCodedResponse::StreamTermination(_) => None,
}
}
@ -292,20 +283,12 @@ impl<T: EthSpec> RPCCodedResponse<T> {
/// Builds an RPCCodedResponse from a response code and an ErrorMessage
pub fn from_error(response_code: u8, err: String) -> Self {
match response_code {
1 => RPCCodedResponse::InvalidRequest(err.into()),
2 => RPCCodedResponse::ServerError(err.into()),
_ => RPCCodedResponse::Unknown(err.into()),
}
}
/// Builds an RPCCodedResponse from a response code and an ErrorMessage
pub fn from_error_code(response_code: RPCResponseErrorCode, err: String) -> Self {
match response_code {
RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err.into()),
RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err.into()),
RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err.into()),
}
let code = match response_code {
1 => RPCResponseErrorCode::InvalidRequest,
2 => RPCResponseErrorCode::ServerError,
_ => RPCResponseErrorCode::Unknown,
};
RPCCodedResponse::Error(code, err.into())
}
/// Specifies which response allows for multiple chunks for the stream handler.
@ -318,30 +301,27 @@ impl<T: EthSpec> RPCCodedResponse<T> {
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
},
RPCCodedResponse::InvalidRequest(_) => true,
RPCCodedResponse::ServerError(_) => true,
RPCCodedResponse::Unknown(_) => true,
RPCCodedResponse::Error(_, _) => true,
// Stream terminations are part of responses that have chunks
RPCCodedResponse::StreamTermination(_) => true,
}
}
/// Returns true if this response is an error. Used to terminate the stream after an error is
/// sent.
pub fn is_error(&self) -> bool {
/// Returns true if this response always terminates the stream.
pub fn close_after(&self) -> bool {
match self {
RPCCodedResponse::Success(_) => false,
_ => true,
}
}
}
pub fn error_code(&self) -> Option<RPCResponseErrorCode> {
impl RPCResponseErrorCode {
fn as_u8(&self) -> u8 {
match self {
RPCCodedResponse::Success(_) => None,
RPCCodedResponse::StreamTermination(_) => None,
RPCCodedResponse::InvalidRequest(_) => Some(RPCResponseErrorCode::InvalidRequest),
RPCCodedResponse::ServerError(_) => Some(RPCResponseErrorCode::ServerError),
RPCCodedResponse::Unknown(_) => Some(RPCResponseErrorCode::Unknown),
RPCResponseErrorCode::InvalidRequest => 1,
RPCResponseErrorCode::ServerError => 2,
RPCResponseErrorCode::Unknown => 255,
}
}
}
@ -383,9 +363,7 @@ impl<T: EthSpec> std::fmt::Display for RPCCodedResponse<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RPCCodedResponse::Success(res) => write!(f, "{}", res),
RPCCodedResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err),
RPCCodedResponse::ServerError(err) => write!(f, "Server Error: {:?}", err),
RPCCodedResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err),
RPCCodedResponse::Error(code, err) => write!(f, "{}: {:?}", code, err),
RPCCodedResponse::StreamTermination(_) => write!(f, "Stream Termination"),
}
}

View File

@ -323,12 +323,12 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
/* These functions are used in the handler for stream management */
/// Number of responses expected for this request.
pub fn expected_responses(&self) -> usize {
pub fn expected_responses(&self) -> u64 {
match self {
RPCRequest::Status(_) => 1,
RPCRequest::Goodbye(_) => 0,
RPCRequest::BlocksByRange(req) => req.count as usize,
RPCRequest::BlocksByRoot(req) => req.block_roots.len(),
RPCRequest::BlocksByRange(req) => req.count,
RPCRequest::BlocksByRoot(req) => req.block_roots.len() as u64,
RPCRequest::Ping(_) => 1,
RPCRequest::MetaData(_) => 1,
}