diff --git a/Cargo.lock b/Cargo.lock index 6159778b7..d568ade04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1433,9 +1433,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc831ee6a32dd495436e317595e639a587aa9907bef96fe6e6abc290ab6204e9" +checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8" dependencies = [ "cc", "cxxbridge-flags", @@ -1445,9 +1445,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94331d54f1b1a8895cd81049f7eaaaef9d05a7dcb4d1fd08bf3ff0806246789d" +checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38" dependencies = [ "cc", "codespan-reporting", @@ -1460,15 +1460,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dcd35ba14ca9b40d6e4b4b39961f23d835dbb8eed74565ded361d93e1feb8a" +checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03" [[package]] name = "cxxbridge-macro" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2" +checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263" dependencies = [ "proc-macro2", "quote", @@ -2982,7 +2982,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.4", + "tokio-util 0.7.7", "tracing", ] @@ -3463,7 +3463,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba6a270039626615617f3f36d15fc827041df3b78c439da2cadfa47455a77f2f" dependencies = [ - "parity-scale-codec 3.3.0", + "parity-scale-codec 3.4.0", ] [[package]] @@ -4215,7 +4215,7 @@ dependencies = [ "thiserror", "tinytemplate", "tokio", - "tokio-util 0.7.4", + "tokio-util 0.7.7", "webrtc", ] @@ -4391,6 +4391,7 @@ dependencies = [ "lighthouse_metrics", "lighthouse_version", "lru 0.7.8", + "lru_cache", "parking_lot 0.12.1", "prometheus-client", "quickcheck", @@ -5402,9 +5403,9 @@ dependencies = [ [[package]] name = "parity-scale-codec" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3840933452adf7b3b9145e27086a5a3376c619dca1a21b1e5a5af0d54979bed" +checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac" dependencies = [ "arrayvec", "bitvec 1.0.1", @@ -6281,7 +6282,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.23.4", - "tokio-util 0.7.4", + "tokio-util 0.7.7", "tower-service", "url", "wasm-bindgen", @@ -6567,7 +6568,7 @@ checksum = "001cf62ece89779fd16105b5f515ad0e5cedcd5440d3dd806bb067978e7c3608" dependencies = [ "cfg-if", "derive_more", - "parity-scale-codec 3.3.0", + "parity-scale-codec 3.4.0", "scale-info-derive", ] @@ -6813,9 +6814,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a" +checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" dependencies = [ "itoa 1.0.5", "ryu", @@ -6977,9 +6978,9 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" [[package]] name = "signal-hook-registry" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" dependencies = [ "libc", ] @@ -7665,10 +7666,11 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.4" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" dependencies = [ + "cfg-if", "once_cell", ] @@ -7867,7 +7869,7 @@ dependencies = [ "futures-core", "pin-project-lite 0.2.9", "tokio", - "tokio-util 0.7.4", + "tokio-util 0.7.7", ] [[package]] @@ -7917,9 +7919,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", @@ -8964,9 +8966,9 @@ dependencies = [ [[package]] name = "webrtc-ice" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "494483fbb2f5492620871fdc78b084aed8807377f6e3fe88b2e49f0a9c9c41d7" +checksum = "465a03cc11e9a7d7b4f9f99870558fe37a102b65b93f8045392fef7c67b39e80" dependencies = [ "arc-swap", "async-trait", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 474ebebb5..9b00c39d2 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -25,6 +25,7 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } smallvec = "1.6.1" tokio-io-timeout = "1.1.1" lru = "0.7.1" +lru_cache = { path = "../../common/lru_cache" } parking_lot = "0.12.0" sha2 = "0.10" snap = "1.0.1" diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 89670a2eb..03f6a746a 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -8,6 +8,7 @@ use crate::{Subnet, SubnetDiscovery}; use delay_map::HashSetDelay; use discv5::Enr; use libp2p::identify::Info as IdentifyInfo; +use lru_cache::LRUTimeCache; use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; @@ -39,6 +40,9 @@ mod network_behaviour; /// requests. This defines the interval in seconds. const HEARTBEAT_INTERVAL: u64 = 30; +/// The minimum amount of time we allow peers to reconnect to us after a disconnect when we are +/// saturated with peers. This effectively looks like a swarm BAN for this amount of time. +pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600); /// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would /// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet /// peers. @@ -74,6 +78,20 @@ pub struct PeerManager { target_peers: usize, /// Peers queued to be dialed. peers_to_dial: VecDeque<(PeerId, Option)>, + /// The number of temporarily banned peers. This is used to prevent instantaneous + /// reconnection. + // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A + // peer can be in a disconnected state and new connections will be refused and logged as if the + // peer is banned without it being reflected in the peer's state. + // Also the banned state can out-last the peer's reference in the peer db. So peers that are + // unknown to us can still be temporarily banned. This is fundamentally a relationship with + // the swarm. Regardless of our knowledge of the peer in the db, it will be temporarily banned + // at the swarm layer. + // NOTE: An LRUTimeCache is used compared to a structure that needs to be polled to avoid very + // frequent polling to unban peers. Instead, this cache piggy-backs the PeerManager heartbeat + // to update and clear the cache. Therefore the PEER_RECONNECTION_TIMEOUT only has a resolution + // of the HEARTBEAT_INTERVAL. + temporary_banned_peers: LRUTimeCache, /// A collection of sync committee subnets that we need to stay subscribed to. /// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run /// discovery queries for subnet peers if we disconnect from existing sync @@ -143,6 +161,7 @@ impl PeerManager { outbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_outbound)), status_peers: HashSetDelay::new(Duration::from_secs(status_interval)), target_peers: target_peer_count, + temporary_banned_peers: LRUTimeCache::new(PEER_RECONNECTION_TIMEOUT), sync_committee_subnets: Default::default(), heartbeat, discovery_enabled, @@ -243,6 +262,15 @@ impl PeerManager { reason: Option, ) { match ban_operation { + BanOperation::TemporaryBan => { + // The peer could be temporarily banned. We only do this in the case that + // we have currently reached our peer target limit. + if self.network_globals.connected_peers() >= self.target_peers { + // We have enough peers, prevent this reconnection. + self.temporary_banned_peers.raw_insert(*peer_id); + self.events.push(PeerManagerEvent::Banned(*peer_id, vec![])); + } + } BanOperation::DisconnectThePeer => { // The peer was currently connected, so we start a disconnection. // Once the peer has disconnected, its connection state will transition to a @@ -259,6 +287,11 @@ impl PeerManager { BanOperation::ReadyToBan(banned_ips) => { // The peer is not currently connected, we can safely ban it at the swarm // level. + + // If a peer is being banned, this trumps any temporary ban the peer might be + // under. We no longer track it in the temporary ban list. + self.temporary_banned_peers.raw_remove(peer_id); + // Inform the Swarm to ban the peer self.events .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); @@ -1109,6 +1142,14 @@ impl PeerManager { } } + /// Unbans any temporarily banned peers that have served their timeout. + fn unban_temporary_banned_peers(&mut self) { + for peer_id in self.temporary_banned_peers.remove_expired() { + self.events + .push(PeerManagerEvent::UnBanned(peer_id, Vec::new())); + } + } + /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of @@ -1141,6 +1182,9 @@ impl PeerManager { // Prune any excess peers back to our target in such a way that incentivises good scores and // a uniform distribution of subnets. self.prune_excess_peers(); + + // Unban any peers that have served their temporary ban timeout + self.unban_temporary_banned_peers(); } // Update metrics related to peer scoring. diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 42eb270c4..21288473e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -170,7 +170,7 @@ impl PeerManager { BanResult::NotBanned => {} } - // Count dialing peers in the limit if the peer dialied us. + // Count dialing peers in the limit if the peer dialed us. let count_dialing = endpoint.is_listener(); // Check the connection limits if self.peer_limit_reached(count_dialing) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 1f44488a5..61cf8de1c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -844,8 +844,12 @@ impl PeerDB { .collect::>(); return Some(BanOperation::ReadyToBan(banned_ips)); } - PeerConnectionStatus::Disconnecting { .. } - | PeerConnectionStatus::Unknown + PeerConnectionStatus::Disconnecting { .. } => { + // The peer has been disconnected but not banned. Inform the peer manager + // that this peer could be eligible for a temporary ban. + return Some(BanOperation::TemporaryBan); + } + PeerConnectionStatus::Unknown | PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. } => { self.disconnected_peers += 1; @@ -1177,6 +1181,9 @@ impl From> for ScoreUpdateResult { /// When attempting to ban a peer provides the peer manager with the operation that must be taken. pub enum BanOperation { + /// Optionally temporarily ban this peer to prevent instantaneous reconnection. + /// The peer manager will decide if temporary banning is required. + TemporaryBan, // The peer is currently connected. Perform a graceful disconnect before banning at the swarm // level. DisconnectThePeer, diff --git a/common/lru_cache/src/time.rs b/common/lru_cache/src/time.rs index 5c0e4c1ca..1253ef1ec 100644 --- a/common/lru_cache/src/time.rs +++ b/common/lru_cache/src/time.rs @@ -31,6 +31,77 @@ where } } + /// Inserts a key without removal of potentially expired elements. + /// Returns true if the key does not already exist. + pub fn raw_insert(&mut self, key: Key) -> bool { + // check the cache before removing elements + let is_new = self.map.insert(key.clone()); + + // add the new key to the list, if it doesn't already exist. + if is_new { + self.list.push_back(Element { + key, + inserted: Instant::now(), + }); + } else { + let position = self + .list + .iter() + .position(|e| e.key == key) + .expect("Key is not new"); + let mut element = self + .list + .remove(position) + .expect("Position is not occupied"); + element.inserted = Instant::now(); + self.list.push_back(element); + } + #[cfg(test)] + self.check_invariant(); + is_new + } + + /// Removes a key from the cache without purging expired elements. Returns true if the key + /// existed. + pub fn raw_remove(&mut self, key: &Key) -> bool { + if self.map.remove(key) { + let position = self + .list + .iter() + .position(|e| &e.key == key) + .expect("Key must exist"); + self.list + .remove(position) + .expect("Position is not occupied"); + true + } else { + false + } + } + + /// Removes all expired elements and returns them + pub fn remove_expired(&mut self) -> Vec { + if self.list.is_empty() { + return Vec::new(); + } + + let mut removed_elements = Vec::new(); + let now = Instant::now(); + // remove any expired results + while let Some(element) = self.list.pop_front() { + if element.inserted + self.ttl > now { + self.list.push_front(element); + break; + } + self.map.remove(&element.key); + removed_elements.push(element.key); + } + #[cfg(test)] + self.check_invariant(); + + removed_elements + } + // Inserts a new key. It first purges expired elements to do so. // // If the key was not present this returns `true`. If the value was already present this