Improve substream management (#3261)

## Issue Addressed

Which issue # does this PR address?

## Proposed Changes

Please list or describe the changes introduced by this PR.

## Additional Info

Please provide any additional information. For example, future considerations
or information useful for reviewers.
This commit is contained in:
Divma 2022-06-10 06:58:50 +00:00
parent 11d80a6a38
commit 3dd50bda11
3 changed files with 35 additions and 29 deletions

View File

@ -1006,9 +1006,6 @@ where
proto,
error,
} => {
if matches!(error, RPCError::HandlerRejected) {
// this peer's request got canceled
}
// Inform the peer manager of the error.
// An inbound error here means we sent an error to the peer, or the stream
// timed out.

View File

@ -457,10 +457,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
debug!(self.log, "Internal RPC Error"; "error" => %e, "peer_id" => %peer_id);
return;
}
RPCError::HandlerRejected => {
// Our fault. Do nothing
return;
}
RPCError::HandlerRejected => PeerAction::Fatal,
RPCError::InvalidData(_) => {
// Peer is not complying with the protocol. This is considered a malicious action
PeerAction::Fatal

View File

@ -40,6 +40,9 @@ const IO_ERROR_RETRIES: u8 = 3;
/// Maximum time given to the handler to perform shutdown operations.
const SHUTDOWN_TIMEOUT_SECS: u8 = 15;
/// Maximum number of simultaneous inbound substreams we keep for this peer.
const MAX_INBOUND_SUBSTREAMS: usize = 32;
/// Identifier of inbound and outbound substreams from the handler's perspective.
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct SubstreamId(usize);
@ -241,7 +244,7 @@ where
// We now drive to completion communications already dialed/established
while let Some((id, req)) = self.dial_queue.pop() {
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::HandlerRejected,
error: RPCError::Disconnected,
proto: req.protocol(),
id,
}));
@ -265,7 +268,7 @@ where
self.dial_queue.push((id, req));
}
_ => self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::HandlerRejected,
error: RPCError::Disconnected,
proto: req.protocol(),
id,
})),
@ -339,23 +342,32 @@ where
// store requests that expect responses
if expected_responses > 0 {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(expected_responses as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundState::Idle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(expected_responses as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
}
}
// If we received a goodbye, shutdown the connection.
@ -382,7 +394,7 @@ where
// accept outbound connections only if the handler is not deactivated
if matches!(self.state, HandlerState::Deactivated) {
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::HandlerRejected,
error: RPCError::Disconnected,
proto,
id,
}));
@ -671,7 +683,7 @@ where
{
// if the request was still active, report back to cancel it
self.events_out.push(Err(HandlerErr::Inbound {
error: RPCError::HandlerRejected,
error: RPCError::Disconnected,
proto: info.protocol,
id: *id,
}));
@ -803,7 +815,7 @@ where
// the handler is deactivated. Close the stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::HandlerRejected,
error: RPCError::Disconnected,
proto: entry.get().proto,
id: entry.get().req_id,
}))