diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index aaae683d3..593b7279b 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -851,44 +851,10 @@ impl NetworkBehaviour for Behaviour { self.peer_manager.addresses_of_peer(peer_id) } - // This gets called every time a connection is closed. - fn inject_connection_closed( - &mut self, - peer_id: &PeerId, - conn_id: &ConnectionId, - endpoint: &ConnectedPoint, - ) { - // If the peer manager (and therefore the behaviour's) believe this peer connected, inform - // about the disconnection. - if self.network_globals.peers.read().is_connected(&peer_id) { - delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); - } - } - - // This gets called once there are no more active connections. - fn inject_disconnected(&mut self, peer_id: &PeerId) { - // If the application/behaviour layers thinks this peer has connected inform it of the disconnect. - if self.network_globals.peers.read().is_connected(&peer_id) { - // Inform the application. - self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); - // Inform the behaviour. - delegate_to_behaviours!(self, inject_disconnected, peer_id); - } - // Inform the peer manager. - // NOTE: It may be the case that a rejected node, due to too many peers is disconnected - // here and the peer manager has no knowledge of its connection. We insert it here for - // reference so that peer manager can track this peer. - self.peer_manager.notify_disconnect(&peer_id); - - // Update the prometheus metrics - metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); - metrics::set_gauge( - &metrics::PEERS_CONNECTED, - self.network_globals.connected_peers() as i64, - ); - } - // This gets called every time a connection is established. + // NOTE: The current logic implies that we would reject extra connections for already connected + // peers if we have reached our peer limit. This is fine for the time being as we currently + // only allow a single connection per peer. fn inject_connection_established( &mut self, peer_id: &PeerId, @@ -897,6 +863,9 @@ impl NetworkBehaviour for Behaviour { ) { let goodbye_reason: Option = if self.peer_manager.is_banned(peer_id) { // If the peer is banned, send goodbye with reason banned. + // A peer that has recently transitioned to the banned state should be in the + // disconnecting state, but the `is_banned()` function is dependent on score so should + // be true here in this case. Some(GoodbyeReason::Banned) } else if self.peer_manager.peer_limit_reached() && self @@ -913,14 +882,17 @@ impl NetworkBehaviour for Behaviour { None }; - if goodbye_reason.is_some() { - debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.as_ref().expect("Is some").to_string()); + if let Some(goodbye_reason) = goodbye_reason { + debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.to_string()); self.peers_to_dc - .push_back((peer_id.clone(), goodbye_reason)); + .push_back((peer_id.clone(), Some(goodbye_reason))); + // NOTE: We don't inform the peer manager that this peer is disconnecting. It is simply + // rejected with a goodbye. return; } - // notify the peer manager of a successful connection + // All peers at this point will be registered as being connected. + // Notify the peer manager of a successful connection match endpoint { ConnectedPoint::Listener { send_back_addr, .. } => { self.peer_manager @@ -946,6 +918,8 @@ impl NetworkBehaviour for Behaviour { } // This gets called on the initial connection establishment. + // NOTE: This gets called after inject_connection_established. Therefore the logic in that + // function dictates the logic here. fn inject_connected(&mut self, peer_id: &PeerId) { // If the PeerManager has connected this peer, inform the behaviours if !self.network_globals.peers.read().is_connected(&peer_id) { @@ -962,6 +936,79 @@ impl NetworkBehaviour for Behaviour { delegate_to_behaviours!(self, inject_connected, peer_id); } + // This gets called every time a connection is closed. + // NOTE: The peer manager state can be modified in the lifetime of the peer. Due to the scoring + // mechanism. Peers can become banned. In this case, we still want to inform the behaviours. + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + conn_id: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + // If the peer manager (and therefore the behaviour's) believe this peer connected, inform + // about the disconnection. + // It could be the peer was in the process of being disconnected. In this case the + // sub-behaviours are expecting this peer to be connected and we inform them. + if self + .network_globals + .peers + .read() + .is_connected_or_disconnecting(peer_id) + { + // We are disconnecting the peer or the peer has already been connected. + // Both these cases, the peer has been previously registered in the sub protocols. + delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); + } + } + + // This gets called once there are no more active connections. + fn inject_disconnected(&mut self, peer_id: &PeerId) { + // If the application/behaviour layers thinks this peer has connected inform it of the disconnect. + + if self + .network_globals + .peers + .read() + .is_connected_or_disconnecting(peer_id) + { + // We are disconnecting the peer or the peer has already been connected. + // Both these cases, the peer has been previously registered in the sub protocols and + // potentially the application layer. + // Inform the application. + self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); + // Inform the behaviour. + delegate_to_behaviours!(self, inject_disconnected, peer_id); + + // Decrement the PEERS_PER_CLIENT metric + if let Some(kind) = self + .network_globals + .peers + .read() + .peer_info(peer_id) + .map(|info| info.client.kind.clone()) + { + if let Some(v) = + metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) + { + v.dec() + }; + } + } + + // Inform the peer manager. + // NOTE: It may be the case that a rejected node, due to too many peers is disconnected + // here and the peer manager has no knowledge of its connection. We insert it here for + // reference so that peer manager can track this peer. + self.peer_manager.notify_disconnect(&peer_id); + + // Update the prometheus metrics + metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); + metrics::set_gauge( + &metrics::PEERS_CONNECTED, + self.network_globals.connected_peers() as i64, + ); + } + fn inject_addr_reach_failure( &mut self, peer_id: Option<&PeerId>, diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 753dc3984..760a22b29 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -136,10 +136,18 @@ impl PeerManager { debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score().to_string()); // Goodbye's are fatal info.apply_peer_action_to_score(PeerAction::Fatal); - if info.connection_status.is_connected_or_dialing() { - self.events - .push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason)); - } + } + + // Update the peerdb and peer state accordingly + if self + .network_globals + .peers + .write() + .disconnect_and_ban(peer_id) + { + // update the state of the peer. + self.events + .push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason)); } } @@ -147,54 +155,52 @@ impl PeerManager { /// /// If the peer doesn't exist, log a warning and insert defaults. pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { - // NOTE: This is duplicated in the update_peer_scores() and could be improved. - - // Variables to update the PeerDb if required. + // Helper function to avoid any potential deadlocks. let mut ban_peer = None; let mut unban_peer = None; - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let previous_state = info.score_state(); - info.apply_peer_action_to_score(action); - if previous_state != info.score_state() { - match info.score_state() { - ScoreState::Banned => { - debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); - ban_peer = Some(peer_id.clone()); - if info.connection_status.is_connected_or_dialing() { - self.events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), - GoodbyeReason::BadScore, - )); + { + let mut peer_db = self.network_globals.peers.write(); + if let Some(info) = peer_db.peer_info_mut(peer_id) { + let previous_state = info.score_state(); + info.apply_peer_action_to_score(action); + if previous_state != info.score_state() { + match info.score_state() { + ScoreState::Banned => { + debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); + ban_peer = Some(peer_id); + } + ScoreState::Disconnected => { + debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); + // disconnect the peer if it's currently connected or dialing + if info.is_connected_or_dialing() { + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + peer_db.notify_disconnecting(peer_id); + } + unban_peer = Some(peer_id); + } + ScoreState::Healthy => { + debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); + // unban the peer if it was previously banned. + unban_peer = Some(peer_id); } } - ScoreState::Disconnected => { - debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); - // disconnect the peer if it's currently connected or dialing - unban_peer = Some(peer_id.clone()); - if info.connection_status.is_connected_or_dialing() { - self.events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), - GoodbyeReason::BadScore, - )); - } - } - ScoreState::Healthy => { - debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); - // unban the peer if it was previously banned. - unban_peer = Some(peer_id.clone()); - } + } else { + debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); } - } else { - debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); } - } + } // end write lock - // Update the PeerDB state. - if let Some(peer_id) = ban_peer.take() { - self.ban_peer(&peer_id); - } else if let Some(peer_id) = unban_peer.take() { - self.unban_peer(&peer_id); + if let Some(peer_id) = ban_peer { + self.ban_peer(peer_id); + } + if let Some(peer_id) = unban_peer { + if let Err(e) = self.unban_peer(peer_id) { + error!(self.log, "{}", e; "peer_id" => %peer_id); + } } } @@ -266,37 +272,14 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. pub fn notify_disconnect(&mut self, peer_id: &PeerId) { - // Decrement the PEERS_PER_CLIENT metric - if let Some(kind) = self - .network_globals + self.network_globals .peers - .read() - .peer_info(peer_id) - .and_then(|peer_info| { - if let Connected { .. } = peer_info.connection_status { - Some(peer_info.client.kind.clone()) - } else { - None - } - }) - { - if let Some(v) = - metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) - { - v.dec() - }; - } - - self.network_globals.peers.write().disconnect(peer_id); + .write() + .notify_disconnect(peer_id); // remove the ping and status timer for the peer self.ping_peers.remove(peer_id); self.status_peers.remove(peer_id); - metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); - metrics::set_gauge( - &metrics::PEERS_CONNECTED, - self.network_globals.connected_peers() as i64, - ); } /// A dial attempt has failed. @@ -618,7 +601,7 @@ impl PeerManager { fn connect_peer(&mut self, peer_id: &PeerId, connection: ConnectingType) -> bool { { let mut peerdb = self.network_globals.peers.write(); - if peerdb.connection_status(peer_id).map(|c| c.is_banned()) == Some(true) { + if peerdb.is_banned(&peer_id) { // don't connect if the peer is banned slog::crit!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => peer_id.to_string()); } @@ -688,18 +671,14 @@ impl PeerManager { ScoreState::Banned => { debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); to_ban_peers.push(peer_id.clone()); - if info.connection_status.is_connected_or_dialing() { - self.events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), - GoodbyeReason::BadScore, - )); - } } ScoreState::Disconnected => { debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); // disconnect the peer if it's currently connected or dialing to_unban_peers.push(peer_id.clone()); - if info.connection_status.is_connected_or_dialing() { + if info.is_connected_or_dialing() { + // Change the state to inform that we are disconnecting the peer. + info.disconnecting(false); self.events.push(PeerManagerEvent::DisconnectPeer( peer_id.clone(), GoodbyeReason::BadScore, @@ -720,7 +699,9 @@ impl PeerManager { } // process unbanning peers for peer_id in to_unban_peers { - self.unban_peer(&peer_id); + if let Err(e) = self.unban_peer(&peer_id) { + error!(self.log, "{}", e; "peer_id" => %peer_id); + } } } @@ -729,12 +710,26 @@ impl PeerManager { /// Records updates the peers connection status and updates the peer db as well as blocks the /// peer from participating in discovery and removes them from the routing table. fn ban_peer(&mut self, peer_id: &PeerId) { - let mut peer_db = self.network_globals.peers.write(); - peer_db.ban(peer_id); + { + // write lock scope + let mut peer_db = self.network_globals.peers.write(); + + if peer_db.disconnect_and_ban(peer_id) { + // The peer was currently connected, so we start a disconnection. + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + } + } // end write lock + + // take a read lock + let peer_db = self.network_globals.peers.read(); + let banned_ip_addresses = peer_db .peer_info(peer_id) .map(|info| { - info.seen_addresses + info.seen_addresses() .iter() .filter(|ip| peer_db.is_ip_banned(ip)) .cloned() @@ -749,16 +744,17 @@ impl PeerManager { /// /// Records updates the peers connection status and updates the peer db as well as removes /// previous bans from discovery. - fn unban_peer(&mut self, peer_id: &PeerId) { + fn unban_peer(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { let mut peer_db = self.network_globals.peers.write(); - peer_db.unban(&peer_id); + peer_db.unban(&peer_id)?; let seen_ip_addresses = peer_db .peer_info(peer_id) - .map(|info| info.seen_addresses.iter().cloned().collect::>()) + .map(|info| info.seen_addresses().iter().cloned().collect::>()) .unwrap_or_default(); self.discovery.unban_peer(&peer_id, seen_ip_addresses); + Ok(()) } /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. @@ -778,6 +774,9 @@ impl PeerManager { // Updates peer's scores. self.update_peer_scores(); + // Keep a list of peers we are disconnecting + let mut disconnecting_peers = Vec::new(); + let connected_peer_count = self.network_globals.connected_peers(); if connected_peer_count > self.target_peers { //remove excess peers with the worst scores, but keep subnet peers @@ -793,12 +792,18 @@ impl PeerManager { //disconnected in update_peer_scores .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { - self.events.push(PeerManagerEvent::DisconnectPeer( - (*peer_id).clone(), - GoodbyeReason::TooManyPeers, - )); + disconnecting_peers.push((*peer_id).clone()); } } + + let mut peer_db = self.network_globals.peers.write(); + for peer_id in disconnecting_peers { + peer_db.notify_disconnecting(&peer_id); + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id, + GoodbyeReason::TooManyPeers, + )); + } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index c97ec8ec6..6d3601b2d 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -25,13 +25,13 @@ pub struct PeerInfo { /// Client managing this peer pub client: Client, /// Connection status of this peer - pub connection_status: PeerConnectionStatus, + connection_status: PeerConnectionStatus, /// The known listening addresses of this peer. This is given by identify and can be arbitrary /// (including local IPs). pub listening_addresses: Vec, /// This is addresses we have physically seen and this is what we use for banning/un-banning /// peers. - pub seen_addresses: HashSet, + seen_addresses: HashSet, /// The current syncing state of the peer. The state may be determined after it's initial /// connection. pub sync_status: PeerSyncStatus, @@ -91,6 +91,16 @@ impl PeerInfo { false } + /// Returns the seen addresses of the peer. + pub fn seen_addresses(&self) -> &HashSet { + &self.seen_addresses + } + + /// Returns the connection status of the peer. + pub fn connection_status(&self) -> &PeerConnectionStatus { + &self.connection_status + } + /// Reports if this peer has some future validator duty in which case it is valuable to keep it. pub fn has_future_duty(&self) -> bool { self.min_ttl.map_or(false, |i| i >= Instant::now()) @@ -120,35 +130,161 @@ impl PeerInfo { } } + #[cfg(test)] + /// Resets the peers score. + pub fn reset_score(&mut self) { + self.score.test_reset(); + } + + /* Peer connection status API */ + + /// Checks if the status is connected. + pub fn is_connected(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Connected { .. }) + } + + /// Checks if the status is connected. + pub fn is_dialing(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Dialing { .. }) + } + + /// The peer is either connected or in the process of being dialed. + pub fn is_connected_or_dialing(&self) -> bool { + self.is_connected() || self.is_dialing() + } + + /// Checks if the status is banned. + pub fn is_banned(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Banned { .. }) + } + + /// Checks if the status is disconnected. + pub fn is_disconnected(&self) -> bool { + matches!(self.connection_status, Disconnected { .. }) + } + + /// Returns the number of connections with this peer. + pub fn connections(&self) -> (u8, u8) { + match self.connection_status { + Connected { n_in, n_out } => (n_in, n_out), + _ => (0, 0), + } + } + + // Setters + + /// Modifies the status to Disconnected and sets the last seen instant to now. Returns None if + /// no changes were made. Returns Some(bool) where the bool represents if peer became banned or + /// simply just disconnected. + pub fn notify_disconnect(&mut self) -> Option { + match self.connection_status { + Banned { .. } | Disconnected { .. } => None, + Disconnecting { to_ban } => { + // If we are disconnecting this peer in the process of banning, we now ban the + // peer. + if to_ban { + self.connection_status = Banned { + since: Instant::now(), + }; + Some(true) + } else { + self.connection_status = Disconnected { + since: Instant::now(), + }; + Some(false) + } + } + Connected { .. } | Dialing { .. } | Unknown => { + self.connection_status = Disconnected { + since: Instant::now(), + }; + Some(false) + } + } + } + + /// Notify the we are currently disconnecting this peer, after which the peer will be + /// considered banned. + // This intermediate state is required to inform the network behaviours that the sub-protocols + // are aware this peer exists and it is in the process of being banned. Compared to nodes that + // try to connect to us and are already banned (sub protocols do not know of these peers). + pub fn disconnecting(&mut self, to_ban: bool) { + self.connection_status = Disconnecting { to_ban } + } + + /// Modifies the status to Banned + pub fn ban(&mut self) { + self.connection_status = Banned { + since: Instant::now(), + }; + } + + /// The score system has unbanned the peer. Update the connection status + pub fn unban(&mut self) { + if let PeerConnectionStatus::Banned { since, .. } = self.connection_status { + self.connection_status = PeerConnectionStatus::Disconnected { since }; + } + } + + /// Modifies the status to Dialing + /// Returns an error if the current state is unexpected. + pub(crate) fn dialing_peer(&mut self) -> Result<(), &'static str> { + match &mut self.connection_status { + Connected { .. } => return Err("Dialing connected peer"), + Dialing { .. } => return Err("Dialing an already dialing peer"), + Disconnecting { .. } => return Err("Dialing a disconnecting peer"), + Disconnected { .. } | Banned { .. } | Unknown => {} + } + self.connection_status = Dialing { + since: Instant::now(), + }; + Ok(()) + } + /// Modifies the status to Connected and increases the number of ingoing /// connections by one - pub(crate) fn connect_ingoing(&mut self) { + pub(crate) fn connect_ingoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_in, .. } => *n_in += 1, - Disconnected { .. } | Banned { .. } | Dialing { .. } | Unknown => { + Disconnected { .. } + | Banned { .. } + | Dialing { .. } + | Disconnecting { .. } + | Unknown => { self.connection_status = Connected { n_in: 1, n_out: 0 }; self.connection_direction = Some(ConnectionDirection::Incoming); } } + + if let Some(ip_addr) = seen_address { + self.seen_addresses.insert(ip_addr); + } } /// Modifies the status to Connected and increases the number of outgoing /// connections by one - pub(crate) fn connect_outgoing(&mut self) { + pub(crate) fn connect_outgoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_out, .. } => *n_out += 1, - Disconnected { .. } | Banned { .. } | Dialing { .. } | Unknown => { + Disconnected { .. } + | Banned { .. } + | Dialing { .. } + | Disconnecting { .. } + | Unknown => { self.connection_status = Connected { n_in: 0, n_out: 1 }; self.connection_direction = Some(ConnectionDirection::Outgoing); } } + if let Some(ip_addr) = seen_address { + self.seen_addresses.insert(ip_addr); + } } #[cfg(test)] /// Add an f64 to a non-trusted peer's score abiding by the limits. pub fn add_to_score(&mut self, score: f64) { if !self.is_trusted { - self.score.add(score) + self.score.test_add(score) } } } @@ -185,17 +321,21 @@ pub enum PeerConnectionStatus { /// number of outgoing connections. n_out: u8, }, + /// The peer is being disconnected. + Disconnecting { + // After the disconnection the peer will be considered banned. + to_ban: bool, + }, /// The peer has disconnected. Disconnected { /// last time the peer was connected or discovered. since: Instant, }, + /// The peer has been banned and is disconnected. Banned { /// moment when the peer was banned. since: Instant, - /// ip addresses this peer had a the moment of the ban - ip_addresses: Vec, }, /// We are currently dialing this peer. Dialing { @@ -209,14 +349,20 @@ pub enum PeerConnectionStatus { /// Serialization for http requests. impl Serialize for PeerConnectionStatus { fn serialize(&self, serializer: S) -> Result { - let mut s = serializer.serialize_struct("connection_status", 5)?; + let mut s = serializer.serialize_struct("connection_status", 6)?; match self { Connected { n_in, n_out } => { s.serialize_field("status", "connected")?; s.serialize_field("connections_in", n_in)?; s.serialize_field("connections_out", n_out)?; s.serialize_field("last_seen", &0)?; - s.serialize_field("banned_ips", &Vec::::new())?; + s.end() + } + Disconnecting { .. } => { + s.serialize_field("status", "disconnecting")?; + s.serialize_field("connections_in", &0)?; + s.serialize_field("connections_out", &0)?; + s.serialize_field("last_seen", &0)?; s.end() } Disconnected { since } => { @@ -227,15 +373,11 @@ impl Serialize for PeerConnectionStatus { s.serialize_field("banned_ips", &Vec::::new())?; s.end() } - Banned { - since, - ip_addresses, - } => { + Banned { since } => { s.serialize_field("status", "banned")?; s.serialize_field("connections_in", &0)?; s.serialize_field("connections_out", &0)?; s.serialize_field("last_seen", &since.elapsed().as_secs())?; - s.serialize_field("banned_ips", &ip_addresses)?; s.end() } Dialing { since } => { @@ -243,7 +385,6 @@ impl Serialize for PeerConnectionStatus { s.serialize_field("connections_in", &0)?; s.serialize_field("connections_out", &0)?; s.serialize_field("last_seen", &since.elapsed().as_secs())?; - s.serialize_field("banned_ips", &Vec::::new())?; s.end() } Unknown => { @@ -251,7 +392,6 @@ impl Serialize for PeerConnectionStatus { s.serialize_field("connections_in", &0)?; s.serialize_field("connections_out", &0)?; s.serialize_field("last_seen", &0)?; - s.serialize_field("banned_ips", &Vec::::new())?; s.end() } } @@ -263,59 +403,3 @@ impl Default for PeerConnectionStatus { PeerConnectionStatus::Unknown } } - -impl PeerConnectionStatus { - /// Checks if the status is connected. - pub fn is_connected(&self) -> bool { - matches!(self, PeerConnectionStatus::Connected { .. }) - } - - /// Checks if the status is connected. - pub fn is_dialing(&self) -> bool { - matches!(self, PeerConnectionStatus::Dialing { .. }) - } - - /// The peer is either connected or in the process of being dialed. - pub fn is_connected_or_dialing(&self) -> bool { - self.is_connected() || self.is_dialing() - } - - /// Checks if the status is banned. - pub fn is_banned(&self) -> bool { - matches!(self, PeerConnectionStatus::Banned { .. }) - } - - /// Checks if the status is disconnected. - pub fn is_disconnected(&self) -> bool { - matches!(self, Disconnected { .. }) - } - - /// Modifies the status to Disconnected and sets the last seen instant to now - pub fn disconnect(&mut self) { - *self = Disconnected { - since: Instant::now(), - }; - } - - /// Modifies the status to Banned - pub fn ban(&mut self, ip_addresses: Vec) { - *self = Banned { - since: Instant::now(), - ip_addresses, - }; - } - - /// The score system has unbanned the peer. Update the connection status - pub fn unban(&mut self) { - if let PeerConnectionStatus::Banned { since, .. } = self { - *self = PeerConnectionStatus::Disconnected { since: *since } - } - } - - pub fn connections(&self) -> (u8, u8) { - match self { - Connected { n_in, n_out } => (*n_in, *n_out), - _ => (0, 0), - } - } -} diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index e3d8d2a0e..7b805b0a1 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -6,8 +6,8 @@ use crate::rpc::methods::MetaData; use crate::Enr; use crate::PeerId; use rand::seq::SliceRandom; -use slog::{crit, debug, trace, warn}; -use std::collections::HashMap; +use slog::{crit, debug, error, trace, warn}; +use std::collections::{HashMap, HashSet}; use std::net::IpAddr; use std::time::Instant; use types::{EthSpec, SubnetId}; @@ -42,27 +42,19 @@ pub struct BannedPeersCount { impl BannedPeersCount { /// Removes the peer from the counts if it is banned. Returns true if the peer was banned and /// false otherwise. - pub fn remove_banned_peer(&mut self, connection_status: &PeerConnectionStatus) -> bool { - match connection_status { - PeerConnectionStatus::Banned { ip_addresses, .. } => { - self.banned_peers = self.banned_peers.saturating_sub(1); - for address in ip_addresses { - if let Some(count) = self.banned_peers_per_ip.get_mut(address) { - *count = count.saturating_sub(1); - } - } - true + pub fn remove_banned_peer(&mut self, ip_addresses: &HashSet) { + self.banned_peers = self.banned_peers.saturating_sub(1); + for address in ip_addresses { + if let Some(count) = self.banned_peers_per_ip.get_mut(address) { + *count = count.saturating_sub(1); } - _ => false, //if not banned do nothing } } - pub fn add_banned_peer(&mut self, connection_status: &PeerConnectionStatus) { - if let PeerConnectionStatus::Banned { ip_addresses, .. } = connection_status { - self.banned_peers += 1; - for address in ip_addresses { - *self.banned_peers_per_ip.entry(*address).or_insert(0) += 1; - } + pub fn add_banned_peer(&mut self, ip_addresses: &HashSet) { + self.banned_peers = self.banned_peers.saturating_add(1); + for address in ip_addresses { + *self.banned_peers_per_ip.entry(*address).or_insert(0) += 1; } } @@ -148,6 +140,13 @@ impl PeerDB { matches!(self.connection_status(peer_id), Some(PeerConnectionStatus::Connected { .. }) | Some(PeerConnectionStatus::Dialing { .. })) } + + /// If we are connected or in the process of disconnecting + pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool { + matches!(self.connection_status(peer_id), Some(PeerConnectionStatus::Connected { .. }) + | Some(PeerConnectionStatus::Disconnecting { .. })) + } + /// Returns true if the peer is synced at least to our current head. pub fn is_synced(&self, peer_id: &PeerId) -> bool { match self.peers.get(peer_id).map(|info| &info.sync_status) { @@ -157,7 +156,11 @@ impl PeerDB { } } - /// Returns true if the Peer is banned. + /// Returns true if the Peer is banned. This doesn't check the connection state, rather the + /// underlying score of the peer. A peer may be banned but still in the connected state + /// temporarily. + /// + /// This is used to determine if we should accept incoming connections or not. pub fn is_banned(&self, peer_id: &PeerId) -> bool { if let Some(peer) = self.peers.get(peer_id) { match peer.score().state() { @@ -170,7 +173,7 @@ impl PeerDB { } fn ip_is_banned(&self, peer: &PeerInfo) -> bool { - peer.seen_addresses + peer.seen_addresses() .iter() .any(|addr| self.banned_peers_count.ip_is_banned(addr)) } @@ -194,16 +197,14 @@ impl PeerDB { /// Gives the ids and info of all known connected peers. pub fn connected_peers(&self) -> impl Iterator)> { - self.peers - .iter() - .filter(|(_, info)| info.connection_status.is_connected()) + self.peers.iter().filter(|(_, info)| info.is_connected()) } /// Gives the ids of all known connected peers. pub fn connected_peer_ids(&self) -> impl Iterator { self.peers .iter() - .filter(|(_, info)| info.connection_status.is_connected()) + .filter(|(_, info)| info.is_connected()) .map(|(peer_id, _)| peer_id) } @@ -211,9 +212,7 @@ impl PeerDB { pub fn connected_or_dialing_peers(&self) -> impl Iterator { self.peers .iter() - .filter(|(_, info)| { - info.connection_status.is_connected() || info.connection_status.is_dialing() - }) + .filter(|(_, info)| info.is_connected() || info.is_dialing()) .map(|(peer_id, _)| peer_id) } @@ -223,7 +222,7 @@ impl PeerDB { .iter() .filter(|(_, info)| { if info.sync_status.is_synced() || info.sync_status.is_advanced() { - return info.connection_status.is_connected(); + return info.is_connected(); } false }) @@ -236,7 +235,7 @@ impl PeerDB { .iter() .filter(|(_, info)| { if info.sync_status.is_advanced() { - return info.connection_status.is_connected(); + return info.is_connected(); } false }) @@ -247,9 +246,7 @@ impl PeerDB { pub fn peers_on_subnet(&self, subnet_id: SubnetId) -> impl Iterator { self.peers .iter() - .filter(move |(_, info)| { - info.connection_status.is_connected() && info.on_subnet(subnet_id) - }) + .filter(move |(_, info)| info.is_connected() && info.on_subnet(subnet_id)) .map(|(peer_id, _)| peer_id) } @@ -257,7 +254,7 @@ impl PeerDB { pub fn disconnected_peers(&self) -> impl Iterator { self.peers .iter() - .filter(|(_, info)| info.connection_status.is_disconnected()) + .filter(|(_, info)| info.is_disconnected()) .map(|(peer_id, _)| peer_id) } @@ -265,7 +262,7 @@ impl PeerDB { pub fn banned_peers(&self) -> impl Iterator { self.peers .iter() - .filter(|(_, info)| info.connection_status.is_banned()) + .filter(|(_, info)| info.is_banned()) .map(|(peer_id, _)| peer_id) } @@ -275,7 +272,7 @@ impl PeerDB { let mut connected = self .peers .iter() - .filter(|(_, info)| info.connection_status.is_connected()) + .filter(|(_, info)| info.is_connected()) .collect::>(); connected.shuffle(&mut rand::thread_rng()); @@ -287,12 +284,12 @@ impl PeerDB { /// score from highest to lowest, and filtered using `is_status` pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> where - F: Fn(&PeerConnectionStatus) -> bool, + F: Fn(&PeerInfo) -> bool, { let mut by_status = self .peers .iter() - .filter(|(_, info)| is_status(&info.connection_status)) + .filter(|(_, info)| is_status(&info)) .collect::>(); by_status.sort_by_key(|(_, info)| info.score()); by_status.into_iter().rev().collect() @@ -301,11 +298,11 @@ impl PeerDB { /// Returns the peer with highest reputation that satisfies `is_status` pub fn best_by_status(&self, is_status: F) -> Option<&PeerId> where - F: Fn(&PeerConnectionStatus) -> bool, + F: Fn(&PeerInfo) -> bool, { self.peers .iter() - .filter(|(_, info)| is_status(&info.connection_status)) + .filter(|(_, info)| is_status(&info)) .max_by_key(|(_, info)| info.score()) .map(|(id, _)| id) } @@ -313,7 +310,7 @@ impl PeerDB { /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. pub fn connection_status(&self, peer_id: &PeerId) -> Option { self.peer_info(peer_id) - .map(|info| info.connection_status.clone()) + .map(|info| info.connection_status().clone()) } /* Setters */ @@ -323,16 +320,18 @@ impl PeerDB { let info = self.peers.entry(peer_id.clone()).or_default(); info.enr = enr; - if info.connection_status.is_disconnected() { + if info.is_disconnected() { self.disconnected_peers = self.disconnected_peers.saturating_sub(1); } - self.banned_peers_count - .remove_banned_peer(&info.connection_status); + if info.is_banned() { + self.banned_peers_count + .remove_banned_peer(info.seen_addresses()); + } - info.connection_status = PeerConnectionStatus::Dialing { - since: Instant::now(), - }; + if let Err(e) = info.dialing_peer() { + error!(self.log, "{}", e); + } } /// Update min ttl of a peer. @@ -357,7 +356,7 @@ impl PeerDB { let log = &self.log; self.peers.iter_mut() .filter(move |(_, info)| { - info.connection_status.is_connected() && info.on_subnet(subnet_id) + info.is_connected() && info.on_subnet(subnet_id) }) .for_each(|(peer_id,info)| { if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { @@ -367,7 +366,7 @@ impl PeerDB { .checked_duration_since(Instant::now()) .map(|duration| duration.as_secs()) .unwrap_or_else(|| 0); - trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => peer_id.to_string(), "min_ttl" => min_ttl_secs); + trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => %peer_id, "min_ttl" => min_ttl_secs); }); } @@ -376,21 +375,24 @@ impl PeerDB { let info = self.peers.entry(peer_id.clone()).or_default(); info.enr = enr; - if info.connection_status.is_disconnected() { + if info.is_disconnected() { self.disconnected_peers = self.disconnected_peers.saturating_sub(1); } - self.banned_peers_count - .remove_banned_peer(&info.connection_status); - info.connect_ingoing(); + + if info.is_banned() { + error!(self.log, "Accepted a connection from a banned peer"; "peer_id" => %peer_id); + self.banned_peers_count + .remove_banned_peer(info.seen_addresses()); + } // Add the seen ip address to the peer's info - if let Some(ip_addr) = multiaddr.iter().find_map(|p| match p { + let ip_addr = multiaddr.iter().find_map(|p| match p { Protocol::Ip4(ip) => Some(ip.into()), Protocol::Ip6(ip) => Some(ip.into()), _ => None, - }) { - info.seen_addresses.insert(ip_addr); - } + }); + + info.connect_ingoing(ip_addr); } /// Sets a peer as connected with an outgoing connection. @@ -398,38 +400,56 @@ impl PeerDB { let info = self.peers.entry(peer_id.clone()).or_default(); info.enr = enr; - if info.connection_status.is_disconnected() { + if info.is_disconnected() { self.disconnected_peers = self.disconnected_peers.saturating_sub(1); } - self.banned_peers_count - .remove_banned_peer(&info.connection_status); - info.connect_outgoing(); + + if info.is_banned() { + error!(self.log, "Connected to a banned peer"; "peer_id" => %peer_id); + self.banned_peers_count + .remove_banned_peer(info.seen_addresses()); + } // Add the seen ip address to the peer's info - if let Some(ip_addr) = multiaddr.iter().find_map(|p| match p { + let ip_addr = multiaddr.iter().find_map(|p| match p { Protocol::Ip4(ip) => Some(ip.into()), Protocol::Ip6(ip) => Some(ip.into()), _ => None, - }) { - info.seen_addresses.insert(ip_addr); - } + }); + + info.connect_outgoing(ip_addr); } /// Sets the peer as disconnected. A banned peer remains banned - pub fn disconnect(&mut self, peer_id: &PeerId) { + pub fn notify_disconnect(&mut self, peer_id: &PeerId) { // Note that it could be the case we prevent new nodes from joining. In this instance, // we don't bother tracking the new node. if let Some(info) = self.peers.get_mut(peer_id) { - if !info.connection_status.is_disconnected() && !info.connection_status.is_banned() { - info.connection_status.disconnect(); - self.disconnected_peers += 1; + if let Some(became_banned) = info.notify_disconnect() { + if became_banned { + self.banned_peers_count + .add_banned_peer(info.seen_addresses()); + } else { + self.disconnected_peers += 1; + } } self.shrink_to_fit(); } } - /// Marks a peer as banned. - pub fn ban(&mut self, peer_id: &PeerId) { + /// Notifies the peer manager that the peer is undergoing a normal disconnect (without banning + /// afterwards. + pub fn notify_disconnecting(&mut self, peer_id: &PeerId) { + if let Some(info) = self.peers.get_mut(peer_id) { + info.disconnecting(false); + } + } + + /// Marks a peer to be disconnected and then banned. + /// Returns true if the peer is currently connected and false otherwise. + // NOTE: If the peer's score is not already low enough to be banned, this will decrease the + // peer's score to be a banned state. + pub fn disconnect_and_ban(&mut self, peer_id: &PeerId) -> bool { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Banning unknown peer"; @@ -437,20 +457,56 @@ impl PeerDB { PeerInfo::default() }); - if info.connection_status.is_disconnected() { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + // Ban the peer if the score is not already low enough. + match info.score().state() { + ScoreState::Banned => {} + _ => { + // If score isn't low enough to ban, this function has been called incorrectly. + error!(self.log, "Banning a peer with a good score"; "peer_id" => %peer_id); + info.apply_peer_action_to_score(super::score::PeerAction::Fatal); + } } - if !info.connection_status.is_banned() { - info.connection_status - .ban(info.seen_addresses.iter().cloned().collect()); - self.banned_peers_count - .add_banned_peer(&info.connection_status); + + // Check and verify all the connection states + match info.connection_status() { + PeerConnectionStatus::Disconnected { .. } => { + // It should not be possible to ban a peer that is already disconnected. + error!(log_ref, "Banning a disconnected peer"; "peer_id" => %peer_id); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + info.ban(); + self.banned_peers_count + .add_banned_peer(info.seen_addresses()); + false + } + PeerConnectionStatus::Disconnecting { .. } => { + warn!(log_ref, "Banning peer that is currently disconnecting"; "peer_id" => %peer_id); + info.disconnecting(true); + false + } + PeerConnectionStatus::Banned { .. } => { + error!(log_ref, "Banning already banned peer"; "peer_id" => %peer_id); + false + } + PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. } => { + // update the state + info.disconnecting(true); + true + } + PeerConnectionStatus::Unknown => { + // shift the peer straight to banned + warn!(log_ref, "Banning a peer of unknown connection state"; "peer_id" => %peer_id); + self.banned_peers_count + .add_banned_peer(info.seen_addresses()); + info.ban(); + false + } } - self.shrink_to_fit(); } /// Unbans a peer. - pub fn unban(&mut self, peer_id: &PeerId) { + /// This should only be called once a peer's score is no longer banned. + /// If this is called for a banned peer, it will error. + pub fn unban(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "UnBanning unknown peer"; @@ -458,12 +514,21 @@ impl PeerDB { PeerInfo::default() }); - if info.connection_status.is_banned() { - self.banned_peers_count - .remove_banned_peer(&info.connection_status); - info.connection_status.unban(); + if !info.is_banned() { + return Err("Unbanning peer that is not banned"); } + + if let ScoreState::Banned = info.score().state() { + return Err("Attempted to unban (connection status) a banned peer"); + } + + self.banned_peers_count + .remove_banned_peer(info.seen_addresses()); + info.unban(); + // This transitions a banned peer to a disconnected peer + self.disconnected_peers = self.disconnected_peers.saturating_add(1); self.shrink_to_fit(); + Ok(()) } /// Removes banned and disconnected peers from the DB if we have reached any of our limits. @@ -475,7 +540,7 @@ impl PeerDB { if let Some(to_drop) = if let Some((id, info)) = self .peers .iter() - .filter(|(_, info)| info.connection_status.is_banned()) + .filter(|(_, info)| info.is_banned()) .min_by(|(_, info_a), (_, info_b)| { info_a .score() @@ -483,7 +548,7 @@ impl PeerDB { .unwrap_or(std::cmp::Ordering::Equal) }) { self.banned_peers_count - .remove_banned_peer(&info.connection_status); + .remove_banned_peer(info.seen_addresses()); Some(id.clone()) } else { // If there is no minimum, this is a coding error. @@ -505,7 +570,7 @@ impl PeerDB { if let Some(to_drop) = self .peers .iter() - .filter(|(_, info)| info.connection_status.is_disconnected()) + .filter(|(_, info)| info.is_disconnected()) .min_by(|(_, info_a), (_, info_b)| { info_a .score() @@ -570,6 +635,12 @@ mod tests { } } + fn reset_score(db: &mut PeerDB, peer_id: &PeerId) { + if let Some(info) = db.peer_info_mut(peer_id) { + info.reset_score(); + } + } + fn get_db() -> PeerDB { let log = build_log(slog::Level::Debug, false); PeerDB::new(vec![], &log) @@ -597,11 +668,8 @@ mod tests { assert_eq!(pdb.score(&random_peer).score(), Score::default().score()); // it should be connected, and therefore not counted as disconnected assert_eq!(pdb.disconnected_peers, 0); - assert!(peer_info.unwrap().connection_status.is_connected()); - assert_eq!( - peer_info.unwrap().connection_status.connections(), - (n_in, n_out) - ); + assert!(peer_info.unwrap().is_connected()); + assert_eq!(peer_info.unwrap().connections(), (n_in, n_out)); } #[test] @@ -615,7 +683,7 @@ mod tests { assert_eq!(pdb.disconnected_peers, 0); for p in pdb.connected_peer_ids().cloned().collect::>() { - pdb.disconnect(&p); + pdb.notify_disconnect(&p); } assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS); @@ -632,7 +700,8 @@ mod tests { assert_eq!(pdb.banned_peers_count.banned_peers(), 0); for p in pdb.connected_peer_ids().cloned().collect::>() { - pdb.ban(&p); + pdb.disconnect_and_ban(&p); + pdb.notify_disconnect(&p); } assert_eq!(pdb.banned_peers_count.banned_peers(), MAX_BANNED_PEERS); @@ -653,7 +722,7 @@ mod tests { add_score(&mut pdb, &p2, 50.0); let best_peers: Vec<&PeerId> = pdb - .best_peers_by_status(PeerConnectionStatus::is_connected) + .best_peers_by_status(PeerInfo::is_connected) .iter() .map(|p| p.0) .collect(); @@ -674,10 +743,10 @@ mod tests { add_score(&mut pdb, &p1, 100.0); add_score(&mut pdb, &p2, 50.0); - let the_best = pdb.best_by_status(PeerConnectionStatus::is_connected); + let the_best = pdb.best_by_status(PeerInfo::is_connected); assert!(the_best.is_some()); // Consistency check - let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected); + let best_peers = pdb.best_peers_by_status(PeerInfo::is_connected); assert_eq!(the_best, best_peers.iter().next().map(|p| p.0)); } @@ -689,35 +758,27 @@ mod tests { pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); pdb.connect_outgoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); - pdb.ban(&random_peer); + pdb.disconnect_and_ban(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - dbg!("1"); } #[test] @@ -740,15 +801,17 @@ mod tests { ); pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); - pdb.disconnect(&random_peer1); - pdb.ban(&random_peer2); + pdb.notify_disconnect(&random_peer1); + pdb.disconnect_and_ban(&random_peer2); + pdb.notify_disconnect(&random_peer2); pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.ban(&random_peer1); + pdb.disconnect_and_ban(&random_peer1); + pdb.notify_disconnect(&random_peer1); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -761,37 +824,41 @@ mod tests { pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.ban(&random_peer3); + pdb.disconnect_and_ban(&random_peer3); + pdb.notify_disconnect(&random_peer3); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.ban(&random_peer3); + pdb.disconnect_and_ban(&random_peer3); + pdb.notify_disconnect(&random_peer3); pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); - pdb.disconnect(&random_peer2); - pdb.ban(&random_peer3); + pdb.notify_disconnect(&random_peer2); + pdb.disconnect_and_ban(&random_peer3); + pdb.notify_disconnect(&random_peer3); pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.disconnect(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.ban(&random_peer); + pdb.disconnect_and_ban(&random_peer); + pdb.notify_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); } @@ -835,7 +902,8 @@ mod tests { let p5 = connect_peer_with_ips(&mut pdb, vec![ip5]); for p in &peers[..BANNED_PEERS_PER_IP_THRESHOLD + 1] { - pdb.ban(p); + pdb.disconnect_and_ban(p); + pdb.notify_disconnect(p); } //check that ip1 and ip2 are banned but ip3-5 not @@ -846,7 +914,8 @@ mod tests { assert!(!pdb.is_banned(&p5)); //ban also the last peer in peers - pdb.ban(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); + pdb.disconnect_and_ban(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); + pdb.notify_disconnect(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); //check that ip1-ip4 are banned but ip5 not assert!(pdb.is_banned(&p1)); @@ -856,7 +925,8 @@ mod tests { assert!(!pdb.is_banned(&p5)); //peers[0] gets unbanned - pdb.unban(&peers[0]); + reset_score(&mut pdb, &peers[0]); + pdb.unban(&peers[0]).unwrap(); //nothing changed assert!(pdb.is_banned(&p1)); @@ -866,7 +936,8 @@ mod tests { assert!(!pdb.is_banned(&p5)); //peers[1] gets unbanned - pdb.unban(&peers[1]); + reset_score(&mut pdb, &peers[1]); + pdb.unban(&peers[1]).unwrap(); //all ips are unbanned assert!(!pdb.is_banned(&p1)); @@ -893,52 +964,43 @@ mod tests { // ban all peers for p in &peers { - pdb.ban(p); + pdb.disconnect_and_ban(p); + pdb.notify_disconnect(p); } // check ip is banned assert!(pdb.is_banned(&p1)); assert!(!pdb.is_banned(&p2)); - // change addresses of banned peers - for p in &peers { - let seen_addresses = &mut pdb.peers.get_mut(p).unwrap().seen_addresses; - seen_addresses.clear(); - seen_addresses.insert(ip2); - } - - // check still the same ip is banned - assert!(pdb.is_banned(&p1)); - assert!(!pdb.is_banned(&p2)); - // unban a peer - pdb.unban(&peers[0]); + reset_score(&mut pdb, &peers[0]); + pdb.unban(&peers[0]).unwrap(); // check not banned anymore assert!(!pdb.is_banned(&p1)); assert!(!pdb.is_banned(&p2)); - // unban and reban all peers + // add ip2 to all peers and ban them. for p in &peers { - pdb.unban(p); - pdb.ban(p); + pdb.connect_ingoing(&p, ip2.into(), None); + pdb.disconnect_and_ban(p); + pdb.notify_disconnect(p); } - // ip2 is now banned - assert!(!pdb.is_banned(&p1)); + // both IP's are now banned + assert!(pdb.is_banned(&p1)); assert!(pdb.is_banned(&p2)); - // change ips back again + // unban all peers for p in &peers { - let seen_addresses = &mut pdb.peers.get_mut(p).unwrap().seen_addresses; - seen_addresses.clear(); - seen_addresses.insert(ip1); + reset_score(&mut pdb, &p); + pdb.unban(p).unwrap(); } // reban every peer except one for p in &peers[1..] { - pdb.unban(p); - pdb.ban(p); + pdb.disconnect_and_ban(p); + pdb.notify_disconnect(p); } // nothing is banned @@ -946,12 +1008,12 @@ mod tests { assert!(!pdb.is_banned(&p2)); //reban last peer - pdb.unban(&peers[0]); - pdb.ban(&peers[0]); + pdb.disconnect_and_ban(&peers[0]); + pdb.notify_disconnect(&peers[0]); - //ip1 is banned + //Ip's are banned again assert!(pdb.is_banned(&p1)); - assert!(!pdb.is_banned(&p2)); + assert!(pdb.is_banned(&p2)); } #[test] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index d2d584b52..bc889eb24 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -48,7 +48,7 @@ pub enum PeerAction { /// An error occurred with this peer but it is not necessarily malicious. /// We have high tolerance for this actions: several occurrences are needed for a peer to get /// kicked. - /// NOTE: ~15 occurrences will get the peer banned + /// NOTE: ~50 occurrences will get the peer banned HighToleranceError, /// Received an expected message. _ValidMessage, @@ -178,7 +178,7 @@ impl Score { } /// Add an f64 to the score abiding by the limits. - pub fn add(&mut self, score: f64) { + fn add(&mut self, score: f64) { let mut new_score = self.score + score; if new_score > MAX_SCORE { new_score = MAX_SCORE; @@ -190,6 +190,26 @@ impl Score { self.score = new_score; } + /// Add an f64 to the score abiding by the limits. + #[cfg(test)] + pub fn test_add(&mut self, score: f64) { + let mut new_score = self.score + score; + if new_score > MAX_SCORE { + new_score = MAX_SCORE; + } + if new_score < MIN_SCORE { + new_score = MIN_SCORE; + } + + self.score = new_score; + } + + #[cfg(test)] + // reset the score + pub fn test_reset(&mut self) { + self.score = 0f64; + } + /// Applies time-based logic such as decay rates to the score. /// This function should be called periodically. pub fn update(&mut self) { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fd6adf02c..3aad79f3f 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1259,7 +1259,7 @@ pub fn serve( &dir, ), state: api_types::PeerState::from_peer_connection_status( - &peer_info.connection_status, + &peer_info.connection_status(), ), })); } @@ -1301,7 +1301,7 @@ pub fn serve( &dir, ), state: api_types::PeerState::from_peer_connection_status( - &peer_info.connection_status, + &peer_info.connection_status(), ), }); } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 03ff1b892..02fdc5ca4 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -444,6 +444,7 @@ impl PeerState { match status { PeerConnectionStatus::Connected { .. } => PeerState::Connected, PeerConnectionStatus::Dialing { .. } => PeerState::Connecting, + PeerConnectionStatus::Disconnecting { .. } => PeerState::Disconnecting, PeerConnectionStatus::Disconnected { .. } | PeerConnectionStatus::Banned { .. } | PeerConnectionStatus::Unknown => PeerState::Disconnected,