diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index 6c482cbdb..b2ee8558c 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -1,5 +1,5 @@ use crate::types::GossipKind; -use crate::Enr; +use crate::{Enr, PeerIdSerialized}; use discv5::{Discv5Config, Discv5ConfigBuilder}; use libp2p::gossipsub::{ GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, ValidationMode, @@ -58,6 +58,9 @@ pub struct Config { /// List of libp2p nodes to initially connect to. pub libp2p_nodes: Vec, + /// List of trusted libp2p nodes which are not scored. + pub trusted_peers: Vec, + /// Client version pub client_version: String, @@ -139,6 +142,7 @@ impl Default for Config { boot_nodes_enr: vec![], boot_nodes_multiaddr: vec![], libp2p_nodes: vec![], + trusted_peers: vec![], client_version: lighthouse_version::version_with_platform(), disable_discovery: false, topics, diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index 6fdbb3d31..655e69d65 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -14,6 +14,50 @@ pub mod rpc; mod service; pub mod types; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +use std::str::FromStr; + +/// Wrapper over a libp2p `PeerId` which implements `Serialize` and `Deserialize` +#[derive(Clone, Debug)] +pub struct PeerIdSerialized(libp2p::PeerId); + +impl From for PeerId { + fn from(peer_id: PeerIdSerialized) -> Self { + peer_id.0 + } +} + +impl FromStr for PeerIdSerialized { + type Err = String; + + fn from_str(s: &str) -> Result { + Ok(Self( + PeerId::from_str(s).map_err(|e| format!("Invalid peer id: {}", e))?, + )) + } +} + +impl Serialize for PeerIdSerialized { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.0.to_string()) + } +} + +impl<'de> Deserialize<'de> for PeerIdSerialized { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s: String = Deserialize::deserialize(deserializer)?; + Ok(Self(PeerId::from_str(&s).map_err(|e| { + de::Error::custom(format!("Failed to deserialise peer id: {:?}", e)) + })?)) + } +} + pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery}; pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 211db2263..963df7868 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -133,9 +133,9 @@ impl PeerManager { pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { // get the peer info if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score.to_string()); + debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score().to_string()); // Goodbye's are fatal - info.score.apply_peer_action(PeerAction::Fatal); + info.apply_peer_action_to_score(PeerAction::Fatal); if info.connection_status.is_connected_or_dialing() { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason)); @@ -155,12 +155,12 @@ impl PeerManager { let mut unban_peer = None; if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let previous_state = info.score.state(); - info.score.apply_peer_action(action); - if previous_state != info.score.state() { - match info.score.state() { + let previous_state = info.score_state(); + info.apply_peer_action_to_score(action); + if previous_state != info.score_state() { + match info.score_state() { ScoreState::Banned => { - debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); + debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); ban_peer = Some(peer_id.clone()); if info.connection_status.is_connected_or_dialing() { self.events.push(PeerManagerEvent::DisconnectPeer( @@ -170,7 +170,7 @@ impl PeerManager { } } ScoreState::Disconnected => { - debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); // disconnect the peer if it's currently connected or dialing unban_peer = Some(peer_id.clone()); if info.connection_status.is_connected_or_dialing() { @@ -182,13 +182,13 @@ impl PeerManager { // TODO: Update the peer manager to inform that the peer is disconnecting. } ScoreState::Healthy => { - debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); // unban the peer if it was previously banned. unban_peer = Some(peer_id.clone()); } } } else { - debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); + debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); } } @@ -689,9 +689,9 @@ impl PeerManager { let mut to_unban_peers = Vec::new(); for (peer_id, info) in pdb.peers_mut() { - let previous_state = info.score.state(); + let previous_state = info.score_state(); // Update scores - info.score.update(); + info.score_update(); /* TODO: Implement logic about connection lifetimes match info.connection_status { @@ -746,10 +746,10 @@ impl PeerManager { */ // handle score transitions - if previous_state != info.score.state() { - match info.score.state() { + if previous_state != info.score_state() { + match info.score_state() { ScoreState::Banned => { - debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); + debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string()); to_ban_peers.push(peer_id.clone()); if info.connection_status.is_connected_or_dialing() { self.events.push(PeerManagerEvent::DisconnectPeer( @@ -759,7 +759,7 @@ impl PeerManager { } } ScoreState::Disconnected => { - debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); // disconnect the peer if it's currently connected or dialing to_unban_peers.push(peer_id.clone()); if info.connection_status.is_connected_or_dialing() { @@ -771,7 +771,7 @@ impl PeerManager { // TODO: Update peer manager to report that it's disconnecting. } ScoreState::Healthy => { - debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string()); // unban the peer if it was previously banned. to_unban_peers.push(peer_id.clone()); } @@ -821,7 +821,7 @@ impl PeerManager { .take(connected_peer_count - self.target_peers) //we only need to disconnect peers with healthy scores, since the others got already //disconnected in update_peer_scores - .filter(|(_, info)| info.score.state() == ScoreState::Healthy) + .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { self.events.push(PeerManagerEvent::DisconnectPeer( (*peer_id).clone(), diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 2448ee424..2933cb731 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -1,5 +1,5 @@ use super::client::Client; -use super::score::Score; +use super::score::{PeerAction, Score, ScoreState}; use super::PeerSyncStatus; use crate::rpc::MetaData; use crate::Multiaddr; @@ -19,7 +19,7 @@ pub struct PeerInfo { /// The connection status of the peer _status: PeerStatus, /// The peers reputation - pub score: Score, + score: Score, /// Client managing this peer pub client: Client, /// Connection status of this peer @@ -36,6 +36,8 @@ pub struct PeerInfo { /// necessary. #[serde(skip)] pub min_ttl: Option, + /// Is the peer a trusted peer. + pub is_trusted: bool, } impl Default for PeerInfo { @@ -49,11 +51,21 @@ impl Default for PeerInfo { sync_status: PeerSyncStatus::Unknown, meta_data: None, min_ttl: None, + is_trusted: false, } } } impl PeerInfo { + /// Return a PeerInfo struct for a trusted peer. + pub fn trusted_peer_info() -> Self { + PeerInfo { + score: Score::max_score(), + is_trusted: true, + ..Default::default() + } + } + /// Returns if the peer is subscribed to a given `SubnetId` pub fn on_subnet(&self, subnet_id: SubnetId) -> bool { if let Some(meta_data) = &self.meta_data { @@ -69,6 +81,38 @@ impl PeerInfo { pub fn has_future_duty(&self) -> bool { self.min_ttl.map_or(false, |i| i >= Instant::now()) } + + /// Returns score of the peer. + pub fn score(&self) -> Score { + self.score + } + + /// Returns the state of the peer based on the score. + pub(crate) fn score_state(&self) -> ScoreState { + self.score.state() + } + + /// Applies decay rates to a non-trusted peer's score. + pub fn score_update(&mut self) { + if !self.is_trusted { + self.score.update() + } + } + + /// Apply peer action to a non-trusted peer's score. + pub fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) { + if !self.is_trusted { + self.score.apply_peer_action(peer_action) + } + } + + #[cfg(test)] + /// Add an f64 to a non-trusted peer's score abiding by the limits. + pub fn add_to_score(&mut self, score: f64) { + if !self.is_trusted { + self.score.add(score) + } + } } #[derive(Clone, Debug, Serialize)] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index fc28cc55f..425cf4a37 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -86,12 +86,17 @@ impl BannedPeersCount { } impl PeerDB { - pub fn new(log: &slog::Logger) -> Self { + pub fn new(trusted_peers: Vec, log: &slog::Logger) -> Self { + // Initialize the peers hashmap with trusted peers + let peers = trusted_peers + .into_iter() + .map(|peer_id| (peer_id, PeerInfo::trusted_peer_info())) + .collect(); Self { log: log.clone(), disconnected_peers: 0, banned_peers_count: BannedPeersCount::new(), - peers: HashMap::new(), + peers, } } @@ -101,7 +106,7 @@ impl PeerDB { pub fn score(&self, peer_id: &PeerId) -> Score { self.peers .get(peer_id) - .map_or(Score::default(), |info| info.score) + .map_or(Score::default(), |info| info.score()) } /// Returns an iterator over all peers in the db. @@ -159,7 +164,7 @@ impl PeerDB { /// Returns true if the Peer is banned. pub fn is_banned(&self, peer_id: &PeerId) -> bool { if let Some(peer) = self.peers.get(peer_id) { - match peer.score.state() { + match peer.score().state() { ScoreState::Banned => true, _ => self.ip_is_banned(peer), } @@ -181,7 +186,7 @@ impl PeerDB { /// Returns true if the Peer is either banned or in the disconnected state. pub fn is_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { if let Some(peer) = self.peers.get(peer_id) { - match peer.score.state() { + match peer.score().state() { ScoreState::Banned | ScoreState::Disconnected => true, _ => self.ip_is_banned(peer), } @@ -264,7 +269,7 @@ impl PeerDB { .collect::>(); connected.shuffle(&mut rand::thread_rng()); - connected.sort_by_key(|(_, info)| info.score); + connected.sort_by_key(|(_, info)| info.score()); connected } @@ -279,7 +284,7 @@ impl PeerDB { .iter() .filter(|(_, info)| is_status(&info.connection_status)) .collect::>(); - by_status.sort_by_key(|(_, info)| info.score); + by_status.sort_by_key(|(_, info)| info.score()); by_status.into_iter().rev().collect() } @@ -291,7 +296,7 @@ impl PeerDB { self.peers .iter() .filter(|(_, info)| is_status(&info.connection_status)) - .max_by_key(|(_, info)| info.score) + .max_by_key(|(_, info)| info.score()) .map(|(id, _)| id) } @@ -455,8 +460,8 @@ impl PeerDB { .filter(|(_, info)| info.connection_status.is_banned()) .min_by(|(_, info_a), (_, info_b)| { info_a - .score - .partial_cmp(&info_b.score) + .score() + .partial_cmp(&info_b.score()) .unwrap_or(std::cmp::Ordering::Equal) }) { self.banned_peers_count @@ -485,8 +490,8 @@ impl PeerDB { .filter(|(_, info)| info.connection_status.is_disconnected()) .min_by(|(_, info_a), (_, info_b)| { info_a - .score - .partial_cmp(&info_b.score) + .score() + .partial_cmp(&info_b.score()) .unwrap_or(std::cmp::Ordering::Equal) }) .map(|(id, _)| id.clone()) @@ -543,13 +548,13 @@ mod tests { fn add_score(db: &mut PeerDB, peer_id: &PeerId, score: f64) { if let Some(info) = db.peer_info_mut(peer_id) { - info.score.add(score); + info.add_to_score(score); } } fn get_db() -> PeerDB { let log = build_log(slog::Level::Debug, false); - PeerDB::new(&log) + PeerDB::new(vec![], &log) } #[test] @@ -938,4 +943,28 @@ mod tests { assert!(pdb.is_banned(&p1)); assert!(!pdb.is_banned(&p2)); } + + #[test] + fn test_trusted_peers_score() { + let trusted_peer = PeerId::random(); + let log = build_log(slog::Level::Debug, false); + let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer.clone()], &log); + + pdb.connect_ingoing(&trusted_peer); + + // Check trusted status and score + assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted); + assert_eq!( + pdb.peer_info(&trusted_peer).unwrap().score().score(), + Score::max_score().score() + ); + + // Adding/Subtracting score should have no effect on a trusted peer + add_score(&mut pdb, &trusted_peer, -50.0); + + assert_eq!( + pdb.peer_info(&trusted_peer).unwrap().score().score(), + Score::max_score().score() + ); + } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index de106d886..d2d584b52 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -145,6 +145,13 @@ impl std::fmt::Display for ScoreState { } impl Score { + /// Return max possible score. + pub fn max_score() -> Self { + Score { + score: MAX_SCORE, + last_updated: Instant::now(), + } + } /// Access to the underlying score. pub fn score(&self) -> f64 { self.score diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 50952eff4..1d594918d 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -84,6 +84,11 @@ impl Service { config.libp2p_port, config.discovery_port, meta_data, + config + .trusted_peers + .iter() + .map(|x| PeerId::from(x.clone())) + .collect(), &log, )); diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index bfed2fd9b..e2bf851de 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -37,6 +37,7 @@ impl NetworkGlobals { tcp_port: u16, udp_port: u16, local_metadata: MetaData, + trusted_peers: Vec, log: &slog::Logger, ) -> Self { NetworkGlobals { @@ -45,8 +46,8 @@ impl NetworkGlobals { listen_multiaddrs: RwLock::new(Vec::new()), listen_port_tcp: AtomicU16::new(tcp_port), listen_port_udp: AtomicU16::new(udp_port), - peers: RwLock::new(PeerDB::new(log)), local_metadata: RwLock::new(local_metadata), + peers: RwLock::new(PeerDB::new(trusted_peers, log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), } diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 46c11fd58..16a8f9b0a 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -111,7 +111,7 @@ mod tests { }; let network_globals: NetworkGlobals = - NetworkGlobals::new(enr, 0, 0, meta_data, &log); + NetworkGlobals::new(enr, 0, 0, meta_data, vec![], &log); AttestationService::new(beacon_chain, Arc::new(network_globals), &log) } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6caa8acd9..ff64a600c 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -131,7 +131,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Disables the discv5 discovery protocol. The node will not search for new peers or participate in the discovery protocol.") .takes_value(false), ) - + .arg( + Arg::with_name("trusted-peers") + .long("trusted-peers") + .value_name("TRUSTED_PEERS") + .help("One or more comma-delimited trusted peer ids which always have the highest score according to the peer scoring system.") + .takes_value(true), + ) /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f9abfca6a..b33a14001 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -2,7 +2,7 @@ use beacon_chain::builder::PUBKEY_CACHE_FILENAME; use clap::ArgMatches; use clap_utils::BAD_TESTNET_DIR_MESSAGE; use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis}; -use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr, NetworkConfig}; +use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr, NetworkConfig, PeerIdSerialized}; use eth2_testnet_config::Eth2TestnetConfig; use slog::{crit, info, Logger}; use ssz::Encode; @@ -343,6 +343,17 @@ pub fn set_network_config( .collect::, _>>()?; } + if let Some(trusted_peers_str) = cli_args.value_of("trusted-peers") { + config.trusted_peers = trusted_peers_str + .split(',') + .map(|peer_id| { + peer_id + .parse() + .map_err(|_| format!("Invalid trusted peer id: {}", peer_id)) + }) + .collect::, _>>()?; + } + if let Some(enr_udp_port_str) = cli_args.value_of("enr-udp-port") { config.enr_udp_port = Some( enr_udp_port_str