diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 09d82e2e8..0ef194a82 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -4,7 +4,7 @@ use crate::behaviour::gossipsub_scoring_parameters::{ use crate::config::gossipsub_config; use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS}; use crate::peer_manager::{ - score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, + peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::rpc::*; use crate::service::METADATA_FILENAME; @@ -255,7 +255,7 @@ impl Behaviour { let update_gossipsub_scores = tokio::time::interval(params.decay_interval); gossipsub - .with_peer_score(params.clone(), thresholds) + .with_peer_score(params, thresholds) .expect("Valid score params and thresholds"); Ok(Behaviour { @@ -445,7 +445,7 @@ impl Behaviour { .peers .read() .peer_info(propagation_source) - .map(|info| info.client.kind.as_ref()) + .map(|info| info.client().kind.as_ref()) { metrics::inc_counter_vec( &metrics::GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT, @@ -834,12 +834,18 @@ impl NetworkBehaviourEventProcess for Behaviour< } GossipsubEvent::Subscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { - self.peer_manager.add_subscription(&peer_id, subnet_id); + self.network_globals + .peers + .write() + .add_subscription(&peer_id, subnet_id); } } GossipsubEvent::Unsubscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { - self.peer_manager.remove_subscription(&peer_id, subnet_id); + self.network_globals + .peers + .write() + .remove_subscription(&peer_id, &subnet_id); } } } diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index c04c61616..3c5114d36 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -74,8 +74,9 @@ pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; pub use peer_manager::{ - client::Client, - score::{PeerAction, ReportSource}, - ConnectionDirection, PeerConnectionStatus, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo, + peerdb::client::Client, + peerdb::score::{PeerAction, ReportSource}, + peerdb::PeerDB, + ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus, }; pub use service::{load_private_key, Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 95be059f4..9ed82c086 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -1,6 +1,5 @@ //! Implementation of Lighthouse's peer management system. -pub use self::peerdb::*; use crate::discovery::TARGET_SUBNET_PEERS; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::types::SyncState; @@ -13,6 +12,7 @@ use futures::Stream; use hashset_delay::HashSetDelay; use libp2p::core::ConnectedPoint; use libp2p::identify::IdentifyInfo; +use peerdb::{BanOperation, BanResult, ScoreUpdateResult}; use slog::{debug, error, warn}; use smallvec::SmallVec; use std::{ @@ -25,17 +25,14 @@ use types::{EthSpec, SyncSubnetId}; pub use libp2p::core::{identity::Keypair, Multiaddr}; -pub mod client; -mod peer_info; -mod peer_sync_status; #[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy -mod peerdb; -pub(crate) mod score; +pub mod peerdb; -pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo}; -pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; -use score::{PeerAction, ReportSource, ScoreState}; -use std::cmp::Ordering; +pub use peerdb::peer_info::{ + ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo, +}; +use peerdb::score::{PeerAction, ReportSource}; +pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{hash_map::Entry, HashMap}; use std::net::IpAddr; @@ -65,10 +62,6 @@ pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.3; /// dialing priority peers we need for validator duties. pub const PRIORITY_PEER_EXCESS: f32 = 0.05; -/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing -/// them in lighthouse. -const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; - /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -154,63 +147,102 @@ impl PeerManager { /// /// This will send a goodbye and disconnect the peer if it is connected or dialing. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { - // get the peer info + // Update the sync status if required if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); if matches!(reason, GoodbyeReason::IrrelevantNetwork) { - info.sync_status.update(PeerSyncStatus::IrrelevantPeer); + info.update_sync_status(SyncStatus::IrrelevantPeer); } - - // Goodbye's are fatal - info.apply_peer_action_to_score(PeerAction::Fatal); - metrics::inc_counter_vec( - &metrics::PEER_ACTION_EVENTS_PER_CLIENT, - &[ - info.client.kind.as_ref(), - PeerAction::Fatal.as_ref(), - source.into(), - ], - ); } - // Update the peerdb and start the disconnection. - self.ban_peer(peer_id, reason); + self.report_peer(peer_id, PeerAction::Fatal, source, Some(reason)); } /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. - pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) { - // Helper function to avoid any potential deadlocks. - let mut to_ban_peers = Vec::with_capacity(1); - let mut to_unban_peers = Vec::with_capacity(1); + pub fn report_peer( + &mut self, + peer_id: &PeerId, + action: PeerAction, + source: ReportSource, + reason: Option, + ) { + let action = self + .network_globals + .peers + .write() + .report_peer(peer_id, action, source); + self.handle_score_action(peer_id, action, reason); + } - { - let mut peer_db = self.network_globals.peers.write(); - if let Some(info) = peer_db.peer_info_mut(peer_id) { - let previous_state = info.score_state(); - info.apply_peer_action_to_score(action); - metrics::inc_counter_vec( - &metrics::PEER_ACTION_EVENTS_PER_CLIENT, - &[info.client.kind.as_ref(), action.as_ref(), source.into()], - ); - - Self::handle_score_transitions( - previous_state, - peer_id, - info, - &mut to_ban_peers, - &mut to_unban_peers, - &mut self.events, - &self.log, - ); - if previous_state == info.score_state() { - debug!(self.log, "Peer score adjusted"; "peer_id" => %peer_id, "score" => %info.score()); - } + /// Upon adjusting a Peer's score, there are times the peer manager must pass messages up to + /// libp2p. This function handles the conditional logic associated with each score update + /// result. + fn handle_score_action( + &mut self, + peer_id: &PeerId, + action: ScoreUpdateResult, + reason: Option, + ) { + match action { + ScoreUpdateResult::Ban(ban_operation) => { + // The peer has been banned and we need to handle the banning operation + // NOTE: When we ban a peer, its IP address can be banned. We do not recursively search + // through all our connected peers banning all other peers that are using this IP address. + // If these peers are behaving fine, we permit their current connections. However, if any new + // nodes or current nodes try to reconnect on a banned IP, they will be instantly banned + // and disconnected. + self.handle_ban_operation(peer_id, ban_operation, reason); } - } // end write lock + ScoreUpdateResult::Disconnect => { + // The peer has transitioned to a disconnect state and has been marked as such in + // the peer db. We must inform libp2p to disconnect this peer. + self.events.push(PeerManagerEvent::DisconnectPeer( + *peer_id, + GoodbyeReason::BadScore, + )); + } + ScoreUpdateResult::NoAction => { + // The report had no effect on the peer and there is nothing to do. + } + ScoreUpdateResult::Unbanned(unbanned_ips) => { + // Inform the Swarm to unban the peer + self.events + .push(PeerManagerEvent::UnBanned(*peer_id, unbanned_ips)); + } + } + } - self.ban_and_unban_peers(to_ban_peers, to_unban_peers); + /// If a peer is being banned, this handles the banning operation. + fn handle_ban_operation( + &mut self, + peer_id: &PeerId, + ban_operation: BanOperation, + reason: Option, + ) { + match ban_operation { + BanOperation::DisconnectThePeer => { + // The peer was currently connected, so we start a disconnection. + // Once the peer has disconnected, its connection state will transition to a + // banned state. + self.events.push(PeerManagerEvent::DisconnectPeer( + *peer_id, + reason.unwrap_or(GoodbyeReason::BadScore), + )); + } + BanOperation::PeerDisconnecting => { + // The peer is currently being disconnected and will be banned once the + // disconnection completes. + } + BanOperation::ReadyToBan(banned_ips) => { + // The peer is not currently connected, we can safely ban it at the swarm + // level. + // Inform the Swarm to ban the peer + self.events + .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); + } + } } /// Peers that have been returned by discovery requests that are suitable for dialing are @@ -274,27 +306,6 @@ impl PeerManager { self.status_peers.insert(*peer_id); } - /// Adds a gossipsub subscription to a peer in the peerdb. - pub fn add_subscription(&self, peer_id: &PeerId, subnet: Subnet) { - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - info.subnets.insert(subnet); - } - } - - /// Removes a gossipsub subscription to a peer in the peerdb. - pub fn remove_subscription(&self, peer_id: &PeerId, subnet: Subnet) { - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - info.subnets.remove(&subnet); - } - } - - /// Removes all gossipsub subscriptions to a peer in the peerdb. - pub fn remove_all_subscriptions(&self, peer_id: &PeerId) { - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - info.subnets = Default::default(); - } - } - /// Insert the sync subnet into list of long lived sync committee subnets that we need to /// maintain adequate number of peers for. pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) { @@ -417,10 +428,6 @@ impl PeerManager { ) { if num_established == 0 { // There are no more connections - - // Remove all subnet subscriptions from the peer_db - self.remove_all_subscriptions(&peer_id); - if self .network_globals .peers @@ -441,7 +448,7 @@ impl PeerManager { .peers .read() .peer_info(&peer_id) - .map(|info| info.client.kind.clone()) + .map(|info| info.client().kind.clone()) { if let Some(v) = metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) @@ -496,15 +503,13 @@ impl PeerManager { /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let previous_kind = peer_info.client.kind.clone(); - let previous_listening_addresses = std::mem::replace( - &mut peer_info.listening_addresses, - info.listen_addrs.clone(), - ); - peer_info.client = client::Client::from_identify_info(info); + let previous_kind = peer_info.client().kind.clone(); + let previous_listening_addresses = + peer_info.set_listening_addresses(info.listen_addrs.clone()); + peer_info.set_client(peerdb::client::Client::from_identify_info(info)); - if previous_kind != peer_info.client.kind - || peer_info.listening_addresses != previous_listening_addresses + if previous_kind != peer_info.client().kind + || *peer_info.listening_addresses() != previous_listening_addresses { debug!(self.log, "Identified Peer"; "peer" => %peer_id, "protocol_version" => &info.protocol_version, @@ -517,7 +522,7 @@ impl PeerManager { // update the peer client kind metric if let Some(v) = metrics::get_int_gauge( &metrics::PEERS_PER_CLIENT, - &[&peer_info.client.kind.to_string()], + &[&peer_info.client().kind.to_string()], ) { v.inc() }; @@ -636,7 +641,7 @@ impl PeerManager { RPCError::Disconnected => return, // No penalty for a graceful disconnection }; - self.report_peer(peer_id, peer_action, ReportSource::RPC); + self.report_peer(peer_id, peer_action, ReportSource::RPC, None); } /// A ping request has been received. @@ -646,7 +651,7 @@ impl PeerManager { // received a ping // reset the to-ping timer for this peer debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq); - match peer_info.connection_direction { + match peer_info.connection_direction() { Some(ConnectionDirection::Incoming) => { self.inbound_ping_peers.insert(*peer_id); } @@ -659,7 +664,7 @@ impl PeerManager { } // if the sequence number is unknown send an update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { + if let Some(meta_data) = &peer_info.meta_data() { if *meta_data.seq_number() < seq { debug!(self.log, "Requesting new metadata from peer"; "peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "ping_seq_no" => seq); @@ -683,7 +688,7 @@ impl PeerManager { // received a pong // if the sequence number is unknown send update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { + if let Some(meta_data) = &peer_info.meta_data() { if *meta_data.seq_number() < seq { debug!(self.log, "Requesting new metadata from peer"; "peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "pong_seq_no" => seq); @@ -703,7 +708,7 @@ impl PeerManager { /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - if let Some(known_meta_data) = &peer_info.meta_data { + if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { debug!(self.log, "Updating peer's metadata"; "peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number(), "new_seq_no" => meta_data.seq_number()); @@ -718,64 +723,24 @@ impl PeerManager { debug!(self.log, "Obtained peer's metadata"; "peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number()); } - peer_info.meta_data = Some(meta_data); + peer_info.set_meta_data(meta_data); } else { error!(self.log, "Received METADATA from an unknown peer"; "peer_id" => %peer_id); } } + /// Updates the gossipsub scores for all known peers in gossipsub. pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { - let mut to_ban_peers = Vec::new(); - let mut to_unban_peers = Vec::new(); + let actions = self + .network_globals + .peers + .write() + .update_gossipsub_scores(self.target_peers, gossipsub); - { - // collect peers with scores - let mut guard = self.network_globals.peers.write(); - let mut peers: Vec<_> = guard - .peers_mut() - .filter_map(|(peer_id, info)| { - gossipsub - .peer_score(peer_id) - .map(|score| (peer_id, info, score)) - }) - .collect(); - - // sort descending by score - peers.sort_unstable_by(|(.., s1), (.., s2)| { - s2.partial_cmp(s1).unwrap_or(Ordering::Equal) - }); - - let mut to_ignore_negative_peers = - (self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; - - for (peer_id, info, score) in peers { - let previous_state = info.score_state(); - info.update_gossipsub_score( - score, - if score < 0.0 && to_ignore_negative_peers > 0 { - to_ignore_negative_peers -= 1; - // We ignore the negative score for the best negative peers so that their - // gossipsub score can recover without getting disconnected. - true - } else { - false - }, - ); - - Self::handle_score_transitions( - previous_state, - peer_id, - info, - &mut to_ban_peers, - &mut to_unban_peers, - &mut self.events, - &self.log, - ); - } - } // end write lock - - self.ban_and_unban_peers(to_ban_peers, to_unban_peers); + for (peer_id, score_action) in actions { + self.handle_score_action(&peer_id, score_action, None); + } } /* Internal functions */ @@ -810,14 +775,15 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. fn inject_disconnect(&mut self, peer_id: &PeerId) { - if self + let ban_operation = self .network_globals .peers .write() - .inject_disconnect(peer_id) - { + .inject_disconnect(peer_id); + + if let Some(ban_operation) = ban_operation { // The peer was awaiting a ban, continue to ban the peer. - self.ban_peer(peer_id, GoodbyeReason::BadScore); + self.handle_ban_operation(peer_id, ban_operation, None); } // Remove the ping and status timer for the peer @@ -879,7 +845,7 @@ impl PeerManager { .peers .read() .peer_info(peer_id) - .map(|peer_info| peer_info.client.kind.clone()) + .map(|peer_info| peer_info.client().kind.clone()) { if let Some(v) = metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) @@ -891,154 +857,6 @@ impl PeerManager { true } - /// This handles score transitions between states. It transitions peers states from - /// disconnected/banned/connected. - fn handle_score_transitions( - previous_state: ScoreState, - peer_id: &PeerId, - info: &mut PeerInfo, - to_ban_peers: &mut Vec, - to_unban_peers: &mut Vec, - events: &mut SmallVec<[PeerManagerEvent; 16]>, - log: &slog::Logger, - ) { - match (info.score_state(), previous_state) { - (ScoreState::Banned, ScoreState::Healthy | ScoreState::Disconnected) => { - debug!(log, "Peer has been banned"; "peer_id" => %peer_id, "score" => %info.score()); - to_ban_peers.push(*peer_id); - } - (ScoreState::Disconnected, ScoreState::Banned | ScoreState::Healthy) => { - debug!(log, "Peer transitioned to disconnect state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); - // disconnect the peer if it's currently connected or dialing - if info.is_connected_or_dialing() { - // Change the state to inform that we are disconnecting the peer. - info.disconnecting(false); - events.push(PeerManagerEvent::DisconnectPeer( - *peer_id, - GoodbyeReason::BadScore, - )); - } else if previous_state == ScoreState::Banned { - to_unban_peers.push(*peer_id); - } - } - (ScoreState::Healthy, ScoreState::Disconnected) => { - debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); - } - (ScoreState::Healthy, ScoreState::Banned) => { - debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); - // unban the peer if it was previously banned. - to_unban_peers.push(*peer_id); - } - // Explicitly ignore states that haven't transitioned. - (ScoreState::Healthy, ScoreState::Healthy) => {} - (ScoreState::Disconnected, ScoreState::Disconnected) => {} - (ScoreState::Banned, ScoreState::Banned) => {} - } - } - - /// Updates the state of banned peers. - fn ban_and_unban_peers(&mut self, to_ban_peers: Vec, to_unban_peers: Vec) { - // process banning peers - for peer_id in to_ban_peers { - self.ban_peer(&peer_id, GoodbyeReason::BadScore); - } - // process unbanning peers - for peer_id in to_unban_peers { - if let Err(e) = self.unban_peer(&peer_id) { - error!(self.log, "{}", e; "peer_id" => %peer_id); - } - } - } - - /// Updates the scores of known peers according to their connection - /// status and the time that has passed. - /// NOTE: This is experimental and will likely be adjusted - fn update_peer_scores(&mut self) { - /* Check how long have peers been in this state and update their reputations if needed */ - let mut to_ban_peers = Vec::new(); - let mut to_unban_peers = Vec::new(); - - for (peer_id, info) in self.network_globals.peers.write().peers_mut() { - let previous_state = info.score_state(); - // Update scores - info.score_update(); - - Self::handle_score_transitions( - previous_state, - peer_id, - info, - &mut to_ban_peers, - &mut to_unban_peers, - &mut self.events, - &self.log, - ); - } - self.ban_and_unban_peers(to_ban_peers, to_unban_peers); - } - - /// Bans a peer. - /// - /// Records updates the peers connection status and updates the peer db as well as blocks the - /// peer from participating in discovery and removes them from the routing table. - fn ban_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { - // NOTE: When we ban a peer, its IP address can be banned. We do not recursively search - // through all our connected peers banning all other peers that are using this IP address. - // If these peers are behaving fine, we permit their current connections. However, if any new - // nodes or current nodes try to reconnect on a banned IP, they will be instantly banned - // and disconnected. - - let mut peer_db = self.network_globals.peers.write(); - - match peer_db.disconnect_and_ban(peer_id) { - BanOperation::DisconnectThePeer => { - // The peer was currently connected, so we start a disconnection. - // Once the peer has disconnected, this function will be called to again to ban - // at the swarm level. - self.events - .push(PeerManagerEvent::DisconnectPeer(*peer_id, reason)); - } - BanOperation::PeerDisconnecting => { - // The peer is currently being disconnected and will be banned once the - // disconnection completes. - } - BanOperation::ReadyToBan => { - // The peer is not currently connected, we can safely ban it at the swarm - // level. - let banned_ip_addresses = peer_db - .peer_info(peer_id) - .map(|info| { - info.seen_addresses() - .filter(|ip| peer_db.is_ip_banned(ip)) - .collect::>() - }) - .unwrap_or_default(); - - // Inform the Swarm to ban the peer - self.events - .push(PeerManagerEvent::Banned(*peer_id, banned_ip_addresses)); - } - } - } - - /// Unbans a peer. - /// - /// Updates the peer's connection status and updates the peer db as well as removes - /// previous bans from discovery. - fn unban_peer(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { - let mut peer_db = self.network_globals.peers.write(); - peer_db.unban(peer_id)?; - - let seen_ip_addresses = peer_db - .peer_info(peer_id) - .map(|info| info.seen_addresses().collect::>()) - .unwrap_or_default(); - - // Inform the Swarm to unban the peer - self.events - .push(PeerManagerEvent::UnBanned(*peer_id, seen_ip_addresses)); - Ok(()) - } - // Gracefully disconnects a peer without banning them. fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events @@ -1046,7 +864,7 @@ impl PeerManager { self.network_globals .peers .write() - .notify_disconnecting(peer_id, false); + .notify_disconnecting(&peer_id, false); } /// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`. @@ -1110,8 +928,11 @@ impl PeerManager { self.events.push(PeerManagerEvent::DiscoverPeers); } - // Updates peer's scores. - self.update_peer_scores(); + // Updates peer's scores and unban any peers if required. + let actions = self.network_globals.peers.write().update_scores(); + for (peer_id, action) in actions { + self.handle_score_action(&peer_id, action, None); + } // Maintain minimum count for sync committee peers. self.maintain_sync_committee_peers(); @@ -1394,67 +1215,6 @@ mod tests { ); } - #[tokio::test] - async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { - let mut peer_manager = build_peer_manager(3).await; - - // Create 3 peers to connect to. - let peer0 = PeerId::random(); - let inbound_only_peer1 = PeerId::random(); - let outbound_only_peer1 = PeerId::random(); - - peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); - peer_manager.inject_connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); - - // Connect to two peers that are on the threshold of being disconnected. - peer_manager.inject_connect_ingoing( - &inbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), - None, - ); - peer_manager.inject_connect_outgoing( - &outbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), - None, - ); - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(inbound_only_peer1)) - .unwrap() - .add_to_score(-19.9); - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(outbound_only_peer1)) - .unwrap() - .add_to_score(-19.9); - // Update the gossipsub scores to induce connection downgrade - // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection. - // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected, - // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(inbound_only_peer1)) - .unwrap() - .set_gossipsub_score(-85.0); - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(outbound_only_peer1)) - .unwrap() - .set_gossipsub_score(-85.0); - - peer_manager.heartbeat(); - - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); - } - #[tokio::test] async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { let mut peer_manager = build_peer_manager(3).await; @@ -1466,18 +1226,18 @@ mod tests { let inbound_only_peer1 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); - peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); - peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None); // Connect to two peers that are on the threshold of being disconnected. peer_manager.inject_connect_ingoing( &inbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), + "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None, ); peer_manager.inject_connect_outgoing( &outbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), + "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None, ); peer_manager @@ -1486,14 +1246,14 @@ mod tests { .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() - .add_to_score(-19.9); + .add_to_score(-19.8); peer_manager .network_globals .peers .write() .peer_info_mut(&(outbound_only_peer1)) .unwrap() - .add_to_score(-19.9); + .add_to_score(-19.8); peer_manager .network_globals .peers @@ -1509,9 +1269,9 @@ mod tests { .unwrap() .set_gossipsub_score(-85.0); peer_manager.heartbeat(); - // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + // Tests that when we are over the target peer limit, after disconnecting one unhealthy peer, // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } #[tokio::test] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 542ba07e7..c382f6e4f 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -1,26 +1,34 @@ -use super::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; -use super::peer_sync_status::PeerSyncStatus; -use super::score::{Score, ScoreState}; -use crate::rpc::methods::MetaData; -use crate::Enr; -use crate::PeerId; use crate::{ + metrics, multiaddr::{Multiaddr, Protocol}, types::Subnet, + Enr, Gossipsub, PeerId, }; +use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; +use score::{PeerAction, ReportSource, Score, ScoreState}; use slog::{crit, debug, error, trace, warn}; -use std::collections::HashMap; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, SocketAddr}; use std::time::Instant; +use sync_status::SyncStatus; use types::EthSpec; +pub mod client; +pub mod peer_info; +pub mod score; +pub mod sync_status; + /// Max number of disconnected nodes to remember. const MAX_DC_PEERS: usize = 500; /// The maximum number of banned nodes to remember. const MAX_BANNED_PEERS: usize = 1000; /// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP. const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5; +/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing +/// them in lighthouse. +const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; /// Storage of known peers, their reputation and information pub struct PeerDB { @@ -34,13 +42,1029 @@ pub struct PeerDB { log: slog::Logger, } +impl PeerDB { + pub fn new(trusted_peers: Vec, log: &slog::Logger) -> Self { + // Initialize the peers hashmap with trusted peers + let peers = trusted_peers + .into_iter() + .map(|peer_id| (peer_id, PeerInfo::trusted_peer_info())) + .collect(); + Self { + log: log.clone(), + disconnected_peers: 0, + banned_peers_count: BannedPeersCount::new(), + peers, + } + } + + /* Getters */ + + /// Gives the score of a peer, or default score if it is unknown. + pub fn score(&self, peer_id: &PeerId) -> f64 { + self.peers + .get(peer_id) + .map_or(&Score::default(), |info| info.score()) + .score() + } + + /// Returns an iterator over all peers in the db. + pub fn peers(&self) -> impl Iterator)> { + self.peers.iter() + } + + /// Gives the ids of all known peers. + pub fn peer_ids(&self) -> impl Iterator { + self.peers.keys() + } + + /// Returns a peer's info, if known. + pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + /// Returns a mutable reference to a peer's info if known. + // VISIBILITY: The peer manager is able to modify some elements of the peer info, such as sync + // status. + pub(super) fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { + self.peers.get_mut(peer_id) + } + + /// Returns if the peer is already connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected { .. }) + ) + } + + /// If we are connected or currently dialing the peer returns true. + pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected { .. }) + | Some(PeerConnectionStatus::Dialing { .. }) + ) + } + + /// If we are connected or in the process of disconnecting + pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected { .. }) + | Some(PeerConnectionStatus::Disconnecting { .. }) + ) + } + + /// Returns true if the peer should be dialed. This checks the connection state and the + /// score state and determines if the peer manager should dial this peer. + pub fn should_dial(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Disconnected { .. }) + | Some(PeerConnectionStatus::Unknown { .. }) + | None + ) && !self.score_state_banned_or_disconnected(peer_id) + } + + /// Returns true if the peer is synced at least to our current head. + pub fn is_synced(&self, peer_id: &PeerId) -> bool { + match self.peers.get(peer_id).map(|info| info.sync_status()) { + Some(SyncStatus::Synced { .. }) => true, + Some(_) => false, + None => false, + } + } + + /// Returns the current [`BanResult`] of the peer. This doesn't check the connection state, rather the + /// underlying score of the peer. A peer may be banned but still in the connected state + /// temporarily. + /// + /// This is used to determine if we should accept incoming connections or not. + pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { + if let Some(peer) = self.peers.get(peer_id) { + match peer.score_state() { + ScoreState::Banned => BanResult::BadScore, + _ => { + if let Some(ip) = self.ip_is_banned(peer) { + BanResult::BannedIp(ip) + } else { + BanResult::NotBanned + } + } + } + } else { + BanResult::NotBanned + } + } + + /// Checks if the peer's known addresses are currently banned. + fn ip_is_banned(&self, peer: &PeerInfo) -> Option { + peer.seen_ip_addresses() + .find(|ip| self.banned_peers_count.ip_is_banned(ip)) + } + + /// Returns true if the IP is banned. + pub fn is_ip_banned(&self, ip: &IpAddr) -> bool { + self.banned_peers_count.ip_is_banned(ip) + } + + /// Returns true if the Peer is either banned or in the disconnected state. + fn score_state_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { + if let Some(peer) = self.peers.get(peer_id) { + match peer.score_state() { + ScoreState::Banned | ScoreState::Disconnected => true, + _ => self.ip_is_banned(peer).is_some(), + } + } else { + false + } + } + + /// Gives the ids and info of all known connected peers. + pub fn connected_peers(&self) -> impl Iterator)> { + self.peers.iter().filter(|(_, info)| info.is_connected()) + } + + /// Gives the ids of all known connected peers. + pub fn connected_peer_ids(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_connected()) + .map(|(peer_id, _)| peer_id) + } + + /// Connected or dialing peers + pub fn connected_or_dialing_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_connected() || info.is_dialing()) + .map(|(peer_id, _)| peer_id) + } + + /// Connected outbound-only peers + pub fn connected_outbound_only_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_outbound_only()) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the `peer_id` of all known connected and synced peers. + pub fn synced_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| { + if info.sync_status().is_synced() || info.sync_status().is_advanced() { + return info.is_connected(); + } + false + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the `peer_id` of all known connected and advanced peers. + pub fn advanced_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| { + if info.sync_status().is_advanced() { + return info.is_connected(); + } + false + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives an iterator of all peers on a given subnet. + pub fn good_peers_on_subnet(&self, subnet: Subnet) -> impl Iterator { + self.peers + .iter() + .filter(move |(_, info)| { + // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers + info.is_connected() + && info.on_subnet_metadata(&subnet) + && info.on_subnet_gossipsub(&subnet) + && info.is_good_gossipsub_peer() + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the ids of all known disconnected peers. + pub fn disconnected_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_disconnected()) + .map(|(peer_id, _)| peer_id) + } + + /// Returns the ids of all known banned peers. + pub fn banned_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_banned()) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the ids of all known banned peers. + pub fn banned_peers_by_score(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.score_is_banned()) + .map(|(peer_id, _)| peer_id) + } + + /// Returns a vector of all connected peers sorted by score beginning with the worst scores. + /// Ties get broken randomly. + pub fn worst_connected_peers(&self) -> Vec<(&PeerId, &PeerInfo)> { + let mut connected = self + .peers + .iter() + .filter(|(_, info)| info.is_connected()) + .collect::>(); + + connected.shuffle(&mut rand::thread_rng()); + connected.sort_by_key(|(_, info)| info.score()); + connected + } + + /// Returns a vector containing peers (their ids and info), sorted by + /// score from highest to lowest, and filtered using `is_status` + pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> + where + F: Fn(&PeerInfo) -> bool, + { + let mut by_status = self + .peers + .iter() + .filter(|(_, info)| is_status(info)) + .collect::>(); + by_status.sort_by_key(|(_, info)| info.score()); + by_status.into_iter().rev().collect() + } + + /// Returns the peer with highest reputation that satisfies `is_status` + pub fn best_by_status(&self, is_status: F) -> Option<&PeerId> + where + F: Fn(&PeerInfo) -> bool, + { + self.peers + .iter() + .filter(|(_, info)| is_status(info)) + .max_by_key(|(_, info)| info.score()) + .map(|(id, _)| id) + } + + /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. + pub fn connection_status(&self, peer_id: &PeerId) -> Option { + self.peer_info(peer_id) + .map(|info| info.connection_status().clone()) + } + + /* Mutability */ + + /// Allows the sync module to update sync status' of peers. Returns None, if the peer doesn't + /// exist and returns Some(bool) representing if the sync state was modified. + pub fn update_sync_status( + &mut self, + peer_id: &PeerId, + sync_status: SyncStatus, + ) -> Option { + let info = self.peers.get_mut(peer_id)?; + Some(info.update_sync_status(sync_status)) + } + + /// Updates the scores of known peers according to their connection status and the time that + /// has passed. This function returns a list of peers that have been unbanned. + /// NOTE: Peer scores cannot be penalized during the update, they can only increase. Therefore + /// it not possible to ban peers when updating scores. + #[must_use = "The unbanned peers must be sent to libp2p"] + pub(super) fn update_scores(&mut self) -> Vec<(PeerId, ScoreUpdateResult)> { + // Peer can be unbanned in this process. + // We return the result, such that the peer manager can inform the swarm to lift the libp2p + // ban on these peers. + let mut peers_to_unban = Vec::new(); + let mut result = Vec::new(); + + for (peer_id, info) in self.peers.iter_mut() { + let previous_state = info.score_state(); + // Update scores + info.score_update(); + + match Self::handle_score_transition(previous_state, peer_id, info, &self.log) { + // A peer should not be able to be banned from a score update. + ScoreTransitionResult::Banned => { + error!(self.log, "Peer has been banned in an update"; "peer_id" => %peer_id) + } + // A peer should not be able to transition to a disconnected state from a healthy + // state in a score update. + ScoreTransitionResult::Disconnected => { + error!(self.log, "Peer has been disconnected in an update"; "peer_id" => %peer_id) + } + ScoreTransitionResult::Unbanned => { + peers_to_unban.push(*peer_id); + } + ScoreTransitionResult::NoAction => {} + } + } + + // Update the state in the peerdb + for unbanned_peer in peers_to_unban { + self.update_connection_state(&unbanned_peer, NewConnectionState::Unbanned); + let seen_ip_addresses = self + .peers + .get(&unbanned_peer) + .map(|info| { + info.seen_ip_addresses() + .filter(|ip| !self.is_ip_banned(ip)) + .collect::>() + }) + .unwrap_or_default(); + result.push(( + unbanned_peer, + ScoreUpdateResult::Unbanned(seen_ip_addresses), + )); + } + // Return the list so that the peer manager can update libp2p + result + } + + /// Updates gossipsub scores for all peers. + #[must_use = "Score updates need to be reported to libp2p"] + pub(super) fn update_gossipsub_scores( + &mut self, + target_peers: usize, + gossipsub: &Gossipsub, + ) -> Vec<(PeerId, ScoreUpdateResult)> { + let mut actions = Vec::new(); + let mut results = Vec::new(); + + let mut peers: Vec<_> = self + .peers + .iter_mut() + .filter(|(_peer_id, info)| info.is_connected()) + .filter_map(|(peer_id, info)| { + gossipsub + .peer_score(peer_id) + .map(|score| (peer_id, info, score)) + }) + .collect(); + + // sort descending by score + peers.sort_unstable_by(|(.., s1), (.., s2)| s2.partial_cmp(s1).unwrap_or(Ordering::Equal)); + + let mut to_ignore_negative_peers = + (target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; + + for (peer_id, info, score) in peers { + let previous_state = info.score_state(); + info.update_gossipsub_score( + score, + if score < 0.0 && to_ignore_negative_peers > 0 { + to_ignore_negative_peers -= 1; + // We ignore the negative score for the best negative peers so that their + // gossipsub score can recover without getting disconnected. + true + } else { + false + }, + ); + + actions.push(( + *peer_id, + Self::handle_score_transition(previous_state, peer_id, info, &self.log), + )); + } + + for (peer_id, action) in actions { + let result = match action { + ScoreTransitionResult::Banned => { + // The peer was banned as a result of this action. + self.update_connection_state(&peer_id, NewConnectionState::Banned) + .into() + } + ScoreTransitionResult::Disconnected => { + // The peer needs to be disconnected + + // Update the state + self.update_connection_state( + &peer_id, + NewConnectionState::Disconnecting { to_ban: false }, + ); + ScoreUpdateResult::Disconnect + } + ScoreTransitionResult::NoAction => ScoreUpdateResult::NoAction, + ScoreTransitionResult::Unbanned => { + self.update_connection_state(&peer_id, NewConnectionState::Unbanned); + let seen_ip_addresses = self + .peers + .get(&peer_id) + .map(|info| { + info.seen_ip_addresses() + .filter(|ip| !self.is_ip_banned(ip)) + .collect::>() + }) + .unwrap_or_default(); + + ScoreUpdateResult::Unbanned(seen_ip_addresses) + } + }; + + // Actions to be handled by the peer manager for each peer id + if !matches!(result, ScoreUpdateResult::NoAction) { + results.push((peer_id, result)); + } + } + results + } + + /// Reports a peer for some action. + /// + /// The action can only cause a negative effect. This can lead to disconnecting or banning a + /// specific peer. Therefore the result of this function returns if the peer needs to be banned + /// or disconnected. + /// + /// If the peer doesn't exist, log a warning and insert defaults. + #[must_use = "Banned and disconnected peers need to be handled in libp2p"] + pub(super) fn report_peer( + &mut self, + peer_id: &PeerId, + action: PeerAction, + source: ReportSource, + ) -> ScoreUpdateResult { + match self.peers.get_mut(peer_id) { + Some(info) => { + let previous_state = info.score_state(); + info.apply_peer_action_to_score(action); + metrics::inc_counter_vec( + &metrics::PEER_ACTION_EVENTS_PER_CLIENT, + &[info.client().kind.as_ref(), action.as_ref(), source.into()], + ); + let result = + Self::handle_score_transition(previous_state, peer_id, info, &self.log); + if previous_state == info.score_state() { + debug!(self.log, "Peer score adjusted"; "peer_id" => %peer_id, "score" => %info.score()); + } + match result { + ScoreTransitionResult::Banned => { + // The peer was banned as a result of this action. + self.update_connection_state(peer_id, NewConnectionState::Banned) + .into() + } + ScoreTransitionResult::Disconnected => { + // The peer needs to be disconnected + + // Update the state + self.update_connection_state( + peer_id, + NewConnectionState::Disconnecting { to_ban: false }, + ); + ScoreUpdateResult::Disconnect + } + ScoreTransitionResult::NoAction => ScoreUpdateResult::NoAction, + ScoreTransitionResult::Unbanned => { + error!(self.log, "Report peer action lead to an unbanning"; "peer_id" => %peer_id); + ScoreUpdateResult::NoAction + } + } + } + None => { + debug!(self.log, "Reporting a peer that doesn't exist"; "peer_id" =>%peer_id); + ScoreUpdateResult::NoAction + } + } + } + + // Connection Status + + /// A peer is being dialed. + // VISIBILITY: Only the peer manager can adjust the connection state + pub(super) fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { + let info = self.peers.entry(*peer_id).or_default(); + if let Some(enr) = enr { + info.set_enr(enr); + } + + if let Err(e) = info.dialing_peer() { + error!(self.log, "{}", e; "peer_id" => %peer_id); + } + + // If the peer was banned, remove the banned peer and addresses. + if info.is_banned() { + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + } + + // If the peer was disconnected, reduce the disconnected peer count. + if info.is_disconnected() { + self.disconnected_peers = self.disconnected_peers().count().saturating_sub(1); + } + } + + /// Update min ttl of a peer. + // VISIBILITY: Only the peer manager can update the min_ttl + pub(super) fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) { + let info = self.peers.entry(*peer_id).or_default(); + + // only update if the ttl is longer + if info.min_ttl().is_none() || Some(&min_ttl) > info.min_ttl() { + info.set_min_ttl(min_ttl); + + let min_ttl_secs = min_ttl + .checked_duration_since(Instant::now()) + .map(|duration| duration.as_secs()) + .unwrap_or_else(|| 0); + debug!(self.log, "Updating the time a peer is required for"; "peer_id" => %peer_id, "future_min_ttl_secs" => min_ttl_secs); + } + } + + /// Adds a gossipsub subscription to a peer in the peerdb. + // VISIBILITY: The behaviour is able to adjust subscriptions. + pub(crate) fn add_subscription(&mut self, peer_id: &PeerId, subnet: Subnet) { + if let Some(info) = self.peers.get_mut(peer_id) { + info.insert_subnet(subnet); + } + } + + /// Removes a gossipsub subscription to a peer in the peerdb. + // VISIBILITY: The behaviour is able to adjust subscriptions. + pub(crate) fn remove_subscription(&mut self, peer_id: &PeerId, subnet: &Subnet) { + if let Some(info) = self.peers.get_mut(peer_id) { + info.remove_subnet(subnet); + } + } + + /// Extends the ttl of all peers on the given subnet that have a shorter + /// min_ttl than what's given. + // VISIBILITY: The behaviour is able to adjust subscriptions. + pub(crate) fn extend_peers_on_subnet(&mut self, subnet: &Subnet, min_ttl: Instant) { + let log = &self.log; + self.peers.iter_mut() + .filter(move |(_, info)| { + info.is_connected() && info.on_subnet_metadata(subnet) && info.on_subnet_gossipsub(subnet) + }) + .for_each(|(peer_id,info)| { + if info.min_ttl().is_none() || Some(&min_ttl) > info.min_ttl() { + info.set_min_ttl(min_ttl); + } + let min_ttl_secs = min_ttl + .checked_duration_since(Instant::now()) + .map(|duration| duration.as_secs()) + .unwrap_or_else(|| 0); + trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => %peer_id, "min_ttl" => min_ttl_secs); + }); + } + + /// Sets a peer as connected with an ingoing connection. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn connect_ingoing( + &mut self, + peer_id: &PeerId, + seen_address: Multiaddr, + enr: Option, + ) { + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr, + seen_address, + direction: ConnectionDirection::Incoming, + }, + ); + } + + /// Sets a peer as connected with an outgoing connection. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn connect_outgoing( + &mut self, + peer_id: &PeerId, + seen_address: Multiaddr, + enr: Option, + ) { + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr, + seen_address, + direction: ConnectionDirection::Outgoing, + }, + ); + } + + /// The connection state of the peer has been changed. Modify the peer in the db to ensure all + /// variables are in sync with libp2p. + /// Updating the state can lead to a `BanOperation` which needs to be processed via the peer + /// manager and should be handled in the peer manager. + // NOTE: This function is vital in keeping the connection state, and thus the peerdb size in + // check and up to date with libp2p. + fn update_connection_state( + &mut self, + peer_id: &PeerId, + new_state: NewConnectionState, + ) -> Option { + let log_ref = &self.log; + let info = self.peers.entry(*peer_id).or_insert_with(|| { + // If we are not creating a new connection (or dropping a current inbound connection) log a warning indicating we are updating a + // connection state for an unknown peer. + if !matches!( + new_state, + NewConnectionState::Connected { .. } | NewConnectionState::Disconnecting { .. } + ) { + warn!(log_ref, "Updating state of unknown peer"; + "peer_id" => %peer_id, "new_state" => ?new_state); + } + PeerInfo::default() + }); + + // Ban the peer if the score is not already low enough. + if matches!(new_state, NewConnectionState::Banned) { + match info.score_state() { + ScoreState::Banned => {} + _ => { + // If score isn't low enough to ban, this function has been called incorrectly. + error!(self.log, "Banning a peer with a good score"; "peer_id" => %peer_id); + info.apply_peer_action_to_score(score::PeerAction::Fatal); + } + } + } + + // Handle all the possible state changes + match (info.connection_status().clone(), new_state) { + /* Handle the transition to a connected state */ + ( + current_state, + NewConnectionState::Connected { + enr, + direction, + seen_address, + }, + ) => { + // Update the ENR if one exists + if let Some(enr) = enr { + info.set_enr(enr); + } + + match current_state { + PeerConnectionStatus::Disconnected { .. } => { + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + PeerConnectionStatus::Banned { .. } => { + error!(self.log, "Accepted a connection from a banned peer"; "peer_id" => %peer_id); + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + } + PeerConnectionStatus::Disconnecting { .. } => { + warn!(self.log, "Connected to a disconnecting peer"; "peer_id" => %peer_id) + } + PeerConnectionStatus::Unknown + | PeerConnectionStatus::Connected { .. } + | PeerConnectionStatus::Dialing { .. } => {} + } + + // Add the seen ip address and port to the peer's info + let socket_addr = match seen_address.iter().fold( + (None, None), + |(found_ip, found_port), protocol| match protocol { + Protocol::Ip4(ip) => (Some(ip.into()), found_port), + Protocol::Ip6(ip) => (Some(ip.into()), found_port), + Protocol::Tcp(port) => (found_ip, Some(port)), + _ => (found_ip, found_port), + }, + ) { + (Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)), + (Some(_ip), None) => { + crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id); + None + } + _ => None, + }; + + // Update the connection state + match direction { + ConnectionDirection::Incoming => info.connect_ingoing(socket_addr), + ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr), + } + } + + /* Handle the transition to the disconnected state */ + (old_state, NewConnectionState::Disconnected) => { + // Remove all subnets for disconnected peers. + info.clear_subnets(); + + match old_state { + PeerConnectionStatus::Banned { .. } => {} + PeerConnectionStatus::Disconnected { .. } => {} + PeerConnectionStatus::Disconnecting { to_ban } if to_ban => { + // Update the status. + info.set_connection_status(PeerConnectionStatus::Banned { + since: Instant::now(), + }); + self.banned_peers_count + .add_banned_peer(info.seen_ip_addresses()); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + self.shrink_to_fit(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + PeerConnectionStatus::Disconnecting { .. } + | PeerConnectionStatus::Unknown + | PeerConnectionStatus::Connected { .. } + | PeerConnectionStatus::Dialing { .. } => { + self.disconnected_peers += 1; + info.set_connection_status(PeerConnectionStatus::Disconnected { + since: Instant::now(), + }); + self.shrink_to_fit(); + } + } + } + + /* Handle the transition to the disconnecting state */ + ( + PeerConnectionStatus::Banned { .. } | PeerConnectionStatus::Disconnected { .. }, + NewConnectionState::Disconnecting { to_ban }, + ) => { + error!(self.log, "Disconnecting from an already disconnected peer"; "peer_id" => %peer_id); + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban }); + } + (_, NewConnectionState::Disconnecting { to_ban }) => { + // We overwrite all states and set this peer to be disconnecting. + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban }); + } + + /* Handle transitioning to the banned state */ + (PeerConnectionStatus::Disconnected { .. }, NewConnectionState::Banned) => { + // It is possible to ban a peer that is currently disconnected. This can occur when + // there are many events that score it poorly and are processed after it has disconnected. + info.set_connection_status(PeerConnectionStatus::Banned { + since: Instant::now(), + }); + self.banned_peers_count + .add_banned_peer(info.seen_ip_addresses()); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + self.shrink_to_fit(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + (PeerConnectionStatus::Disconnecting { .. }, NewConnectionState::Banned) => { + // NOTE: This can occur due a rapid downscore of a peer. It goes through the + // disconnection phase and straight into banning in a short time-frame. + debug!(log_ref, "Banning peer that is currently disconnecting"; "peer_id" => %peer_id); + // Ban the peer once the disconnection process completes. + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban: true }); + return Some(BanOperation::PeerDisconnecting); + } + (PeerConnectionStatus::Banned { .. }, NewConnectionState::Banned) => { + error!(log_ref, "Banning already banned peer"; "peer_id" => %peer_id); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + ( + PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. }, + NewConnectionState::Banned, + ) => { + // update the state + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban: true }); + return Some(BanOperation::DisconnectThePeer); + } + (PeerConnectionStatus::Unknown, NewConnectionState::Banned) => { + // shift the peer straight to banned + warn!(log_ref, "Banning a peer of unknown connection state"; "peer_id" => %peer_id); + self.banned_peers_count + .add_banned_peer(info.seen_ip_addresses()); + info.set_connection_status(PeerConnectionStatus::Banned { + since: Instant::now(), + }); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + self.shrink_to_fit(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + + /* Handle the connection state of unbanning a peer */ + (old_state, NewConnectionState::Unbanned) => { + if matches!(info.score_state(), ScoreState::Banned) { + error!(self.log, "Unbanning a banned peer"; "peer_id" => %peer_id); + } + match old_state { + PeerConnectionStatus::Unknown | PeerConnectionStatus::Connected { .. } => { + error!(self.log, "Unbanning a connected peer"; "peer_id" => %peer_id); + } + PeerConnectionStatus::Disconnected { .. } + | PeerConnectionStatus::Disconnecting { .. } => { + debug!(self.log, "Unbanning disconnected or disconnecting peer"; "peer_id" => %peer_id); + } // These are odd but fine. + PeerConnectionStatus::Dialing { .. } => {} // Also odd but acceptable + PeerConnectionStatus::Banned { since } => { + info.set_connection_status(PeerConnectionStatus::Disconnected { since }); + + // Increment the disconnected count and reduce the banned count + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + self.disconnected_peers = + self.disconnected_peers().count().saturating_add(1); + self.shrink_to_fit(); + } + } + } + } + None + } + + /// Sets the peer as disconnected. A banned peer remains banned. If the node has become banned, + /// this returns true, otherwise this is false. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn inject_disconnect(&mut self, peer_id: &PeerId) -> Option { + self.update_connection_state(peer_id, NewConnectionState::Disconnected) + } + + /// The peer manager has notified us that the peer is undergoing a normal disconnect. Optionally tag + /// the peer to be banned after the disconnect. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn notify_disconnecting(&mut self, peer_id: &PeerId, to_ban: bool) { + self.update_connection_state(peer_id, NewConnectionState::Disconnecting { to_ban }); + } + + /// Removes banned and disconnected peers from the DB if we have reached any of our limits. + /// Drops the peers with the lowest reputation so that the number of + /// disconnected peers is less than MAX_DC_PEERS + fn shrink_to_fit(&mut self) { + // Remove excess banned peers + while self.banned_peers_count.banned_peers() > MAX_BANNED_PEERS { + if let Some(to_drop) = if let Some((id, info, _)) = self + .peers + .iter() + .filter_map(|(id, info)| match info.connection_status() { + PeerConnectionStatus::Banned { since } => Some((id, info, since)), + _ => None, + }) + .min_by_key(|(_, _, since)| *since) + { + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + Some(*id) + } else { + // If there is no minimum, this is a coding error. + crit!( + self.log, + "banned_peers > MAX_BANNED_PEERS despite no banned peers in db!" + ); + // reset banned_peers this will also exit the loop + self.banned_peers_count = BannedPeersCount::new(); + None + } { + debug!(self.log, "Removing old banned peer"; "peer_id" => %to_drop); + self.peers.remove(&to_drop); + } + } + + // Remove excess disconnected peers + while self.disconnected_peers > MAX_DC_PEERS { + if let Some(to_drop) = self + .peers + .iter() + .filter(|(_, info)| info.is_disconnected()) + .filter_map(|(id, info)| match info.connection_status() { + PeerConnectionStatus::Disconnected { since } => Some((id, since)), + _ => None, + }) + .min_by_key(|(_, age)| *age) + .map(|(id, _)| *id) + { + debug!(self.log, "Removing old disconnected peer"; "peer_id" => %to_drop, "disconnected_size" => self.disconnected_peers.saturating_sub(1)); + self.peers.remove(&to_drop); + } + // If there is no minimum, this is a coding error. For safety we decrease + // the count to avoid a potential infinite loop. + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + } + + /// This handles score transitions between states. It transitions peers states from + /// disconnected/banned/connected. + fn handle_score_transition( + previous_state: ScoreState, + peer_id: &PeerId, + info: &PeerInfo, + log: &slog::Logger, + ) -> ScoreTransitionResult { + match (info.score_state(), previous_state) { + (ScoreState::Banned, ScoreState::Healthy | ScoreState::Disconnected) => { + debug!(log, "Peer has been banned"; "peer_id" => %peer_id, "score" => %info.score()); + ScoreTransitionResult::Banned + } + (ScoreState::Disconnected, ScoreState::Banned | ScoreState::Healthy) => { + debug!(log, "Peer transitioned to disconnect state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); + // disconnect the peer if it's currently connected or dialing + if info.is_connected_or_dialing() { + ScoreTransitionResult::Disconnected + } else if previous_state == ScoreState::Banned { + ScoreTransitionResult::Unbanned + } else { + // The peer was healthy, but is already disconnected, so there is no action to + // take. + ScoreTransitionResult::NoAction + } + } + (ScoreState::Healthy, ScoreState::Disconnected) => { + debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); + ScoreTransitionResult::NoAction + } + (ScoreState::Healthy, ScoreState::Banned) => { + debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); + // unban the peer if it was previously banned. + ScoreTransitionResult::Unbanned + } + // Explicitly ignore states that haven't transitioned. + (ScoreState::Healthy, ScoreState::Healthy) => ScoreTransitionResult::NoAction, + (ScoreState::Disconnected, ScoreState::Disconnected) => ScoreTransitionResult::NoAction, + + (ScoreState::Banned, ScoreState::Banned) => ScoreTransitionResult::NoAction, + } + } +} + +/// Internal enum for managing connection state transitions. +#[derive(Debug)] +enum NewConnectionState { + /// A peer has connected to us. + Connected { + /// An optional known ENR if the peer was dialed. + enr: Option, + /// The seen socket address associated with the connection. + seen_address: Multiaddr, + /// The direction, incoming/outgoing. + direction: ConnectionDirection, + }, + /// The peer is in the process of being disconnected. + Disconnecting { + /// Whether the peer should be banned after the disconnect occurs. + to_ban: bool, + }, + /// The peer has been disconnected from our local node. + Disconnected, + /// The peer has been banned and actions to shift the peer to the banned state should be + /// undertaken + Banned, + /// The peer has been unbanned and the connection state should be updated to reflect this. + Unbanned, +} + +/// The result of applying a score transition to a peer. +enum ScoreTransitionResult { + /// The peer has become disconnected. + Disconnected, + /// The peer has been banned. + Banned, + /// The peer has been unbanned. + Unbanned, + /// No state change occurred. + NoAction, +} + +/// The type of results that can happen from executing the `report_peer` function. +pub enum ScoreUpdateResult { + /// The reported peer must be banned. + Ban(BanOperation), + /// The reported peer transitioned to the disconnected state and must be disconnected. + Disconnect, + /// The peer has been unbanned and this needs to be propagated to libp2p. The list of unbanned + /// IP addresses are sent along with it. + Unbanned(Vec), + /// The report requires no further action. + NoAction, +} + +impl From> for ScoreUpdateResult { + fn from(ban_operation: Option) -> Self { + match ban_operation { + None => ScoreUpdateResult::NoAction, + Some(bo) => ScoreUpdateResult::Ban(bo), + } + } +} + /// When attempting to ban a peer provides the peer manager with the operation that must be taken. pub enum BanOperation { // The peer is currently connected. Perform a graceful disconnect before banning at the swarm // level. DisconnectThePeer, - // The peer is disconnected, it has now been banned and can be banned at the swarm level. - ReadyToBan, + // The peer is disconnected, it has now been banned and can be banned at the swarm level. It + // stores a collection of banned IP addresses to inform the swarm. + ReadyToBan(Vec), // The peer is currently being disconnected, nothing to do. PeerDisconnecting, } @@ -93,6 +1117,14 @@ impl BannedPeersCount { self.banned_peers } + pub fn banned_ips(&self) -> HashSet { + self.banned_peers_per_ip + .iter() + .filter(|(_ip, count)| **count > BANNED_PEERS_PER_IP_THRESHOLD) + .map(|(ip, _count)| *ip) + .collect() + } + /// An IP is considered banned if more than BANNED_PEERS_PER_IP_THRESHOLD banned peers /// exist with this IP pub fn ip_is_banned(&self, ip: &IpAddr) -> bool { @@ -109,581 +1141,6 @@ impl BannedPeersCount { } } -impl PeerDB { - pub fn new(trusted_peers: Vec, log: &slog::Logger) -> Self { - // Initialize the peers hashmap with trusted peers - let peers = trusted_peers - .into_iter() - .map(|peer_id| (peer_id, PeerInfo::trusted_peer_info())) - .collect(); - Self { - log: log.clone(), - disconnected_peers: 0, - banned_peers_count: BannedPeersCount::new(), - peers, - } - } - - /* Getters */ - - /// Gives the score of a peer, or default score if it is unknown. - pub fn score(&self, peer_id: &PeerId) -> f64 { - self.peers - .get(peer_id) - .map_or(&Score::default(), |info| info.score()) - .score() - } - - /// Returns an iterator over all peers in the db. - pub fn peers(&self) -> impl Iterator)> { - self.peers.iter() - } - - /// Returns an iterator over all peers in the db. - pub(super) fn peers_mut(&mut self) -> impl Iterator)> { - self.peers.iter_mut() - } - - /// Gives the ids of all known peers. - pub fn peer_ids(&self) -> impl Iterator { - self.peers.keys() - } - - /// Returns a peer's info, if known. - pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - self.peers.get(peer_id) - } - - /// Returns a mutable reference to a peer's info if known. - pub fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { - self.peers.get_mut(peer_id) - } - - /// Returns if the peer is already connected. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Connected { .. }) - ) - } - - /// If we are connected or currently dialing the peer returns true. - pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Connected { .. }) - | Some(PeerConnectionStatus::Dialing { .. }) - ) - } - - /// If we are connected or in the process of disconnecting - pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Connected { .. }) - | Some(PeerConnectionStatus::Disconnecting { .. }) - ) - } - - /// Returns true if the peer should be dialed. This checks the connection state and the - /// score state and determines if the peer manager should dial this peer. - pub fn should_dial(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Disconnected { .. }) - | Some(PeerConnectionStatus::Unknown { .. }) - | None - ) && !self.is_banned_or_disconnected(peer_id) - } - - /// Returns true if the peer is synced at least to our current head. - pub fn is_synced(&self, peer_id: &PeerId) -> bool { - match self.peers.get(peer_id).map(|info| &info.sync_status) { - Some(PeerSyncStatus::Synced { .. }) => true, - Some(_) => false, - None => false, - } - } - - /// Returns the current [`BanResult`] of the peer. This doesn't check the connection state, rather the - /// underlying score of the peer. A peer may be banned but still in the connected state - /// temporarily. - /// - /// This is used to determine if we should accept incoming connections or not. - pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { - if let Some(peer) = self.peers.get(peer_id) { - match peer.score_state() { - ScoreState::Banned => BanResult::BadScore, - _ => { - if let Some(ip) = self.ip_is_banned(peer) { - BanResult::BannedIp(ip) - } else { - BanResult::NotBanned - } - } - } - } else { - BanResult::NotBanned - } - } - - /// Checks if the peer's known addresses are currently banned. - fn ip_is_banned(&self, peer: &PeerInfo) -> Option { - peer.seen_addresses() - .find(|ip| self.banned_peers_count.ip_is_banned(ip)) - } - - /// Returns true if the IP is banned. - pub fn is_ip_banned(&self, ip: &IpAddr) -> bool { - self.banned_peers_count.ip_is_banned(ip) - } - - /// Returns true if the Peer is either banned or in the disconnected state. - fn is_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { - if let Some(peer) = self.peers.get(peer_id) { - match peer.score_state() { - ScoreState::Banned | ScoreState::Disconnected => true, - _ => self.ip_is_banned(peer).is_some(), - } - } else { - false - } - } - - /// Gives the ids and info of all known connected peers. - pub fn connected_peers(&self) -> impl Iterator)> { - self.peers.iter().filter(|(_, info)| info.is_connected()) - } - - /// Gives the ids of all known connected peers. - pub fn connected_peer_ids(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_connected()) - .map(|(peer_id, _)| peer_id) - } - - /// Connected or dialing peers - pub fn connected_or_dialing_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_connected() || info.is_dialing()) - .map(|(peer_id, _)| peer_id) - } - - /// Connected outbound-only peers - pub fn connected_outbound_only_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_outbound_only()) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the `peer_id` of all known connected and synced peers. - pub fn synced_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| { - if info.sync_status.is_synced() || info.sync_status.is_advanced() { - return info.is_connected(); - } - false - }) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the `peer_id` of all known connected and advanced peers. - pub fn advanced_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| { - if info.sync_status.is_advanced() { - return info.is_connected(); - } - false - }) - .map(|(peer_id, _)| peer_id) - } - - /// Gives an iterator of all peers on a given subnet. - pub fn good_peers_on_subnet(&self, subnet: Subnet) -> impl Iterator { - self.peers - .iter() - .filter(move |(_, info)| { - // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers - info.is_connected() - && info.on_subnet_metadata(&subnet) - && info.on_subnet_gossipsub(&subnet) - && info.is_good_gossipsub_peer() - }) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the ids of all known disconnected peers. - pub fn disconnected_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_disconnected()) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the ids of all known banned peers. - pub fn banned_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_banned()) - .map(|(peer_id, _)| peer_id) - } - - /// Returns a vector of all connected peers sorted by score beginning with the worst scores. - /// Ties get broken randomly. - pub fn worst_connected_peers(&self) -> Vec<(&PeerId, &PeerInfo)> { - let mut connected = self - .peers - .iter() - .filter(|(_, info)| info.is_connected()) - .collect::>(); - - connected.shuffle(&mut rand::thread_rng()); - connected.sort_by_key(|(_, info)| info.score()); - connected - } - - /// Returns a vector containing peers (their ids and info), sorted by - /// score from highest to lowest, and filtered using `is_status` - pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> - where - F: Fn(&PeerInfo) -> bool, - { - let mut by_status = self - .peers - .iter() - .filter(|(_, info)| is_status(info)) - .collect::>(); - by_status.sort_by_key(|(_, info)| info.score()); - by_status.into_iter().rev().collect() - } - - /// Returns the peer with highest reputation that satisfies `is_status` - pub fn best_by_status(&self, is_status: F) -> Option<&PeerId> - where - F: Fn(&PeerInfo) -> bool, - { - self.peers - .iter() - .filter(|(_, info)| is_status(info)) - .max_by_key(|(_, info)| info.score()) - .map(|(id, _)| id) - } - - /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. - pub fn connection_status(&self, peer_id: &PeerId) -> Option { - self.peer_info(peer_id) - .map(|info| info.connection_status().clone()) - } - - /* Setters */ - - /// A peer is being dialed. - pub fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { - let info = self.peers.entry(*peer_id).or_default(); - info.enr = enr; - - if info.is_disconnected() { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - - if info.is_banned() { - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - } - - if let Err(e) = info.dialing_peer() { - error!(self.log, "{}", e; "peer_id" => %peer_id); - } - } - - /// Update min ttl of a peer. - pub fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) { - let info = self.peers.entry(*peer_id).or_default(); - - // only update if the ttl is longer - if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { - info.min_ttl = Some(min_ttl); - - let min_ttl_secs = min_ttl - .checked_duration_since(Instant::now()) - .map(|duration| duration.as_secs()) - .unwrap_or_else(|| 0); - debug!(self.log, "Updating the time a peer is required for"; "peer_id" => %peer_id, "future_min_ttl_secs" => min_ttl_secs); - } - } - - /// Extends the ttl of all peers on the given subnet that have a shorter - /// min_ttl than what's given. - pub fn extend_peers_on_subnet(&mut self, subnet: &Subnet, min_ttl: Instant) { - let log = &self.log; - self.peers.iter_mut() - .filter(move |(_, info)| { - info.is_connected() && info.on_subnet_metadata(subnet) && info.on_subnet_gossipsub(subnet) - }) - .for_each(|(peer_id,info)| { - if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { - info.min_ttl = Some(min_ttl); - } - let min_ttl_secs = min_ttl - .checked_duration_since(Instant::now()) - .map(|duration| duration.as_secs()) - .unwrap_or_else(|| 0); - trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => %peer_id, "min_ttl" => min_ttl_secs); - }); - } - - fn connect( - &mut self, - peer_id: &PeerId, - multiaddr: Multiaddr, - enr: Option, - direction: ConnectionDirection, - ) { - let info = self.peers.entry(*peer_id).or_default(); - info.enr = enr; - - if info.is_disconnected() { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - - if info.is_banned() { - error!(self.log, "Accepted a connection from a banned peer"; "peer_id" => %peer_id); - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - } - - // Add the seen ip address and port to the peer's info - let socket_addr = match multiaddr.iter().fold( - (None, None), - |(found_ip, found_port), protocol| match protocol { - Protocol::Ip4(ip) => (Some(ip.into()), found_port), - Protocol::Ip6(ip) => (Some(ip.into()), found_port), - Protocol::Tcp(port) => (found_ip, Some(port)), - _ => (found_ip, found_port), - }, - ) { - (Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)), - (Some(_ip), None) => { - crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id); - None - } - _ => None, - }; - - match direction { - ConnectionDirection::Incoming => info.connect_ingoing(socket_addr), - ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr), - } - } - /// Sets a peer as connected with an ingoing connection. - pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { - self.connect(peer_id, multiaddr, enr, ConnectionDirection::Incoming) - } - - /// Sets a peer as connected with an outgoing connection. - pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { - self.connect(peer_id, multiaddr, enr, ConnectionDirection::Outgoing) - } - - /// Sets the peer as disconnected. A banned peer remains banned. If the node has become banned, - /// this returns true, otherwise this is false. - pub fn inject_disconnect(&mut self, peer_id: &PeerId) -> bool { - // Note that it could be the case we prevent new nodes from joining. In this instance, - // we don't bother tracking the new node. - if let Some(info) = self.peers.get_mut(peer_id) { - if !matches!( - info.connection_status(), - PeerConnectionStatus::Disconnected { .. } | PeerConnectionStatus::Banned { .. } - ) { - self.disconnected_peers += 1; - } - let result = info.notify_disconnect().unwrap_or(false); - self.shrink_to_fit(); - result - } else { - false - } - } - - /// Notifies the peer manager that the peer is undergoing a normal disconnect. Optionally tag - /// the peer to be banned after the disconnect. - pub fn notify_disconnecting(&mut self, peer_id: PeerId, to_ban_afterwards: bool) { - let peer_info = self.peers.entry(peer_id).or_default(); - - if matches!( - peer_info.connection_status(), - PeerConnectionStatus::Disconnected { .. } - ) { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - peer_info.disconnecting(to_ban_afterwards); - } - - /// Marks a peer to be disconnected and then banned. - /// Returns true if the peer is currently connected and false otherwise. - // NOTE: If the peer's score is not already low enough to be banned, this will decrease the - // peer's score to be a banned state. - pub fn disconnect_and_ban(&mut self, peer_id: &PeerId) -> BanOperation { - let log_ref = &self.log; - let info = self.peers.entry(*peer_id).or_insert_with(|| { - warn!(log_ref, "Banning unknown peer"; - "peer_id" => %peer_id); - PeerInfo::default() - }); - - // Ban the peer if the score is not already low enough. - match info.score_state() { - ScoreState::Banned => {} - _ => { - // If score isn't low enough to ban, this function has been called incorrectly. - error!(self.log, "Banning a peer with a good score"; "peer_id" => %peer_id); - info.apply_peer_action_to_score(super::score::PeerAction::Fatal); - } - } - - // Check and verify all the connection states - match info.connection_status() { - PeerConnectionStatus::Disconnected { .. } => { - // It is possible to ban a peer that has a disconnected score, if there are many - // events that score it poorly and are processed after it has disconnected. - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - info.update_state(); - self.banned_peers_count - .add_banned_peer(info.seen_addresses()); - self.shrink_to_fit(); - BanOperation::ReadyToBan - } - PeerConnectionStatus::Disconnecting { .. } => { - // NOTE: This can occur due a rapid downscore of a peer. It goes through the - // disconnection phase and straight into banning in a short time-frame. - debug!(log_ref, "Banning peer that is currently disconnecting"; "peer_id" => %peer_id); - info.disconnecting(true); - BanOperation::PeerDisconnecting - } - PeerConnectionStatus::Banned { .. } => { - error!(log_ref, "Banning already banned peer"; "peer_id" => %peer_id); - BanOperation::ReadyToBan - } - PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. } => { - // update the state - info.disconnecting(true); - BanOperation::DisconnectThePeer - } - PeerConnectionStatus::Unknown => { - // shift the peer straight to banned - warn!(log_ref, "Banning a peer of unknown connection state"; "peer_id" => %peer_id); - self.banned_peers_count - .add_banned_peer(info.seen_addresses()); - info.update_state(); - self.shrink_to_fit(); - BanOperation::ReadyToBan - } - } - } - - /// Unbans a peer. - /// - /// This should only be called once a peer's score is no longer banned. - /// If this is called for a banned peer, it will error. - /// - /// This updates the connection state of the peer and updates the number of banned peers in the - /// peerdb. - pub fn unban(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { - let log_ref = &self.log; - let info = self.peers.entry(*peer_id).or_insert_with(|| { - warn!(log_ref, "UnBanning unknown peer"; - "peer_id" => %peer_id); - PeerInfo::default() - }); - - if let ScoreState::Banned = info.score_state() { - return Err("Attempted to unban (connection status) a banned peer"); - } - - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - - // Update the connection state - info.update_state(); - - // This transitions a banned peer to a disconnected peer - self.disconnected_peers = self.disconnected_peers().count().saturating_add(1); - self.shrink_to_fit(); - Ok(()) - } - - /// Removes banned and disconnected peers from the DB if we have reached any of our limits. - /// Drops the peers with the lowest reputation so that the number of - /// disconnected peers is less than MAX_DC_PEERS - pub fn shrink_to_fit(&mut self) { - // Remove excess banned peers - while self.banned_peers_count.banned_peers() > MAX_BANNED_PEERS { - if let Some(to_drop) = if let Some((id, info, _)) = self - .peers - .iter() - .filter_map(|(id, info)| match info.connection_status() { - PeerConnectionStatus::Banned { since } => Some((id, info, since)), - _ => None, - }) - .min_by_key(|(_, _, since)| *since) - { - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - Some(*id) - } else { - // If there is no minimum, this is a coding error. - crit!( - self.log, - "banned_peers > MAX_BANNED_PEERS despite no banned peers in db!" - ); - // reset banned_peers this will also exit the loop - self.banned_peers_count = BannedPeersCount::new(); - None - } { - debug!(self.log, "Removing old banned peer"; "peer_id" => %to_drop); - self.peers.remove(&to_drop); - } - } - - // Remove excess disconnected peers - while self.disconnected_peers > MAX_DC_PEERS { - if let Some(to_drop) = self - .peers - .iter() - .filter(|(_, info)| info.is_disconnected()) - .filter_map(|(id, info)| match info.connection_status() { - PeerConnectionStatus::Disconnected { since } => Some((id, since)), - _ => None, - }) - .min_by_key(|(_, since)| *since) - .map(|(id, _)| *id) - { - debug!(self.log, "Removing old disconnected peer"; "peer_id" => %to_drop); - self.peers.remove(&to_drop); - } - // If there is no minimum, this is a coding error. For safety we decrease - // the count to avoid a potential infinite loop. - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - } - - /// Add the meta data of a peer. - pub fn add_metadata(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.peers.get_mut(peer_id) { - peer_info.meta_data = Some(meta_data); - } else { - warn!(self.log, "Tried to add meta data for a non-existent peer"; "peer_id" => %peer_id); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -769,6 +1226,88 @@ mod tests { assert_eq!(pdb.connected_outbound_only_peers().count(), 1); } + #[test] + fn test_disconnected_removed_in_correct_order() { + let mut pdb = get_db(); + + use std::collections::BTreeMap; + let mut peer_list = BTreeMap::new(); + for id in 0..MAX_DC_PEERS + 1 { + let new_peer = PeerId::random(); + pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_list.insert(id, new_peer); + } + assert_eq!(pdb.disconnected_peers, 0); + + for (_, p) in peer_list.iter() { + pdb.inject_disconnect(p); + // Allow the timing to update correctly + } + assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + + // Only the oldest peer should have been removed + for (id, peer_id) in peer_list.iter().rev().take(MAX_DC_PEERS) { + println!("Testing id {}", id); + assert!( + pdb.peer_info(peer_id).is_some(), + "Latest peer should not be pruned" + ); + } + + assert!( + pdb.peer_info(peer_list.iter().next().unwrap().1).is_none(), + "First peer should be removed" + ); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + } + + #[test] + fn new_connection_should_remain() { + let mut pdb = get_db(); + + use std::collections::BTreeMap; + let mut peer_list = BTreeMap::new(); + for id in 0..MAX_DC_PEERS + 20 { + let new_peer = PeerId::random(); + pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_list.insert(id, new_peer); + } + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + for (_, p) in peer_list.iter() { + pdb.inject_disconnect(p); + } + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + println!("{}", pdb.disconnected_peers); + + peer_list.clear(); + for id in 0..MAX_DC_PEERS + 20 { + let new_peer = PeerId::random(); + pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_list.insert(id, new_peer); + } + + let new_peer = PeerId::random(); + // New peer gets its min_ttl updated because it exists on a subnet + let min_ttl = Instant::now() + std::time::Duration::from_secs(12); + + pdb.update_min_ttl(&new_peer, min_ttl); + // Peer then gets dialed + pdb.dialing_peer(&new_peer, None); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + // Dialing fails, remove the peer + pdb.inject_disconnect(&new_peer); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + + assert!( + pdb.peer_info(&new_peer).is_some(), + "Peer should exist as disconnected" + ); + + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + println!("{}", pdb.disconnected_peers); + } + #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); @@ -782,6 +1321,7 @@ mod tests { for p in pdb.connected_peer_ids().cloned().collect::>() { pdb.inject_disconnect(&p); } + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS); } @@ -797,9 +1337,8 @@ mod tests { assert_eq!(pdb.banned_peers_count.banned_peers(), 0); for p in pdb.connected_peer_ids().cloned().collect::>() { - pdb.disconnect_and_ban(&p); + let _ = pdb.report_peer(&p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&p); - pdb.disconnect_and_ban(&p); } assert_eq!(pdb.banned_peers_count.banned_peers(), MAX_BANNED_PEERS); @@ -867,9 +1406,9 @@ mod tests { pdb.inject_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - pdb.disconnect_and_ban(&random_peer); + let _ = pdb.report_peer(&random_peer, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer); - pdb.disconnect_and_ban(&random_peer); + let _ = pdb.report_peer(&random_peer, PeerAction::Fatal, ReportSource::PeerManager); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); pdb.inject_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); @@ -906,7 +1445,7 @@ mod tests { assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), - pdb.banned_peers().count() + pdb.banned_peers_by_score().count() ); // Should be no disconnected peers @@ -922,7 +1461,7 @@ mod tests { pdb.disconnected_peers, pdb.banned_peers_count.banned_peers ); // Disconnect and ban peer 2 - pdb.disconnect_and_ban(&random_peer2); + let _ = pdb.report_peer(&random_peer2, PeerAction::Fatal, ReportSource::PeerManager); // Should be 1 disconnected peer and one peer in the process of being disconnected println!( "3:{},{}", @@ -936,7 +1475,7 @@ mod tests { pdb.disconnected_peers, pdb.banned_peers_count.banned_peers ); // Now that the peer is disconnected, register the ban. - pdb.disconnect_and_ban(&random_peer2); + let _ = pdb.report_peer(&random_peer2, PeerAction::Fatal, ReportSource::PeerManager); // There should be 1 disconnected peer and one banned peer. println!( "5:{},{}", @@ -950,7 +1489,7 @@ mod tests { pdb.banned_peers().count() ); // Now ban peer 1. - pdb.disconnect_and_ban(&random_peer1); + let _ = pdb.report_peer(&random_peer1, PeerAction::Fatal, ReportSource::PeerManager); // There should be no disconnected peers and 2 banned peers println!( "6:{},{}", @@ -964,7 +1503,7 @@ mod tests { pdb.disconnected_peers, pdb.banned_peers_count.banned_peers ); // Same thing here. - pdb.disconnect_and_ban(&random_peer1); + let _ = pdb.report_peer(&random_peer1, PeerAction::Fatal, ReportSource::PeerManager); println!( "8:{},{}", pdb.disconnected_peers, pdb.banned_peers_count.banned_peers @@ -1000,9 +1539,8 @@ mod tests { ); // Ban peer 3 - pdb.disconnect_and_ban(&random_peer3); + let _ = pdb.report_peer(&random_peer3, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer3); - pdb.disconnect_and_ban(&random_peer3); // This should add a new banned peer, there should be 0 disconnected and 2 banned // peers (peer1 and peer3) @@ -1018,9 +1556,8 @@ mod tests { ); // Ban peer 3 - pdb.disconnect_and_ban(&random_peer3); + let _ = pdb.report_peer(&random_peer3, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer3); - pdb.disconnect_and_ban(&random_peer3); // Should still have 2 banned peers println!( @@ -1049,9 +1586,8 @@ mod tests { ); // Ban peer 3 - pdb.disconnect_and_ban(&random_peer3); + let _ = pdb.report_peer(&random_peer3, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer3); - pdb.disconnect_and_ban(&random_peer3); // Should have 1 disconnect (peer 2) and one banned (peer 3) println!( @@ -1101,9 +1637,8 @@ mod tests { ); // Ban peer 0 - pdb.disconnect_and_ban(&random_peer); + let _ = pdb.report_peer(&random_peer, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer); - pdb.disconnect_and_ban(&random_peer); // Should have 1 disconnect ( peer 2) and two banned (peer0, peer 3) println!( @@ -1154,9 +1689,8 @@ mod tests { let p5 = connect_peer_with_ips(&mut pdb, vec![ip5]); for p in &peers[..BANNED_PEERS_PER_IP_THRESHOLD + 1] { - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } //check that ip1 and ip2 are banned but ip3-5 not @@ -1167,9 +1701,12 @@ mod tests { assert!(!pdb.ban_status(&p5).is_banned()); //ban also the last peer in peers - pdb.disconnect_and_ban(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); + let _ = pdb.report_peer( + &peers[BANNED_PEERS_PER_IP_THRESHOLD + 1], + PeerAction::Fatal, + ReportSource::PeerManager, + ); pdb.inject_disconnect(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); - pdb.disconnect_and_ban(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); //check that ip1-ip4 are banned but ip5 not assert!(pdb.ban_status(&p1).is_banned()); @@ -1180,7 +1717,7 @@ mod tests { //peers[0] gets unbanned reset_score(&mut pdb, &peers[0]); - pdb.unban(&peers[0]).unwrap(); + pdb.update_connection_state(&peers[0], NewConnectionState::Unbanned); //nothing changed assert!(pdb.ban_status(&p1).is_banned()); @@ -1191,7 +1728,7 @@ mod tests { //peers[1] gets unbanned reset_score(&mut pdb, &peers[1]); - pdb.unban(&peers[1]).unwrap(); + pdb.update_connection_state(&peers[1], NewConnectionState::Unbanned); //all ips are unbanned assert!(!pdb.ban_status(&p1).is_banned()); @@ -1218,9 +1755,8 @@ mod tests { // ban all peers for p in &peers { - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } // check ip is banned @@ -1229,20 +1765,25 @@ mod tests { // unban a peer reset_score(&mut pdb, &peers[0]); - pdb.unban(&peers[0]).unwrap(); + pdb.update_connection_state(&peers[0], NewConnectionState::Unbanned); // check not banned anymore assert!(!pdb.ban_status(&p1).is_banned()); assert!(!pdb.ban_status(&p2).is_banned()); + // unban all peers + for p in &peers { + reset_score(&mut pdb, p); + pdb.update_connection_state(p, NewConnectionState::Unbanned); + } + // add ip2 to all peers and ban them. let mut socker_addr = Multiaddr::from(ip2); socker_addr.push(Protocol::Tcp(8080)); for p in &peers { pdb.connect_ingoing(p, socker_addr.clone(), None); - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } // both IP's are now banned @@ -1252,24 +1793,22 @@ mod tests { // unban all peers for p in &peers { reset_score(&mut pdb, p); - pdb.unban(p).unwrap(); + pdb.update_connection_state(p, NewConnectionState::Unbanned); } // reban every peer except one for p in &peers[1..] { - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } // nothing is banned assert!(!pdb.ban_status(&p1).is_banned()); assert!(!pdb.ban_status(&p2).is_banned()); - //reban last peer - pdb.disconnect_and_ban(&peers[0]); + // reban last peer + let _ = pdb.report_peer(&peers[0], PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&peers[0]); - pdb.disconnect_and_ban(&peers[0]); //Ip's are banned again assert!(pdb.ban_status(&p1).is_banned()); @@ -1286,7 +1825,7 @@ mod tests { pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Check trusted status and score - assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted); + assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted()); assert_eq!( pdb.peer_info(&trusted_peer).unwrap().score().score(), Score::max_score().score() diff --git a/beacon_node/eth2_libp2p/src/peer_manager/client.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/client.rs similarity index 100% rename from beacon_node/eth2_libp2p/src/peer_manager/client.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/client.rs diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/peer_info.rs similarity index 70% rename from beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/peer_info.rs index 8d3912041..82aaefc63 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/peer_info.rs @@ -1,6 +1,6 @@ use super::client::Client; use super::score::{PeerAction, Score, ScoreState}; -use super::PeerSyncStatus; +use super::sync_status::SyncStatus; use crate::Multiaddr; use crate::{rpc::MetaData, types::Subnet}; use discv5::Enr; @@ -24,34 +24,34 @@ pub struct PeerInfo { /// The peers reputation score: Score, /// Client managing this peer - pub client: Client, + client: Client, /// Connection status of this peer connection_status: PeerConnectionStatus, /// The known listening addresses of this peer. This is given by identify and can be arbitrary /// (including local IPs). - pub listening_addresses: Vec, + listening_addresses: Vec, /// This is addresses we have physically seen and this is what we use for banning/un-banning /// peers. - pub seen_addresses: HashSet, + seen_addresses: HashSet, /// The current syncing state of the peer. The state may be determined after it's initial /// connection. - pub sync_status: PeerSyncStatus, + sync_status: SyncStatus, /// The ENR subnet bitfield of the peer. This may be determined after it's initial /// connection. - pub meta_data: Option>, + meta_data: Option>, /// Subnets the peer is connected to. - pub subnets: HashSet, + subnets: HashSet, /// The time we would like to retain this peer. After this time, the peer is no longer /// necessary. #[serde(skip)] - pub min_ttl: Option, + min_ttl: Option, /// Is the peer a trusted peer. - pub is_trusted: bool, + is_trusted: bool, /// Direction of the first connection of the last (or current) connected session with this peer. /// None if this peer was never connected. - pub connection_direction: Option, + connection_direction: Option, /// The enr of the peer, if known. - pub enr: Option, + enr: Option, } impl Default for PeerInfo { @@ -64,7 +64,7 @@ impl Default for PeerInfo { listening_addresses: Vec::new(), seen_addresses: HashSet::new(), subnets: HashSet::new(), - sync_status: PeerSyncStatus::Unknown, + sync_status: SyncStatus::Unknown, meta_data: None, min_ttl: None, is_trusted: false, @@ -101,13 +101,59 @@ impl PeerInfo { false } + /// Obtains the client of the peer. + pub fn client(&self) -> &Client { + &self.client + } + + /// Returns the listening addresses of the Peer. + pub fn listening_addresses(&self) -> &Vec { + &self.listening_addresses + } + + /// Returns the connection direction for the peer. + pub fn connection_direction(&self) -> Option<&ConnectionDirection> { + self.connection_direction.as_ref() + } + + /// Returns the sync status of the peer. + pub fn sync_status(&self) -> &SyncStatus { + &self.sync_status + } + + /// Returns the metadata for the peer if currently known. + pub fn meta_data(&self) -> Option<&MetaData> { + self.meta_data.as_ref() + } + + /// Returns whether the peer is a trusted peer or not. + pub fn is_trusted(&self) -> bool { + self.is_trusted + } + + /// The time a peer is expected to be useful until for an attached validator. If this is set to + /// None, the peer is not required for any upcoming duty. + pub fn min_ttl(&self) -> Option<&Instant> { + self.min_ttl.as_ref() + } + + /// The ENR of the peer if it is known. + pub fn enr(&self) -> Option<&Enr> { + self.enr.as_ref() + } + /// Returns if the peer is subscribed to a given `Subnet` from the gossipsub subscriptions. pub fn on_subnet_gossipsub(&self, subnet: &Subnet) -> bool { self.subnets.contains(subnet) } - /// Returns the seen IP addresses of the peer. - pub fn seen_addresses(&self) -> impl Iterator + '_ { + /// Returns the seen addresses of the peer. + pub fn seen_addresses(&self) -> impl Iterator + '_ { + self.seen_addresses.iter() + } + + /// Returns a list of seen IP addresses for the peer. + pub fn seen_ip_addresses(&self) -> impl Iterator + '_ { self.seen_addresses .iter() .map(|socket_addr| socket_addr.ip()) @@ -133,34 +179,11 @@ impl PeerInfo { self.score.state() } - /// Applies decay rates to a non-trusted peer's score. - pub fn score_update(&mut self) { - if !self.is_trusted { - self.score.update() - } - } - - /// Apply peer action to a non-trusted peer's score. - pub fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) { - if !self.is_trusted { - self.score.apply_peer_action(peer_action) - } - } - - pub(crate) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) { - self.score.update_gossipsub_score(new_score, ignore); - } - + /// Returns true if the gossipsub score is sufficient. pub fn is_good_gossipsub_peer(&self) -> bool { self.score.is_good_gossipsub_peer() } - #[cfg(test)] - /// Resets the peers score. - pub fn reset_score(&mut self) { - self.score.test_reset(); - } - /* Peer connection status API */ /// Checks if the status is connected. @@ -181,8 +204,14 @@ impl PeerInfo { self.is_connected() || self.is_dialing() } - /// Checks if the status is banned. + /// Checks if the connection status is banned. This can lag behind the score state + /// temporarily. pub fn is_banned(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Banned { .. }) + } + + /// Checks if the peer's score is banned. + pub fn score_is_banned(&self) -> bool { matches!(self.score.state(), ScoreState::Banned) } @@ -204,53 +233,95 @@ impl PeerInfo { } } - // Setters + /* Mutable Functions */ - /// Modifies the status to Disconnected and sets the last seen instant to now. Returns None if - /// no changes were made. Returns Some(bool) where the bool represents if peer is to now be - /// banned. - pub fn notify_disconnect(&mut self) -> Option { - match self.connection_status { - Banned { .. } | Disconnected { .. } => None, - Disconnecting { to_ban } => { - self.connection_status = Disconnected { - since: Instant::now(), - }; - Some(to_ban) - } - Connected { .. } | Dialing { .. } | Unknown => { - self.connection_status = Disconnected { - since: Instant::now(), - }; - Some(false) - } + /// Updates the sync status. Returns true if the status was changed. + // VISIBILITY: Both the peer manager the network sync is able to update the sync state of a peer + pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool { + self.sync_status.update(sync_status) + } + + /// Sets the client of the peer. + // VISIBILITY: The peer manager is able to set the client + pub(in crate::peer_manager) fn set_client(&mut self, client: Client) { + self.client = client + } + + /// Replaces the current listening addresses with those specified, returning the current + /// listening addresses. + // VISIBILITY: The peer manager is able to set the listening addresses + pub(in crate::peer_manager) fn set_listening_addresses( + &mut self, + listening_addresses: Vec, + ) -> Vec { + std::mem::replace(&mut self.listening_addresses, listening_addresses) + } + + /// Sets an explicit value for the meta data. + // VISIBILITY: The peer manager is able to adjust the meta_data + pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData) { + self.meta_data = Some(meta_data) + } + + /// Sets the connection status of the peer. + pub(super) fn set_connection_status(&mut self, connection_status: PeerConnectionStatus) { + self.connection_status = connection_status + } + + /// Sets the ENR of the peer if one is known. + pub(super) fn set_enr(&mut self, enr: Enr) { + self.enr = Some(enr) + } + + /// Sets the time that the peer is expected to be needed until for an attached validator duty. + pub(super) fn set_min_ttl(&mut self, min_ttl: Instant) { + self.min_ttl = Some(min_ttl) + } + + /// Adds a known subnet for the peer. + pub(super) fn insert_subnet(&mut self, subnet: Subnet) { + self.subnets.insert(subnet); + } + + /// Removes a subnet from the peer. + pub(super) fn remove_subnet(&mut self, subnet: &Subnet) { + self.subnets.remove(subnet); + } + + /// Removes all subnets from the peer. + pub(super) fn clear_subnets(&mut self) { + self.subnets.clear() + } + + /// Applies decay rates to a non-trusted peer's score. + pub(super) fn score_update(&mut self) { + if !self.is_trusted { + self.score.update() } } - /// Notify the we are currently disconnecting this peer. Optionally ban the peer after the - /// disconnect. - pub fn disconnecting(&mut self, to_ban: bool) { - self.connection_status = Disconnecting { to_ban } + /// Apply peer action to a non-trusted peer's score. + // VISIBILITY: The peer manager is able to modify the score of a peer. + pub(in crate::peer_manager) fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) { + if !self.is_trusted { + self.score.apply_peer_action(peer_action) + } } - /// Modifies the status to banned or unbanned based on the underlying score. - pub fn update_state(&mut self) { - match (&self.connection_status, self.score.state()) { - (Disconnected { .. } | Unknown, ScoreState::Banned) => { - self.connection_status = Banned { - since: Instant::now(), - } - } - (Banned { since }, ScoreState::Healthy | ScoreState::Disconnected) => { - self.connection_status = Disconnected { since: *since } - } - (_, _) => {} - } + /// Updates the gossipsub score with a new score. Optionally ignore the gossipsub score. + pub(super) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) { + self.score.update_gossipsub_score(new_score, ignore); + } + + #[cfg(test)] + /// Resets the peers score. + pub fn reset_score(&mut self) { + self.score.test_reset(); } /// Modifies the status to Dialing /// Returns an error if the current state is unexpected. - pub(crate) fn dialing_peer(&mut self) -> Result<(), &'static str> { + pub(super) fn dialing_peer(&mut self) -> Result<(), &'static str> { match &mut self.connection_status { Connected { .. } => return Err("Dialing connected peer"), Dialing { .. } => return Err("Dialing an already dialing peer"), @@ -265,7 +336,7 @@ impl PeerInfo { /// Modifies the status to Connected and increases the number of ingoing /// connections by one - pub(crate) fn connect_ingoing(&mut self, seen_address: Option) { + pub(super) fn connect_ingoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_in, .. } => *n_in += 1, Disconnected { .. } @@ -285,7 +356,7 @@ impl PeerInfo { /// Modifies the status to Connected and increases the number of outgoing /// connections by one - pub(crate) fn connect_outgoing(&mut self, seen_address: Option) { + pub(super) fn connect_outgoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_out, .. } => *n_out += 1, Disconnected { .. } @@ -335,7 +406,9 @@ impl Default for PeerStatus { #[derive(Debug, Clone, Serialize, AsRefStr)] #[strum(serialize_all = "snake_case")] pub enum ConnectionDirection { + /// The connection was established by a peer dialing us. Incoming, + /// The connection was established by us dialing a peer. Outgoing, } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/score.rs similarity index 100% rename from beacon_node/eth2_libp2p/src/peer_manager/score.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/score.rs diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/sync_status.rs similarity index 61% rename from beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/sync_status.rs index 1aab570d3..bab8aa9ae 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/sync_status.rs @@ -5,7 +5,7 @@ use types::{Epoch, Hash256, Slot}; #[derive(Clone, Debug, Serialize)] /// The current sync status of the peer. -pub enum PeerSyncStatus { +pub enum SyncStatus { /// At the current state as our node or ahead of us. Synced { info: SyncInfo }, /// The peer has greater knowledge about the canonical chain than we do. @@ -27,46 +27,40 @@ pub struct SyncInfo { pub finalized_root: Hash256, } -impl std::cmp::PartialEq for PeerSyncStatus { +impl std::cmp::PartialEq for SyncStatus { fn eq(&self, other: &Self) -> bool { matches!( (self, other), - (PeerSyncStatus::Synced { .. }, PeerSyncStatus::Synced { .. }) - | ( - PeerSyncStatus::Advanced { .. }, - PeerSyncStatus::Advanced { .. } - ) - | (PeerSyncStatus::Behind { .. }, PeerSyncStatus::Behind { .. }) - | ( - PeerSyncStatus::IrrelevantPeer, - PeerSyncStatus::IrrelevantPeer - ) - | (PeerSyncStatus::Unknown, PeerSyncStatus::Unknown) + (SyncStatus::Synced { .. }, SyncStatus::Synced { .. }) + | (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. }) + | (SyncStatus::Behind { .. }, SyncStatus::Behind { .. }) + | (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer) + | (SyncStatus::Unknown, SyncStatus::Unknown) ) } } -impl PeerSyncStatus { +impl SyncStatus { /// Returns true if the peer has advanced knowledge of the chain. pub fn is_advanced(&self) -> bool { - matches!(self, PeerSyncStatus::Advanced { .. }) + matches!(self, SyncStatus::Advanced { .. }) } /// Returns true if the peer is up to date with the current chain. pub fn is_synced(&self) -> bool { - matches!(self, PeerSyncStatus::Synced { .. }) + matches!(self, SyncStatus::Synced { .. }) } /// Returns true if the peer is behind the current chain. pub fn is_behind(&self) -> bool { - matches!(self, PeerSyncStatus::Behind { .. }) + matches!(self, SyncStatus::Behind { .. }) } /// Updates the peer's sync status, returning whether the status transitioned. /// /// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if /// the status remained `Synced` with different `SyncInfo` within. - pub fn update(&mut self, new_state: PeerSyncStatus) -> bool { + pub fn update(&mut self, new_state: SyncStatus) -> bool { let changed_status = *self != new_state; *self = new_state; changed_status @@ -74,16 +68,16 @@ impl PeerSyncStatus { pub fn as_str(&self) -> &'static str { match self { - PeerSyncStatus::Advanced { .. } => "Advanced", - PeerSyncStatus::Behind { .. } => "Behind", - PeerSyncStatus::Synced { .. } => "Synced", - PeerSyncStatus::Unknown => "Unknown", - PeerSyncStatus::IrrelevantPeer => "Irrelevant", + SyncStatus::Advanced { .. } => "Advanced", + SyncStatus::Behind { .. } => "Behind", + SyncStatus::Synced { .. } => "Synced", + SyncStatus::Unknown => "Unknown", + SyncStatus::IrrelevantPeer => "Irrelevant", } } } -impl std::fmt::Display for PeerSyncStatus { +impl std::fmt::Display for SyncStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(self.as_str()) } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 66bec0db4..5e7add8ac 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -283,7 +283,7 @@ impl Service { self.swarm .behaviour_mut() .peer_manager_mut() - .report_peer(peer_id, action, source); + .report_peer(peer_id, action, source, None); } /// Disconnect and ban a peer, providing a reason. diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 5c3b0690d..638270c2b 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -1,5 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. -use crate::peer_manager::PeerDB; +use crate::peer_manager::peerdb::PeerDB; use crate::rpc::MetaData; use crate::types::{BackFillState, SyncState}; use crate::Client; @@ -117,7 +117,7 @@ impl NetworkGlobals { self.peers .read() .peer_info(peer_id) - .map(|info| info.client.clone()) + .map(|info| info.client().clone()) .unwrap_or_default() } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d2e40cde2..2412b1f54 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1620,23 +1620,21 @@ pub fn serve( })?; if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { - let address = if let Some(socket_addr) = - peer_info.seen_addresses.iter().next() - { + let address = if let Some(socket_addr) = peer_info.seen_addresses().next() { let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port())); addr.to_string() - } else if let Some(addr) = peer_info.listening_addresses.first() { + } else if let Some(addr) = peer_info.listening_addresses().first() { addr.to_string() } else { String::new() }; // the eth2 API spec implies only peers we have been connected to at some point should be included. - if let Some(dir) = peer_info.connection_direction.as_ref() { + if let Some(dir) = peer_info.connection_direction().as_ref() { return Ok(api_types::GenericResponse::from(api_types::PeerData { peer_id: peer_id.to_string(), - enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + enr: peer_info.enr().map(|enr| enr.to_base64()), last_seen_p2p_address: address, direction: api_types::PeerDirection::from_connection_direction(dir), state: api_types::PeerState::from_peer_connection_status( @@ -1669,20 +1667,20 @@ pub fn serve( .peers() .for_each(|(peer_id, peer_info)| { let address = - if let Some(socket_addr) = peer_info.seen_addresses.iter().next() { + if let Some(socket_addr) = peer_info.seen_addresses().next() { let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); addr.push(eth2_libp2p::multiaddr::Protocol::Tcp( socket_addr.port(), )); addr.to_string() - } else if let Some(addr) = peer_info.listening_addresses.first() { + } else if let Some(addr) = peer_info.listening_addresses().first() { addr.to_string() } else { String::new() }; // the eth2 API spec implies only peers we have been connected to at some point should be included. - if let Some(dir) = peer_info.connection_direction.as_ref() { + if let Some(dir) = peer_info.connection_direction() { let direction = api_types::PeerDirection::from_connection_direction(dir); let state = api_types::PeerState::from_peer_connection_status( @@ -1700,7 +1698,7 @@ pub fn serve( if state_matches && direction_matches { peers.push(api_types::PeerData { peer_id: peer_id.to_string(), - enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + enr: peer_info.enr().map(|enr| enr.to_base64()), last_seen_p2p_address: address, direction, state, diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs index bca7bbd62..69d67424a 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/tests/common.rs @@ -7,7 +7,7 @@ use eth2_libp2p::{ discv5::enr::{CombinedKey, EnrBuilder}, rpc::methods::{MetaData, MetaDataV2}, types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState}, - Enr, NetworkGlobals, PeerId, + ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager, }; use http_api::{Config, Context}; use network::NetworkMessage; @@ -46,7 +46,7 @@ pub struct ApiServer> { } impl InteractiveTester { - pub fn new(spec: Option, validator_count: usize) -> Self { + pub async fn new(spec: Option, validator_count: usize) -> Self { let harness = BeaconChainHarness::new( E::default(), spec, @@ -59,7 +59,7 @@ impl InteractiveTester { shutdown_tx: _server_shutdown, network_rx, .. - } = create_api_server(harness.chain.clone(), harness.logger().clone()); + } = create_api_server(harness.chain.clone(), harness.logger().clone()).await; tokio::spawn(server); @@ -82,7 +82,7 @@ impl InteractiveTester { } } -pub fn create_api_server( +pub async fn create_api_server( chain: Arc>, log: Logger, ) -> ApiServer> { @@ -96,15 +96,30 @@ pub fn create_api_server( }); let enr_key = CombinedKey::generate_secp256k1(); let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let network_globals = - NetworkGlobals::new(enr.clone(), TCP_PORT, UDP_PORT, meta_data, vec![], &log); + let network_globals = Arc::new(NetworkGlobals::new( + enr.clone(), + TCP_PORT, + UDP_PORT, + meta_data, + vec![], + &log, + )); + // Only a peer manager can add peers, so we create a dummy manager. + let network_config = NetworkConfig::default(); + let mut pm = PeerManager::new(&network_config, network_globals.clone(), &log) + .await + .unwrap(); + + // add a peer let peer_id = PeerId::random(); - network_globals - .peers - .write() - .connect_ingoing(&peer_id, EXTERNAL_ADDR.parse().unwrap(), None); + let connected_point = ConnectedPoint::Listener { + local_addr: EXTERNAL_ADDR.parse().unwrap(), + send_back_addr: EXTERNAL_ADDR.parse().unwrap(), + }; + let num_established = std::num::NonZeroU32::new(1).unwrap(); + pm.inject_connection_established(peer_id, connected_point, num_established, None); *network_globals.sync_state.write() = SyncState::Synced; let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); @@ -119,7 +134,7 @@ pub fn create_api_server( }, chain: Some(chain.clone()), network_tx: Some(network_tx), - network_globals: Some(Arc::new(network_globals)), + network_globals: Some(network_globals), eth1_service: Some(eth1_service), log, }); diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 9286b60af..6b4f79fa5 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -17,7 +17,7 @@ async fn sync_committee_duties_across_fork() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; @@ -102,7 +102,7 @@ async fn attestations_across_fork_with_skip_slots() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; @@ -152,7 +152,7 @@ async fn sync_contributions_across_fork_with_skip_slots() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; @@ -199,7 +199,7 @@ async fn sync_committee_indices_across_fork() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index dfc81afdc..64ce3b656 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -18,7 +18,7 @@ async fn deposit_contract_custom_network() { // Arbitrary contract address. spec.deposit_contract_address = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".parse().unwrap(); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let client = &tester.client; let result = client.get_config_deposit_contract().await.unwrap().data; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 20cb8084f..86762aeb4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -63,14 +63,14 @@ struct ApiTester { } impl ApiTester { - pub fn new() -> Self { + pub async fn new() -> Self { // This allows for testing voluntary exits without building out a massive chain. let mut spec = E::default_spec(); spec.shard_committee_period = 2; - Self::new_from_spec(spec) + Self::new_from_spec(spec).await } - pub fn new_from_spec(spec: ChainSpec) -> Self { + pub async fn new_from_spec(spec: ChainSpec) -> Self { let harness = BeaconChainHarness::new( MainnetEthSpec, Some(spec.clone()), @@ -181,7 +181,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, - } = create_api_server(chain.clone(), log); + } = create_api_server(chain.clone(), log).await; tokio::spawn(server); @@ -213,7 +213,7 @@ impl ApiTester { } } - pub fn new_from_genesis() -> Self { + pub async fn new_from_genesis() -> Self { let harness = BeaconChainHarness::new( MainnetEthSpec, None, @@ -260,7 +260,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, - } = create_api_server(chain.clone(), log); + } = create_api_server(chain.clone(), log).await; tokio::spawn(server); @@ -2453,7 +2453,7 @@ async fn poll_events, eth2::Error>> + Unpin #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events() { - ApiTester::new().test_get_events().await; + ApiTester::new().await.test_get_events().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -2461,6 +2461,7 @@ async fn get_events_altair() { let mut spec = E::default_spec(); spec.altair_fork_epoch = Some(Epoch::new(0)); ApiTester::new_from_spec(spec) + .await .test_get_events_altair() .await; } @@ -2468,6 +2469,7 @@ async fn get_events_altair() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events_from_genesis() { ApiTester::new_from_genesis() + .await .test_get_events_from_genesis() .await; } @@ -2475,6 +2477,7 @@ async fn get_events_from_genesis() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_get() { ApiTester::new() + .await .test_beacon_genesis() .await .test_beacon_states_root() @@ -2515,17 +2518,21 @@ async fn beacon_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn post_beacon_blocks_valid() { - ApiTester::new().test_post_beacon_blocks_valid().await; + ApiTester::new().await.test_post_beacon_blocks_valid().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn post_beacon_blocks_invalid() { - ApiTester::new().test_post_beacon_blocks_invalid().await; + ApiTester::new() + .await + .test_post_beacon_blocks_invalid() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attestations_valid() { ApiTester::new() + .await .test_post_beacon_pool_attestations_valid() .await; } @@ -2533,6 +2540,7 @@ async fn beacon_pools_post_attestations_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attestations_invalid() { ApiTester::new() + .await .test_post_beacon_pool_attestations_invalid() .await; } @@ -2540,6 +2548,7 @@ async fn beacon_pools_post_attestations_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attester_slashings_valid() { ApiTester::new() + .await .test_post_beacon_pool_attester_slashings_valid() .await; } @@ -2547,6 +2556,7 @@ async fn beacon_pools_post_attester_slashings_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attester_slashings_invalid() { ApiTester::new() + .await .test_post_beacon_pool_attester_slashings_invalid() .await; } @@ -2554,6 +2564,7 @@ async fn beacon_pools_post_attester_slashings_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_proposer_slashings_valid() { ApiTester::new() + .await .test_post_beacon_pool_proposer_slashings_valid() .await; } @@ -2561,6 +2572,7 @@ async fn beacon_pools_post_proposer_slashings_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_proposer_slashings_invalid() { ApiTester::new() + .await .test_post_beacon_pool_proposer_slashings_invalid() .await; } @@ -2568,6 +2580,7 @@ async fn beacon_pools_post_proposer_slashings_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_voluntary_exits_valid() { ApiTester::new() + .await .test_post_beacon_pool_voluntary_exits_valid() .await; } @@ -2575,6 +2588,7 @@ async fn beacon_pools_post_voluntary_exits_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_voluntary_exits_invalid() { ApiTester::new() + .await .test_post_beacon_pool_voluntary_exits_invalid() .await; } @@ -2582,6 +2596,7 @@ async fn beacon_pools_post_voluntary_exits_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn config_get() { ApiTester::new() + .await .test_get_config_fork_schedule() .await .test_get_config_spec() @@ -2593,6 +2608,7 @@ async fn config_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn debug_get() { ApiTester::new() + .await .test_get_debug_beacon_states() .await .test_get_debug_beacon_heads() @@ -2602,6 +2618,7 @@ async fn debug_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn node_get() { ApiTester::new() + .await .test_get_node_version() .await .test_get_node_syncing() @@ -2620,17 +2637,24 @@ async fn node_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_early() { - ApiTester::new().test_get_validator_duties_early().await; + ApiTester::new() + .await + .test_get_validator_duties_early() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_attester() { - ApiTester::new().test_get_validator_duties_attester().await; + ApiTester::new() + .await + .test_get_validator_duties_attester() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_attester_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_duties_attester() .await; @@ -2638,12 +2662,16 @@ async fn get_validator_duties_attester_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_proposer() { - ApiTester::new().test_get_validator_duties_proposer().await; + ApiTester::new() + .await + .test_get_validator_duties_proposer() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_proposer_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_duties_proposer() .await; @@ -2651,12 +2679,13 @@ async fn get_validator_duties_proposer_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_production() { - ApiTester::new().test_block_production().await; + ApiTester::new().await.test_block_production().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_production_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_block_production() .await; @@ -2664,12 +2693,16 @@ async fn block_production_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_attestation_data() { - ApiTester::new().test_get_validator_attestation_data().await; + ApiTester::new() + .await + .test_get_validator_attestation_data() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_attestation_data_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_attestation_data() .await; @@ -2678,6 +2711,7 @@ async fn get_validator_attestation_data_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation() { ApiTester::new() + .await .test_get_validator_aggregate_attestation() .await; } @@ -2685,6 +2719,7 @@ async fn get_validator_aggregate_attestation() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_aggregate_attestation() .await; @@ -2693,6 +2728,7 @@ async fn get_validator_aggregate_attestation_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_valid() { ApiTester::new() + .await .test_get_validator_aggregate_and_proofs_valid() .await; } @@ -2700,6 +2736,7 @@ async fn get_validator_aggregate_and_proofs_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_aggregate_and_proofs_valid() .await; @@ -2708,6 +2745,7 @@ async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_invalid() { ApiTester::new() + .await .test_get_validator_aggregate_and_proofs_invalid() .await; } @@ -2715,6 +2753,7 @@ async fn get_validator_aggregate_and_proofs_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_aggregate_and_proofs_invalid() .await; @@ -2723,6 +2762,7 @@ async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_beacon_committee_subscriptions() { ApiTester::new() + .await .test_get_validator_beacon_committee_subscriptions() .await; } @@ -2730,6 +2770,7 @@ async fn get_validator_beacon_committee_subscriptions() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lighthouse_endpoints() { ApiTester::new() + .await .test_get_lighthouse_health() .await .test_get_lighthouse_syncing() diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index e57a88835..e8a024a81 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -790,7 +790,7 @@ pub fn update_gossip_metrics( for (peer_id, _) in gossipsub.all_peers() { let client = peers .peer_info(peer_id) - .map(|peer_info| peer_info.client.kind.as_static()) + .map(|peer_info| peer_info.client().kind.as_static()) .unwrap_or_else(|| "Unknown"); peer_to_client.insert(peer_id, client); @@ -919,7 +919,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) .peers .read() .connected_peers() - .map(|(_peer_id, info)| info.sync_status.as_str()) + .map(|(_peer_id, info)| info.sync_status().as_str()) { *peers_per_sync_type.entry(sync_type).or_default() += 1; } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3b04ec21f..ae19db32a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -654,15 +654,21 @@ impl SyncManager { // NOTE: here we are gracefully handling two race conditions: Receiving the status message // of a peer that is 1) disconnected 2) not in the PeerDB. - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let new_state = sync_type.as_sync_status(remote_sync_info); - let rpr = new_state.as_str(); - let was_updated = peer_info.sync_status.update(new_state.clone()); + let new_state = sync_type.as_sync_status(remote_sync_info); + let rpr = new_state.as_str(); + // Drop the write lock + let update_sync_status = self + .network_globals + .peers + .write() + .update_sync_status(peer_id, new_state.clone()); + if let Some(was_updated) = update_sync_status { + let is_connected = self.network_globals.peers.read().is_connected(peer_id); if was_updated { debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr, "our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch, "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch, - "is_connected" => peer_info.is_connected()); + "is_connected" => is_connected); // A peer has transitioned its sync state. If the new state is "synced" we // inform the backfill sync that a new synced peer has joined us. @@ -670,7 +676,7 @@ impl SyncManager { self.backfill_sync.fully_synced_peer_joined(); } } - peer_info.is_connected() + is_connected } else { error!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); false diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0647fc683..073b83a85 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -55,7 +55,7 @@ impl SyncNetworkContext { .peers .read() .peer_info(peer_id) - .map(|info| info.client.clone()) + .map(|info| info.client().clone()) .unwrap_or_default() } diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index 513b05a09..912c4d011 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -1,6 +1,6 @@ use super::manager::SLOT_IMPORT_TOLERANCE; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{PeerSyncStatus, SyncInfo}; +use eth2_libp2p::{SyncInfo, SyncStatus as PeerSyncStatus}; use std::cmp::Ordering; /// The type of peer relative to our current state.