From 8dd92491772fccf2b99210a9cb7a14217739a7a9 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 14 Feb 2023 03:25:42 +0000 Subject: [PATCH] Enforce a timeout on peer disconnect (#3757) On heavily crowded networks, we are seeing many attempted connections to our node every second. Often these connections come from peers that have just been disconnected. This can be for a number of reasons including: - We have deemed them to be not as useful as other peers - They have performed poorly - They have dropped the connection with us - The connection was spontaneously lost - They were randomly removed because we have too many peers In all of these cases, if we have reached or exceeded our target peer limit, there is no desire to accept new connections immediately after the disconnect from these peers. In fact, it often costs us resources to handle the established connections and defeats some of the logic of dropping them in the first place. This PR adds a timeout, that prevents recently disconnected peers from reconnecting to us. Technically we implement a ban at the swarm layer to prevent immediate re connections for at least 10 minutes. I decided to keep this light, and use a time-based LRUCache which only gets updated during the peer manager heartbeat to prevent added stress of polling a delay map for what could be a large number of peers. This cache is bounded in time. An extra space bound could be added should people consider this a risk. Co-authored-by: Diva M --- Cargo.lock | 54 +++++++------- beacon_node/lighthouse_network/Cargo.toml | 1 + .../src/peer_manager/mod.rs | 44 ++++++++++++ .../src/peer_manager/network_behaviour.rs | 2 +- .../src/peer_manager/peerdb.rs | 11 ++- common/lru_cache/src/time.rs | 71 +++++++++++++++++++ 6 files changed, 154 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6159778b7..d568ade04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1433,9 +1433,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc831ee6a32dd495436e317595e639a587aa9907bef96fe6e6abc290ab6204e9" +checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8" dependencies = [ "cc", "cxxbridge-flags", @@ -1445,9 +1445,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94331d54f1b1a8895cd81049f7eaaaef9d05a7dcb4d1fd08bf3ff0806246789d" +checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38" dependencies = [ "cc", "codespan-reporting", @@ -1460,15 +1460,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dcd35ba14ca9b40d6e4b4b39961f23d835dbb8eed74565ded361d93e1feb8a" +checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03" [[package]] name = "cxxbridge-macro" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2" +checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263" dependencies = [ "proc-macro2", "quote", @@ -2982,7 +2982,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.4", + "tokio-util 0.7.7", "tracing", ] @@ -3463,7 +3463,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba6a270039626615617f3f36d15fc827041df3b78c439da2cadfa47455a77f2f" dependencies = [ - "parity-scale-codec 3.3.0", + "parity-scale-codec 3.4.0", ] [[package]] @@ -4215,7 +4215,7 @@ dependencies = [ "thiserror", "tinytemplate", "tokio", - "tokio-util 0.7.4", + "tokio-util 0.7.7", "webrtc", ] @@ -4391,6 +4391,7 @@ dependencies = [ "lighthouse_metrics", "lighthouse_version", "lru 0.7.8", + "lru_cache", "parking_lot 0.12.1", "prometheus-client", "quickcheck", @@ -5402,9 +5403,9 @@ dependencies = [ [[package]] name = "parity-scale-codec" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3840933452adf7b3b9145e27086a5a3376c619dca1a21b1e5a5af0d54979bed" +checksum = "637935964ff85a605d114591d4d2c13c5d1ba2806dae97cea6bf180238a749ac" dependencies = [ "arrayvec", "bitvec 1.0.1", @@ -6281,7 +6282,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.23.4", - "tokio-util 0.7.4", + "tokio-util 0.7.7", "tower-service", "url", "wasm-bindgen", @@ -6567,7 +6568,7 @@ checksum = "001cf62ece89779fd16105b5f515ad0e5cedcd5440d3dd806bb067978e7c3608" dependencies = [ "cfg-if", "derive_more", - "parity-scale-codec 3.3.0", + "parity-scale-codec 3.4.0", "scale-info-derive", ] @@ -6813,9 +6814,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a" +checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" dependencies = [ "itoa 1.0.5", "ryu", @@ -6977,9 +6978,9 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" [[package]] name = "signal-hook-registry" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" dependencies = [ "libc", ] @@ -7665,10 +7666,11 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.4" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" dependencies = [ + "cfg-if", "once_cell", ] @@ -7867,7 +7869,7 @@ dependencies = [ "futures-core", "pin-project-lite 0.2.9", "tokio", - "tokio-util 0.7.4", + "tokio-util 0.7.7", ] [[package]] @@ -7917,9 +7919,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", @@ -8964,9 +8966,9 @@ dependencies = [ [[package]] name = "webrtc-ice" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "494483fbb2f5492620871fdc78b084aed8807377f6e3fe88b2e49f0a9c9c41d7" +checksum = "465a03cc11e9a7d7b4f9f99870558fe37a102b65b93f8045392fef7c67b39e80" dependencies = [ "arc-swap", "async-trait", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 474ebebb5..9b00c39d2 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -25,6 +25,7 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } smallvec = "1.6.1" tokio-io-timeout = "1.1.1" lru = "0.7.1" +lru_cache = { path = "../../common/lru_cache" } parking_lot = "0.12.0" sha2 = "0.10" snap = "1.0.1" diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 89670a2eb..03f6a746a 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -8,6 +8,7 @@ use crate::{Subnet, SubnetDiscovery}; use delay_map::HashSetDelay; use discv5::Enr; use libp2p::identify::Info as IdentifyInfo; +use lru_cache::LRUTimeCache; use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use slog::{debug, error, trace, warn}; @@ -39,6 +40,9 @@ mod network_behaviour; /// requests. This defines the interval in seconds. const HEARTBEAT_INTERVAL: u64 = 30; +/// The minimum amount of time we allow peers to reconnect to us after a disconnect when we are +/// saturated with peers. This effectively looks like a swarm BAN for this amount of time. +pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600); /// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would /// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet /// peers. @@ -74,6 +78,20 @@ pub struct PeerManager { target_peers: usize, /// Peers queued to be dialed. peers_to_dial: VecDeque<(PeerId, Option)>, + /// The number of temporarily banned peers. This is used to prevent instantaneous + /// reconnection. + // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A + // peer can be in a disconnected state and new connections will be refused and logged as if the + // peer is banned without it being reflected in the peer's state. + // Also the banned state can out-last the peer's reference in the peer db. So peers that are + // unknown to us can still be temporarily banned. This is fundamentally a relationship with + // the swarm. Regardless of our knowledge of the peer in the db, it will be temporarily banned + // at the swarm layer. + // NOTE: An LRUTimeCache is used compared to a structure that needs to be polled to avoid very + // frequent polling to unban peers. Instead, this cache piggy-backs the PeerManager heartbeat + // to update and clear the cache. Therefore the PEER_RECONNECTION_TIMEOUT only has a resolution + // of the HEARTBEAT_INTERVAL. + temporary_banned_peers: LRUTimeCache, /// A collection of sync committee subnets that we need to stay subscribed to. /// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run /// discovery queries for subnet peers if we disconnect from existing sync @@ -143,6 +161,7 @@ impl PeerManager { outbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_outbound)), status_peers: HashSetDelay::new(Duration::from_secs(status_interval)), target_peers: target_peer_count, + temporary_banned_peers: LRUTimeCache::new(PEER_RECONNECTION_TIMEOUT), sync_committee_subnets: Default::default(), heartbeat, discovery_enabled, @@ -243,6 +262,15 @@ impl PeerManager { reason: Option, ) { match ban_operation { + BanOperation::TemporaryBan => { + // The peer could be temporarily banned. We only do this in the case that + // we have currently reached our peer target limit. + if self.network_globals.connected_peers() >= self.target_peers { + // We have enough peers, prevent this reconnection. + self.temporary_banned_peers.raw_insert(*peer_id); + self.events.push(PeerManagerEvent::Banned(*peer_id, vec![])); + } + } BanOperation::DisconnectThePeer => { // The peer was currently connected, so we start a disconnection. // Once the peer has disconnected, its connection state will transition to a @@ -259,6 +287,11 @@ impl PeerManager { BanOperation::ReadyToBan(banned_ips) => { // The peer is not currently connected, we can safely ban it at the swarm // level. + + // If a peer is being banned, this trumps any temporary ban the peer might be + // under. We no longer track it in the temporary ban list. + self.temporary_banned_peers.raw_remove(peer_id); + // Inform the Swarm to ban the peer self.events .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); @@ -1109,6 +1142,14 @@ impl PeerManager { } } + /// Unbans any temporarily banned peers that have served their timeout. + fn unban_temporary_banned_peers(&mut self) { + for peer_id in self.temporary_banned_peers.remove_expired() { + self.events + .push(PeerManagerEvent::UnBanned(peer_id, Vec::new())); + } + } + /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of @@ -1141,6 +1182,9 @@ impl PeerManager { // Prune any excess peers back to our target in such a way that incentivises good scores and // a uniform distribution of subnets. self.prune_excess_peers(); + + // Unban any peers that have served their temporary ban timeout + self.unban_temporary_banned_peers(); } // Update metrics related to peer scoring. diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 42eb270c4..21288473e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -170,7 +170,7 @@ impl PeerManager { BanResult::NotBanned => {} } - // Count dialing peers in the limit if the peer dialied us. + // Count dialing peers in the limit if the peer dialed us. let count_dialing = endpoint.is_listener(); // Check the connection limits if self.peer_limit_reached(count_dialing) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 1f44488a5..61cf8de1c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -844,8 +844,12 @@ impl PeerDB { .collect::>(); return Some(BanOperation::ReadyToBan(banned_ips)); } - PeerConnectionStatus::Disconnecting { .. } - | PeerConnectionStatus::Unknown + PeerConnectionStatus::Disconnecting { .. } => { + // The peer has been disconnected but not banned. Inform the peer manager + // that this peer could be eligible for a temporary ban. + return Some(BanOperation::TemporaryBan); + } + PeerConnectionStatus::Unknown | PeerConnectionStatus::Connected { .. } | PeerConnectionStatus::Dialing { .. } => { self.disconnected_peers += 1; @@ -1177,6 +1181,9 @@ impl From> for ScoreUpdateResult { /// When attempting to ban a peer provides the peer manager with the operation that must be taken. pub enum BanOperation { + /// Optionally temporarily ban this peer to prevent instantaneous reconnection. + /// The peer manager will decide if temporary banning is required. + TemporaryBan, // The peer is currently connected. Perform a graceful disconnect before banning at the swarm // level. DisconnectThePeer, diff --git a/common/lru_cache/src/time.rs b/common/lru_cache/src/time.rs index 5c0e4c1ca..1253ef1ec 100644 --- a/common/lru_cache/src/time.rs +++ b/common/lru_cache/src/time.rs @@ -31,6 +31,77 @@ where } } + /// Inserts a key without removal of potentially expired elements. + /// Returns true if the key does not already exist. + pub fn raw_insert(&mut self, key: Key) -> bool { + // check the cache before removing elements + let is_new = self.map.insert(key.clone()); + + // add the new key to the list, if it doesn't already exist. + if is_new { + self.list.push_back(Element { + key, + inserted: Instant::now(), + }); + } else { + let position = self + .list + .iter() + .position(|e| e.key == key) + .expect("Key is not new"); + let mut element = self + .list + .remove(position) + .expect("Position is not occupied"); + element.inserted = Instant::now(); + self.list.push_back(element); + } + #[cfg(test)] + self.check_invariant(); + is_new + } + + /// Removes a key from the cache without purging expired elements. Returns true if the key + /// existed. + pub fn raw_remove(&mut self, key: &Key) -> bool { + if self.map.remove(key) { + let position = self + .list + .iter() + .position(|e| &e.key == key) + .expect("Key must exist"); + self.list + .remove(position) + .expect("Position is not occupied"); + true + } else { + false + } + } + + /// Removes all expired elements and returns them + pub fn remove_expired(&mut self) -> Vec { + if self.list.is_empty() { + return Vec::new(); + } + + let mut removed_elements = Vec::new(); + let now = Instant::now(); + // remove any expired results + while let Some(element) = self.list.pop_front() { + if element.inserted + self.ttl > now { + self.list.push_front(element); + break; + } + self.map.remove(&element.key); + removed_elements.push(element.key); + } + #[cfg(test)] + self.check_invariant(); + + removed_elements + } + // Inserts a new key. It first purges expired elements to do so. // // If the key was not present this returns `true`. If the value was already present this