From 0aee7ec873bcc7206b9acf2741f46c209b510c57 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 11 Oct 2021 02:45:06 +0000 Subject: [PATCH] Refactor Peerdb and PeerManager (#2660) ## Proposed Changes This is a refactor of the PeerDB and PeerManager. A number of bugs have been surfacing around the connection state of peers and their interaction with the score state. This refactor tightens the mutability properties of peers such that only specific modules are able to modify the state of peer information preventing inadvertant state changes that can lead to our local peer manager db being out of sync with libp2p. Further, the logic around connection and scoring was quite convoluted and the distinction between the PeerManager and Peerdb was not well defined. Although these issues are not fully resolved, this PR is step to cleaning up this logic. The peerdb solely manages most mutability operations of peers leaving high-order logic to the peer manager. A single `update_connection_state()` function has been added to the peer-db making it solely responsible for modifying the peer's connection state. The way the peer's scores can be modified have been reduced to three simple functions (`update_scores()`, `update_gossipsub_scores()` and `report_peer()`). This prevents any add-hoc modifications of scores and only natural processes of score modification is allowed which simplifies the reasoning of score and state changes. --- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 16 +- beacon_node/eth2_libp2p/src/lib.rs | 7 +- .../eth2_libp2p/src/peer_manager/mod.rs | 504 ++--- .../eth2_libp2p/src/peer_manager/peerdb.rs | 1777 +++++++++++------ .../src/peer_manager/{ => peerdb}/client.rs | 0 .../peer_manager/{ => peerdb}/peer_info.rs | 229 ++- .../src/peer_manager/{ => peerdb}/score.rs | 0 .../sync_status.rs} | 42 +- beacon_node/eth2_libp2p/src/service.rs | 2 +- beacon_node/eth2_libp2p/src/types/globals.rs | 4 +- beacon_node/http_api/src/lib.rs | 18 +- beacon_node/http_api/tests/common.rs | 37 +- beacon_node/http_api/tests/fork_tests.rs | 8 +- .../http_api/tests/interactive_tests.rs | 2 +- beacon_node/http_api/tests/tests.rs | 69 +- beacon_node/network/src/metrics.rs | 4 +- beacon_node/network/src/sync/manager.rs | 18 +- .../network/src/sync/network_context.rs | 2 +- .../network/src/sync/peer_sync_info.rs | 2 +- 19 files changed, 1587 insertions(+), 1154 deletions(-) rename beacon_node/eth2_libp2p/src/peer_manager/{ => peerdb}/client.rs (100%) rename beacon_node/eth2_libp2p/src/peer_manager/{ => peerdb}/peer_info.rs (70%) rename beacon_node/eth2_libp2p/src/peer_manager/{ => peerdb}/score.rs (100%) rename beacon_node/eth2_libp2p/src/peer_manager/{peer_sync_status.rs => peerdb/sync_status.rs} (61%) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 09d82e2e8..0ef194a82 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -4,7 +4,7 @@ use crate::behaviour::gossipsub_scoring_parameters::{ use crate::config::gossipsub_config; use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS}; use crate::peer_manager::{ - score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, + peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::rpc::*; use crate::service::METADATA_FILENAME; @@ -255,7 +255,7 @@ impl Behaviour { let update_gossipsub_scores = tokio::time::interval(params.decay_interval); gossipsub - .with_peer_score(params.clone(), thresholds) + .with_peer_score(params, thresholds) .expect("Valid score params and thresholds"); Ok(Behaviour { @@ -445,7 +445,7 @@ impl Behaviour { .peers .read() .peer_info(propagation_source) - .map(|info| info.client.kind.as_ref()) + .map(|info| info.client().kind.as_ref()) { metrics::inc_counter_vec( &metrics::GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT, @@ -834,12 +834,18 @@ impl NetworkBehaviourEventProcess for Behaviour< } GossipsubEvent::Subscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { - self.peer_manager.add_subscription(&peer_id, subnet_id); + self.network_globals + .peers + .write() + .add_subscription(&peer_id, subnet_id); } } GossipsubEvent::Unsubscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { - self.peer_manager.remove_subscription(&peer_id, subnet_id); + self.network_globals + .peers + .write() + .remove_subscription(&peer_id, &subnet_id); } } } diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index c04c61616..3c5114d36 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -74,8 +74,9 @@ pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; pub use peer_manager::{ - client::Client, - score::{PeerAction, ReportSource}, - ConnectionDirection, PeerConnectionStatus, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo, + peerdb::client::Client, + peerdb::score::{PeerAction, ReportSource}, + peerdb::PeerDB, + ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus, }; pub use service::{load_private_key, Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 95be059f4..9ed82c086 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -1,6 +1,5 @@ //! Implementation of Lighthouse's peer management system. -pub use self::peerdb::*; use crate::discovery::TARGET_SUBNET_PEERS; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::types::SyncState; @@ -13,6 +12,7 @@ use futures::Stream; use hashset_delay::HashSetDelay; use libp2p::core::ConnectedPoint; use libp2p::identify::IdentifyInfo; +use peerdb::{BanOperation, BanResult, ScoreUpdateResult}; use slog::{debug, error, warn}; use smallvec::SmallVec; use std::{ @@ -25,17 +25,14 @@ use types::{EthSpec, SyncSubnetId}; pub use libp2p::core::{identity::Keypair, Multiaddr}; -pub mod client; -mod peer_info; -mod peer_sync_status; #[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy -mod peerdb; -pub(crate) mod score; +pub mod peerdb; -pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo}; -pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; -use score::{PeerAction, ReportSource, ScoreState}; -use std::cmp::Ordering; +pub use peerdb::peer_info::{ + ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo, +}; +use peerdb::score::{PeerAction, ReportSource}; +pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{hash_map::Entry, HashMap}; use std::net::IpAddr; @@ -65,10 +62,6 @@ pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.3; /// dialing priority peers we need for validator duties. pub const PRIORITY_PEER_EXCESS: f32 = 0.05; -/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing -/// them in lighthouse. -const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; - /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -154,63 +147,102 @@ impl PeerManager { /// /// This will send a goodbye and disconnect the peer if it is connected or dialing. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { - // get the peer info + // Update the sync status if required if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); if matches!(reason, GoodbyeReason::IrrelevantNetwork) { - info.sync_status.update(PeerSyncStatus::IrrelevantPeer); + info.update_sync_status(SyncStatus::IrrelevantPeer); } - - // Goodbye's are fatal - info.apply_peer_action_to_score(PeerAction::Fatal); - metrics::inc_counter_vec( - &metrics::PEER_ACTION_EVENTS_PER_CLIENT, - &[ - info.client.kind.as_ref(), - PeerAction::Fatal.as_ref(), - source.into(), - ], - ); } - // Update the peerdb and start the disconnection. - self.ban_peer(peer_id, reason); + self.report_peer(peer_id, PeerAction::Fatal, source, Some(reason)); } /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. - pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) { - // Helper function to avoid any potential deadlocks. - let mut to_ban_peers = Vec::with_capacity(1); - let mut to_unban_peers = Vec::with_capacity(1); + pub fn report_peer( + &mut self, + peer_id: &PeerId, + action: PeerAction, + source: ReportSource, + reason: Option, + ) { + let action = self + .network_globals + .peers + .write() + .report_peer(peer_id, action, source); + self.handle_score_action(peer_id, action, reason); + } - { - let mut peer_db = self.network_globals.peers.write(); - if let Some(info) = peer_db.peer_info_mut(peer_id) { - let previous_state = info.score_state(); - info.apply_peer_action_to_score(action); - metrics::inc_counter_vec( - &metrics::PEER_ACTION_EVENTS_PER_CLIENT, - &[info.client.kind.as_ref(), action.as_ref(), source.into()], - ); - - Self::handle_score_transitions( - previous_state, - peer_id, - info, - &mut to_ban_peers, - &mut to_unban_peers, - &mut self.events, - &self.log, - ); - if previous_state == info.score_state() { - debug!(self.log, "Peer score adjusted"; "peer_id" => %peer_id, "score" => %info.score()); - } + /// Upon adjusting a Peer's score, there are times the peer manager must pass messages up to + /// libp2p. This function handles the conditional logic associated with each score update + /// result. + fn handle_score_action( + &mut self, + peer_id: &PeerId, + action: ScoreUpdateResult, + reason: Option, + ) { + match action { + ScoreUpdateResult::Ban(ban_operation) => { + // The peer has been banned and we need to handle the banning operation + // NOTE: When we ban a peer, its IP address can be banned. We do not recursively search + // through all our connected peers banning all other peers that are using this IP address. + // If these peers are behaving fine, we permit their current connections. However, if any new + // nodes or current nodes try to reconnect on a banned IP, they will be instantly banned + // and disconnected. + self.handle_ban_operation(peer_id, ban_operation, reason); } - } // end write lock + ScoreUpdateResult::Disconnect => { + // The peer has transitioned to a disconnect state and has been marked as such in + // the peer db. We must inform libp2p to disconnect this peer. + self.events.push(PeerManagerEvent::DisconnectPeer( + *peer_id, + GoodbyeReason::BadScore, + )); + } + ScoreUpdateResult::NoAction => { + // The report had no effect on the peer and there is nothing to do. + } + ScoreUpdateResult::Unbanned(unbanned_ips) => { + // Inform the Swarm to unban the peer + self.events + .push(PeerManagerEvent::UnBanned(*peer_id, unbanned_ips)); + } + } + } - self.ban_and_unban_peers(to_ban_peers, to_unban_peers); + /// If a peer is being banned, this handles the banning operation. + fn handle_ban_operation( + &mut self, + peer_id: &PeerId, + ban_operation: BanOperation, + reason: Option, + ) { + match ban_operation { + BanOperation::DisconnectThePeer => { + // The peer was currently connected, so we start a disconnection. + // Once the peer has disconnected, its connection state will transition to a + // banned state. + self.events.push(PeerManagerEvent::DisconnectPeer( + *peer_id, + reason.unwrap_or(GoodbyeReason::BadScore), + )); + } + BanOperation::PeerDisconnecting => { + // The peer is currently being disconnected and will be banned once the + // disconnection completes. + } + BanOperation::ReadyToBan(banned_ips) => { + // The peer is not currently connected, we can safely ban it at the swarm + // level. + // Inform the Swarm to ban the peer + self.events + .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); + } + } } /// Peers that have been returned by discovery requests that are suitable for dialing are @@ -274,27 +306,6 @@ impl PeerManager { self.status_peers.insert(*peer_id); } - /// Adds a gossipsub subscription to a peer in the peerdb. - pub fn add_subscription(&self, peer_id: &PeerId, subnet: Subnet) { - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - info.subnets.insert(subnet); - } - } - - /// Removes a gossipsub subscription to a peer in the peerdb. - pub fn remove_subscription(&self, peer_id: &PeerId, subnet: Subnet) { - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - info.subnets.remove(&subnet); - } - } - - /// Removes all gossipsub subscriptions to a peer in the peerdb. - pub fn remove_all_subscriptions(&self, peer_id: &PeerId) { - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - info.subnets = Default::default(); - } - } - /// Insert the sync subnet into list of long lived sync committee subnets that we need to /// maintain adequate number of peers for. pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) { @@ -417,10 +428,6 @@ impl PeerManager { ) { if num_established == 0 { // There are no more connections - - // Remove all subnet subscriptions from the peer_db - self.remove_all_subscriptions(&peer_id); - if self .network_globals .peers @@ -441,7 +448,7 @@ impl PeerManager { .peers .read() .peer_info(&peer_id) - .map(|info| info.client.kind.clone()) + .map(|info| info.client().kind.clone()) { if let Some(v) = metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) @@ -496,15 +503,13 @@ impl PeerManager { /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let previous_kind = peer_info.client.kind.clone(); - let previous_listening_addresses = std::mem::replace( - &mut peer_info.listening_addresses, - info.listen_addrs.clone(), - ); - peer_info.client = client::Client::from_identify_info(info); + let previous_kind = peer_info.client().kind.clone(); + let previous_listening_addresses = + peer_info.set_listening_addresses(info.listen_addrs.clone()); + peer_info.set_client(peerdb::client::Client::from_identify_info(info)); - if previous_kind != peer_info.client.kind - || peer_info.listening_addresses != previous_listening_addresses + if previous_kind != peer_info.client().kind + || *peer_info.listening_addresses() != previous_listening_addresses { debug!(self.log, "Identified Peer"; "peer" => %peer_id, "protocol_version" => &info.protocol_version, @@ -517,7 +522,7 @@ impl PeerManager { // update the peer client kind metric if let Some(v) = metrics::get_int_gauge( &metrics::PEERS_PER_CLIENT, - &[&peer_info.client.kind.to_string()], + &[&peer_info.client().kind.to_string()], ) { v.inc() }; @@ -636,7 +641,7 @@ impl PeerManager { RPCError::Disconnected => return, // No penalty for a graceful disconnection }; - self.report_peer(peer_id, peer_action, ReportSource::RPC); + self.report_peer(peer_id, peer_action, ReportSource::RPC, None); } /// A ping request has been received. @@ -646,7 +651,7 @@ impl PeerManager { // received a ping // reset the to-ping timer for this peer debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq); - match peer_info.connection_direction { + match peer_info.connection_direction() { Some(ConnectionDirection::Incoming) => { self.inbound_ping_peers.insert(*peer_id); } @@ -659,7 +664,7 @@ impl PeerManager { } // if the sequence number is unknown send an update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { + if let Some(meta_data) = &peer_info.meta_data() { if *meta_data.seq_number() < seq { debug!(self.log, "Requesting new metadata from peer"; "peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "ping_seq_no" => seq); @@ -683,7 +688,7 @@ impl PeerManager { // received a pong // if the sequence number is unknown send update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { + if let Some(meta_data) = &peer_info.meta_data() { if *meta_data.seq_number() < seq { debug!(self.log, "Requesting new metadata from peer"; "peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "pong_seq_no" => seq); @@ -703,7 +708,7 @@ impl PeerManager { /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - if let Some(known_meta_data) = &peer_info.meta_data { + if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { debug!(self.log, "Updating peer's metadata"; "peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number(), "new_seq_no" => meta_data.seq_number()); @@ -718,64 +723,24 @@ impl PeerManager { debug!(self.log, "Obtained peer's metadata"; "peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number()); } - peer_info.meta_data = Some(meta_data); + peer_info.set_meta_data(meta_data); } else { error!(self.log, "Received METADATA from an unknown peer"; "peer_id" => %peer_id); } } + /// Updates the gossipsub scores for all known peers in gossipsub. pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { - let mut to_ban_peers = Vec::new(); - let mut to_unban_peers = Vec::new(); + let actions = self + .network_globals + .peers + .write() + .update_gossipsub_scores(self.target_peers, gossipsub); - { - // collect peers with scores - let mut guard = self.network_globals.peers.write(); - let mut peers: Vec<_> = guard - .peers_mut() - .filter_map(|(peer_id, info)| { - gossipsub - .peer_score(peer_id) - .map(|score| (peer_id, info, score)) - }) - .collect(); - - // sort descending by score - peers.sort_unstable_by(|(.., s1), (.., s2)| { - s2.partial_cmp(s1).unwrap_or(Ordering::Equal) - }); - - let mut to_ignore_negative_peers = - (self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; - - for (peer_id, info, score) in peers { - let previous_state = info.score_state(); - info.update_gossipsub_score( - score, - if score < 0.0 && to_ignore_negative_peers > 0 { - to_ignore_negative_peers -= 1; - // We ignore the negative score for the best negative peers so that their - // gossipsub score can recover without getting disconnected. - true - } else { - false - }, - ); - - Self::handle_score_transitions( - previous_state, - peer_id, - info, - &mut to_ban_peers, - &mut to_unban_peers, - &mut self.events, - &self.log, - ); - } - } // end write lock - - self.ban_and_unban_peers(to_ban_peers, to_unban_peers); + for (peer_id, score_action) in actions { + self.handle_score_action(&peer_id, score_action, None); + } } /* Internal functions */ @@ -810,14 +775,15 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. fn inject_disconnect(&mut self, peer_id: &PeerId) { - if self + let ban_operation = self .network_globals .peers .write() - .inject_disconnect(peer_id) - { + .inject_disconnect(peer_id); + + if let Some(ban_operation) = ban_operation { // The peer was awaiting a ban, continue to ban the peer. - self.ban_peer(peer_id, GoodbyeReason::BadScore); + self.handle_ban_operation(peer_id, ban_operation, None); } // Remove the ping and status timer for the peer @@ -879,7 +845,7 @@ impl PeerManager { .peers .read() .peer_info(peer_id) - .map(|peer_info| peer_info.client.kind.clone()) + .map(|peer_info| peer_info.client().kind.clone()) { if let Some(v) = metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()]) @@ -891,154 +857,6 @@ impl PeerManager { true } - /// This handles score transitions between states. It transitions peers states from - /// disconnected/banned/connected. - fn handle_score_transitions( - previous_state: ScoreState, - peer_id: &PeerId, - info: &mut PeerInfo, - to_ban_peers: &mut Vec, - to_unban_peers: &mut Vec, - events: &mut SmallVec<[PeerManagerEvent; 16]>, - log: &slog::Logger, - ) { - match (info.score_state(), previous_state) { - (ScoreState::Banned, ScoreState::Healthy | ScoreState::Disconnected) => { - debug!(log, "Peer has been banned"; "peer_id" => %peer_id, "score" => %info.score()); - to_ban_peers.push(*peer_id); - } - (ScoreState::Disconnected, ScoreState::Banned | ScoreState::Healthy) => { - debug!(log, "Peer transitioned to disconnect state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); - // disconnect the peer if it's currently connected or dialing - if info.is_connected_or_dialing() { - // Change the state to inform that we are disconnecting the peer. - info.disconnecting(false); - events.push(PeerManagerEvent::DisconnectPeer( - *peer_id, - GoodbyeReason::BadScore, - )); - } else if previous_state == ScoreState::Banned { - to_unban_peers.push(*peer_id); - } - } - (ScoreState::Healthy, ScoreState::Disconnected) => { - debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); - } - (ScoreState::Healthy, ScoreState::Banned) => { - debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); - // unban the peer if it was previously banned. - to_unban_peers.push(*peer_id); - } - // Explicitly ignore states that haven't transitioned. - (ScoreState::Healthy, ScoreState::Healthy) => {} - (ScoreState::Disconnected, ScoreState::Disconnected) => {} - (ScoreState::Banned, ScoreState::Banned) => {} - } - } - - /// Updates the state of banned peers. - fn ban_and_unban_peers(&mut self, to_ban_peers: Vec, to_unban_peers: Vec) { - // process banning peers - for peer_id in to_ban_peers { - self.ban_peer(&peer_id, GoodbyeReason::BadScore); - } - // process unbanning peers - for peer_id in to_unban_peers { - if let Err(e) = self.unban_peer(&peer_id) { - error!(self.log, "{}", e; "peer_id" => %peer_id); - } - } - } - - /// Updates the scores of known peers according to their connection - /// status and the time that has passed. - /// NOTE: This is experimental and will likely be adjusted - fn update_peer_scores(&mut self) { - /* Check how long have peers been in this state and update their reputations if needed */ - let mut to_ban_peers = Vec::new(); - let mut to_unban_peers = Vec::new(); - - for (peer_id, info) in self.network_globals.peers.write().peers_mut() { - let previous_state = info.score_state(); - // Update scores - info.score_update(); - - Self::handle_score_transitions( - previous_state, - peer_id, - info, - &mut to_ban_peers, - &mut to_unban_peers, - &mut self.events, - &self.log, - ); - } - self.ban_and_unban_peers(to_ban_peers, to_unban_peers); - } - - /// Bans a peer. - /// - /// Records updates the peers connection status and updates the peer db as well as blocks the - /// peer from participating in discovery and removes them from the routing table. - fn ban_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { - // NOTE: When we ban a peer, its IP address can be banned. We do not recursively search - // through all our connected peers banning all other peers that are using this IP address. - // If these peers are behaving fine, we permit their current connections. However, if any new - // nodes or current nodes try to reconnect on a banned IP, they will be instantly banned - // and disconnected. - - let mut peer_db = self.network_globals.peers.write(); - - match peer_db.disconnect_and_ban(peer_id) { - BanOperation::DisconnectThePeer => { - // The peer was currently connected, so we start a disconnection. - // Once the peer has disconnected, this function will be called to again to ban - // at the swarm level. - self.events - .push(PeerManagerEvent::DisconnectPeer(*peer_id, reason)); - } - BanOperation::PeerDisconnecting => { - // The peer is currently being disconnected and will be banned once the - // disconnection completes. - } - BanOperation::ReadyToBan => { - // The peer is not currently connected, we can safely ban it at the swarm - // level. - let banned_ip_addresses = peer_db - .peer_info(peer_id) - .map(|info| { - info.seen_addresses() - .filter(|ip| peer_db.is_ip_banned(ip)) - .collect::>() - }) - .unwrap_or_default(); - - // Inform the Swarm to ban the peer - self.events - .push(PeerManagerEvent::Banned(*peer_id, banned_ip_addresses)); - } - } - } - - /// Unbans a peer. - /// - /// Updates the peer's connection status and updates the peer db as well as removes - /// previous bans from discovery. - fn unban_peer(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { - let mut peer_db = self.network_globals.peers.write(); - peer_db.unban(peer_id)?; - - let seen_ip_addresses = peer_db - .peer_info(peer_id) - .map(|info| info.seen_addresses().collect::>()) - .unwrap_or_default(); - - // Inform the Swarm to unban the peer - self.events - .push(PeerManagerEvent::UnBanned(*peer_id, seen_ip_addresses)); - Ok(()) - } - // Gracefully disconnects a peer without banning them. fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events @@ -1046,7 +864,7 @@ impl PeerManager { self.network_globals .peers .write() - .notify_disconnecting(peer_id, false); + .notify_disconnecting(&peer_id, false); } /// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`. @@ -1110,8 +928,11 @@ impl PeerManager { self.events.push(PeerManagerEvent::DiscoverPeers); } - // Updates peer's scores. - self.update_peer_scores(); + // Updates peer's scores and unban any peers if required. + let actions = self.network_globals.peers.write().update_scores(); + for (peer_id, action) in actions { + self.handle_score_action(&peer_id, action, None); + } // Maintain minimum count for sync committee peers. self.maintain_sync_committee_peers(); @@ -1394,67 +1215,6 @@ mod tests { ); } - #[tokio::test] - async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() { - let mut peer_manager = build_peer_manager(3).await; - - // Create 3 peers to connect to. - let peer0 = PeerId::random(); - let inbound_only_peer1 = PeerId::random(); - let outbound_only_peer1 = PeerId::random(); - - peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); - peer_manager.inject_connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); - - // Connect to two peers that are on the threshold of being disconnected. - peer_manager.inject_connect_ingoing( - &inbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), - None, - ); - peer_manager.inject_connect_outgoing( - &outbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), - None, - ); - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(inbound_only_peer1)) - .unwrap() - .add_to_score(-19.9); - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(outbound_only_peer1)) - .unwrap() - .add_to_score(-19.9); - // Update the gossipsub scores to induce connection downgrade - // during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection. - // If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected, - // then handle_state_transitions will not change the connection status to disconnected because the score state has not changed. - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(inbound_only_peer1)) - .unwrap() - .set_gossipsub_score(-85.0); - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&(outbound_only_peer1)) - .unwrap() - .set_gossipsub_score(-85.0); - - peer_manager.heartbeat(); - - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1); - } - #[tokio::test] async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { let mut peer_manager = build_peer_manager(3).await; @@ -1466,18 +1226,18 @@ mod tests { let inbound_only_peer1 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); - peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); - peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None); // Connect to two peers that are on the threshold of being disconnected. peer_manager.inject_connect_ingoing( &inbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), + "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None, ); peer_manager.inject_connect_outgoing( &outbound_only_peer1, - "/ip4/0.0.0.0".parse().unwrap(), + "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None, ); peer_manager @@ -1486,14 +1246,14 @@ mod tests { .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() - .add_to_score(-19.9); + .add_to_score(-19.8); peer_manager .network_globals .peers .write() .peer_info_mut(&(outbound_only_peer1)) .unwrap() - .add_to_score(-19.9); + .add_to_score(-19.8); peer_manager .network_globals .peers @@ -1509,9 +1269,9 @@ mod tests { .unwrap() .set_gossipsub_score(-85.0); peer_manager.heartbeat(); - // Tests that when we are over the target peer limit, after disconnecting two unhealthy peers, + // Tests that when we are over the target peer limit, after disconnecting one unhealthy peer, // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). - assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2); + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } #[tokio::test] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 542ba07e7..c382f6e4f 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -1,26 +1,34 @@ -use super::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; -use super::peer_sync_status::PeerSyncStatus; -use super::score::{Score, ScoreState}; -use crate::rpc::methods::MetaData; -use crate::Enr; -use crate::PeerId; use crate::{ + metrics, multiaddr::{Multiaddr, Protocol}, types::Subnet, + Enr, Gossipsub, PeerId, }; +use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; +use score::{PeerAction, ReportSource, Score, ScoreState}; use slog::{crit, debug, error, trace, warn}; -use std::collections::HashMap; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, SocketAddr}; use std::time::Instant; +use sync_status::SyncStatus; use types::EthSpec; +pub mod client; +pub mod peer_info; +pub mod score; +pub mod sync_status; + /// Max number of disconnected nodes to remember. const MAX_DC_PEERS: usize = 500; /// The maximum number of banned nodes to remember. const MAX_BANNED_PEERS: usize = 1000; /// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP. const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5; +/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing +/// them in lighthouse. +const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; /// Storage of known peers, their reputation and information pub struct PeerDB { @@ -34,13 +42,1029 @@ pub struct PeerDB { log: slog::Logger, } +impl PeerDB { + pub fn new(trusted_peers: Vec, log: &slog::Logger) -> Self { + // Initialize the peers hashmap with trusted peers + let peers = trusted_peers + .into_iter() + .map(|peer_id| (peer_id, PeerInfo::trusted_peer_info())) + .collect(); + Self { + log: log.clone(), + disconnected_peers: 0, + banned_peers_count: BannedPeersCount::new(), + peers, + } + } + + /* Getters */ + + /// Gives the score of a peer, or default score if it is unknown. + pub fn score(&self, peer_id: &PeerId) -> f64 { + self.peers + .get(peer_id) + .map_or(&Score::default(), |info| info.score()) + .score() + } + + /// Returns an iterator over all peers in the db. + pub fn peers(&self) -> impl Iterator)> { + self.peers.iter() + } + + /// Gives the ids of all known peers. + pub fn peer_ids(&self) -> impl Iterator { + self.peers.keys() + } + + /// Returns a peer's info, if known. + pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + /// Returns a mutable reference to a peer's info if known. + // VISIBILITY: The peer manager is able to modify some elements of the peer info, such as sync + // status. + pub(super) fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { + self.peers.get_mut(peer_id) + } + + /// Returns if the peer is already connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected { .. }) + ) + } + + /// If we are connected or currently dialing the peer returns true. + pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected { .. }) + | Some(PeerConnectionStatus::Dialing { .. }) + ) + } + + /// If we are connected or in the process of disconnecting + pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Connected { .. }) + | Some(PeerConnectionStatus::Disconnecting { .. }) + ) + } + + /// Returns true if the peer should be dialed. This checks the connection state and the + /// score state and determines if the peer manager should dial this peer. + pub fn should_dial(&self, peer_id: &PeerId) -> bool { + matches!( + self.connection_status(peer_id), + Some(PeerConnectionStatus::Disconnected { .. }) + | Some(PeerConnectionStatus::Unknown { .. }) + | None + ) && !self.score_state_banned_or_disconnected(peer_id) + } + + /// Returns true if the peer is synced at least to our current head. + pub fn is_synced(&self, peer_id: &PeerId) -> bool { + match self.peers.get(peer_id).map(|info| info.sync_status()) { + Some(SyncStatus::Synced { .. }) => true, + Some(_) => false, + None => false, + } + } + + /// Returns the current [`BanResult`] of the peer. This doesn't check the connection state, rather the + /// underlying score of the peer. A peer may be banned but still in the connected state + /// temporarily. + /// + /// This is used to determine if we should accept incoming connections or not. + pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { + if let Some(peer) = self.peers.get(peer_id) { + match peer.score_state() { + ScoreState::Banned => BanResult::BadScore, + _ => { + if let Some(ip) = self.ip_is_banned(peer) { + BanResult::BannedIp(ip) + } else { + BanResult::NotBanned + } + } + } + } else { + BanResult::NotBanned + } + } + + /// Checks if the peer's known addresses are currently banned. + fn ip_is_banned(&self, peer: &PeerInfo) -> Option { + peer.seen_ip_addresses() + .find(|ip| self.banned_peers_count.ip_is_banned(ip)) + } + + /// Returns true if the IP is banned. + pub fn is_ip_banned(&self, ip: &IpAddr) -> bool { + self.banned_peers_count.ip_is_banned(ip) + } + + /// Returns true if the Peer is either banned or in the disconnected state. + fn score_state_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { + if let Some(peer) = self.peers.get(peer_id) { + match peer.score_state() { + ScoreState::Banned | ScoreState::Disconnected => true, + _ => self.ip_is_banned(peer).is_some(), + } + } else { + false + } + } + + /// Gives the ids and info of all known connected peers. + pub fn connected_peers(&self) -> impl Iterator)> { + self.peers.iter().filter(|(_, info)| info.is_connected()) + } + + /// Gives the ids of all known connected peers. + pub fn connected_peer_ids(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_connected()) + .map(|(peer_id, _)| peer_id) + } + + /// Connected or dialing peers + pub fn connected_or_dialing_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_connected() || info.is_dialing()) + .map(|(peer_id, _)| peer_id) + } + + /// Connected outbound-only peers + pub fn connected_outbound_only_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_outbound_only()) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the `peer_id` of all known connected and synced peers. + pub fn synced_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| { + if info.sync_status().is_synced() || info.sync_status().is_advanced() { + return info.is_connected(); + } + false + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the `peer_id` of all known connected and advanced peers. + pub fn advanced_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| { + if info.sync_status().is_advanced() { + return info.is_connected(); + } + false + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives an iterator of all peers on a given subnet. + pub fn good_peers_on_subnet(&self, subnet: Subnet) -> impl Iterator { + self.peers + .iter() + .filter(move |(_, info)| { + // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers + info.is_connected() + && info.on_subnet_metadata(&subnet) + && info.on_subnet_gossipsub(&subnet) + && info.is_good_gossipsub_peer() + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the ids of all known disconnected peers. + pub fn disconnected_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_disconnected()) + .map(|(peer_id, _)| peer_id) + } + + /// Returns the ids of all known banned peers. + pub fn banned_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.is_banned()) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the ids of all known banned peers. + pub fn banned_peers_by_score(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.score_is_banned()) + .map(|(peer_id, _)| peer_id) + } + + /// Returns a vector of all connected peers sorted by score beginning with the worst scores. + /// Ties get broken randomly. + pub fn worst_connected_peers(&self) -> Vec<(&PeerId, &PeerInfo)> { + let mut connected = self + .peers + .iter() + .filter(|(_, info)| info.is_connected()) + .collect::>(); + + connected.shuffle(&mut rand::thread_rng()); + connected.sort_by_key(|(_, info)| info.score()); + connected + } + + /// Returns a vector containing peers (their ids and info), sorted by + /// score from highest to lowest, and filtered using `is_status` + pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> + where + F: Fn(&PeerInfo) -> bool, + { + let mut by_status = self + .peers + .iter() + .filter(|(_, info)| is_status(info)) + .collect::>(); + by_status.sort_by_key(|(_, info)| info.score()); + by_status.into_iter().rev().collect() + } + + /// Returns the peer with highest reputation that satisfies `is_status` + pub fn best_by_status(&self, is_status: F) -> Option<&PeerId> + where + F: Fn(&PeerInfo) -> bool, + { + self.peers + .iter() + .filter(|(_, info)| is_status(info)) + .max_by_key(|(_, info)| info.score()) + .map(|(id, _)| id) + } + + /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. + pub fn connection_status(&self, peer_id: &PeerId) -> Option { + self.peer_info(peer_id) + .map(|info| info.connection_status().clone()) + } + + /* Mutability */ + + /// Allows the sync module to update sync status' of peers. Returns None, if the peer doesn't + /// exist and returns Some(bool) representing if the sync state was modified. + pub fn update_sync_status( + &mut self, + peer_id: &PeerId, + sync_status: SyncStatus, + ) -> Option { + let info = self.peers.get_mut(peer_id)?; + Some(info.update_sync_status(sync_status)) + } + + /// Updates the scores of known peers according to their connection status and the time that + /// has passed. This function returns a list of peers that have been unbanned. + /// NOTE: Peer scores cannot be penalized during the update, they can only increase. Therefore + /// it not possible to ban peers when updating scores. + #[must_use = "The unbanned peers must be sent to libp2p"] + pub(super) fn update_scores(&mut self) -> Vec<(PeerId, ScoreUpdateResult)> { + // Peer can be unbanned in this process. + // We return the result, such that the peer manager can inform the swarm to lift the libp2p + // ban on these peers. + let mut peers_to_unban = Vec::new(); + let mut result = Vec::new(); + + for (peer_id, info) in self.peers.iter_mut() { + let previous_state = info.score_state(); + // Update scores + info.score_update(); + + match Self::handle_score_transition(previous_state, peer_id, info, &self.log) { + // A peer should not be able to be banned from a score update. + ScoreTransitionResult::Banned => { + error!(self.log, "Peer has been banned in an update"; "peer_id" => %peer_id) + } + // A peer should not be able to transition to a disconnected state from a healthy + // state in a score update. + ScoreTransitionResult::Disconnected => { + error!(self.log, "Peer has been disconnected in an update"; "peer_id" => %peer_id) + } + ScoreTransitionResult::Unbanned => { + peers_to_unban.push(*peer_id); + } + ScoreTransitionResult::NoAction => {} + } + } + + // Update the state in the peerdb + for unbanned_peer in peers_to_unban { + self.update_connection_state(&unbanned_peer, NewConnectionState::Unbanned); + let seen_ip_addresses = self + .peers + .get(&unbanned_peer) + .map(|info| { + info.seen_ip_addresses() + .filter(|ip| !self.is_ip_banned(ip)) + .collect::>() + }) + .unwrap_or_default(); + result.push(( + unbanned_peer, + ScoreUpdateResult::Unbanned(seen_ip_addresses), + )); + } + // Return the list so that the peer manager can update libp2p + result + } + + /// Updates gossipsub scores for all peers. + #[must_use = "Score updates need to be reported to libp2p"] + pub(super) fn update_gossipsub_scores( + &mut self, + target_peers: usize, + gossipsub: &Gossipsub, + ) -> Vec<(PeerId, ScoreUpdateResult)> { + let mut actions = Vec::new(); + let mut results = Vec::new(); + + let mut peers: Vec<_> = self + .peers + .iter_mut() + .filter(|(_peer_id, info)| info.is_connected()) + .filter_map(|(peer_id, info)| { + gossipsub + .peer_score(peer_id) + .map(|score| (peer_id, info, score)) + }) + .collect(); + + // sort descending by score + peers.sort_unstable_by(|(.., s1), (.., s2)| s2.partial_cmp(s1).unwrap_or(Ordering::Equal)); + + let mut to_ignore_negative_peers = + (target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize; + + for (peer_id, info, score) in peers { + let previous_state = info.score_state(); + info.update_gossipsub_score( + score, + if score < 0.0 && to_ignore_negative_peers > 0 { + to_ignore_negative_peers -= 1; + // We ignore the negative score for the best negative peers so that their + // gossipsub score can recover without getting disconnected. + true + } else { + false + }, + ); + + actions.push(( + *peer_id, + Self::handle_score_transition(previous_state, peer_id, info, &self.log), + )); + } + + for (peer_id, action) in actions { + let result = match action { + ScoreTransitionResult::Banned => { + // The peer was banned as a result of this action. + self.update_connection_state(&peer_id, NewConnectionState::Banned) + .into() + } + ScoreTransitionResult::Disconnected => { + // The peer needs to be disconnected + + // Update the state + self.update_connection_state( + &peer_id, + NewConnectionState::Disconnecting { to_ban: false }, + ); + ScoreUpdateResult::Disconnect + } + ScoreTransitionResult::NoAction => ScoreUpdateResult::NoAction, + ScoreTransitionResult::Unbanned => { + self.update_connection_state(&peer_id, NewConnectionState::Unbanned); + let seen_ip_addresses = self + .peers + .get(&peer_id) + .map(|info| { + info.seen_ip_addresses() + .filter(|ip| !self.is_ip_banned(ip)) + .collect::>() + }) + .unwrap_or_default(); + + ScoreUpdateResult::Unbanned(seen_ip_addresses) + } + }; + + // Actions to be handled by the peer manager for each peer id + if !matches!(result, ScoreUpdateResult::NoAction) { + results.push((peer_id, result)); + } + } + results + } + + /// Reports a peer for some action. + /// + /// The action can only cause a negative effect. This can lead to disconnecting or banning a + /// specific peer. Therefore the result of this function returns if the peer needs to be banned + /// or disconnected. + /// + /// If the peer doesn't exist, log a warning and insert defaults. + #[must_use = "Banned and disconnected peers need to be handled in libp2p"] + pub(super) fn report_peer( + &mut self, + peer_id: &PeerId, + action: PeerAction, + source: ReportSource, + ) -> ScoreUpdateResult { + match self.peers.get_mut(peer_id) { + Some(info) => { + let previous_state = info.score_state(); + info.apply_peer_action_to_score(action); + metrics::inc_counter_vec( + &metrics::PEER_ACTION_EVENTS_PER_CLIENT, + &[info.client().kind.as_ref(), action.as_ref(), source.into()], + ); + let result = + Self::handle_score_transition(previous_state, peer_id, info, &self.log); + if previous_state == info.score_state() { + debug!(self.log, "Peer score adjusted"; "peer_id" => %peer_id, "score" => %info.score()); + } + match result { + ScoreTransitionResult::Banned => { + // The peer was banned as a result of this action. + self.update_connection_state(peer_id, NewConnectionState::Banned) + .into() + } + ScoreTransitionResult::Disconnected => { + // The peer needs to be disconnected + + // Update the state + self.update_connection_state( + peer_id, + NewConnectionState::Disconnecting { to_ban: false }, + ); + ScoreUpdateResult::Disconnect + } + ScoreTransitionResult::NoAction => ScoreUpdateResult::NoAction, + ScoreTransitionResult::Unbanned => { + error!(self.log, "Report peer action lead to an unbanning"; "peer_id" => %peer_id); + ScoreUpdateResult::NoAction + } + } + } + None => { + debug!(self.log, "Reporting a peer that doesn't exist"; "peer_id" =>%peer_id); + ScoreUpdateResult::NoAction + } + } + } + + // Connection Status + + /// A peer is being dialed. + // VISIBILITY: Only the peer manager can adjust the connection state + pub(super) fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { + let info = self.peers.entry(*peer_id).or_default(); + if let Some(enr) = enr { + info.set_enr(enr); + } + + if let Err(e) = info.dialing_peer() { + error!(self.log, "{}", e; "peer_id" => %peer_id); + } + + // If the peer was banned, remove the banned peer and addresses. + if info.is_banned() { + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + } + + // If the peer was disconnected, reduce the disconnected peer count. + if info.is_disconnected() { + self.disconnected_peers = self.disconnected_peers().count().saturating_sub(1); + } + } + + /// Update min ttl of a peer. + // VISIBILITY: Only the peer manager can update the min_ttl + pub(super) fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) { + let info = self.peers.entry(*peer_id).or_default(); + + // only update if the ttl is longer + if info.min_ttl().is_none() || Some(&min_ttl) > info.min_ttl() { + info.set_min_ttl(min_ttl); + + let min_ttl_secs = min_ttl + .checked_duration_since(Instant::now()) + .map(|duration| duration.as_secs()) + .unwrap_or_else(|| 0); + debug!(self.log, "Updating the time a peer is required for"; "peer_id" => %peer_id, "future_min_ttl_secs" => min_ttl_secs); + } + } + + /// Adds a gossipsub subscription to a peer in the peerdb. + // VISIBILITY: The behaviour is able to adjust subscriptions. + pub(crate) fn add_subscription(&mut self, peer_id: &PeerId, subnet: Subnet) { + if let Some(info) = self.peers.get_mut(peer_id) { + info.insert_subnet(subnet); + } + } + + /// Removes a gossipsub subscription to a peer in the peerdb. + // VISIBILITY: The behaviour is able to adjust subscriptions. + pub(crate) fn remove_subscription(&mut self, peer_id: &PeerId, subnet: &Subnet) { + if let Some(info) = self.peers.get_mut(peer_id) { + info.remove_subnet(subnet); + } + } + + /// Extends the ttl of all peers on the given subnet that have a shorter + /// min_ttl than what's given. + // VISIBILITY: The behaviour is able to adjust subscriptions. + pub(crate) fn extend_peers_on_subnet(&mut self, subnet: &Subnet, min_ttl: Instant) { + let log = &self.log; + self.peers.iter_mut() + .filter(move |(_, info)| { + info.is_connected() && info.on_subnet_metadata(subnet) && info.on_subnet_gossipsub(subnet) + }) + .for_each(|(peer_id,info)| { + if info.min_ttl().is_none() || Some(&min_ttl) > info.min_ttl() { + info.set_min_ttl(min_ttl); + } + let min_ttl_secs = min_ttl + .checked_duration_since(Instant::now()) + .map(|duration| duration.as_secs()) + .unwrap_or_else(|| 0); + trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => %peer_id, "min_ttl" => min_ttl_secs); + }); + } + + /// Sets a peer as connected with an ingoing connection. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn connect_ingoing( + &mut self, + peer_id: &PeerId, + seen_address: Multiaddr, + enr: Option, + ) { + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr, + seen_address, + direction: ConnectionDirection::Incoming, + }, + ); + } + + /// Sets a peer as connected with an outgoing connection. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn connect_outgoing( + &mut self, + peer_id: &PeerId, + seen_address: Multiaddr, + enr: Option, + ) { + self.update_connection_state( + peer_id, + NewConnectionState::Connected { + enr, + seen_address, + direction: ConnectionDirection::Outgoing, + }, + ); + } + + /// The connection state of the peer has been changed. Modify the peer in the db to ensure all + /// variables are in sync with libp2p. + /// Updating the state can lead to a `BanOperation` which needs to be processed via the peer + /// manager and should be handled in the peer manager. + // NOTE: This function is vital in keeping the connection state, and thus the peerdb size in + // check and up to date with libp2p. + fn update_connection_state( + &mut self, + peer_id: &PeerId, + new_state: NewConnectionState, + ) -> Option { + let log_ref = &self.log; + let info = self.peers.entry(*peer_id).or_insert_with(|| { + // If we are not creating a new connection (or dropping a current inbound connection) log a warning indicating we are updating a + // connection state for an unknown peer. + if !matches!( + new_state, + NewConnectionState::Connected { .. } | NewConnectionState::Disconnecting { .. } + ) { + warn!(log_ref, "Updating state of unknown peer"; + "peer_id" => %peer_id, "new_state" => ?new_state); + } + PeerInfo::default() + }); + + // Ban the peer if the score is not already low enough. + if matches!(new_state, NewConnectionState::Banned) { + match info.score_state() { + ScoreState::Banned => {} + _ => { + // If score isn't low enough to ban, this function has been called incorrectly. + error!(self.log, "Banning a peer with a good score"; "peer_id" => %peer_id); + info.apply_peer_action_to_score(score::PeerAction::Fatal); + } + } + } + + // Handle all the possible state changes + match (info.connection_status().clone(), new_state) { + /* Handle the transition to a connected state */ + ( + current_state, + NewConnectionState::Connected { + enr, + direction, + seen_address, + }, + ) => { + // Update the ENR if one exists + if let Some(enr) = enr { + info.set_enr(enr); + } + + match current_state { + PeerConnectionStatus::Disconnected { .. } => { + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + PeerConnectionStatus::Banned { .. } => { + error!(self.log, "Accepted a connection from a banned peer"; "peer_id" => %peer_id); + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + } + PeerConnectionStatus::Disconnecting { .. } => { + warn!(self.log, "Connected to a disconnecting peer"; "peer_id" => %peer_id) + } + PeerConnectionStatus::Unknown + | PeerConnectionStatus::Connected { .. } + | PeerConnectionStatus::Dialing { .. } => {} + } + + // Add the seen ip address and port to the peer's info + let socket_addr = match seen_address.iter().fold( + (None, None), + |(found_ip, found_port), protocol| match protocol { + Protocol::Ip4(ip) => (Some(ip.into()), found_port), + Protocol::Ip6(ip) => (Some(ip.into()), found_port), + Protocol::Tcp(port) => (found_ip, Some(port)), + _ => (found_ip, found_port), + }, + ) { + (Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)), + (Some(_ip), None) => { + crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id); + None + } + _ => None, + }; + + // Update the connection state + match direction { + ConnectionDirection::Incoming => info.connect_ingoing(socket_addr), + ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr), + } + } + + /* Handle the transition to the disconnected state */ + (old_state, NewConnectionState::Disconnected) => { + // Remove all subnets for disconnected peers. + info.clear_subnets(); + + match old_state { + PeerConnectionStatus::Banned { .. } => {} + PeerConnectionStatus::Disconnected { .. } => {} + PeerConnectionStatus::Disconnecting { to_ban } if to_ban => { + // Update the status. + info.set_connection_status(PeerConnectionStatus::Banned { + since: Instant::now(), + }); + self.banned_peers_count + .add_banned_peer(info.seen_ip_addresses()); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + self.shrink_to_fit(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + PeerConnectionStatus::Disconnecting { .. } + | PeerConnectionStatus::Unknown + | PeerConnectionStatus::Connected { .. } + | PeerConnectionStatus::Dialing { .. } => { + self.disconnected_peers += 1; + info.set_connection_status(PeerConnectionStatus::Disconnected { + since: Instant::now(), + }); + self.shrink_to_fit(); + } + } + } + + /* Handle the transition to the disconnecting state */ + ( + PeerConnectionStatus::Banned { .. } | PeerConnectionStatus::Disconnected { .. }, + NewConnectionState::Disconnecting { to_ban }, + ) => { + error!(self.log, "Disconnecting from an already disconnected peer"; "peer_id" => %peer_id); + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban }); + } + (_, NewConnectionState::Disconnecting { to_ban }) => { + // We overwrite all states and set this peer to be disconnecting. + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban }); + } + + /* Handle transitioning to the banned state */ + (PeerConnectionStatus::Disconnected { .. }, NewConnectionState::Banned) => { + // It is possible to ban a peer that is currently disconnected. This can occur when + // there are many events that score it poorly and are processed after it has disconnected. + info.set_connection_status(PeerConnectionStatus::Banned { + since: Instant::now(), + }); + self.banned_peers_count + .add_banned_peer(info.seen_ip_addresses()); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + self.shrink_to_fit(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + (PeerConnectionStatus::Disconnecting { .. }, NewConnectionState::Banned) => { + // NOTE: This can occur due a rapid downscore of a peer. It goes through the + // disconnection phase and straight into banning in a short time-frame. + debug!(log_ref, "Banning peer that is currently disconnecting"; "peer_id" => %peer_id); + // Ban the peer once the disconnection process completes. + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban: true }); + return Some(BanOperation::PeerDisconnecting); + } + (PeerConnectionStatus::Banned { .. }, NewConnectionState::Banned) => { + error!(log_ref, "Banning already banned peer"; "peer_id" => %peer_id); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + ( + PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. }, + NewConnectionState::Banned, + ) => { + // update the state + info.set_connection_status(PeerConnectionStatus::Disconnecting { to_ban: true }); + return Some(BanOperation::DisconnectThePeer); + } + (PeerConnectionStatus::Unknown, NewConnectionState::Banned) => { + // shift the peer straight to banned + warn!(log_ref, "Banning a peer of unknown connection state"; "peer_id" => %peer_id); + self.banned_peers_count + .add_banned_peer(info.seen_ip_addresses()); + info.set_connection_status(PeerConnectionStatus::Banned { + since: Instant::now(), + }); + let known_banned_ips = self.banned_peers_count.banned_ips(); + let banned_ips = info + .seen_ip_addresses() + .filter(|ip| known_banned_ips.contains(ip)) + .collect::>(); + self.shrink_to_fit(); + return Some(BanOperation::ReadyToBan(banned_ips)); + } + + /* Handle the connection state of unbanning a peer */ + (old_state, NewConnectionState::Unbanned) => { + if matches!(info.score_state(), ScoreState::Banned) { + error!(self.log, "Unbanning a banned peer"; "peer_id" => %peer_id); + } + match old_state { + PeerConnectionStatus::Unknown | PeerConnectionStatus::Connected { .. } => { + error!(self.log, "Unbanning a connected peer"; "peer_id" => %peer_id); + } + PeerConnectionStatus::Disconnected { .. } + | PeerConnectionStatus::Disconnecting { .. } => { + debug!(self.log, "Unbanning disconnected or disconnecting peer"; "peer_id" => %peer_id); + } // These are odd but fine. + PeerConnectionStatus::Dialing { .. } => {} // Also odd but acceptable + PeerConnectionStatus::Banned { since } => { + info.set_connection_status(PeerConnectionStatus::Disconnected { since }); + + // Increment the disconnected count and reduce the banned count + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + self.disconnected_peers = + self.disconnected_peers().count().saturating_add(1); + self.shrink_to_fit(); + } + } + } + } + None + } + + /// Sets the peer as disconnected. A banned peer remains banned. If the node has become banned, + /// this returns true, otherwise this is false. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn inject_disconnect(&mut self, peer_id: &PeerId) -> Option { + self.update_connection_state(peer_id, NewConnectionState::Disconnected) + } + + /// The peer manager has notified us that the peer is undergoing a normal disconnect. Optionally tag + /// the peer to be banned after the disconnect. + // VISIBILITY: Only the peer manager can adjust the connection state. + pub(super) fn notify_disconnecting(&mut self, peer_id: &PeerId, to_ban: bool) { + self.update_connection_state(peer_id, NewConnectionState::Disconnecting { to_ban }); + } + + /// Removes banned and disconnected peers from the DB if we have reached any of our limits. + /// Drops the peers with the lowest reputation so that the number of + /// disconnected peers is less than MAX_DC_PEERS + fn shrink_to_fit(&mut self) { + // Remove excess banned peers + while self.banned_peers_count.banned_peers() > MAX_BANNED_PEERS { + if let Some(to_drop) = if let Some((id, info, _)) = self + .peers + .iter() + .filter_map(|(id, info)| match info.connection_status() { + PeerConnectionStatus::Banned { since } => Some((id, info, since)), + _ => None, + }) + .min_by_key(|(_, _, since)| *since) + { + self.banned_peers_count + .remove_banned_peer(info.seen_ip_addresses()); + Some(*id) + } else { + // If there is no minimum, this is a coding error. + crit!( + self.log, + "banned_peers > MAX_BANNED_PEERS despite no banned peers in db!" + ); + // reset banned_peers this will also exit the loop + self.banned_peers_count = BannedPeersCount::new(); + None + } { + debug!(self.log, "Removing old banned peer"; "peer_id" => %to_drop); + self.peers.remove(&to_drop); + } + } + + // Remove excess disconnected peers + while self.disconnected_peers > MAX_DC_PEERS { + if let Some(to_drop) = self + .peers + .iter() + .filter(|(_, info)| info.is_disconnected()) + .filter_map(|(id, info)| match info.connection_status() { + PeerConnectionStatus::Disconnected { since } => Some((id, since)), + _ => None, + }) + .min_by_key(|(_, age)| *age) + .map(|(id, _)| *id) + { + debug!(self.log, "Removing old disconnected peer"; "peer_id" => %to_drop, "disconnected_size" => self.disconnected_peers.saturating_sub(1)); + self.peers.remove(&to_drop); + } + // If there is no minimum, this is a coding error. For safety we decrease + // the count to avoid a potential infinite loop. + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + } + + /// This handles score transitions between states. It transitions peers states from + /// disconnected/banned/connected. + fn handle_score_transition( + previous_state: ScoreState, + peer_id: &PeerId, + info: &PeerInfo, + log: &slog::Logger, + ) -> ScoreTransitionResult { + match (info.score_state(), previous_state) { + (ScoreState::Banned, ScoreState::Healthy | ScoreState::Disconnected) => { + debug!(log, "Peer has been banned"; "peer_id" => %peer_id, "score" => %info.score()); + ScoreTransitionResult::Banned + } + (ScoreState::Disconnected, ScoreState::Banned | ScoreState::Healthy) => { + debug!(log, "Peer transitioned to disconnect state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); + // disconnect the peer if it's currently connected or dialing + if info.is_connected_or_dialing() { + ScoreTransitionResult::Disconnected + } else if previous_state == ScoreState::Banned { + ScoreTransitionResult::Unbanned + } else { + // The peer was healthy, but is already disconnected, so there is no action to + // take. + ScoreTransitionResult::NoAction + } + } + (ScoreState::Healthy, ScoreState::Disconnected) => { + debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); + ScoreTransitionResult::NoAction + } + (ScoreState::Healthy, ScoreState::Banned) => { + debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); + // unban the peer if it was previously banned. + ScoreTransitionResult::Unbanned + } + // Explicitly ignore states that haven't transitioned. + (ScoreState::Healthy, ScoreState::Healthy) => ScoreTransitionResult::NoAction, + (ScoreState::Disconnected, ScoreState::Disconnected) => ScoreTransitionResult::NoAction, + + (ScoreState::Banned, ScoreState::Banned) => ScoreTransitionResult::NoAction, + } + } +} + +/// Internal enum for managing connection state transitions. +#[derive(Debug)] +enum NewConnectionState { + /// A peer has connected to us. + Connected { + /// An optional known ENR if the peer was dialed. + enr: Option, + /// The seen socket address associated with the connection. + seen_address: Multiaddr, + /// The direction, incoming/outgoing. + direction: ConnectionDirection, + }, + /// The peer is in the process of being disconnected. + Disconnecting { + /// Whether the peer should be banned after the disconnect occurs. + to_ban: bool, + }, + /// The peer has been disconnected from our local node. + Disconnected, + /// The peer has been banned and actions to shift the peer to the banned state should be + /// undertaken + Banned, + /// The peer has been unbanned and the connection state should be updated to reflect this. + Unbanned, +} + +/// The result of applying a score transition to a peer. +enum ScoreTransitionResult { + /// The peer has become disconnected. + Disconnected, + /// The peer has been banned. + Banned, + /// The peer has been unbanned. + Unbanned, + /// No state change occurred. + NoAction, +} + +/// The type of results that can happen from executing the `report_peer` function. +pub enum ScoreUpdateResult { + /// The reported peer must be banned. + Ban(BanOperation), + /// The reported peer transitioned to the disconnected state and must be disconnected. + Disconnect, + /// The peer has been unbanned and this needs to be propagated to libp2p. The list of unbanned + /// IP addresses are sent along with it. + Unbanned(Vec), + /// The report requires no further action. + NoAction, +} + +impl From> for ScoreUpdateResult { + fn from(ban_operation: Option) -> Self { + match ban_operation { + None => ScoreUpdateResult::NoAction, + Some(bo) => ScoreUpdateResult::Ban(bo), + } + } +} + /// When attempting to ban a peer provides the peer manager with the operation that must be taken. pub enum BanOperation { // The peer is currently connected. Perform a graceful disconnect before banning at the swarm // level. DisconnectThePeer, - // The peer is disconnected, it has now been banned and can be banned at the swarm level. - ReadyToBan, + // The peer is disconnected, it has now been banned and can be banned at the swarm level. It + // stores a collection of banned IP addresses to inform the swarm. + ReadyToBan(Vec), // The peer is currently being disconnected, nothing to do. PeerDisconnecting, } @@ -93,6 +1117,14 @@ impl BannedPeersCount { self.banned_peers } + pub fn banned_ips(&self) -> HashSet { + self.banned_peers_per_ip + .iter() + .filter(|(_ip, count)| **count > BANNED_PEERS_PER_IP_THRESHOLD) + .map(|(ip, _count)| *ip) + .collect() + } + /// An IP is considered banned if more than BANNED_PEERS_PER_IP_THRESHOLD banned peers /// exist with this IP pub fn ip_is_banned(&self, ip: &IpAddr) -> bool { @@ -109,581 +1141,6 @@ impl BannedPeersCount { } } -impl PeerDB { - pub fn new(trusted_peers: Vec, log: &slog::Logger) -> Self { - // Initialize the peers hashmap with trusted peers - let peers = trusted_peers - .into_iter() - .map(|peer_id| (peer_id, PeerInfo::trusted_peer_info())) - .collect(); - Self { - log: log.clone(), - disconnected_peers: 0, - banned_peers_count: BannedPeersCount::new(), - peers, - } - } - - /* Getters */ - - /// Gives the score of a peer, or default score if it is unknown. - pub fn score(&self, peer_id: &PeerId) -> f64 { - self.peers - .get(peer_id) - .map_or(&Score::default(), |info| info.score()) - .score() - } - - /// Returns an iterator over all peers in the db. - pub fn peers(&self) -> impl Iterator)> { - self.peers.iter() - } - - /// Returns an iterator over all peers in the db. - pub(super) fn peers_mut(&mut self) -> impl Iterator)> { - self.peers.iter_mut() - } - - /// Gives the ids of all known peers. - pub fn peer_ids(&self) -> impl Iterator { - self.peers.keys() - } - - /// Returns a peer's info, if known. - pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - self.peers.get(peer_id) - } - - /// Returns a mutable reference to a peer's info if known. - pub fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { - self.peers.get_mut(peer_id) - } - - /// Returns if the peer is already connected. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Connected { .. }) - ) - } - - /// If we are connected or currently dialing the peer returns true. - pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Connected { .. }) - | Some(PeerConnectionStatus::Dialing { .. }) - ) - } - - /// If we are connected or in the process of disconnecting - pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Connected { .. }) - | Some(PeerConnectionStatus::Disconnecting { .. }) - ) - } - - /// Returns true if the peer should be dialed. This checks the connection state and the - /// score state and determines if the peer manager should dial this peer. - pub fn should_dial(&self, peer_id: &PeerId) -> bool { - matches!( - self.connection_status(peer_id), - Some(PeerConnectionStatus::Disconnected { .. }) - | Some(PeerConnectionStatus::Unknown { .. }) - | None - ) && !self.is_banned_or_disconnected(peer_id) - } - - /// Returns true if the peer is synced at least to our current head. - pub fn is_synced(&self, peer_id: &PeerId) -> bool { - match self.peers.get(peer_id).map(|info| &info.sync_status) { - Some(PeerSyncStatus::Synced { .. }) => true, - Some(_) => false, - None => false, - } - } - - /// Returns the current [`BanResult`] of the peer. This doesn't check the connection state, rather the - /// underlying score of the peer. A peer may be banned but still in the connected state - /// temporarily. - /// - /// This is used to determine if we should accept incoming connections or not. - pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { - if let Some(peer) = self.peers.get(peer_id) { - match peer.score_state() { - ScoreState::Banned => BanResult::BadScore, - _ => { - if let Some(ip) = self.ip_is_banned(peer) { - BanResult::BannedIp(ip) - } else { - BanResult::NotBanned - } - } - } - } else { - BanResult::NotBanned - } - } - - /// Checks if the peer's known addresses are currently banned. - fn ip_is_banned(&self, peer: &PeerInfo) -> Option { - peer.seen_addresses() - .find(|ip| self.banned_peers_count.ip_is_banned(ip)) - } - - /// Returns true if the IP is banned. - pub fn is_ip_banned(&self, ip: &IpAddr) -> bool { - self.banned_peers_count.ip_is_banned(ip) - } - - /// Returns true if the Peer is either banned or in the disconnected state. - fn is_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { - if let Some(peer) = self.peers.get(peer_id) { - match peer.score_state() { - ScoreState::Banned | ScoreState::Disconnected => true, - _ => self.ip_is_banned(peer).is_some(), - } - } else { - false - } - } - - /// Gives the ids and info of all known connected peers. - pub fn connected_peers(&self) -> impl Iterator)> { - self.peers.iter().filter(|(_, info)| info.is_connected()) - } - - /// Gives the ids of all known connected peers. - pub fn connected_peer_ids(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_connected()) - .map(|(peer_id, _)| peer_id) - } - - /// Connected or dialing peers - pub fn connected_or_dialing_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_connected() || info.is_dialing()) - .map(|(peer_id, _)| peer_id) - } - - /// Connected outbound-only peers - pub fn connected_outbound_only_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_outbound_only()) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the `peer_id` of all known connected and synced peers. - pub fn synced_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| { - if info.sync_status.is_synced() || info.sync_status.is_advanced() { - return info.is_connected(); - } - false - }) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the `peer_id` of all known connected and advanced peers. - pub fn advanced_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| { - if info.sync_status.is_advanced() { - return info.is_connected(); - } - false - }) - .map(|(peer_id, _)| peer_id) - } - - /// Gives an iterator of all peers on a given subnet. - pub fn good_peers_on_subnet(&self, subnet: Subnet) -> impl Iterator { - self.peers - .iter() - .filter(move |(_, info)| { - // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers - info.is_connected() - && info.on_subnet_metadata(&subnet) - && info.on_subnet_gossipsub(&subnet) - && info.is_good_gossipsub_peer() - }) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the ids of all known disconnected peers. - pub fn disconnected_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_disconnected()) - .map(|(peer_id, _)| peer_id) - } - - /// Gives the ids of all known banned peers. - pub fn banned_peers(&self) -> impl Iterator { - self.peers - .iter() - .filter(|(_, info)| info.is_banned()) - .map(|(peer_id, _)| peer_id) - } - - /// Returns a vector of all connected peers sorted by score beginning with the worst scores. - /// Ties get broken randomly. - pub fn worst_connected_peers(&self) -> Vec<(&PeerId, &PeerInfo)> { - let mut connected = self - .peers - .iter() - .filter(|(_, info)| info.is_connected()) - .collect::>(); - - connected.shuffle(&mut rand::thread_rng()); - connected.sort_by_key(|(_, info)| info.score()); - connected - } - - /// Returns a vector containing peers (their ids and info), sorted by - /// score from highest to lowest, and filtered using `is_status` - pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> - where - F: Fn(&PeerInfo) -> bool, - { - let mut by_status = self - .peers - .iter() - .filter(|(_, info)| is_status(info)) - .collect::>(); - by_status.sort_by_key(|(_, info)| info.score()); - by_status.into_iter().rev().collect() - } - - /// Returns the peer with highest reputation that satisfies `is_status` - pub fn best_by_status(&self, is_status: F) -> Option<&PeerId> - where - F: Fn(&PeerInfo) -> bool, - { - self.peers - .iter() - .filter(|(_, info)| is_status(info)) - .max_by_key(|(_, info)| info.score()) - .map(|(id, _)| id) - } - - /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. - pub fn connection_status(&self, peer_id: &PeerId) -> Option { - self.peer_info(peer_id) - .map(|info| info.connection_status().clone()) - } - - /* Setters */ - - /// A peer is being dialed. - pub fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { - let info = self.peers.entry(*peer_id).or_default(); - info.enr = enr; - - if info.is_disconnected() { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - - if info.is_banned() { - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - } - - if let Err(e) = info.dialing_peer() { - error!(self.log, "{}", e; "peer_id" => %peer_id); - } - } - - /// Update min ttl of a peer. - pub fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) { - let info = self.peers.entry(*peer_id).or_default(); - - // only update if the ttl is longer - if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { - info.min_ttl = Some(min_ttl); - - let min_ttl_secs = min_ttl - .checked_duration_since(Instant::now()) - .map(|duration| duration.as_secs()) - .unwrap_or_else(|| 0); - debug!(self.log, "Updating the time a peer is required for"; "peer_id" => %peer_id, "future_min_ttl_secs" => min_ttl_secs); - } - } - - /// Extends the ttl of all peers on the given subnet that have a shorter - /// min_ttl than what's given. - pub fn extend_peers_on_subnet(&mut self, subnet: &Subnet, min_ttl: Instant) { - let log = &self.log; - self.peers.iter_mut() - .filter(move |(_, info)| { - info.is_connected() && info.on_subnet_metadata(subnet) && info.on_subnet_gossipsub(subnet) - }) - .for_each(|(peer_id,info)| { - if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { - info.min_ttl = Some(min_ttl); - } - let min_ttl_secs = min_ttl - .checked_duration_since(Instant::now()) - .map(|duration| duration.as_secs()) - .unwrap_or_else(|| 0); - trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => %peer_id, "min_ttl" => min_ttl_secs); - }); - } - - fn connect( - &mut self, - peer_id: &PeerId, - multiaddr: Multiaddr, - enr: Option, - direction: ConnectionDirection, - ) { - let info = self.peers.entry(*peer_id).or_default(); - info.enr = enr; - - if info.is_disconnected() { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - - if info.is_banned() { - error!(self.log, "Accepted a connection from a banned peer"; "peer_id" => %peer_id); - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - } - - // Add the seen ip address and port to the peer's info - let socket_addr = match multiaddr.iter().fold( - (None, None), - |(found_ip, found_port), protocol| match protocol { - Protocol::Ip4(ip) => (Some(ip.into()), found_port), - Protocol::Ip6(ip) => (Some(ip.into()), found_port), - Protocol::Tcp(port) => (found_ip, Some(port)), - _ => (found_ip, found_port), - }, - ) { - (Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)), - (Some(_ip), None) => { - crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id); - None - } - _ => None, - }; - - match direction { - ConnectionDirection::Incoming => info.connect_ingoing(socket_addr), - ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr), - } - } - /// Sets a peer as connected with an ingoing connection. - pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { - self.connect(peer_id, multiaddr, enr, ConnectionDirection::Incoming) - } - - /// Sets a peer as connected with an outgoing connection. - pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { - self.connect(peer_id, multiaddr, enr, ConnectionDirection::Outgoing) - } - - /// Sets the peer as disconnected. A banned peer remains banned. If the node has become banned, - /// this returns true, otherwise this is false. - pub fn inject_disconnect(&mut self, peer_id: &PeerId) -> bool { - // Note that it could be the case we prevent new nodes from joining. In this instance, - // we don't bother tracking the new node. - if let Some(info) = self.peers.get_mut(peer_id) { - if !matches!( - info.connection_status(), - PeerConnectionStatus::Disconnected { .. } | PeerConnectionStatus::Banned { .. } - ) { - self.disconnected_peers += 1; - } - let result = info.notify_disconnect().unwrap_or(false); - self.shrink_to_fit(); - result - } else { - false - } - } - - /// Notifies the peer manager that the peer is undergoing a normal disconnect. Optionally tag - /// the peer to be banned after the disconnect. - pub fn notify_disconnecting(&mut self, peer_id: PeerId, to_ban_afterwards: bool) { - let peer_info = self.peers.entry(peer_id).or_default(); - - if matches!( - peer_info.connection_status(), - PeerConnectionStatus::Disconnected { .. } - ) { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - peer_info.disconnecting(to_ban_afterwards); - } - - /// Marks a peer to be disconnected and then banned. - /// Returns true if the peer is currently connected and false otherwise. - // NOTE: If the peer's score is not already low enough to be banned, this will decrease the - // peer's score to be a banned state. - pub fn disconnect_and_ban(&mut self, peer_id: &PeerId) -> BanOperation { - let log_ref = &self.log; - let info = self.peers.entry(*peer_id).or_insert_with(|| { - warn!(log_ref, "Banning unknown peer"; - "peer_id" => %peer_id); - PeerInfo::default() - }); - - // Ban the peer if the score is not already low enough. - match info.score_state() { - ScoreState::Banned => {} - _ => { - // If score isn't low enough to ban, this function has been called incorrectly. - error!(self.log, "Banning a peer with a good score"; "peer_id" => %peer_id); - info.apply_peer_action_to_score(super::score::PeerAction::Fatal); - } - } - - // Check and verify all the connection states - match info.connection_status() { - PeerConnectionStatus::Disconnected { .. } => { - // It is possible to ban a peer that has a disconnected score, if there are many - // events that score it poorly and are processed after it has disconnected. - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - info.update_state(); - self.banned_peers_count - .add_banned_peer(info.seen_addresses()); - self.shrink_to_fit(); - BanOperation::ReadyToBan - } - PeerConnectionStatus::Disconnecting { .. } => { - // NOTE: This can occur due a rapid downscore of a peer. It goes through the - // disconnection phase and straight into banning in a short time-frame. - debug!(log_ref, "Banning peer that is currently disconnecting"; "peer_id" => %peer_id); - info.disconnecting(true); - BanOperation::PeerDisconnecting - } - PeerConnectionStatus::Banned { .. } => { - error!(log_ref, "Banning already banned peer"; "peer_id" => %peer_id); - BanOperation::ReadyToBan - } - PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. } => { - // update the state - info.disconnecting(true); - BanOperation::DisconnectThePeer - } - PeerConnectionStatus::Unknown => { - // shift the peer straight to banned - warn!(log_ref, "Banning a peer of unknown connection state"; "peer_id" => %peer_id); - self.banned_peers_count - .add_banned_peer(info.seen_addresses()); - info.update_state(); - self.shrink_to_fit(); - BanOperation::ReadyToBan - } - } - } - - /// Unbans a peer. - /// - /// This should only be called once a peer's score is no longer banned. - /// If this is called for a banned peer, it will error. - /// - /// This updates the connection state of the peer and updates the number of banned peers in the - /// peerdb. - pub fn unban(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { - let log_ref = &self.log; - let info = self.peers.entry(*peer_id).or_insert_with(|| { - warn!(log_ref, "UnBanning unknown peer"; - "peer_id" => %peer_id); - PeerInfo::default() - }); - - if let ScoreState::Banned = info.score_state() { - return Err("Attempted to unban (connection status) a banned peer"); - } - - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - - // Update the connection state - info.update_state(); - - // This transitions a banned peer to a disconnected peer - self.disconnected_peers = self.disconnected_peers().count().saturating_add(1); - self.shrink_to_fit(); - Ok(()) - } - - /// Removes banned and disconnected peers from the DB if we have reached any of our limits. - /// Drops the peers with the lowest reputation so that the number of - /// disconnected peers is less than MAX_DC_PEERS - pub fn shrink_to_fit(&mut self) { - // Remove excess banned peers - while self.banned_peers_count.banned_peers() > MAX_BANNED_PEERS { - if let Some(to_drop) = if let Some((id, info, _)) = self - .peers - .iter() - .filter_map(|(id, info)| match info.connection_status() { - PeerConnectionStatus::Banned { since } => Some((id, info, since)), - _ => None, - }) - .min_by_key(|(_, _, since)| *since) - { - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - Some(*id) - } else { - // If there is no minimum, this is a coding error. - crit!( - self.log, - "banned_peers > MAX_BANNED_PEERS despite no banned peers in db!" - ); - // reset banned_peers this will also exit the loop - self.banned_peers_count = BannedPeersCount::new(); - None - } { - debug!(self.log, "Removing old banned peer"; "peer_id" => %to_drop); - self.peers.remove(&to_drop); - } - } - - // Remove excess disconnected peers - while self.disconnected_peers > MAX_DC_PEERS { - if let Some(to_drop) = self - .peers - .iter() - .filter(|(_, info)| info.is_disconnected()) - .filter_map(|(id, info)| match info.connection_status() { - PeerConnectionStatus::Disconnected { since } => Some((id, since)), - _ => None, - }) - .min_by_key(|(_, since)| *since) - .map(|(id, _)| *id) - { - debug!(self.log, "Removing old disconnected peer"; "peer_id" => %to_drop); - self.peers.remove(&to_drop); - } - // If there is no minimum, this is a coding error. For safety we decrease - // the count to avoid a potential infinite loop. - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - } - - /// Add the meta data of a peer. - pub fn add_metadata(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.peers.get_mut(peer_id) { - peer_info.meta_data = Some(meta_data); - } else { - warn!(self.log, "Tried to add meta data for a non-existent peer"; "peer_id" => %peer_id); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -769,6 +1226,88 @@ mod tests { assert_eq!(pdb.connected_outbound_only_peers().count(), 1); } + #[test] + fn test_disconnected_removed_in_correct_order() { + let mut pdb = get_db(); + + use std::collections::BTreeMap; + let mut peer_list = BTreeMap::new(); + for id in 0..MAX_DC_PEERS + 1 { + let new_peer = PeerId::random(); + pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_list.insert(id, new_peer); + } + assert_eq!(pdb.disconnected_peers, 0); + + for (_, p) in peer_list.iter() { + pdb.inject_disconnect(p); + // Allow the timing to update correctly + } + assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + + // Only the oldest peer should have been removed + for (id, peer_id) in peer_list.iter().rev().take(MAX_DC_PEERS) { + println!("Testing id {}", id); + assert!( + pdb.peer_info(peer_id).is_some(), + "Latest peer should not be pruned" + ); + } + + assert!( + pdb.peer_info(peer_list.iter().next().unwrap().1).is_none(), + "First peer should be removed" + ); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + } + + #[test] + fn new_connection_should_remain() { + let mut pdb = get_db(); + + use std::collections::BTreeMap; + let mut peer_list = BTreeMap::new(); + for id in 0..MAX_DC_PEERS + 20 { + let new_peer = PeerId::random(); + pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_list.insert(id, new_peer); + } + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + for (_, p) in peer_list.iter() { + pdb.inject_disconnect(p); + } + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + println!("{}", pdb.disconnected_peers); + + peer_list.clear(); + for id in 0..MAX_DC_PEERS + 20 { + let new_peer = PeerId::random(); + pdb.connect_ingoing(&new_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_list.insert(id, new_peer); + } + + let new_peer = PeerId::random(); + // New peer gets its min_ttl updated because it exists on a subnet + let min_ttl = Instant::now() + std::time::Duration::from_secs(12); + + pdb.update_min_ttl(&new_peer, min_ttl); + // Peer then gets dialed + pdb.dialing_peer(&new_peer, None); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + // Dialing fails, remove the peer + pdb.inject_disconnect(&new_peer); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + + assert!( + pdb.peer_info(&new_peer).is_some(), + "Peer should exist as disconnected" + ); + + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + println!("{}", pdb.disconnected_peers); + } + #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); @@ -782,6 +1321,7 @@ mod tests { for p in pdb.connected_peer_ids().cloned().collect::>() { pdb.inject_disconnect(&p); } + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS); } @@ -797,9 +1337,8 @@ mod tests { assert_eq!(pdb.banned_peers_count.banned_peers(), 0); for p in pdb.connected_peer_ids().cloned().collect::>() { - pdb.disconnect_and_ban(&p); + let _ = pdb.report_peer(&p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&p); - pdb.disconnect_and_ban(&p); } assert_eq!(pdb.banned_peers_count.banned_peers(), MAX_BANNED_PEERS); @@ -867,9 +1406,9 @@ mod tests { pdb.inject_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); - pdb.disconnect_and_ban(&random_peer); + let _ = pdb.report_peer(&random_peer, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer); - pdb.disconnect_and_ban(&random_peer); + let _ = pdb.report_peer(&random_peer, PeerAction::Fatal, ReportSource::PeerManager); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); pdb.inject_disconnect(&random_peer); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); @@ -906,7 +1445,7 @@ mod tests { assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), - pdb.banned_peers().count() + pdb.banned_peers_by_score().count() ); // Should be no disconnected peers @@ -922,7 +1461,7 @@ mod tests { pdb.disconnected_peers, pdb.banned_peers_count.banned_peers ); // Disconnect and ban peer 2 - pdb.disconnect_and_ban(&random_peer2); + let _ = pdb.report_peer(&random_peer2, PeerAction::Fatal, ReportSource::PeerManager); // Should be 1 disconnected peer and one peer in the process of being disconnected println!( "3:{},{}", @@ -936,7 +1475,7 @@ mod tests { pdb.disconnected_peers, pdb.banned_peers_count.banned_peers ); // Now that the peer is disconnected, register the ban. - pdb.disconnect_and_ban(&random_peer2); + let _ = pdb.report_peer(&random_peer2, PeerAction::Fatal, ReportSource::PeerManager); // There should be 1 disconnected peer and one banned peer. println!( "5:{},{}", @@ -950,7 +1489,7 @@ mod tests { pdb.banned_peers().count() ); // Now ban peer 1. - pdb.disconnect_and_ban(&random_peer1); + let _ = pdb.report_peer(&random_peer1, PeerAction::Fatal, ReportSource::PeerManager); // There should be no disconnected peers and 2 banned peers println!( "6:{},{}", @@ -964,7 +1503,7 @@ mod tests { pdb.disconnected_peers, pdb.banned_peers_count.banned_peers ); // Same thing here. - pdb.disconnect_and_ban(&random_peer1); + let _ = pdb.report_peer(&random_peer1, PeerAction::Fatal, ReportSource::PeerManager); println!( "8:{},{}", pdb.disconnected_peers, pdb.banned_peers_count.banned_peers @@ -1000,9 +1539,8 @@ mod tests { ); // Ban peer 3 - pdb.disconnect_and_ban(&random_peer3); + let _ = pdb.report_peer(&random_peer3, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer3); - pdb.disconnect_and_ban(&random_peer3); // This should add a new banned peer, there should be 0 disconnected and 2 banned // peers (peer1 and peer3) @@ -1018,9 +1556,8 @@ mod tests { ); // Ban peer 3 - pdb.disconnect_and_ban(&random_peer3); + let _ = pdb.report_peer(&random_peer3, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer3); - pdb.disconnect_and_ban(&random_peer3); // Should still have 2 banned peers println!( @@ -1049,9 +1586,8 @@ mod tests { ); // Ban peer 3 - pdb.disconnect_and_ban(&random_peer3); + let _ = pdb.report_peer(&random_peer3, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer3); - pdb.disconnect_and_ban(&random_peer3); // Should have 1 disconnect (peer 2) and one banned (peer 3) println!( @@ -1101,9 +1637,8 @@ mod tests { ); // Ban peer 0 - pdb.disconnect_and_ban(&random_peer); + let _ = pdb.report_peer(&random_peer, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&random_peer); - pdb.disconnect_and_ban(&random_peer); // Should have 1 disconnect ( peer 2) and two banned (peer0, peer 3) println!( @@ -1154,9 +1689,8 @@ mod tests { let p5 = connect_peer_with_ips(&mut pdb, vec![ip5]); for p in &peers[..BANNED_PEERS_PER_IP_THRESHOLD + 1] { - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } //check that ip1 and ip2 are banned but ip3-5 not @@ -1167,9 +1701,12 @@ mod tests { assert!(!pdb.ban_status(&p5).is_banned()); //ban also the last peer in peers - pdb.disconnect_and_ban(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); + let _ = pdb.report_peer( + &peers[BANNED_PEERS_PER_IP_THRESHOLD + 1], + PeerAction::Fatal, + ReportSource::PeerManager, + ); pdb.inject_disconnect(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); - pdb.disconnect_and_ban(&peers[BANNED_PEERS_PER_IP_THRESHOLD + 1]); //check that ip1-ip4 are banned but ip5 not assert!(pdb.ban_status(&p1).is_banned()); @@ -1180,7 +1717,7 @@ mod tests { //peers[0] gets unbanned reset_score(&mut pdb, &peers[0]); - pdb.unban(&peers[0]).unwrap(); + pdb.update_connection_state(&peers[0], NewConnectionState::Unbanned); //nothing changed assert!(pdb.ban_status(&p1).is_banned()); @@ -1191,7 +1728,7 @@ mod tests { //peers[1] gets unbanned reset_score(&mut pdb, &peers[1]); - pdb.unban(&peers[1]).unwrap(); + pdb.update_connection_state(&peers[1], NewConnectionState::Unbanned); //all ips are unbanned assert!(!pdb.ban_status(&p1).is_banned()); @@ -1218,9 +1755,8 @@ mod tests { // ban all peers for p in &peers { - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } // check ip is banned @@ -1229,20 +1765,25 @@ mod tests { // unban a peer reset_score(&mut pdb, &peers[0]); - pdb.unban(&peers[0]).unwrap(); + pdb.update_connection_state(&peers[0], NewConnectionState::Unbanned); // check not banned anymore assert!(!pdb.ban_status(&p1).is_banned()); assert!(!pdb.ban_status(&p2).is_banned()); + // unban all peers + for p in &peers { + reset_score(&mut pdb, p); + pdb.update_connection_state(p, NewConnectionState::Unbanned); + } + // add ip2 to all peers and ban them. let mut socker_addr = Multiaddr::from(ip2); socker_addr.push(Protocol::Tcp(8080)); for p in &peers { pdb.connect_ingoing(p, socker_addr.clone(), None); - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } // both IP's are now banned @@ -1252,24 +1793,22 @@ mod tests { // unban all peers for p in &peers { reset_score(&mut pdb, p); - pdb.unban(p).unwrap(); + pdb.update_connection_state(p, NewConnectionState::Unbanned); } // reban every peer except one for p in &peers[1..] { - pdb.disconnect_and_ban(p); + let _ = pdb.report_peer(p, PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(p); - pdb.disconnect_and_ban(p); } // nothing is banned assert!(!pdb.ban_status(&p1).is_banned()); assert!(!pdb.ban_status(&p2).is_banned()); - //reban last peer - pdb.disconnect_and_ban(&peers[0]); + // reban last peer + let _ = pdb.report_peer(&peers[0], PeerAction::Fatal, ReportSource::PeerManager); pdb.inject_disconnect(&peers[0]); - pdb.disconnect_and_ban(&peers[0]); //Ip's are banned again assert!(pdb.ban_status(&p1).is_banned()); @@ -1286,7 +1825,7 @@ mod tests { pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Check trusted status and score - assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted); + assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted()); assert_eq!( pdb.peer_info(&trusted_peer).unwrap().score().score(), Score::max_score().score() diff --git a/beacon_node/eth2_libp2p/src/peer_manager/client.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/client.rs similarity index 100% rename from beacon_node/eth2_libp2p/src/peer_manager/client.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/client.rs diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/peer_info.rs similarity index 70% rename from beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/peer_info.rs index 8d3912041..82aaefc63 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/peer_info.rs @@ -1,6 +1,6 @@ use super::client::Client; use super::score::{PeerAction, Score, ScoreState}; -use super::PeerSyncStatus; +use super::sync_status::SyncStatus; use crate::Multiaddr; use crate::{rpc::MetaData, types::Subnet}; use discv5::Enr; @@ -24,34 +24,34 @@ pub struct PeerInfo { /// The peers reputation score: Score, /// Client managing this peer - pub client: Client, + client: Client, /// Connection status of this peer connection_status: PeerConnectionStatus, /// The known listening addresses of this peer. This is given by identify and can be arbitrary /// (including local IPs). - pub listening_addresses: Vec, + listening_addresses: Vec, /// This is addresses we have physically seen and this is what we use for banning/un-banning /// peers. - pub seen_addresses: HashSet, + seen_addresses: HashSet, /// The current syncing state of the peer. The state may be determined after it's initial /// connection. - pub sync_status: PeerSyncStatus, + sync_status: SyncStatus, /// The ENR subnet bitfield of the peer. This may be determined after it's initial /// connection. - pub meta_data: Option>, + meta_data: Option>, /// Subnets the peer is connected to. - pub subnets: HashSet, + subnets: HashSet, /// The time we would like to retain this peer. After this time, the peer is no longer /// necessary. #[serde(skip)] - pub min_ttl: Option, + min_ttl: Option, /// Is the peer a trusted peer. - pub is_trusted: bool, + is_trusted: bool, /// Direction of the first connection of the last (or current) connected session with this peer. /// None if this peer was never connected. - pub connection_direction: Option, + connection_direction: Option, /// The enr of the peer, if known. - pub enr: Option, + enr: Option, } impl Default for PeerInfo { @@ -64,7 +64,7 @@ impl Default for PeerInfo { listening_addresses: Vec::new(), seen_addresses: HashSet::new(), subnets: HashSet::new(), - sync_status: PeerSyncStatus::Unknown, + sync_status: SyncStatus::Unknown, meta_data: None, min_ttl: None, is_trusted: false, @@ -101,13 +101,59 @@ impl PeerInfo { false } + /// Obtains the client of the peer. + pub fn client(&self) -> &Client { + &self.client + } + + /// Returns the listening addresses of the Peer. + pub fn listening_addresses(&self) -> &Vec { + &self.listening_addresses + } + + /// Returns the connection direction for the peer. + pub fn connection_direction(&self) -> Option<&ConnectionDirection> { + self.connection_direction.as_ref() + } + + /// Returns the sync status of the peer. + pub fn sync_status(&self) -> &SyncStatus { + &self.sync_status + } + + /// Returns the metadata for the peer if currently known. + pub fn meta_data(&self) -> Option<&MetaData> { + self.meta_data.as_ref() + } + + /// Returns whether the peer is a trusted peer or not. + pub fn is_trusted(&self) -> bool { + self.is_trusted + } + + /// The time a peer is expected to be useful until for an attached validator. If this is set to + /// None, the peer is not required for any upcoming duty. + pub fn min_ttl(&self) -> Option<&Instant> { + self.min_ttl.as_ref() + } + + /// The ENR of the peer if it is known. + pub fn enr(&self) -> Option<&Enr> { + self.enr.as_ref() + } + /// Returns if the peer is subscribed to a given `Subnet` from the gossipsub subscriptions. pub fn on_subnet_gossipsub(&self, subnet: &Subnet) -> bool { self.subnets.contains(subnet) } - /// Returns the seen IP addresses of the peer. - pub fn seen_addresses(&self) -> impl Iterator + '_ { + /// Returns the seen addresses of the peer. + pub fn seen_addresses(&self) -> impl Iterator + '_ { + self.seen_addresses.iter() + } + + /// Returns a list of seen IP addresses for the peer. + pub fn seen_ip_addresses(&self) -> impl Iterator + '_ { self.seen_addresses .iter() .map(|socket_addr| socket_addr.ip()) @@ -133,34 +179,11 @@ impl PeerInfo { self.score.state() } - /// Applies decay rates to a non-trusted peer's score. - pub fn score_update(&mut self) { - if !self.is_trusted { - self.score.update() - } - } - - /// Apply peer action to a non-trusted peer's score. - pub fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) { - if !self.is_trusted { - self.score.apply_peer_action(peer_action) - } - } - - pub(crate) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) { - self.score.update_gossipsub_score(new_score, ignore); - } - + /// Returns true if the gossipsub score is sufficient. pub fn is_good_gossipsub_peer(&self) -> bool { self.score.is_good_gossipsub_peer() } - #[cfg(test)] - /// Resets the peers score. - pub fn reset_score(&mut self) { - self.score.test_reset(); - } - /* Peer connection status API */ /// Checks if the status is connected. @@ -181,8 +204,14 @@ impl PeerInfo { self.is_connected() || self.is_dialing() } - /// Checks if the status is banned. + /// Checks if the connection status is banned. This can lag behind the score state + /// temporarily. pub fn is_banned(&self) -> bool { + matches!(self.connection_status, PeerConnectionStatus::Banned { .. }) + } + + /// Checks if the peer's score is banned. + pub fn score_is_banned(&self) -> bool { matches!(self.score.state(), ScoreState::Banned) } @@ -204,53 +233,95 @@ impl PeerInfo { } } - // Setters + /* Mutable Functions */ - /// Modifies the status to Disconnected and sets the last seen instant to now. Returns None if - /// no changes were made. Returns Some(bool) where the bool represents if peer is to now be - /// banned. - pub fn notify_disconnect(&mut self) -> Option { - match self.connection_status { - Banned { .. } | Disconnected { .. } => None, - Disconnecting { to_ban } => { - self.connection_status = Disconnected { - since: Instant::now(), - }; - Some(to_ban) - } - Connected { .. } | Dialing { .. } | Unknown => { - self.connection_status = Disconnected { - since: Instant::now(), - }; - Some(false) - } + /// Updates the sync status. Returns true if the status was changed. + // VISIBILITY: Both the peer manager the network sync is able to update the sync state of a peer + pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool { + self.sync_status.update(sync_status) + } + + /// Sets the client of the peer. + // VISIBILITY: The peer manager is able to set the client + pub(in crate::peer_manager) fn set_client(&mut self, client: Client) { + self.client = client + } + + /// Replaces the current listening addresses with those specified, returning the current + /// listening addresses. + // VISIBILITY: The peer manager is able to set the listening addresses + pub(in crate::peer_manager) fn set_listening_addresses( + &mut self, + listening_addresses: Vec, + ) -> Vec { + std::mem::replace(&mut self.listening_addresses, listening_addresses) + } + + /// Sets an explicit value for the meta data. + // VISIBILITY: The peer manager is able to adjust the meta_data + pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData) { + self.meta_data = Some(meta_data) + } + + /// Sets the connection status of the peer. + pub(super) fn set_connection_status(&mut self, connection_status: PeerConnectionStatus) { + self.connection_status = connection_status + } + + /// Sets the ENR of the peer if one is known. + pub(super) fn set_enr(&mut self, enr: Enr) { + self.enr = Some(enr) + } + + /// Sets the time that the peer is expected to be needed until for an attached validator duty. + pub(super) fn set_min_ttl(&mut self, min_ttl: Instant) { + self.min_ttl = Some(min_ttl) + } + + /// Adds a known subnet for the peer. + pub(super) fn insert_subnet(&mut self, subnet: Subnet) { + self.subnets.insert(subnet); + } + + /// Removes a subnet from the peer. + pub(super) fn remove_subnet(&mut self, subnet: &Subnet) { + self.subnets.remove(subnet); + } + + /// Removes all subnets from the peer. + pub(super) fn clear_subnets(&mut self) { + self.subnets.clear() + } + + /// Applies decay rates to a non-trusted peer's score. + pub(super) fn score_update(&mut self) { + if !self.is_trusted { + self.score.update() } } - /// Notify the we are currently disconnecting this peer. Optionally ban the peer after the - /// disconnect. - pub fn disconnecting(&mut self, to_ban: bool) { - self.connection_status = Disconnecting { to_ban } + /// Apply peer action to a non-trusted peer's score. + // VISIBILITY: The peer manager is able to modify the score of a peer. + pub(in crate::peer_manager) fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) { + if !self.is_trusted { + self.score.apply_peer_action(peer_action) + } } - /// Modifies the status to banned or unbanned based on the underlying score. - pub fn update_state(&mut self) { - match (&self.connection_status, self.score.state()) { - (Disconnected { .. } | Unknown, ScoreState::Banned) => { - self.connection_status = Banned { - since: Instant::now(), - } - } - (Banned { since }, ScoreState::Healthy | ScoreState::Disconnected) => { - self.connection_status = Disconnected { since: *since } - } - (_, _) => {} - } + /// Updates the gossipsub score with a new score. Optionally ignore the gossipsub score. + pub(super) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) { + self.score.update_gossipsub_score(new_score, ignore); + } + + #[cfg(test)] + /// Resets the peers score. + pub fn reset_score(&mut self) { + self.score.test_reset(); } /// Modifies the status to Dialing /// Returns an error if the current state is unexpected. - pub(crate) fn dialing_peer(&mut self) -> Result<(), &'static str> { + pub(super) fn dialing_peer(&mut self) -> Result<(), &'static str> { match &mut self.connection_status { Connected { .. } => return Err("Dialing connected peer"), Dialing { .. } => return Err("Dialing an already dialing peer"), @@ -265,7 +336,7 @@ impl PeerInfo { /// Modifies the status to Connected and increases the number of ingoing /// connections by one - pub(crate) fn connect_ingoing(&mut self, seen_address: Option) { + pub(super) fn connect_ingoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_in, .. } => *n_in += 1, Disconnected { .. } @@ -285,7 +356,7 @@ impl PeerInfo { /// Modifies the status to Connected and increases the number of outgoing /// connections by one - pub(crate) fn connect_outgoing(&mut self, seen_address: Option) { + pub(super) fn connect_outgoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_out, .. } => *n_out += 1, Disconnected { .. } @@ -335,7 +406,9 @@ impl Default for PeerStatus { #[derive(Debug, Clone, Serialize, AsRefStr)] #[strum(serialize_all = "snake_case")] pub enum ConnectionDirection { + /// The connection was established by a peer dialing us. Incoming, + /// The connection was established by us dialing a peer. Outgoing, } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/score.rs similarity index 100% rename from beacon_node/eth2_libp2p/src/peer_manager/score.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/score.rs diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/sync_status.rs similarity index 61% rename from beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb/sync_status.rs index 1aab570d3..bab8aa9ae 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb/sync_status.rs @@ -5,7 +5,7 @@ use types::{Epoch, Hash256, Slot}; #[derive(Clone, Debug, Serialize)] /// The current sync status of the peer. -pub enum PeerSyncStatus { +pub enum SyncStatus { /// At the current state as our node or ahead of us. Synced { info: SyncInfo }, /// The peer has greater knowledge about the canonical chain than we do. @@ -27,46 +27,40 @@ pub struct SyncInfo { pub finalized_root: Hash256, } -impl std::cmp::PartialEq for PeerSyncStatus { +impl std::cmp::PartialEq for SyncStatus { fn eq(&self, other: &Self) -> bool { matches!( (self, other), - (PeerSyncStatus::Synced { .. }, PeerSyncStatus::Synced { .. }) - | ( - PeerSyncStatus::Advanced { .. }, - PeerSyncStatus::Advanced { .. } - ) - | (PeerSyncStatus::Behind { .. }, PeerSyncStatus::Behind { .. }) - | ( - PeerSyncStatus::IrrelevantPeer, - PeerSyncStatus::IrrelevantPeer - ) - | (PeerSyncStatus::Unknown, PeerSyncStatus::Unknown) + (SyncStatus::Synced { .. }, SyncStatus::Synced { .. }) + | (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. }) + | (SyncStatus::Behind { .. }, SyncStatus::Behind { .. }) + | (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer) + | (SyncStatus::Unknown, SyncStatus::Unknown) ) } } -impl PeerSyncStatus { +impl SyncStatus { /// Returns true if the peer has advanced knowledge of the chain. pub fn is_advanced(&self) -> bool { - matches!(self, PeerSyncStatus::Advanced { .. }) + matches!(self, SyncStatus::Advanced { .. }) } /// Returns true if the peer is up to date with the current chain. pub fn is_synced(&self) -> bool { - matches!(self, PeerSyncStatus::Synced { .. }) + matches!(self, SyncStatus::Synced { .. }) } /// Returns true if the peer is behind the current chain. pub fn is_behind(&self) -> bool { - matches!(self, PeerSyncStatus::Behind { .. }) + matches!(self, SyncStatus::Behind { .. }) } /// Updates the peer's sync status, returning whether the status transitioned. /// /// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if /// the status remained `Synced` with different `SyncInfo` within. - pub fn update(&mut self, new_state: PeerSyncStatus) -> bool { + pub fn update(&mut self, new_state: SyncStatus) -> bool { let changed_status = *self != new_state; *self = new_state; changed_status @@ -74,16 +68,16 @@ impl PeerSyncStatus { pub fn as_str(&self) -> &'static str { match self { - PeerSyncStatus::Advanced { .. } => "Advanced", - PeerSyncStatus::Behind { .. } => "Behind", - PeerSyncStatus::Synced { .. } => "Synced", - PeerSyncStatus::Unknown => "Unknown", - PeerSyncStatus::IrrelevantPeer => "Irrelevant", + SyncStatus::Advanced { .. } => "Advanced", + SyncStatus::Behind { .. } => "Behind", + SyncStatus::Synced { .. } => "Synced", + SyncStatus::Unknown => "Unknown", + SyncStatus::IrrelevantPeer => "Irrelevant", } } } -impl std::fmt::Display for PeerSyncStatus { +impl std::fmt::Display for SyncStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str(self.as_str()) } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 66bec0db4..5e7add8ac 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -283,7 +283,7 @@ impl Service { self.swarm .behaviour_mut() .peer_manager_mut() - .report_peer(peer_id, action, source); + .report_peer(peer_id, action, source, None); } /// Disconnect and ban a peer, providing a reason. diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 5c3b0690d..638270c2b 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -1,5 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. -use crate::peer_manager::PeerDB; +use crate::peer_manager::peerdb::PeerDB; use crate::rpc::MetaData; use crate::types::{BackFillState, SyncState}; use crate::Client; @@ -117,7 +117,7 @@ impl NetworkGlobals { self.peers .read() .peer_info(peer_id) - .map(|info| info.client.clone()) + .map(|info| info.client().clone()) .unwrap_or_default() } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d2e40cde2..2412b1f54 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1620,23 +1620,21 @@ pub fn serve( })?; if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { - let address = if let Some(socket_addr) = - peer_info.seen_addresses.iter().next() - { + let address = if let Some(socket_addr) = peer_info.seen_addresses().next() { let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port())); addr.to_string() - } else if let Some(addr) = peer_info.listening_addresses.first() { + } else if let Some(addr) = peer_info.listening_addresses().first() { addr.to_string() } else { String::new() }; // the eth2 API spec implies only peers we have been connected to at some point should be included. - if let Some(dir) = peer_info.connection_direction.as_ref() { + if let Some(dir) = peer_info.connection_direction().as_ref() { return Ok(api_types::GenericResponse::from(api_types::PeerData { peer_id: peer_id.to_string(), - enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + enr: peer_info.enr().map(|enr| enr.to_base64()), last_seen_p2p_address: address, direction: api_types::PeerDirection::from_connection_direction(dir), state: api_types::PeerState::from_peer_connection_status( @@ -1669,20 +1667,20 @@ pub fn serve( .peers() .for_each(|(peer_id, peer_info)| { let address = - if let Some(socket_addr) = peer_info.seen_addresses.iter().next() { + if let Some(socket_addr) = peer_info.seen_addresses().next() { let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); addr.push(eth2_libp2p::multiaddr::Protocol::Tcp( socket_addr.port(), )); addr.to_string() - } else if let Some(addr) = peer_info.listening_addresses.first() { + } else if let Some(addr) = peer_info.listening_addresses().first() { addr.to_string() } else { String::new() }; // the eth2 API spec implies only peers we have been connected to at some point should be included. - if let Some(dir) = peer_info.connection_direction.as_ref() { + if let Some(dir) = peer_info.connection_direction() { let direction = api_types::PeerDirection::from_connection_direction(dir); let state = api_types::PeerState::from_peer_connection_status( @@ -1700,7 +1698,7 @@ pub fn serve( if state_matches && direction_matches { peers.push(api_types::PeerData { peer_id: peer_id.to_string(), - enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), + enr: peer_info.enr().map(|enr| enr.to_base64()), last_seen_p2p_address: address, direction, state, diff --git a/beacon_node/http_api/tests/common.rs b/beacon_node/http_api/tests/common.rs index bca7bbd62..69d67424a 100644 --- a/beacon_node/http_api/tests/common.rs +++ b/beacon_node/http_api/tests/common.rs @@ -7,7 +7,7 @@ use eth2_libp2p::{ discv5::enr::{CombinedKey, EnrBuilder}, rpc::methods::{MetaData, MetaDataV2}, types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState}, - Enr, NetworkGlobals, PeerId, + ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager, }; use http_api::{Config, Context}; use network::NetworkMessage; @@ -46,7 +46,7 @@ pub struct ApiServer> { } impl InteractiveTester { - pub fn new(spec: Option, validator_count: usize) -> Self { + pub async fn new(spec: Option, validator_count: usize) -> Self { let harness = BeaconChainHarness::new( E::default(), spec, @@ -59,7 +59,7 @@ impl InteractiveTester { shutdown_tx: _server_shutdown, network_rx, .. - } = create_api_server(harness.chain.clone(), harness.logger().clone()); + } = create_api_server(harness.chain.clone(), harness.logger().clone()).await; tokio::spawn(server); @@ -82,7 +82,7 @@ impl InteractiveTester { } } -pub fn create_api_server( +pub async fn create_api_server( chain: Arc>, log: Logger, ) -> ApiServer> { @@ -96,15 +96,30 @@ pub fn create_api_server( }); let enr_key = CombinedKey::generate_secp256k1(); let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let network_globals = - NetworkGlobals::new(enr.clone(), TCP_PORT, UDP_PORT, meta_data, vec![], &log); + let network_globals = Arc::new(NetworkGlobals::new( + enr.clone(), + TCP_PORT, + UDP_PORT, + meta_data, + vec![], + &log, + )); + // Only a peer manager can add peers, so we create a dummy manager. + let network_config = NetworkConfig::default(); + let mut pm = PeerManager::new(&network_config, network_globals.clone(), &log) + .await + .unwrap(); + + // add a peer let peer_id = PeerId::random(); - network_globals - .peers - .write() - .connect_ingoing(&peer_id, EXTERNAL_ADDR.parse().unwrap(), None); + let connected_point = ConnectedPoint::Listener { + local_addr: EXTERNAL_ADDR.parse().unwrap(), + send_back_addr: EXTERNAL_ADDR.parse().unwrap(), + }; + let num_established = std::num::NonZeroU32::new(1).unwrap(); + pm.inject_connection_established(peer_id, connected_point, num_established, None); *network_globals.sync_state.write() = SyncState::Synced; let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()); @@ -119,7 +134,7 @@ pub fn create_api_server( }, chain: Some(chain.clone()), network_tx: Some(network_tx), - network_globals: Some(Arc::new(network_globals)), + network_globals: Some(network_globals), eth1_service: Some(eth1_service), log, }); diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 9286b60af..6b4f79fa5 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -17,7 +17,7 @@ async fn sync_committee_duties_across_fork() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; @@ -102,7 +102,7 @@ async fn attestations_across_fork_with_skip_slots() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; @@ -152,7 +152,7 @@ async fn sync_contributions_across_fork_with_skip_slots() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; @@ -199,7 +199,7 @@ async fn sync_committee_indices_across_fork() { let validator_count = E::sync_committee_size(); let fork_epoch = Epoch::new(8); let spec = altair_spec(fork_epoch); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let harness = &tester.harness; let client = &tester.client; diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index dfc81afdc..64ce3b656 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -18,7 +18,7 @@ async fn deposit_contract_custom_network() { // Arbitrary contract address. spec.deposit_contract_address = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".parse().unwrap(); - let tester = InteractiveTester::::new(Some(spec.clone()), validator_count); + let tester = InteractiveTester::::new(Some(spec.clone()), validator_count).await; let client = &tester.client; let result = client.get_config_deposit_contract().await.unwrap().data; diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 20cb8084f..86762aeb4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -63,14 +63,14 @@ struct ApiTester { } impl ApiTester { - pub fn new() -> Self { + pub async fn new() -> Self { // This allows for testing voluntary exits without building out a massive chain. let mut spec = E::default_spec(); spec.shard_committee_period = 2; - Self::new_from_spec(spec) + Self::new_from_spec(spec).await } - pub fn new_from_spec(spec: ChainSpec) -> Self { + pub async fn new_from_spec(spec: ChainSpec) -> Self { let harness = BeaconChainHarness::new( MainnetEthSpec, Some(spec.clone()), @@ -181,7 +181,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, - } = create_api_server(chain.clone(), log); + } = create_api_server(chain.clone(), log).await; tokio::spawn(server); @@ -213,7 +213,7 @@ impl ApiTester { } } - pub fn new_from_genesis() -> Self { + pub async fn new_from_genesis() -> Self { let harness = BeaconChainHarness::new( MainnetEthSpec, None, @@ -260,7 +260,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, - } = create_api_server(chain.clone(), log); + } = create_api_server(chain.clone(), log).await; tokio::spawn(server); @@ -2453,7 +2453,7 @@ async fn poll_events, eth2::Error>> + Unpin #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events() { - ApiTester::new().test_get_events().await; + ApiTester::new().await.test_get_events().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -2461,6 +2461,7 @@ async fn get_events_altair() { let mut spec = E::default_spec(); spec.altair_fork_epoch = Some(Epoch::new(0)); ApiTester::new_from_spec(spec) + .await .test_get_events_altair() .await; } @@ -2468,6 +2469,7 @@ async fn get_events_altair() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_events_from_genesis() { ApiTester::new_from_genesis() + .await .test_get_events_from_genesis() .await; } @@ -2475,6 +2477,7 @@ async fn get_events_from_genesis() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_get() { ApiTester::new() + .await .test_beacon_genesis() .await .test_beacon_states_root() @@ -2515,17 +2518,21 @@ async fn beacon_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn post_beacon_blocks_valid() { - ApiTester::new().test_post_beacon_blocks_valid().await; + ApiTester::new().await.test_post_beacon_blocks_valid().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn post_beacon_blocks_invalid() { - ApiTester::new().test_post_beacon_blocks_invalid().await; + ApiTester::new() + .await + .test_post_beacon_blocks_invalid() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attestations_valid() { ApiTester::new() + .await .test_post_beacon_pool_attestations_valid() .await; } @@ -2533,6 +2540,7 @@ async fn beacon_pools_post_attestations_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attestations_invalid() { ApiTester::new() + .await .test_post_beacon_pool_attestations_invalid() .await; } @@ -2540,6 +2548,7 @@ async fn beacon_pools_post_attestations_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attester_slashings_valid() { ApiTester::new() + .await .test_post_beacon_pool_attester_slashings_valid() .await; } @@ -2547,6 +2556,7 @@ async fn beacon_pools_post_attester_slashings_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_attester_slashings_invalid() { ApiTester::new() + .await .test_post_beacon_pool_attester_slashings_invalid() .await; } @@ -2554,6 +2564,7 @@ async fn beacon_pools_post_attester_slashings_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_proposer_slashings_valid() { ApiTester::new() + .await .test_post_beacon_pool_proposer_slashings_valid() .await; } @@ -2561,6 +2572,7 @@ async fn beacon_pools_post_proposer_slashings_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_proposer_slashings_invalid() { ApiTester::new() + .await .test_post_beacon_pool_proposer_slashings_invalid() .await; } @@ -2568,6 +2580,7 @@ async fn beacon_pools_post_proposer_slashings_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_voluntary_exits_valid() { ApiTester::new() + .await .test_post_beacon_pool_voluntary_exits_valid() .await; } @@ -2575,6 +2588,7 @@ async fn beacon_pools_post_voluntary_exits_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn beacon_pools_post_voluntary_exits_invalid() { ApiTester::new() + .await .test_post_beacon_pool_voluntary_exits_invalid() .await; } @@ -2582,6 +2596,7 @@ async fn beacon_pools_post_voluntary_exits_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn config_get() { ApiTester::new() + .await .test_get_config_fork_schedule() .await .test_get_config_spec() @@ -2593,6 +2608,7 @@ async fn config_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn debug_get() { ApiTester::new() + .await .test_get_debug_beacon_states() .await .test_get_debug_beacon_heads() @@ -2602,6 +2618,7 @@ async fn debug_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn node_get() { ApiTester::new() + .await .test_get_node_version() .await .test_get_node_syncing() @@ -2620,17 +2637,24 @@ async fn node_get() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_early() { - ApiTester::new().test_get_validator_duties_early().await; + ApiTester::new() + .await + .test_get_validator_duties_early() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_attester() { - ApiTester::new().test_get_validator_duties_attester().await; + ApiTester::new() + .await + .test_get_validator_duties_attester() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_attester_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_duties_attester() .await; @@ -2638,12 +2662,16 @@ async fn get_validator_duties_attester_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_proposer() { - ApiTester::new().test_get_validator_duties_proposer().await; + ApiTester::new() + .await + .test_get_validator_duties_proposer() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_proposer_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_duties_proposer() .await; @@ -2651,12 +2679,13 @@ async fn get_validator_duties_proposer_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_production() { - ApiTester::new().test_block_production().await; + ApiTester::new().await.test_block_production().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn block_production_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_block_production() .await; @@ -2664,12 +2693,16 @@ async fn block_production_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_attestation_data() { - ApiTester::new().test_get_validator_attestation_data().await; + ApiTester::new() + .await + .test_get_validator_attestation_data() + .await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_attestation_data_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_attestation_data() .await; @@ -2678,6 +2711,7 @@ async fn get_validator_attestation_data_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation() { ApiTester::new() + .await .test_get_validator_aggregate_attestation() .await; } @@ -2685,6 +2719,7 @@ async fn get_validator_aggregate_attestation() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_attestation_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_aggregate_attestation() .await; @@ -2693,6 +2728,7 @@ async fn get_validator_aggregate_attestation_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_valid() { ApiTester::new() + .await .test_get_validator_aggregate_and_proofs_valid() .await; } @@ -2700,6 +2736,7 @@ async fn get_validator_aggregate_and_proofs_valid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_aggregate_and_proofs_valid() .await; @@ -2708,6 +2745,7 @@ async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_invalid() { ApiTester::new() + .await .test_get_validator_aggregate_and_proofs_invalid() .await; } @@ -2715,6 +2753,7 @@ async fn get_validator_aggregate_and_proofs_invalid() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() { ApiTester::new() + .await .skip_slots(E::slots_per_epoch() * 2) .test_get_validator_aggregate_and_proofs_invalid() .await; @@ -2723,6 +2762,7 @@ async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_beacon_committee_subscriptions() { ApiTester::new() + .await .test_get_validator_beacon_committee_subscriptions() .await; } @@ -2730,6 +2770,7 @@ async fn get_validator_beacon_committee_subscriptions() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn lighthouse_endpoints() { ApiTester::new() + .await .test_get_lighthouse_health() .await .test_get_lighthouse_syncing() diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index e57a88835..e8a024a81 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -790,7 +790,7 @@ pub fn update_gossip_metrics( for (peer_id, _) in gossipsub.all_peers() { let client = peers .peer_info(peer_id) - .map(|peer_info| peer_info.client.kind.as_static()) + .map(|peer_info| peer_info.client().kind.as_static()) .unwrap_or_else(|| "Unknown"); peer_to_client.insert(peer_id, client); @@ -919,7 +919,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) .peers .read() .connected_peers() - .map(|(_peer_id, info)| info.sync_status.as_str()) + .map(|(_peer_id, info)| info.sync_status().as_str()) { *peers_per_sync_type.entry(sync_type).or_default() += 1; } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 3b04ec21f..ae19db32a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -654,15 +654,21 @@ impl SyncManager { // NOTE: here we are gracefully handling two race conditions: Receiving the status message // of a peer that is 1) disconnected 2) not in the PeerDB. - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let new_state = sync_type.as_sync_status(remote_sync_info); - let rpr = new_state.as_str(); - let was_updated = peer_info.sync_status.update(new_state.clone()); + let new_state = sync_type.as_sync_status(remote_sync_info); + let rpr = new_state.as_str(); + // Drop the write lock + let update_sync_status = self + .network_globals + .peers + .write() + .update_sync_status(peer_id, new_state.clone()); + if let Some(was_updated) = update_sync_status { + let is_connected = self.network_globals.peers.read().is_connected(peer_id); if was_updated { debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr, "our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch, "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch, - "is_connected" => peer_info.is_connected()); + "is_connected" => is_connected); // A peer has transitioned its sync state. If the new state is "synced" we // inform the backfill sync that a new synced peer has joined us. @@ -670,7 +676,7 @@ impl SyncManager { self.backfill_sync.fully_synced_peer_joined(); } } - peer_info.is_connected() + is_connected } else { error!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); false diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0647fc683..073b83a85 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -55,7 +55,7 @@ impl SyncNetworkContext { .peers .read() .peer_info(peer_id) - .map(|info| info.client.clone()) + .map(|info| info.client().clone()) .unwrap_or_default() } diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index 513b05a09..912c4d011 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -1,6 +1,6 @@ use super::manager::SLOT_IMPORT_TOLERANCE; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::{PeerSyncStatus, SyncInfo}; +use eth2_libp2p::{SyncInfo, SyncStatus as PeerSyncStatus}; use std::cmp::Ordering; /// The type of peer relative to our current state.