From fbafe416d1dc3179a485197f9dee2c5bb0e81bca Mon Sep 17 00:00:00 2001 From: Divma Date: Mon, 8 Nov 2021 00:01:10 +0000 Subject: [PATCH] Move the peer manager to be a behaviour (#2773) This simply moves some functions that were "swarm notifications" to a network behaviour implementation. Notes ------ - We could disconnect from the peer manager but we would lose the rpc shutdown message - We still notify from the swarm since this is the most reliable way to get some events. Ugly but best for now - Events need to be pushed with "add event" to wake the waker Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> --- beacon_node/http_api/tests/common.rs | 5 +- .../lighthouse_network/src/behaviour/mod.rs | 121 ++++---- beacon_node/lighthouse_network/src/lib.rs | 1 + .../src/peer_manager/mod.rs | 213 +------------- .../src/peer_manager/network_behaviour.rs | 270 ++++++++++++++++++ beacon_node/lighthouse_network/src/service.rs | 40 +-- 6 files changed, 336 insertions(+), 314 deletions(-) create mode 100644 beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs index 9300e47e9..dd2a40efa 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/tests/common.rs @@ -6,6 +6,7 @@ use eth2::{BeaconNodeHttpClient, Timeouts}; use http_api::{Config, Context}; use lighthouse_network::{ discv5::enr::{CombinedKey, EnrBuilder}, + libp2p::{core::connection::ConnectionId, swarm::NetworkBehaviour}, rpc::methods::{MetaData, MetaDataV2}, types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState}, ConnectedPoint, Enr, NetworkGlobals, PeerId, PeerManager, @@ -118,8 +119,8 @@ pub async fn create_api_server( local_addr: EXTERNAL_ADDR.parse().unwrap(), send_back_addr: EXTERNAL_ADDR.parse().unwrap(), }; - let num_established = std::num::NonZeroU32::new(1).unwrap(); - pm.inject_connection_established(peer_id, connected_point, num_established, None); + let con_id = ConnectionId::new(1); + pm.inject_connection_established(&peer_id, &con_id, &connected_point, None); *network_globals.sync_state.write() = SyncState::Synced; let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index f97a13dcd..86ca6f253 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -15,7 +15,6 @@ use crate::types::{ }; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; -use futures::prelude::*; use libp2p::{ core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr, @@ -139,11 +138,10 @@ pub struct Behaviour { // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. identify: Identify, + /// The peer manager that keeps track of peer's reputation and status. + peer_manager: PeerManager, /* Auxiliary Fields */ - /// The peer manager that keeps track of peer's reputation and status. - #[behaviour(ignore)] - peer_manager: PeerManager, /// The output events generated by this behaviour to be consumed in the swarm poll. #[behaviour(ignore)] events: VecDeque>, @@ -1088,71 +1086,6 @@ impl Behaviour { } } - // check the peer manager for events - loop { - match self.peer_manager.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - PeerManagerEvent::PeerConnectedIncoming(peer_id) => { - return Poll::Ready(NBAction::GenerateEvent( - BehaviourEvent::PeerConnectedIncoming(peer_id), - )); - } - PeerManagerEvent::PeerConnectedOutgoing(peer_id) => { - return Poll::Ready(NBAction::GenerateEvent( - BehaviourEvent::PeerConnectedOutgoing(peer_id), - )); - } - PeerManagerEvent::PeerDisconnected(peer_id) => { - return Poll::Ready(NBAction::GenerateEvent( - BehaviourEvent::PeerDisconnected(peer_id), - )); - } - PeerManagerEvent::Banned(peer_id, associated_ips) => { - self.discovery.ban_peer(&peer_id, associated_ips); - return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::PeerBanned( - peer_id, - ))); - } - PeerManagerEvent::UnBanned(peer_id, associated_ips) => { - self.discovery.unban_peer(&peer_id, associated_ips); - return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::PeerUnbanned( - peer_id, - ))); - } - PeerManagerEvent::Status(peer_id) => { - // it's time to status. We don't keep a beacon chain reference here, so we inform - // the network to send a status to this peer - return Poll::Ready(NBAction::GenerateEvent(BehaviourEvent::StatusPeer( - peer_id, - ))); - } - PeerManagerEvent::DiscoverPeers => { - // Peer manager has requested a discovery query for more peers. - self.discovery.discover_peers(); - } - PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => { - // Peer manager has requested a subnet discovery query for more peers. - self.discover_subnet_peers(subnets_to_discover); - } - PeerManagerEvent::Ping(peer_id) => { - // send a ping request to this peer - self.ping(RequestId::Behaviour, peer_id); - } - PeerManagerEvent::MetaData(peer_id) => { - self.send_meta_data_request(peer_id); - } - PeerManagerEvent::DisconnectPeer(peer_id, reason) => { - debug!(self.log, "Peer Manager disconnecting peer"; - "peer_id" => %peer_id, "reason" => %reason); - // send one goodbye - self.eth2_rpc.shutdown(peer_id, reason); - } - }, - Poll::Pending => break, - Poll::Ready(None) => break, // peer manager ended - } - } - if let Some(event) = self.events.pop_front() { return Poll::Ready(NBAction::GenerateEvent(event)); } @@ -1166,6 +1099,56 @@ impl Behaviour { } } +impl NetworkBehaviourEventProcess for Behaviour { + fn inject_event(&mut self, event: PeerManagerEvent) { + match event { + PeerManagerEvent::PeerConnectedIncoming(peer_id) => { + self.add_event(BehaviourEvent::PeerConnectedIncoming(peer_id)); + } + PeerManagerEvent::PeerConnectedOutgoing(peer_id) => { + self.add_event(BehaviourEvent::PeerConnectedOutgoing(peer_id)); + } + PeerManagerEvent::PeerDisconnected(peer_id) => { + self.add_event(BehaviourEvent::PeerDisconnected(peer_id)); + } + PeerManagerEvent::Banned(peer_id, associated_ips) => { + self.discovery.ban_peer(&peer_id, associated_ips); + self.add_event(BehaviourEvent::PeerBanned(peer_id)); + } + PeerManagerEvent::UnBanned(peer_id, associated_ips) => { + self.discovery.unban_peer(&peer_id, associated_ips); + self.add_event(BehaviourEvent::PeerUnbanned(peer_id)); + } + PeerManagerEvent::Status(peer_id) => { + // it's time to status. We don't keep a beacon chain reference here, so we inform + // the network to send a status to this peer + self.add_event(BehaviourEvent::StatusPeer(peer_id)); + } + PeerManagerEvent::DiscoverPeers => { + // Peer manager has requested a discovery query for more peers. + self.discovery.discover_peers(); + } + PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => { + // Peer manager has requested a subnet discovery query for more peers. + self.discover_subnet_peers(subnets_to_discover); + } + PeerManagerEvent::Ping(peer_id) => { + // send a ping request to this peer + self.ping(RequestId::Behaviour, peer_id); + } + PeerManagerEvent::MetaData(peer_id) => { + self.send_meta_data_request(peer_id); + } + PeerManagerEvent::DisconnectPeer(peer_id, reason) => { + debug!(self.log, "Peer Manager disconnecting peer"; + "peer_id" => %peer_id, "reason" => %reason); + // send one goodbye + self.eth2_rpc.shutdown(peer_id, reason); + } + } + } +} + /* Public API types */ /// The type of RPC requests the Behaviour informs it has received and allows for sending. diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index 5c66818d9..733dc72ab 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -68,6 +68,7 @@ pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response} pub use config::Config as NetworkConfig; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discv5; +pub use libp2p; pub use libp2p::bandwidth::BandwidthSinks; pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index db042e676..b8ca40bae 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -2,23 +2,17 @@ use crate::discovery::TARGET_SUBNET_PEERS; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; -use crate::types::SyncState; use crate::{error, metrics, Gossipsub}; use crate::{NetworkGlobals, PeerId}; use crate::{Subnet, SubnetDiscovery}; use discv5::Enr; -use futures::prelude::*; -use futures::Stream; use hashset_delay::HashSetDelay; -use libp2p::core::ConnectedPoint; use libp2p::identify::IdentifyInfo; use peerdb::{BanOperation, BanResult, ScoreUpdateResult}; use slog::{debug, error, warn}; use smallvec::SmallVec; use std::{ - pin::Pin, sync::Arc, - task::{Context, Poll}, time::{Duration, Instant}, }; use types::{EthSpec, SyncSubnetId}; @@ -36,6 +30,7 @@ pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{hash_map::Entry, HashMap}; use std::net::IpAddr; pub mod config; +mod network_behaviour; /// The heartbeat performs regular updates such as updating reputations and performing discovery /// requests. This defines the interval in seconds. @@ -81,6 +76,7 @@ pub struct PeerManager { } /// The events that the `PeerManager` outputs (requests). +#[derive(Debug)] pub enum PeerManagerEvent { /// A peer has dialed us. PeerConnectedIncoming(PeerId), @@ -341,147 +337,6 @@ impl PeerManager { self.inject_peer_connection(peer_id, ConnectingType::Dialing, enr); } - pub fn inject_connection_established( - &mut self, - peer_id: PeerId, - endpoint: ConnectedPoint, - num_established: std::num::NonZeroU32, - enr: Option, - ) { - // Log the connection - match &endpoint { - ConnectedPoint::Listener { .. } => { - debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Incoming", "connections" => %num_established); - } - ConnectedPoint::Dialer { .. } => { - debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Outgoing", "connections" => %num_established); - } - } - - // Check to make sure the peer is not supposed to be banned - match self.ban_status(&peer_id) { - BanResult::BadScore => { - // This is a faulty state - error!(self.log, "Connected to a banned peer, re-banning"; "peer_id" => %peer_id); - // Reban the peer - self.goodbye_peer(&peer_id, GoodbyeReason::Banned, ReportSource::PeerManager); - return; - } - BanResult::BannedIp(ip_addr) => { - // A good peer has connected to us via a banned IP address. We ban the peer and - // prevent future connections. - debug!(self.log, "Peer connected via banned IP. Banning"; "peer_id" => %peer_id, "banned_ip" => %ip_addr); - self.goodbye_peer(&peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager); - return; - } - BanResult::NotBanned => {} - } - - // Check the connection limits - if self.peer_limit_reached() - && self - .network_globals - .peers - .read() - .peer_info(&peer_id) - .map_or(true, |peer| !peer.has_future_duty()) - { - // Gracefully disconnect the peer. - self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers); - return; - } - - // Register the newly connected peer (regardless if we are about to disconnect them). - // NOTE: We don't register peers that we are disconnecting immediately. The network service - // does not need to know about these peers. - match endpoint { - ConnectedPoint::Listener { send_back_addr, .. } => { - self.inject_connect_ingoing(&peer_id, send_back_addr, enr); - if num_established == std::num::NonZeroU32::new(1).expect("valid") { - self.events - .push(PeerManagerEvent::PeerConnectedIncoming(peer_id)); - } - } - ConnectedPoint::Dialer { address } => { - self.inject_connect_outgoing(&peer_id, address, enr); - if num_established == std::num::NonZeroU32::new(1).expect("valid") { - self.events - .push(PeerManagerEvent::PeerConnectedOutgoing(peer_id)); - } - } - } - - let connected_peers = self.network_globals.connected_peers() as i64; - - // increment prometheus metrics - metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); - metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers); - metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers); - } - - pub fn inject_connection_closed( - &mut self, - peer_id: PeerId, - _endpoint: ConnectedPoint, - num_established: u32, - ) { - if num_established == 0 { - // There are no more connections - if self - .network_globals - .peers - .read() - .is_connected_or_disconnecting(&peer_id) - { - // We are disconnecting the peer or the peer has already been connected. - // Both these cases, the peer has been previously registered by the peer manager and - // potentially the application layer. - // Inform the application. - self.events - .push(PeerManagerEvent::PeerDisconnected(peer_id)); - debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id); - - // Decrement the PEERS_PER_CLIENT metric - if let Some(kind) = self - .network_globals - .peers - .read() - .peer_info(&peer_id) - .map(|info| info.client().kind.clone()) - { - if let Some(v) = - metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) - { - v.dec() - }; - } - } - - // NOTE: It may be the case that a rejected node, due to too many peers is disconnected - // here and the peer manager has no knowledge of its connection. We insert it here for - // reference so that peer manager can track this peer. - self.inject_disconnect(&peer_id); - - let connected_peers = self.network_globals.connected_peers() as i64; - - // Update the prometheus metrics - metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); - metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers); - metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers); - } - } - - /// A dial attempt has failed. - /// - /// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer - /// connects and the dial attempt later fails. To handle this, we only update the peer_db if - /// the peer is not already connected. - pub fn inject_dial_failure(&mut self, peer_id: &PeerId) { - if !self.network_globals.peers.read().is_connected(peer_id) { - self.inject_disconnect(peer_id); - } - } - /// Reports if a peer is banned or not. /// /// This is used to determine if we should accept incoming connections. @@ -973,70 +828,6 @@ impl PeerManager { } } -impl Stream for PeerManager { - type Item = PeerManagerEvent; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // perform the heartbeat when necessary - while self.heartbeat.poll_tick(cx).is_ready() { - self.heartbeat(); - } - - // poll the timeouts for pings and status' - loop { - match self.inbound_ping_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(peer_id))) => { - self.inbound_ping_peers.insert(peer_id); - self.events.push(PeerManagerEvent::Ping(peer_id)); - } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for inbound peers to ping"; "error" => e.to_string()) - } - Poll::Ready(None) | Poll::Pending => break, - } - } - - loop { - match self.outbound_ping_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(peer_id))) => { - self.outbound_ping_peers.insert(peer_id); - self.events.push(PeerManagerEvent::Ping(peer_id)); - } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for outbound peers to ping"; "error" => e.to_string()) - } - Poll::Ready(None) | Poll::Pending => break, - } - } - - if !matches!( - self.network_globals.sync_state(), - SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } - ) { - loop { - match self.status_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(peer_id))) => { - self.status_peers.insert(peer_id); - self.events.push(PeerManagerEvent::Status(peer_id)) - } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) - } - Poll::Ready(None) | Poll::Pending => break, - } - } - } - - if !self.events.is_empty() { - return Poll::Ready(Some(self.events.remove(0))); - } else { - self.events.shrink_to_fit(); - } - - Poll::Pending - } -} - enum ConnectingType { /// We are in the process of dialing this peer. Dialing, diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs new file mode 100644 index 000000000..c8b062da4 --- /dev/null +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -0,0 +1,270 @@ +use std::task::{Context, Poll}; + +use futures::StreamExt; +use libp2p::core::connection::ConnectionId; +use libp2p::core::ConnectedPoint; +use libp2p::swarm::protocols_handler::DummyProtocolsHandler; +use libp2p::swarm::{ + DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, +}; +use libp2p::{Multiaddr, PeerId}; +use slog::{debug, error}; +use types::EthSpec; + +use crate::metrics; +use crate::rpc::GoodbyeReason; +use crate::types::SyncState; + +use super::peerdb::BanResult; +use super::{PeerManager, PeerManagerEvent, ReportSource}; + +impl NetworkBehaviour for PeerManager { + type ProtocolsHandler = DummyProtocolsHandler; + + type OutEvent = PeerManagerEvent; + + /* Required trait members */ + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + DummyProtocolsHandler::default() + } + + fn inject_event( + &mut self, + _: PeerId, + _: ConnectionId, + _: ::OutEvent, + ) { + unreachable!("Dummy handler does not emit events") + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll> { + // perform the heartbeat when necessary + while self.heartbeat.poll_tick(cx).is_ready() { + self.heartbeat(); + } + + // poll the timeouts for pings and status' + loop { + match self.inbound_ping_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.inbound_ping_peers.insert(peer_id); + self.events.push(PeerManagerEvent::Ping(peer_id)); + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for inbound peers to ping"; "error" => e.to_string()) + } + Poll::Ready(None) | Poll::Pending => break, + } + } + + loop { + match self.outbound_ping_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.outbound_ping_peers.insert(peer_id); + self.events.push(PeerManagerEvent::Ping(peer_id)); + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for outbound peers to ping"; "error" => e.to_string()) + } + Poll::Ready(None) | Poll::Pending => break, + } + } + + if !matches!( + self.network_globals.sync_state(), + SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } + ) { + loop { + match self.status_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.status_peers.insert(peer_id); + self.events.push(PeerManagerEvent::Status(peer_id)) + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) + } + Poll::Ready(None) | Poll::Pending => break, + } + } + } + + if !self.events.is_empty() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } else { + self.events.shrink_to_fit(); + } + + Poll::Pending + } + + /* Overwritten trait members */ + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + _connection_id: &ConnectionId, + endpoint: &ConnectedPoint, + _failed_addresses: Option<&Vec>, + ) { + // Log the connection + match &endpoint { + ConnectedPoint::Listener { .. } => { + debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Incoming"); + } + ConnectedPoint::Dialer { .. } => { + debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Outgoing"); + // TODO: Ensure we have that address registered. + } + } + + // Check to make sure the peer is not supposed to be banned + match self.ban_status(peer_id) { + // TODO: directly emit the ban event? + BanResult::BadScore => { + // This is a faulty state + error!(self.log, "Connecteded to a banned peer, re-banning"; "peer_id" => %peer_id); + // Reban the peer + self.goodbye_peer(peer_id, GoodbyeReason::Banned, ReportSource::PeerManager); + return; + } + BanResult::BannedIp(ip_addr) => { + // A good peer has connected to us via a banned IP address. We ban the peer and + // prevent future connections. + debug!(self.log, "Peer connected via banned IP. Banning"; "peer_id" => %peer_id, "banned_ip" => %ip_addr); + self.goodbye_peer(peer_id, GoodbyeReason::BannedIP, ReportSource::PeerManager); + return; + } + BanResult::NotBanned => {} + } + + // Check the connection limits + if self.peer_limit_reached() + && self + .network_globals + .peers + .read() + .peer_info(peer_id) + .map_or(true, |peer| !peer.has_future_duty()) + { + // Gracefully disconnect the peer. + self.disconnect_peer(*peer_id, GoodbyeReason::TooManyPeers); + return; + } + + // Register the newly connected peer (regardless if we are about to disconnect them). + // NOTE: We don't register peers that we are disconnecting immediately. The network service + // does not need to know about these peers. + // let enr + match endpoint { + ConnectedPoint::Listener { send_back_addr, .. } => { + self.inject_connect_ingoing(peer_id, send_back_addr.clone(), None); + self.events + .push(PeerManagerEvent::PeerConnectedIncoming(*peer_id)); + } + ConnectedPoint::Dialer { address } => { + self.inject_connect_outgoing(peer_id, address.clone(), None); + self.events + .push(PeerManagerEvent::PeerConnectedOutgoing(*peer_id)); + } + } + + let connected_peers = self.network_globals.connected_peers() as i64; + + // increment prometheus metrics + metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); + metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers); + metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers); + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + // There are no more connections + if self + .network_globals + .peers + .read() + .is_connected_or_disconnecting(peer_id) + { + // We are disconnecting the peer or the peer has already been connected. + // Both these cases, the peer has been previously registered by the peer manager and + // potentially the application layer. + // Inform the application. + self.events + .push(PeerManagerEvent::PeerDisconnected(*peer_id)); + debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id); + + // Decrement the PEERS_PER_CLIENT metric + if let Some(kind) = self + .network_globals + .peers + .read() + .peer_info(peer_id) + .map(|info| info.client().kind.clone()) + { + if let Some(v) = + metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) + { + v.dec() + }; + } + } + + // NOTE: It may be the case that a rejected node, due to too many peers is disconnected + // here and the peer manager has no knowledge of its connection. We insert it here for + // reference so that peer manager can track this peer. + self.inject_disconnect(peer_id); + + let connected_peers = self.network_globals.connected_peers() as i64; + + // Update the prometheus metrics + metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); + metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers); + metrics::set_gauge(&metrics::PEERS_CONNECTED_INTEROP, connected_peers); + } + + fn inject_address_change( + &mut self, + _peer_id: &PeerId, + _connection_id: &ConnectionId, + old: &ConnectedPoint, + new: &ConnectedPoint, + ) { + debug_assert!( + matches!( + (old, new), + ( + // inbound remains inbound + ConnectedPoint::Listener { .. }, + ConnectedPoint::Listener { .. } + ) | ( + // outbound remains outbound + ConnectedPoint::Dialer { .. }, + ConnectedPoint::Dialer { .. } + ) + ), + "A peer has changed between inbound and outbound" + ) + } + + /// A dial attempt has failed. + /// + /// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer + /// connects and the dial attempt later fails. To handle this, we only update the peer_db if + /// the peer is not already connected. + fn inject_dial_failure( + &mut self, + peer_id: Option, + _handler: DummyProtocolsHandler, + _error: &DialError, + ) { + if let Some(peer_id) = peer_id { + if !self.network_globals.peers.read().is_connected(&peer_id) { + self.inject_disconnect(&peer_id); + } + } + } +} diff --git a/beacon_node/lighthouse_network/src/service.rs b/beacon_node/lighthouse_network/src/service.rs index c88eb4585..a42fe5381 100644 --- a/beacon_node/lighthouse_network/src/service.rs +++ b/beacon_node/lighthouse_network/src/service.rs @@ -317,35 +317,17 @@ impl Service { return Libp2pEvent::Behaviour(behaviour); } SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, + peer_id: _, + endpoint: _, + num_established: _, concurrent_dial_errors: _, - } => { - // Inform the peer manager. - // We require the ENR to inject into the peer db, if it exists. - let enr = self - .swarm - .behaviour_mut() - .discovery_mut() - .enr_of_peer(&peer_id); - self.swarm - .behaviour_mut() - .peer_manager_mut() - .inject_connection_established(peer_id, endpoint, num_established, enr); - } + } => {} SwarmEvent::ConnectionClosed { - peer_id, + peer_id: _, cause: _, - endpoint, - num_established, - } => { - // Inform the peer manager. - self.swarm - .behaviour_mut() - .peer_manager_mut() - .inject_connection_closed(peer_id, endpoint, num_established); - } + endpoint: _, + num_established: _, + } => {} SwarmEvent::NewListenAddr { address, .. } => { return Libp2pEvent::NewListenAddr(address) } @@ -367,12 +349,6 @@ impl Service { } SwarmEvent::OutgoingConnectionError { peer_id, error } => { debug!(self.log, "Failed to dial address"; "peer_id" => ?peer_id, "error" => %error); - if let Some(peer_id) = peer_id { - self.swarm - .behaviour_mut() - .peer_manager_mut() - .inject_dial_failure(&peer_id); - } } SwarmEvent::ExpiredListenAddr { address, .. } => { debug!(self.log, "Listen address expired"; "address" => %address)