diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 8d8df58f9..a76c0aa9d 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -743,8 +743,8 @@ impl NetworkBehaviour for Behaviour { .peer_info(peer_id) .map_or(true, |i| !i.has_future_duty()) { - //If we are at our peer limit and we don't need the peer for a future validator - //duty, send goodbye with reason TooManyPeers + // If we are at our peer limit and we don't need the peer for a future validator + // duty, send goodbye with reason TooManyPeers Some(GoodbyeReason::TooManyPeers) } else { None diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 9492942d7..d06ccb7e5 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -116,9 +116,6 @@ where /// Maximum number of concurrent outbound substreams being opened. Value is never modified. max_dial_negotiated: u32, - /// Value to return from `connection_keep_alive`. - keep_alive: KeepAlive, - /// State of the handler. state: HandlerState, @@ -243,7 +240,6 @@ where current_outbound_substream_id: SubstreamId(0), state: HandlerState::Active, max_dial_negotiated: 8, - keep_alive: KeepAlive::Yes, outbound_io_error_retries: 0, log: log.clone(), } @@ -287,7 +283,6 @@ where TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64), )); } - self.update_keep_alive(); } /// Opens an outbound substream with a request. @@ -295,7 +290,6 @@ where match self.state { HandlerState::Active => { self.dial_queue.push((id, req)); - self.update_keep_alive(); } _ => { self.pending_errors.push(HandlerErr::Outbound { @@ -338,43 +332,6 @@ where } inbound_info.pending_items.push(response); } - - /// Updates the `KeepAlive` returned by `connection_keep_alive`. - /// - /// The handler stays alive as long as there are inbound/outbound substreams established and no - /// items dialing/to be dialed. Otherwise it is given a grace period of inactivity of - /// `self.inactive_timeout`. - fn update_keep_alive(&mut self) { - // Check that we don't have outbound items pending for dialing, nor dialing, nor - // established. Also check that there are no established inbound substreams. - // Errors and events need to be reported back, so check those too. - let should_shutdown = match self.state { - HandlerState::ShuttingDown(_) => { - self.dial_queue.is_empty() - && self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - && self.pending_errors.is_empty() - && self.events_out.is_empty() - && self.dial_negotiated == 0 - } - HandlerState::Deactivated => { - // Regardless of events, the timeout has expired. Force the disconnect. - true - } - _ => false, - }; - - match self.keep_alive { - KeepAlive::Yes if should_shutdown => self.keep_alive = KeepAlive::No, - KeepAlive::Yes => {} // We continue being active - KeepAlive::Until(_) if should_shutdown => self.keep_alive = KeepAlive::No, // Already deemed inactive - KeepAlive::Until(_) => { - // No longer idle - self.keep_alive = KeepAlive::Yes; - } - KeepAlive::No => {} // currently not used - } - } } impl ProtocolsHandler for RPCHandler @@ -427,8 +384,6 @@ where self.events_out .push(RPCReceived::Request(self.current_inbound_substream_id, req)); self.current_inbound_substream_id.0 += 1; - - self.update_keep_alive(); } fn inject_fully_negotiated_outbound( @@ -486,8 +441,6 @@ where } self.current_outbound_substream_id.0 += 1; } - - self.update_keep_alive(); } fn inject_event(&mut self, rpc_event: Self::InEvent) { @@ -515,7 +468,6 @@ where // This dialing is now considered failed self.dial_negotiated -= 1; - self.update_keep_alive(); self.outbound_io_error_retries = 0; // map the error @@ -548,7 +500,29 @@ where } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + // Check that we don't have outbound items pending for dialing, nor dialing, nor + // established. Also check that there are no established inbound substreams. + // Errors and events need to be reported back, so check those too. + let should_shutdown = match self.state { + HandlerState::ShuttingDown(_) => { + self.dial_queue.is_empty() + && self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + && self.pending_errors.is_empty() + && self.events_out.is_empty() + && self.dial_negotiated == 0 + } + HandlerState::Deactivated => { + // Regardless of events, the timeout has expired. Force the disconnect. + true + } + _ => false, + }; + if should_shutdown { + KeepAlive::No + } else { + KeepAlive::Yes + } } fn poll( @@ -624,8 +598,6 @@ where if let Some(OutboundInfo { proto, req_id, .. }) = self.outbound_substreams.remove(outbound_id.get_ref()) { - self.update_keep_alive(); - let outbound_err = HandlerErr::Outbound { id: req_id, proto, @@ -724,7 +696,6 @@ where self.inbound_substreams.remove(&inbound_id); } - self.update_keep_alive(); // drive outbound streams that need to be processed for outbound_id in self.outbound_substreams.keys().copied().collect::>() { // get the state and mark it as poisoned @@ -813,7 +784,6 @@ where let request_id = entry.get().req_id; self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); - self.update_keep_alive(); // notify the application error if request.expected_responses() > 1 { // return an end of stream result @@ -844,7 +814,6 @@ where error: e, }; entry.remove_entry(); - self.update_keep_alive(); return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err))); } }, @@ -857,7 +826,6 @@ where let request_id = entry.get().req_id; self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); - self.update_keep_alive(); // report the stream termination to the user // @@ -894,7 +862,6 @@ where self.dial_negotiated += 1; let (id, req) = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); - self.update_keep_alive(); return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(req.clone()), info: (id, req), diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 44c6a95d4..cee0f45d5 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -102,6 +102,7 @@ impl Service { .notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero")) .connection_event_buffer_size(64) .incoming_connection_limit(10) + .outgoing_connection_limit(config.target_peers * 2) .peer_connection_limit(MAX_CONNECTIONS_PER_PEER) .executor(Box::new(Executor(executor))) .build()