diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index bd9a8d6c2..6da99f4bf 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -5,7 +5,7 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; use libp2p::{ - core::identity::Keypair, + core::{identity::Keypair, ConnectedPoint}, discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent, MessageId}, identify::{Identify, IdentifyEvent}, @@ -366,6 +366,43 @@ impl { fn inject_event(&mut self, event: RPCMessage) { match event { + // TODO: These are temporary methods to give access to injected behaviour + // events to the + // peer manager. After a behaviour re-write remove these: + RPCMessage::PeerConnectedHack(peer_id, connected_point) => { + match connected_point { + ConnectedPoint::Dialer { .. } => self.peer_manager.connect_outgoing(&peer_id), + ConnectedPoint::Listener { .. } => self.peer_manager.connect_ingoing(&peer_id), + }; + + // Find ENR info about a peer if possible. + if let Some(enr) = self.discovery.enr_of_peer(&peer_id) { + let bitfield = match enr.bitfield::() { + Ok(v) => v, + Err(e) => { + warn!(self.log, "Peer has invalid ENR bitfield"; + "peer_id" => format!("{}", peer_id), + "error" => format!("{:?}", e)); + return; + } + }; + + // use this as a baseline, until we get the actual meta-data + let meta_data = MetaData { + seq_number: 0, + attnets: bitfield, + }; + // TODO: Shift to the peer manager + self.network_globals + .peers + .write() + .add_metadata(&peer_id, meta_data); + } + } + RPCMessage::PeerDisconnectedHack(peer_id, _connected_point) => { + self.peer_manager.notify_disconnect(&peer_id) + } + RPCMessage::PeerDialed(peer_id) => { self.events.push(BehaviourEvent::PeerDialed(peer_id)) } @@ -402,7 +439,6 @@ impl // propagate the STATUS message upwards self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)); } - _ => { // propagate all other RPC messages upwards self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 5107565cb..a30f86092 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -5,7 +5,6 @@ pub(crate) mod enr; pub use enr::build_enr; use crate::metrics; -use crate::rpc::MetaData; use crate::{error, Enr, NetworkConfig, NetworkGlobals}; use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; @@ -169,6 +168,11 @@ impl Discovery { self.discovery.enr_entries() } + /// Returns the ENR of a known peer if it exists. + pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { + self.discovery.enr_of_peer(peer_id) + } + /// Adds/Removes a subnet from the ENR Bitfield pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> { let id = *subnet_id as usize; @@ -351,62 +355,9 @@ where self.discovery.addresses_of_peer(peer_id) } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - // TODO: Replace with PeerManager with custom behvaviour - // Find ENR info about a peer if possible. + fn inject_connected(&mut self, _peer_id: PeerId, _endpoint: ConnectedPoint) {} - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.network_globals - .peers - .write() - .connect_outgoing(&peer_id); - } - ConnectedPoint::Listener { .. } => { - self.network_globals.peers.write().connect_ingoing(&peer_id); - } - } - - if let Some(enr) = self.discovery.enr_of_peer(&peer_id) { - let bitfield = match enr.bitfield::() { - Ok(v) => v, - Err(e) => { - warn!(self.log, "Peer has invalid ENR bitfield"; - "peer_id" => format!("{}", peer_id), - "error" => format!("{:?}", e)); - return; - } - }; - - // use this as a baseline, until we get the actual meta-data - let meta_data = MetaData { - seq_number: 0, - attnets: bitfield, - }; - self.network_globals - .peers - .write() - .add_metadata(&peer_id, meta_data); - } - - // TODO: Drop peers if over max_peer limit - - metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); - metrics::set_gauge( - &metrics::PEERS_CONNECTED, - self.network_globals.connected_peers() as i64, - ); - } - - fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { - self.network_globals.peers.write().disconnect(peer_id); - - metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); - metrics::set_gauge( - &metrics::PEERS_CONNECTED, - self.network_globals.connected_peers() as i64, - ); - } + fn inject_disconnected(&mut self, _peer_id: &PeerId, _endpoint: ConnectedPoint) {} fn inject_replaced( &mut self, diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index 16974dacf..af844d3cb 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -1,6 +1,7 @@ //! Implementation of a Lighthouse's peer management system. pub use self::peerdb::*; +use crate::metrics; use crate::rpc::MetaData; use crate::{NetworkGlobals, PeerId}; use futures::prelude::*; @@ -172,57 +173,41 @@ impl PeerManager { false } - /// Sets a peer as disconnected. If its reputation gets too low requests - /// the peer to be banned and to be disconnected otherwise - pub fn disconnect(&mut self, peer_id: &PeerId) { + /// Requests that a peer get disconnected. + pub fn disconnect_peer(&mut self, peer_id: &PeerId) { + self.events + .push(PeerManagerEvent::DisconnectPeer(peer_id.clone())); + } + + /// Updates the state of the peer as disconnected. + pub fn notify_disconnect(&mut self, peer_id: &PeerId) { self.update_reputations(); { let mut peerdb = self.network_globals.peers.write(); peerdb.disconnect(peer_id); peerdb.add_reputation(peer_id, PeerAction::Disconnected as Rep); } - if !self.gets_banned(peer_id) { - self.events - .push(PeerManagerEvent::DisconnectPeer(peer_id.clone())); - } // remove the ping and status timer for the peer self.ping_peers.remove(peer_id); self.status_peers.remove(peer_id); + metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); + metrics::set_gauge( + &metrics::PEERS_CONNECTED, + self.network_globals.connected_peers() as i64, + ); } /// Sets a peer as connected as long as their reputation allows it /// Informs if the peer was accepted pub fn connect_ingoing(&mut self, peer_id: &PeerId) -> bool { - self.update_reputations(); - let mut peerdb = self.network_globals.peers.write(); - if !peerdb.connection_status(peer_id).is_banned() { - peerdb.connect_ingoing(peer_id); - // start a ping and status timer for the peer - self.ping_peers.insert(peer_id.clone()); - self.status_peers.insert(peer_id.clone()); - - return true; - } - - false + self.connect_peer(peer_id, false) } /// Sets a peer as connected as long as their reputation allows it /// Informs if the peer was accepted pub fn connect_outgoing(&mut self, peer_id: &PeerId) -> bool { - self.update_reputations(); - let mut peerdb = self.network_globals.peers.write(); - if !peerdb.connection_status(peer_id).is_banned() { - peerdb.connect_outgoing(peer_id); - // start a ping and status timer for the peer - self.ping_peers.insert(peer_id.clone()); - self.status_peers.insert(peer_id.clone()); - - return true; - } - - false + self.connect_peer(peer_id, true) } /// Provides a given peer's reputation if it exists. @@ -256,6 +241,46 @@ impl PeerManager { .add_reputation(peer_id, action as Rep); self.update_reputations(); } + + /* Internal functions */ + + /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being + /// dialed or connecting to us. + /// + /// This is called by `connect_ingoing` and `connect_outgoing`. + /// + /// This informs if the peer was accepted in to the db or not. + // TODO: Drop peers if over max_peer limit + fn connect_peer(&mut self, peer_id: &PeerId, outgoing: bool) -> bool { + // TODO: Call this on a timer + self.update_reputations(); + + { + let mut peerdb = self.network_globals.peers.write(); + if peerdb.connection_status(peer_id).is_banned() { + return false; + } + + if outgoing { + peerdb.connect_outgoing(peer_id); + } else { + peerdb.connect_outgoing(peer_id); + } + } + + // start a ping and status timer for the peer + self.ping_peers.insert(peer_id.clone()); + self.status_peers.insert(peer_id.clone()); + + // increment prometheus metrics + metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); + metrics::set_gauge( + &metrics::PEERS_CONNECTED, + self.network_globals.connected_peers() as i64, + ); + + true + } } impl Stream for PeerManager { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs index 7fd9aeae8..05af9e019 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -140,7 +140,7 @@ impl PeerDB { .map(|(id, _)| id) } - /// Gets the connection status of the peer. + /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. pub fn connection_status(&self, peer_id: &PeerId) -> PeerConnectionStatus { self.peer_info(peer_id) .map_or(PeerConnectionStatus::default(), |info| { diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index bb9e2e711..e8e71d30b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -175,4 +175,8 @@ pub enum RPCMessage { RPC(PeerId, RPCEvent), PeerDialed(PeerId), PeerDisconnected(PeerId), + // TODO: This is a hack to give access to connections to peer manager. Remove this once + // behaviour is re-written + PeerConnectedHack(PeerId, ConnectedPoint), + PeerDisconnectedHack(PeerId, ConnectedPoint), }