diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 5a8fb92d7..657851793 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -741,9 +741,7 @@ impl PeerManager { .peer_info(peer_id) .map(|info| { info.seen_addresses() - .iter() .filter(|ip| peer_db.is_ip_banned(ip)) - .cloned() .collect::>() }) .unwrap_or_default(); @@ -761,7 +759,7 @@ impl PeerManager { let seen_ip_addresses = peer_db .peer_info(peer_id) - .map(|info| info.seen_addresses().iter().cloned().collect::>()) + .map(|info| info.seen_addresses().collect::>()) .unwrap_or_default(); self.discovery.unban_peer(&peer_id, seen_ip_addresses); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 6d3601b2d..3da4cab13 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -9,7 +9,7 @@ use serde::{ Serialize, }; use std::collections::HashSet; -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr}; use std::time::Instant; use types::{EthSpec, SubnetId}; use PeerConnectionStatus::*; @@ -31,7 +31,7 @@ pub struct PeerInfo { pub listening_addresses: Vec, /// This is addresses we have physically seen and this is what we use for banning/un-banning /// peers. - seen_addresses: HashSet, + pub seen_addresses: HashSet, /// The current syncing state of the peer. The state may be determined after it's initial /// connection. pub sync_status: PeerSyncStatus, @@ -91,9 +91,11 @@ impl PeerInfo { false } - /// Returns the seen addresses of the peer. - pub fn seen_addresses(&self) -> &HashSet { - &self.seen_addresses + /// Returns the seen IP addresses of the peer. + pub fn seen_addresses<'a>(&'a self) -> impl Iterator + 'a { + self.seen_addresses + .iter() + .map(|socket_addr| socket_addr.ip()) } /// Returns the connection status of the peer. @@ -243,7 +245,7 @@ impl PeerInfo { /// Modifies the status to Connected and increases the number of ingoing /// connections by one - pub(crate) fn connect_ingoing(&mut self, seen_address: Option) { + pub(crate) fn connect_ingoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_in, .. } => *n_in += 1, Disconnected { .. } @@ -256,14 +258,14 @@ impl PeerInfo { } } - if let Some(ip_addr) = seen_address { - self.seen_addresses.insert(ip_addr); + if let Some(socket_addr) = seen_address { + self.seen_addresses.insert(socket_addr); } } /// Modifies the status to Connected and increases the number of outgoing /// connections by one - pub(crate) fn connect_outgoing(&mut self, seen_address: Option) { + pub(crate) fn connect_outgoing(&mut self, seen_address: Option) { match &mut self.connection_status { Connected { n_out, .. } => *n_out += 1, Disconnected { .. } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 97dd6a43e..1351e7d85 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -1,4 +1,4 @@ -use super::peer_info::{PeerConnectionStatus, PeerInfo}; +use super::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use super::peer_sync_status::PeerSyncStatus; use super::score::{Score, ScoreState}; use crate::multiaddr::{Multiaddr, Protocol}; @@ -7,8 +7,8 @@ use crate::Enr; use crate::PeerId; use rand::seq::SliceRandom; use slog::{crit, debug, error, trace, warn}; -use std::collections::{HashMap, HashSet}; -use std::net::IpAddr; +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr}; use std::time::Instant; use types::{EthSpec, SubnetId}; @@ -16,8 +16,7 @@ use types::{EthSpec, SubnetId}; const MAX_DC_PEERS: usize = 500; /// The maximum number of banned nodes to remember. const MAX_BANNED_PEERS: usize = 1000; -/// If there are more than `BANNED_PEERS_PER_IP_THRESHOLD` many banned peers with the same IP we ban -/// the IP. +/// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP. const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5; /// Storage of known peers, their reputation and information @@ -42,19 +41,19 @@ pub struct BannedPeersCount { impl BannedPeersCount { /// Removes the peer from the counts if it is banned. Returns true if the peer was banned and /// false otherwise. - pub fn remove_banned_peer(&mut self, ip_addresses: &HashSet) { + pub fn remove_banned_peer(&mut self, ip_addresses: impl Iterator) { self.banned_peers = self.banned_peers.saturating_sub(1); for address in ip_addresses { - if let Some(count) = self.banned_peers_per_ip.get_mut(address) { + if let Some(count) = self.banned_peers_per_ip.get_mut(&address) { *count = count.saturating_sub(1); } } } - pub fn add_banned_peer(&mut self, ip_addresses: &HashSet) { + pub fn add_banned_peer(&mut self, ip_addresses: impl Iterator) { self.banned_peers = self.banned_peers.saturating_add(1); for address in ip_addresses { - *self.banned_peers_per_ip.entry(*address).or_insert(0) += 1; + *self.banned_peers_per_ip.entry(address).or_insert(0) += 1; } } @@ -174,8 +173,7 @@ impl PeerDB { fn ip_is_banned(&self, peer: &PeerInfo) -> bool { peer.seen_addresses() - .iter() - .any(|addr| self.banned_peers_count.ip_is_banned(addr)) + .any(|ip| self.banned_peers_count.ip_is_banned(&ip)) } /// Returns true if the IP is banned. @@ -370,8 +368,13 @@ impl PeerDB { }); } - /// Sets a peer as connected with an ingoing connection. - pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { + fn connect( + &mut self, + peer_id: &PeerId, + multiaddr: Multiaddr, + enr: Option, + direction: ConnectionDirection, + ) { let info = self.peers.entry(peer_id.clone()).or_default(); info.enr = enr; @@ -385,39 +388,37 @@ impl PeerDB { .remove_banned_peer(info.seen_addresses()); } - // Add the seen ip address to the peer's info - let ip_addr = multiaddr.iter().find_map(|p| match p { - Protocol::Ip4(ip) => Some(ip.into()), - Protocol::Ip6(ip) => Some(ip.into()), + // Add the seen ip address and port to the peer's info + let socket_addr = match multiaddr.iter().fold( + (None, None), + |(found_ip, found_port), protocol| match protocol { + Protocol::Ip4(ip) => (Some(ip.into()), found_port), + Protocol::Ip6(ip) => (Some(ip.into()), found_port), + Protocol::Tcp(port) => (found_ip, Some(port)), + _ => (found_ip, found_port), + }, + ) { + (Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)), + (Some(_ip), None) => { + crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id); + None + } _ => None, - }); + }; - info.connect_ingoing(ip_addr); + match direction { + ConnectionDirection::Incoming => info.connect_ingoing(socket_addr), + ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr), + } + } + /// Sets a peer as connected with an ingoing connection. + pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { + self.connect(peer_id, multiaddr, enr, ConnectionDirection::Incoming) } /// Sets a peer as connected with an outgoing connection. pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option) { - let info = self.peers.entry(peer_id.clone()).or_default(); - info.enr = enr; - - if info.is_disconnected() { - self.disconnected_peers = self.disconnected_peers.saturating_sub(1); - } - - if info.is_banned() { - error!(self.log, "Connected to a banned peer"; "peer_id" => %peer_id); - self.banned_peers_count - .remove_banned_peer(info.seen_addresses()); - } - - // Add the seen ip address to the peer's info - let ip_addr = multiaddr.iter().find_map(|p| match p { - Protocol::Ip4(ip) => Some(ip.into()), - Protocol::Ip6(ip) => Some(ip.into()), - _ => None, - }); - - info.connect_outgoing(ip_addr); + self.connect(peer_id, multiaddr, enr, ConnectionDirection::Outgoing) } /// Sets the peer as disconnected. A banned peer remains banned @@ -782,27 +783,30 @@ mod tests { #[test] fn test_disconnected_ban_consistency() { let mut pdb = get_db(); + let mut multiaddr = Multiaddr::empty(); + multiaddr.push(Protocol::Tcp(9000)); + multiaddr.push(Protocol::Ip4("0.0.0.0".parse().unwrap())); let random_peer = PeerId::random(); let random_peer1 = PeerId::random(); let random_peer2 = PeerId::random(); let random_peer3 = PeerId::random(); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); - pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); - pdb.connect_ingoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap(), None); - pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer, multiaddr.clone(), None); + pdb.connect_ingoing(&random_peer1, multiaddr.clone(), None); + pdb.connect_ingoing(&random_peer2, multiaddr.clone(), None); + pdb.connect_ingoing(&random_peer3, multiaddr.clone(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), pdb.banned_peers().count() ); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer, multiaddr.clone(), None); pdb.notify_disconnect(&random_peer1); pdb.disconnect_and_ban(&random_peer2); pdb.notify_disconnect(&random_peer2); - pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer3, multiaddr.clone(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -816,7 +820,7 @@ mod tests { pdb.banned_peers().count() ); - pdb.connect_outgoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_outgoing(&random_peer2, multiaddr.clone(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -832,11 +836,11 @@ mod tests { pdb.disconnect_and_ban(&random_peer3); pdb.notify_disconnect(&random_peer3); - pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer1, multiaddr.clone(), None); pdb.notify_disconnect(&random_peer2); pdb.disconnect_and_ban(&random_peer3); pdb.notify_disconnect(&random_peer3); - pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); + pdb.connect_ingoing(&random_peer, multiaddr.clone(), None); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!( pdb.banned_peers_count.banned_peers(), @@ -866,6 +870,7 @@ mod tests { for ip in ips { let mut addr = Multiaddr::empty(); addr.push(Protocol::from(ip)); + addr.push(Protocol::Tcp(9000)); pdb.connect_ingoing(&p, addr, None); } p @@ -979,8 +984,10 @@ mod tests { assert!(!pdb.is_banned(&p2)); // add ip2 to all peers and ban them. + let mut socker_addr = Multiaddr::from(ip2); + socker_addr.push(Protocol::Tcp(8080)); for p in &peers { - pdb.connect_ingoing(&p, ip2.into(), None); + pdb.connect_ingoing(&p, socker_addr.clone(), None); pdb.disconnect_and_ban(p); pdb.notify_disconnect(p); } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 52f6685cf..deecae879 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1286,10 +1286,16 @@ pub fn serve( })?; if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { - //TODO: update this to seen_addresses once #1764 is resolved - let address = match peer_info.listening_addresses.get(0) { - Some(addr) => addr.to_string(), - None => "".to_string(), // this field is non-nullable in the eth2 API spec + let address = if let Some(socket_addr) = + peer_info.seen_addresses.iter().next() + { + let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); + addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port())); + addr.to_string() + } else if let Some(addr) = peer_info.listening_addresses.first() { + addr.to_string() + } else { + String::new() }; // the eth2 API spec implies only peers we have been connected to at some point should be included. @@ -1297,7 +1303,7 @@ pub fn serve( return Ok(api_types::GenericResponse::from(api_types::PeerData { peer_id: peer_id.to_string(), enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), - last_seen_p2p_address: address, + address, direction: api_types::PeerDirection::from_connection_direction( &dir, ), @@ -1330,16 +1336,23 @@ pub fn serve( // the eth2 API spec implies only peers we have been connected to at some point should be included. .filter(|(_, peer_info)| peer_info.connection_direction.is_some()) .for_each(|(peer_id, peer_info)| { - //TODO: update this to seen_addresses once #1764 is resolved - let address = match peer_info.listening_addresses.get(0) { - Some(addr) => addr.to_string(), - None => "".to_string(), // this field is non-nullable in the eth2 API spec + let address = if let Some(socket_addr) = + peer_info.seen_addresses.iter().next() + { + let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip()); + addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port())); + addr.to_string() + } else if let Some(addr) = peer_info.listening_addresses.first() { + addr.to_string() + } else { + String::new() }; + if let Some(dir) = peer_info.connection_direction.as_ref() { peers.push(api_types::PeerData { peer_id: peer_id.to_string(), enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), - last_seen_p2p_address: address, + address, direction: api_types::PeerDirection::from_connection_direction( &dir, ), diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 532348a2c..295807ded 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -37,7 +37,7 @@ const FINALIZED_EPOCH: u64 = 3; const TCP_PORT: u16 = 42; const UDP_PORT: u16 = 42; const SEQ_NUMBER: u64 = 0; -const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0"; +const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0/tcp/9000"; /// Skipping the slots around the epoch boundary allows us to check that we're obtaining states /// from skipped slots for the finalized and justified checkpoints (instead of the state from the @@ -162,10 +162,6 @@ impl ApiTester { EXTERNAL_ADDR.parse().unwrap(), None, ); - //TODO: have to update this once #1764 is resolved - if let Some(peer_info) = network_globals.peers.write().peer_info_mut(&peer_id) { - peer_info.listening_addresses = vec![EXTERNAL_ADDR.parse().unwrap()]; - } *network_globals.sync_state.write() = SyncState::Synced; @@ -1115,7 +1111,7 @@ impl ApiTester { let expected = PeerData { peer_id: self.external_peer_id.to_string(), enr: None, - last_seen_p2p_address: EXTERNAL_ADDR.to_string(), + address: EXTERNAL_ADDR.to_string(), state: PeerState::Connected, direction: PeerDirection::Inbound, }; @@ -1131,7 +1127,7 @@ impl ApiTester { let expected = PeerData { peer_id: self.external_peer_id.to_string(), enr: None, - last_seen_p2p_address: EXTERNAL_ADDR.to_string(), + address: EXTERNAL_ADDR.to_string(), state: PeerState::Connected, direction: PeerDirection::Inbound, }; diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index a300b875e..6ba16b190 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -475,7 +475,7 @@ pub struct BeaconCommitteeSubscription { pub struct PeerData { pub peer_id: String, pub enr: Option, - pub last_seen_p2p_address: String, + pub address: String, pub state: PeerState, pub direction: PeerDirection, }