From 3dd50bda11cefb3c17d851cbb8811610385c20aa Mon Sep 17 00:00:00 2001 From: Divma Date: Fri, 10 Jun 2022 06:58:50 +0000 Subject: [PATCH] Improve substream management (#3261) ## Issue Addressed Which issue # does this PR address? ## Proposed Changes Please list or describe the changes introduced by this PR. ## Additional Info Please provide any additional information. For example, future considerations or information useful for reviewers. --- .../lighthouse_network/src/behaviour/mod.rs | 3 - .../src/peer_manager/mod.rs | 5 +- .../lighthouse_network/src/rpc/handler.rs | 56 +++++++++++-------- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index e67bb29de..81de3f015 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -1006,9 +1006,6 @@ where proto, error, } => { - if matches!(error, RPCError::HandlerRejected) { - // this peer's request got canceled - } // Inform the peer manager of the error. // An inbound error here means we sent an error to the peer, or the stream // timed out. diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 9c8d41194..3575d9d34 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -457,10 +457,7 @@ impl PeerManager { debug!(self.log, "Internal RPC Error"; "error" => %e, "peer_id" => %peer_id); return; } - RPCError::HandlerRejected => { - // Our fault. Do nothing - return; - } + RPCError::HandlerRejected => PeerAction::Fatal, RPCError::InvalidData(_) => { // Peer is not complying with the protocol. This is considered a malicious action PeerAction::Fatal diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index ac39e0cec..9ac062adc 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -40,6 +40,9 @@ const IO_ERROR_RETRIES: u8 = 3; /// Maximum time given to the handler to perform shutdown operations. const SHUTDOWN_TIMEOUT_SECS: u8 = 15; +/// Maximum number of simultaneous inbound substreams we keep for this peer. +const MAX_INBOUND_SUBSTREAMS: usize = 32; + /// Identifier of inbound and outbound substreams from the handler's perspective. #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] pub struct SubstreamId(usize); @@ -241,7 +244,7 @@ where // We now drive to completion communications already dialed/established while let Some((id, req)) = self.dial_queue.pop() { self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::HandlerRejected, + error: RPCError::Disconnected, proto: req.protocol(), id, })); @@ -265,7 +268,7 @@ where self.dial_queue.push((id, req)); } _ => self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::HandlerRejected, + error: RPCError::Disconnected, proto: req.protocol(), id, })), @@ -339,23 +342,32 @@ where // store requests that expect responses if expected_responses > 0 { - // Store the stream and tag the output. - let delay_key = self.inbound_substreams_delay.insert( - self.current_inbound_substream_id, - Duration::from_secs(RESPONSE_TIMEOUT), - ); - let awaiting_stream = InboundState::Idle(substream); - self.inbound_substreams.insert( - self.current_inbound_substream_id, - InboundInfo { - state: awaiting_stream, - pending_items: VecDeque::with_capacity(expected_responses as usize), - delay_key: Some(delay_key), - protocol: req.protocol(), - request_start_time: Instant::now(), - remaining_chunks: expected_responses, - }, - ); + if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS { + // Store the stream and tag the output. + let delay_key = self.inbound_substreams_delay.insert( + self.current_inbound_substream_id, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + let awaiting_stream = InboundState::Idle(substream); + self.inbound_substreams.insert( + self.current_inbound_substream_id, + InboundInfo { + state: awaiting_stream, + pending_items: VecDeque::with_capacity(expected_responses as usize), + delay_key: Some(delay_key), + protocol: req.protocol(), + request_start_time: Instant::now(), + remaining_chunks: expected_responses, + }, + ); + } else { + self.events_out.push(Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto: req.protocol(), + error: RPCError::HandlerRejected, + })); + return self.shutdown(None); + } } // If we received a goodbye, shutdown the connection. @@ -382,7 +394,7 @@ where // accept outbound connections only if the handler is not deactivated if matches!(self.state, HandlerState::Deactivated) { self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::HandlerRejected, + error: RPCError::Disconnected, proto, id, })); @@ -671,7 +683,7 @@ where { // if the request was still active, report back to cancel it self.events_out.push(Err(HandlerErr::Inbound { - error: RPCError::HandlerRejected, + error: RPCError::Disconnected, proto: info.protocol, id: *id, })); @@ -803,7 +815,7 @@ where // the handler is deactivated. Close the stream entry.get_mut().state = OutboundSubstreamState::Closing(substream); self.events_out.push(Err(HandlerErr::Outbound { - error: RPCError::HandlerRejected, + error: RPCError::Disconnected, proto: entry.get().proto, id: entry.get().req_id, }))