Seen addresses store port (#1841)

## Issue Addressed
#1764
This commit is contained in:
divma 2020-11-09 04:01:03 +00:00
parent 63fe5542e7
commit b0e9e3dcef
6 changed files with 96 additions and 80 deletions

View File

@ -741,9 +741,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.peer_info(peer_id) .peer_info(peer_id)
.map(|info| { .map(|info| {
info.seen_addresses() info.seen_addresses()
.iter()
.filter(|ip| peer_db.is_ip_banned(ip)) .filter(|ip| peer_db.is_ip_banned(ip))
.cloned()
.collect::<Vec<_>>() .collect::<Vec<_>>()
}) })
.unwrap_or_default(); .unwrap_or_default();
@ -761,7 +759,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let seen_ip_addresses = peer_db let seen_ip_addresses = peer_db
.peer_info(peer_id) .peer_info(peer_id)
.map(|info| info.seen_addresses().iter().cloned().collect::<Vec<_>>()) .map(|info| info.seen_addresses().collect::<Vec<_>>())
.unwrap_or_default(); .unwrap_or_default();
self.discovery.unban_peer(&peer_id, seen_ip_addresses); self.discovery.unban_peer(&peer_id, seen_ip_addresses);

View File

@ -9,7 +9,7 @@ use serde::{
Serialize, Serialize,
}; };
use std::collections::HashSet; use std::collections::HashSet;
use std::net::IpAddr; use std::net::{IpAddr, SocketAddr};
use std::time::Instant; use std::time::Instant;
use types::{EthSpec, SubnetId}; use types::{EthSpec, SubnetId};
use PeerConnectionStatus::*; use PeerConnectionStatus::*;
@ -31,7 +31,7 @@ pub struct PeerInfo<T: EthSpec> {
pub listening_addresses: Vec<Multiaddr>, pub listening_addresses: Vec<Multiaddr>,
/// This is addresses we have physically seen and this is what we use for banning/un-banning /// This is addresses we have physically seen and this is what we use for banning/un-banning
/// peers. /// peers.
seen_addresses: HashSet<IpAddr>, pub seen_addresses: HashSet<SocketAddr>,
/// The current syncing state of the peer. The state may be determined after it's initial /// The current syncing state of the peer. The state may be determined after it's initial
/// connection. /// connection.
pub sync_status: PeerSyncStatus, pub sync_status: PeerSyncStatus,
@ -91,9 +91,11 @@ impl<T: EthSpec> PeerInfo<T> {
false false
} }
/// Returns the seen addresses of the peer. /// Returns the seen IP addresses of the peer.
pub fn seen_addresses(&self) -> &HashSet<IpAddr> { pub fn seen_addresses<'a>(&'a self) -> impl Iterator<Item = IpAddr> + 'a {
&self.seen_addresses self.seen_addresses
.iter()
.map(|socket_addr| socket_addr.ip())
} }
/// Returns the connection status of the peer. /// Returns the connection status of the peer.
@ -243,7 +245,7 @@ impl<T: EthSpec> PeerInfo<T> {
/// Modifies the status to Connected and increases the number of ingoing /// Modifies the status to Connected and increases the number of ingoing
/// connections by one /// connections by one
pub(crate) fn connect_ingoing(&mut self, seen_address: Option<IpAddr>) { pub(crate) fn connect_ingoing(&mut self, seen_address: Option<SocketAddr>) {
match &mut self.connection_status { match &mut self.connection_status {
Connected { n_in, .. } => *n_in += 1, Connected { n_in, .. } => *n_in += 1,
Disconnected { .. } Disconnected { .. }
@ -256,14 +258,14 @@ impl<T: EthSpec> PeerInfo<T> {
} }
} }
if let Some(ip_addr) = seen_address { if let Some(socket_addr) = seen_address {
self.seen_addresses.insert(ip_addr); self.seen_addresses.insert(socket_addr);
} }
} }
/// Modifies the status to Connected and increases the number of outgoing /// Modifies the status to Connected and increases the number of outgoing
/// connections by one /// connections by one
pub(crate) fn connect_outgoing(&mut self, seen_address: Option<IpAddr>) { pub(crate) fn connect_outgoing(&mut self, seen_address: Option<SocketAddr>) {
match &mut self.connection_status { match &mut self.connection_status {
Connected { n_out, .. } => *n_out += 1, Connected { n_out, .. } => *n_out += 1,
Disconnected { .. } Disconnected { .. }

View File

@ -1,4 +1,4 @@
use super::peer_info::{PeerConnectionStatus, PeerInfo}; use super::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use super::peer_sync_status::PeerSyncStatus; use super::peer_sync_status::PeerSyncStatus;
use super::score::{Score, ScoreState}; use super::score::{Score, ScoreState};
use crate::multiaddr::{Multiaddr, Protocol}; use crate::multiaddr::{Multiaddr, Protocol};
@ -7,8 +7,8 @@ use crate::Enr;
use crate::PeerId; use crate::PeerId;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use slog::{crit, debug, error, trace, warn}; use slog::{crit, debug, error, trace, warn};
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::net::IpAddr; use std::net::{IpAddr, SocketAddr};
use std::time::Instant; use std::time::Instant;
use types::{EthSpec, SubnetId}; use types::{EthSpec, SubnetId};
@ -16,8 +16,7 @@ use types::{EthSpec, SubnetId};
const MAX_DC_PEERS: usize = 500; const MAX_DC_PEERS: usize = 500;
/// The maximum number of banned nodes to remember. /// The maximum number of banned nodes to remember.
const MAX_BANNED_PEERS: usize = 1000; const MAX_BANNED_PEERS: usize = 1000;
/// If there are more than `BANNED_PEERS_PER_IP_THRESHOLD` many banned peers with the same IP we ban /// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP.
/// the IP.
const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5; const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5;
/// Storage of known peers, their reputation and information /// Storage of known peers, their reputation and information
@ -42,19 +41,19 @@ pub struct BannedPeersCount {
impl BannedPeersCount { impl BannedPeersCount {
/// Removes the peer from the counts if it is banned. Returns true if the peer was banned and /// Removes the peer from the counts if it is banned. Returns true if the peer was banned and
/// false otherwise. /// false otherwise.
pub fn remove_banned_peer(&mut self, ip_addresses: &HashSet<IpAddr>) { pub fn remove_banned_peer(&mut self, ip_addresses: impl Iterator<Item = IpAddr>) {
self.banned_peers = self.banned_peers.saturating_sub(1); self.banned_peers = self.banned_peers.saturating_sub(1);
for address in ip_addresses { for address in ip_addresses {
if let Some(count) = self.banned_peers_per_ip.get_mut(address) { if let Some(count) = self.banned_peers_per_ip.get_mut(&address) {
*count = count.saturating_sub(1); *count = count.saturating_sub(1);
} }
} }
} }
pub fn add_banned_peer(&mut self, ip_addresses: &HashSet<IpAddr>) { pub fn add_banned_peer(&mut self, ip_addresses: impl Iterator<Item = IpAddr>) {
self.banned_peers = self.banned_peers.saturating_add(1); self.banned_peers = self.banned_peers.saturating_add(1);
for address in ip_addresses { for address in ip_addresses {
*self.banned_peers_per_ip.entry(*address).or_insert(0) += 1; *self.banned_peers_per_ip.entry(address).or_insert(0) += 1;
} }
} }
@ -174,8 +173,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
fn ip_is_banned(&self, peer: &PeerInfo<TSpec>) -> bool { fn ip_is_banned(&self, peer: &PeerInfo<TSpec>) -> bool {
peer.seen_addresses() peer.seen_addresses()
.iter() .any(|ip| self.banned_peers_count.ip_is_banned(&ip))
.any(|addr| self.banned_peers_count.ip_is_banned(addr))
} }
/// Returns true if the IP is banned. /// Returns true if the IP is banned.
@ -370,8 +368,13 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
}); });
} }
/// Sets a peer as connected with an ingoing connection. fn connect(
pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option<Enr>) { &mut self,
peer_id: &PeerId,
multiaddr: Multiaddr,
enr: Option<Enr>,
direction: ConnectionDirection,
) {
let info = self.peers.entry(peer_id.clone()).or_default(); let info = self.peers.entry(peer_id.clone()).or_default();
info.enr = enr; info.enr = enr;
@ -385,39 +388,37 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.remove_banned_peer(info.seen_addresses()); .remove_banned_peer(info.seen_addresses());
} }
// Add the seen ip address to the peer's info // Add the seen ip address and port to the peer's info
let ip_addr = multiaddr.iter().find_map(|p| match p { let socket_addr = match multiaddr.iter().fold(
Protocol::Ip4(ip) => Some(ip.into()), (None, None),
Protocol::Ip6(ip) => Some(ip.into()), |(found_ip, found_port), protocol| match protocol {
Protocol::Ip4(ip) => (Some(ip.into()), found_port),
Protocol::Ip6(ip) => (Some(ip.into()), found_port),
Protocol::Tcp(port) => (found_ip, Some(port)),
_ => (found_ip, found_port),
},
) {
(Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)),
(Some(_ip), None) => {
crit!(self.log, "Connected peer has an IP but no TCP port"; "peer_id" => %peer_id);
None
}
_ => None, _ => None,
}); };
info.connect_ingoing(ip_addr); match direction {
ConnectionDirection::Incoming => info.connect_ingoing(socket_addr),
ConnectionDirection::Outgoing => info.connect_outgoing(socket_addr),
}
}
/// Sets a peer as connected with an ingoing connection.
pub fn connect_ingoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option<Enr>) {
self.connect(peer_id, multiaddr, enr, ConnectionDirection::Incoming)
} }
/// Sets a peer as connected with an outgoing connection. /// Sets a peer as connected with an outgoing connection.
pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option<Enr>) { pub fn connect_outgoing(&mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option<Enr>) {
let info = self.peers.entry(peer_id.clone()).or_default(); self.connect(peer_id, multiaddr, enr, ConnectionDirection::Outgoing)
info.enr = enr;
if info.is_disconnected() {
self.disconnected_peers = self.disconnected_peers.saturating_sub(1);
}
if info.is_banned() {
error!(self.log, "Connected to a banned peer"; "peer_id" => %peer_id);
self.banned_peers_count
.remove_banned_peer(info.seen_addresses());
}
// Add the seen ip address to the peer's info
let ip_addr = multiaddr.iter().find_map(|p| match p {
Protocol::Ip4(ip) => Some(ip.into()),
Protocol::Ip6(ip) => Some(ip.into()),
_ => None,
});
info.connect_outgoing(ip_addr);
} }
/// Sets the peer as disconnected. A banned peer remains banned /// Sets the peer as disconnected. A banned peer remains banned
@ -782,27 +783,30 @@ mod tests {
#[test] #[test]
fn test_disconnected_ban_consistency() { fn test_disconnected_ban_consistency() {
let mut pdb = get_db(); let mut pdb = get_db();
let mut multiaddr = Multiaddr::empty();
multiaddr.push(Protocol::Tcp(9000));
multiaddr.push(Protocol::Ip4("0.0.0.0".parse().unwrap()));
let random_peer = PeerId::random(); let random_peer = PeerId::random();
let random_peer1 = PeerId::random(); let random_peer1 = PeerId::random();
let random_peer2 = PeerId::random(); let random_peer2 = PeerId::random();
let random_peer3 = PeerId::random(); let random_peer3 = PeerId::random();
pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer, multiaddr.clone(), None);
pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer1, multiaddr.clone(), None);
pdb.connect_ingoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer2, multiaddr.clone(), None);
pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer3, multiaddr.clone(), None);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!( assert_eq!(
pdb.banned_peers_count.banned_peers(), pdb.banned_peers_count.banned_peers(),
pdb.banned_peers().count() pdb.banned_peers().count()
); );
pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer, multiaddr.clone(), None);
pdb.notify_disconnect(&random_peer1); pdb.notify_disconnect(&random_peer1);
pdb.disconnect_and_ban(&random_peer2); pdb.disconnect_and_ban(&random_peer2);
pdb.notify_disconnect(&random_peer2); pdb.notify_disconnect(&random_peer2);
pdb.connect_ingoing(&random_peer3, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer3, multiaddr.clone(), None);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!( assert_eq!(
pdb.banned_peers_count.banned_peers(), pdb.banned_peers_count.banned_peers(),
@ -816,7 +820,7 @@ mod tests {
pdb.banned_peers().count() pdb.banned_peers().count()
); );
pdb.connect_outgoing(&random_peer2, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_outgoing(&random_peer2, multiaddr.clone(), None);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!( assert_eq!(
pdb.banned_peers_count.banned_peers(), pdb.banned_peers_count.banned_peers(),
@ -832,11 +836,11 @@ mod tests {
pdb.disconnect_and_ban(&random_peer3); pdb.disconnect_and_ban(&random_peer3);
pdb.notify_disconnect(&random_peer3); pdb.notify_disconnect(&random_peer3);
pdb.connect_ingoing(&random_peer1, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer1, multiaddr.clone(), None);
pdb.notify_disconnect(&random_peer2); pdb.notify_disconnect(&random_peer2);
pdb.disconnect_and_ban(&random_peer3); pdb.disconnect_and_ban(&random_peer3);
pdb.notify_disconnect(&random_peer3); pdb.notify_disconnect(&random_peer3);
pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None); pdb.connect_ingoing(&random_peer, multiaddr.clone(), None);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!( assert_eq!(
pdb.banned_peers_count.banned_peers(), pdb.banned_peers_count.banned_peers(),
@ -866,6 +870,7 @@ mod tests {
for ip in ips { for ip in ips {
let mut addr = Multiaddr::empty(); let mut addr = Multiaddr::empty();
addr.push(Protocol::from(ip)); addr.push(Protocol::from(ip));
addr.push(Protocol::Tcp(9000));
pdb.connect_ingoing(&p, addr, None); pdb.connect_ingoing(&p, addr, None);
} }
p p
@ -979,8 +984,10 @@ mod tests {
assert!(!pdb.is_banned(&p2)); assert!(!pdb.is_banned(&p2));
// add ip2 to all peers and ban them. // add ip2 to all peers and ban them.
let mut socker_addr = Multiaddr::from(ip2);
socker_addr.push(Protocol::Tcp(8080));
for p in &peers { for p in &peers {
pdb.connect_ingoing(&p, ip2.into(), None); pdb.connect_ingoing(&p, socker_addr.clone(), None);
pdb.disconnect_and_ban(p); pdb.disconnect_and_ban(p);
pdb.notify_disconnect(p); pdb.notify_disconnect(p);
} }

View File

@ -1286,10 +1286,16 @@ pub fn serve<T: BeaconChainTypes>(
})?; })?;
if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) {
//TODO: update this to seen_addresses once #1764 is resolved let address = if let Some(socket_addr) =
let address = match peer_info.listening_addresses.get(0) { peer_info.seen_addresses.iter().next()
Some(addr) => addr.to_string(), {
None => "".to_string(), // this field is non-nullable in the eth2 API spec let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port()));
addr.to_string()
} else if let Some(addr) = peer_info.listening_addresses.first() {
addr.to_string()
} else {
String::new()
}; };
// the eth2 API spec implies only peers we have been connected to at some point should be included. // the eth2 API spec implies only peers we have been connected to at some point should be included.
@ -1297,7 +1303,7 @@ pub fn serve<T: BeaconChainTypes>(
return Ok(api_types::GenericResponse::from(api_types::PeerData { return Ok(api_types::GenericResponse::from(api_types::PeerData {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
last_seen_p2p_address: address, address,
direction: api_types::PeerDirection::from_connection_direction( direction: api_types::PeerDirection::from_connection_direction(
&dir, &dir,
), ),
@ -1330,16 +1336,23 @@ pub fn serve<T: BeaconChainTypes>(
// the eth2 API spec implies only peers we have been connected to at some point should be included. // the eth2 API spec implies only peers we have been connected to at some point should be included.
.filter(|(_, peer_info)| peer_info.connection_direction.is_some()) .filter(|(_, peer_info)| peer_info.connection_direction.is_some())
.for_each(|(peer_id, peer_info)| { .for_each(|(peer_id, peer_info)| {
//TODO: update this to seen_addresses once #1764 is resolved let address = if let Some(socket_addr) =
let address = match peer_info.listening_addresses.get(0) { peer_info.seen_addresses.iter().next()
Some(addr) => addr.to_string(), {
None => "".to_string(), // this field is non-nullable in the eth2 API spec let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port()));
addr.to_string()
} else if let Some(addr) = peer_info.listening_addresses.first() {
addr.to_string()
} else {
String::new()
}; };
if let Some(dir) = peer_info.connection_direction.as_ref() { if let Some(dir) = peer_info.connection_direction.as_ref() {
peers.push(api_types::PeerData { peers.push(api_types::PeerData {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()), enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
last_seen_p2p_address: address, address,
direction: api_types::PeerDirection::from_connection_direction( direction: api_types::PeerDirection::from_connection_direction(
&dir, &dir,
), ),

View File

@ -37,7 +37,7 @@ const FINALIZED_EPOCH: u64 = 3;
const TCP_PORT: u16 = 42; const TCP_PORT: u16 = 42;
const UDP_PORT: u16 = 42; const UDP_PORT: u16 = 42;
const SEQ_NUMBER: u64 = 0; const SEQ_NUMBER: u64 = 0;
const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0"; const EXTERNAL_ADDR: &str = "/ip4/0.0.0.0/tcp/9000";
/// Skipping the slots around the epoch boundary allows us to check that we're obtaining states /// Skipping the slots around the epoch boundary allows us to check that we're obtaining states
/// from skipped slots for the finalized and justified checkpoints (instead of the state from the /// from skipped slots for the finalized and justified checkpoints (instead of the state from the
@ -162,10 +162,6 @@ impl ApiTester {
EXTERNAL_ADDR.parse().unwrap(), EXTERNAL_ADDR.parse().unwrap(),
None, None,
); );
//TODO: have to update this once #1764 is resolved
if let Some(peer_info) = network_globals.peers.write().peer_info_mut(&peer_id) {
peer_info.listening_addresses = vec![EXTERNAL_ADDR.parse().unwrap()];
}
*network_globals.sync_state.write() = SyncState::Synced; *network_globals.sync_state.write() = SyncState::Synced;
@ -1115,7 +1111,7 @@ impl ApiTester {
let expected = PeerData { let expected = PeerData {
peer_id: self.external_peer_id.to_string(), peer_id: self.external_peer_id.to_string(),
enr: None, enr: None,
last_seen_p2p_address: EXTERNAL_ADDR.to_string(), address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected, state: PeerState::Connected,
direction: PeerDirection::Inbound, direction: PeerDirection::Inbound,
}; };
@ -1131,7 +1127,7 @@ impl ApiTester {
let expected = PeerData { let expected = PeerData {
peer_id: self.external_peer_id.to_string(), peer_id: self.external_peer_id.to_string(),
enr: None, enr: None,
last_seen_p2p_address: EXTERNAL_ADDR.to_string(), address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected, state: PeerState::Connected,
direction: PeerDirection::Inbound, direction: PeerDirection::Inbound,
}; };

View File

@ -475,7 +475,7 @@ pub struct BeaconCommitteeSubscription {
pub struct PeerData { pub struct PeerData {
pub peer_id: String, pub peer_id: String,
pub enr: Option<String>, pub enr: Option<String>,
pub last_seen_p2p_address: String, pub address: String,
pub state: PeerState, pub state: PeerState,
pub direction: PeerDirection, pub direction: PeerDirection,
} }