replace max_peers cli argument by target_peers and use excess peers above target_peers capped by a new constant PEER_EXCESS_FACTOR (relative to target_peers) (#1383)

This commit is contained in:
blacktemplar 2020-07-23 05:55:36 +02:00 committed by GitHub
parent 3a888d6ef3
commit 3c4daec9af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 113 additions and 21 deletions

1
Cargo.lock generated
View File

@ -1522,6 +1522,7 @@ dependencies = [
"lighthouse_metrics", "lighthouse_metrics",
"lru 0.5.3", "lru 0.5.3",
"parking_lot 0.11.0", "parking_lot 0.11.0",
"rand 0.7.3",
"serde", "serde",
"serde_derive", "serde_derive",
"sha2 0.9.1", "sha2 0.9.1",

View File

@ -35,6 +35,8 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] }
discv5 = { version = "0.1.0-alpha.7", features = ["libp2p"] } discv5 = { version = "0.1.0-alpha.7", features = ["libp2p"] }
tiny-keccak = "2.0.2" tiny-keccak = "2.0.2"
environment = { path = "../../lighthouse/environment" } environment = { path = "../../lighthouse/environment" }
# TODO: Remove rand crate for mainnet
rand = "0.7.3"
[dependencies.libp2p] [dependencies.libp2p]
#version = "0.19.1" #version = "0.19.1"

View File

@ -610,7 +610,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
} }
PeerManagerEvent::DisconnectPeer(peer_id, reason) => { PeerManagerEvent::DisconnectPeer(peer_id, reason) => {
debug!(self.log, "PeerManager requested to disconnect a peer"; debug!(self.log, "PeerManager requested to disconnect a peer";
"peer_id" => peer_id.to_string()); "peer_id" => peer_id.to_string(), "reason" => reason.to_string());
// queue for disabling // queue for disabling
self.peers_to_dc.push_back(peer_id.clone()); self.peers_to_dc.push_back(peer_id.clone());
// send one goodbye // send one goodbye
@ -731,8 +731,25 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
conn_id: &ConnectionId, conn_id: &ConnectionId,
endpoint: &ConnectedPoint, endpoint: &ConnectedPoint,
) { ) {
// If the peer is banned, send a goodbye and disconnect. let goodbye_reason: Option<GoodbyeReason> = if self.peer_manager.is_banned(peer_id) {
if self.peer_manager.is_banned(peer_id) { // If the peer is banned, send goodbye with reason banned.
Some(GoodbyeReason::Banned)
} else if self.peer_manager.peer_limit_reached()
&& self
.network_globals
.peers
.read()
.peer_info(peer_id)
.map_or(true, |i| !i.has_future_duty())
{
//If we are at our peer limit and we don't need the peer for a future validator
//duty, send goodbye with reason TooManyPeers
Some(GoodbyeReason::TooManyPeers)
} else {
None
};
if let Some(reason) = goodbye_reason {
self.peers_to_dc.push_back(peer_id.clone()); self.peers_to_dc.push_back(peer_id.clone());
// send a goodbye on all possible handlers for this peer // send a goodbye on all possible handlers for this peer
self.handler_events.push_back(NBAction::NotifyHandler { self.handler_events.push_back(NBAction::NotifyHandler {
@ -740,7 +757,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
handler: NotifyHandler::All, handler: NotifyHandler::All,
event: BehaviourHandlerIn::Shutdown(Some(( event: BehaviourHandlerIn::Shutdown(Some((
RequestId::Behaviour, RequestId::Behaviour,
RPCRequest::Goodbye(GoodbyeReason::Banned), RPCRequest::Goodbye(reason),
))), ))),
}); });
return; return;
@ -773,7 +790,16 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
fn inject_connected(&mut self, peer_id: &PeerId) { fn inject_connected(&mut self, peer_id: &PeerId) {
// Drop any connection from a banned peer. The goodbye and disconnects are handled in // Drop any connection from a banned peer. The goodbye and disconnects are handled in
// `inject_connection_established()`, which gets called first. // `inject_connection_established()`, which gets called first.
if self.peer_manager.is_banned(peer_id) { // The same holds if we reached the peer limit and the connected peer has no future duty.
if self.peer_manager.is_banned(peer_id)
|| (self.peer_manager.peer_limit_reached()
&& self
.network_globals
.peers
.read()
.peer_info(peer_id)
.map_or(true, |i| !i.has_future_duty()))
{
return; return;
} }
@ -828,7 +854,16 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) { ) {
// All events from banned peers are rejected // All events from banned peers are rejected
if self.peer_manager.is_banned(&peer_id) { // The same holds if we reached the peer limit and the connected peer has no future duty.
if self.peer_manager.is_banned(&peer_id)
|| (self.peer_manager.peer_limit_reached()
&& self
.network_globals
.peers
.read()
.peer_info(&peer_id)
.map_or(true, |i| !i.has_future_duty()))
{
return; return;
} }

View File

@ -37,7 +37,7 @@ pub struct Config {
pub enr_tcp_port: Option<u16>, pub enr_tcp_port: Option<u16>,
/// Target number of connected peers. /// Target number of connected peers.
pub max_peers: usize, pub target_peers: usize,
/// Gossipsub configuration parameters. /// Gossipsub configuration parameters.
#[serde(skip)] #[serde(skip)]
@ -122,7 +122,7 @@ impl Default for Config {
enr_address: None, enr_address: None,
enr_udp_port: None, enr_udp_port: None,
enr_tcp_port: None, enr_tcp_port: None,
max_peers: 50, target_peers: 50,
gs_config, gs_config,
discv5_config, discv5_config,
boot_nodes: vec![], boot_nodes: vec![],

View File

@ -42,6 +42,11 @@ const PING_INTERVAL: u64 = 30;
/// requests. This defines the interval in seconds. /// requests. This defines the interval in seconds.
const HEARTBEAT_INTERVAL: u64 = 30; const HEARTBEAT_INTERVAL: u64 = 30;
/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of
/// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and
/// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55.
const PEER_EXCESS_FACTOR: f32 = 0.1;
/// The main struct that handles peer's reputation and connection status. /// The main struct that handles peer's reputation and connection status.
pub struct PeerManager<TSpec: EthSpec> { pub struct PeerManager<TSpec: EthSpec> {
/// Storage of network globals to access the `PeerDB`. /// Storage of network globals to access the `PeerDB`.
@ -54,6 +59,8 @@ pub struct PeerManager<TSpec: EthSpec> {
status_peers: HashSetDelay<PeerId>, status_peers: HashSetDelay<PeerId>,
/// The target number of peers we would like to connect to. /// The target number of peers we would like to connect to.
target_peers: usize, target_peers: usize,
/// The maximum number of peers we allow (exceptions for subnet peers)
max_peers: usize,
/// The discovery service. /// The discovery service.
discovery: Discovery<TSpec>, discovery: Discovery<TSpec>,
/// The heartbeat interval to perform routine maintenance. /// The heartbeat interval to perform routine maintenance.
@ -99,7 +106,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
events: SmallVec::new(), events: SmallVec::new(),
ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)),
status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)),
target_peers: config.max_peers, //TODO: Add support for target peers and max peers target_peers: config.target_peers,
max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize,
discovery, discovery,
heartbeat, heartbeat,
log: log.clone(), log: log.clone(),
@ -278,6 +286,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.network_globals.peers.read().is_banned(peer_id) self.network_globals.peers.read().is_banned(peer_id)
} }
/// Reports whether the peer limit is reached in which case we stop allowing new incoming
/// connections.
pub fn peer_limit_reached(&self) -> bool {
self.network_globals.connected_or_dialing_peers() >= self.max_peers
}
/// Updates `PeerInfo` with `identify` information. /// Updates `PeerInfo` with `identify` information.
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
@ -478,11 +492,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option<Instant>) { fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option<Instant>) {
let mut to_dial_peers = Vec::new(); let mut to_dial_peers = Vec::new();
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for enr in peers { for enr in peers {
let peer_id = enr.peer_id(); let peer_id = enr.peer_id();
// if we need more peers, attempt a connection // we attempt a connection if this peer is a subnet peer or if the max peer count
if self.network_globals.connected_or_dialing_peers() < self.target_peers // is not yet filled (including dialling peers)
if (min_ttl.is_some() || connected_or_dialing + to_dial_peers.len() < self.max_peers)
&& !self && !self
.network_globals .network_globals
.peers .peers
@ -514,7 +530,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// This is called by `connect_ingoing` and `connect_outgoing`. /// This is called by `connect_ingoing` and `connect_outgoing`.
/// ///
/// This informs if the peer was accepted in to the db or not. /// This informs if the peer was accepted in to the db or not.
// TODO: Drop peers if over max_peer limit
fn connect_peer(&mut self, peer_id: &PeerId, connection: ConnectingType) -> bool { fn connect_peer(&mut self, peer_id: &PeerId, connection: ConnectingType) -> bool {
// TODO: remove after timed updates // TODO: remove after timed updates
//self.update_reputations(); //self.update_reputations();
@ -673,11 +688,30 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.discovery.discover_peers(); self.discovery.discover_peers();
} }
// TODO: If we have too many peers, remove peers that are not required for subnet
// validation.
// Updates peer's scores. // Updates peer's scores.
self.update_peer_scores(); self.update_peer_scores();
let connected_peer_count = self.network_globals.connected_peers();
if connected_peer_count > self.target_peers {
//remove excess peers with the worst scores, but keep subnet peers
for (peer_id, _) in self
.network_globals
.peers
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| !info.has_future_duty())
.take(connected_peer_count - self.target_peers)
//we only need to disconnect peers with healthy scores, since the others got already
//disconnected in update_peer_scores
.filter(|(_, info)| info.score.state() == ScoreState::Healthy)
{
self.events.push(PeerManagerEvent::DisconnectPeer(
(*peer_id).clone(),
GoodbyeReason::TooManyPeers,
));
}
}
} }
} }

View File

@ -63,6 +63,11 @@ impl<T: EthSpec> PeerInfo<T> {
} }
false false
} }
/// Reports if this peer has some future validator duty in which case it is valuable to keep it.
pub fn has_future_duty(&self) -> bool {
self.min_ttl.map_or(false, |i| i >= Instant::now())
}
} }
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]

View File

@ -3,6 +3,7 @@ use super::peer_sync_status::PeerSyncStatus;
use super::score::Score; use super::score::Score;
use crate::rpc::methods::MetaData; use crate::rpc::methods::MetaData;
use crate::PeerId; use crate::PeerId;
use rand::seq::SliceRandom;
use slog::{crit, debug, trace, warn}; use slog::{crit, debug, trace, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Instant; use std::time::Instant;
@ -168,6 +169,20 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.map(|(peer_id, _)| peer_id) .map(|(peer_id, _)| peer_id)
} }
/// Returns a vector of all connected peers sorted by score beginning with the worst scores.
/// Ties get broken randomly.
pub fn worst_connected_peers(&self) -> Vec<(&PeerId, &PeerInfo<TSpec>)> {
let mut connected = self
.peers
.iter()
.filter(|(_, info)| info.connection_status.is_connected())
.collect::<Vec<_>>();
connected.shuffle(&mut rand::thread_rng());
connected.sort_by_key(|(_, info)| info.score);
connected
}
/// Returns a vector containing peers (their ids and info), sorted by /// Returns a vector containing peers (their ids and info), sorted by
/// score from highest to lowest, and filtered using `is_status` /// score from highest to lowest, and filtered using `is_status`
pub fn best_peers_by_status<F>(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo<TSpec>)> pub fn best_peers_by_status<F>(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo<TSpec>)>

View File

@ -67,9 +67,9 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true), .takes_value(true),
) )
.arg( .arg(
Arg::with_name("max-peers") Arg::with_name("target-peers")
.long("max-peers") .long("target-peers")
.help("The maximum number of peers.") .help("The target number of peers.")
.default_value("50") .default_value("50")
.takes_value(true), .takes_value(true),
) )

View File

@ -90,10 +90,10 @@ pub fn get_config<E: EthSpec>(
client_config.network.listen_address = listen_address; client_config.network.listen_address = listen_address;
} }
if let Some(max_peers_str) = cli_args.value_of("max-peers") { if let Some(target_peers_str) = cli_args.value_of("target-peers") {
client_config.network.max_peers = max_peers_str client_config.network.target_peers = target_peers_str
.parse::<usize>() .parse::<usize>()
.map_err(|_| format!("Invalid number of max peers: {}", max_peers_str))?; .map_err(|_| format!("Invalid number of target peers: {}", target_peers_str))?;
} }
if let Some(port_str) = cli_args.value_of("port") { if let Some(port_str) = cli_args.value_of("port") {