From 56d596ee420bb90dff4acb479f434a8ef8457a9a Mon Sep 17 00:00:00 2001 From: Divma Date: Mon, 20 Dec 2021 23:45:21 +0000 Subject: [PATCH] Unban peers at the swarm level when purged (#2855) ## Issue Addressed #2840 --- Cargo.lock | 19 + beacon_node/lighthouse_network/Cargo.toml | 2 + .../src/peer_manager/mod.rs | 28 +- .../src/peer_manager/peerdb.rs | 48 ++- .../lighthouse_network/src/types/globals.rs | 23 +- .../tests/common/behaviour.rs | 349 ++++++++++++++++++ .../lighthouse_network/tests/common/mod.rs | 8 + .../lighthouse_network/tests/common/swarm.rs | 99 +++++ .../lighthouse_network/tests/pm_tests.rs | 204 ++++++++++ .../network/src/sync/range_sync/range.rs | 26 +- 10 files changed, 747 insertions(+), 59 deletions(-) create mode 100644 beacon_node/lighthouse_network/tests/common/behaviour.rs create mode 100644 beacon_node/lighthouse_network/tests/common/swarm.rs create mode 100644 beacon_node/lighthouse_network/tests/pm_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 21e722ec8..a4dbfc92e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2774,6 +2774,7 @@ dependencies = [ "libp2p-metrics", "libp2p-mplex", "libp2p-noise", + "libp2p-plaintext", "libp2p-swarm", "libp2p-swarm-derive", "libp2p-tcp", @@ -2966,6 +2967,23 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-plaintext" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fba1a6ff33e4a274c89a3b1d78b9f34f32af13265cc5c46c16938262d4e945a" +dependencies = [ + "asynchronous-codec", + "bytes", + "futures", + "libp2p-core 0.30.0", + "log", + "prost 0.9.0", + "prost-build 0.9.0", + "unsigned-varint 0.7.1", + "void", +] + [[package]] name = "libp2p-swarm" version = "0.32.0" @@ -3262,6 +3280,7 @@ dependencies = [ "tokio-util", "types", "unsigned-varint 0.6.0", + "void", ] [[package]] diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 4945dbfdf..7dcccd8ca 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -49,6 +49,8 @@ slog-term = "2.6.0" slog-async = "2.5.0" tempfile = "3.1.0" exit-future = "0.2.0" +libp2p = { version = "0.41.0", default-features = false, features = ["plaintext"] } +void = "1" [features] libp2p-websocket = [] diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index d8de221dd..8695d1496 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -638,7 +638,7 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. fn inject_disconnect(&mut self, peer_id: &PeerId) { - let ban_operation = self + let (ban_operation, purged_peers) = self .network_globals .peers .write() @@ -653,6 +653,11 @@ impl PeerManager { self.inbound_ping_peers.remove(peer_id); self.outbound_ping_peers.remove(peer_id); self.status_peers.remove(peer_id); + self.events.extend( + purged_peers + .into_iter() + .map(|(peer_id, unbanned_ips)| PeerManagerEvent::UnBanned(peer_id, unbanned_ips)), + ); } /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being @@ -855,9 +860,6 @@ enum ConnectingType { #[cfg(test)] mod tests { use super::*; - use crate::discovery::enr_ext::CombinedKeyExt; - use crate::rpc::methods::{MetaData, MetaDataV2}; - use discv5::enr::CombinedKey; use slog::{o, Drain}; use types::MinimalEthSpec as E; @@ -880,23 +882,7 @@ mod tests { ..Default::default() }; let log = build_log(slog::Level::Debug, false); - let globals = { - let keypair = libp2p::identity::Keypair::generate_secp256k1(); - let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); - let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap(); - NetworkGlobals::new( - enr, - 9000, - 9000, - MetaData::V2(MetaDataV2 { - seq_number: 0, - attnets: Default::default(), - syncnets: Default::default(), - }), - vec![], - &log, - ) - }; + let globals = NetworkGlobals::new_test_globals(&log); PeerManager::new(config, Arc::new(globals), &log) .await .unwrap() diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 4d69dc286..81c03eaf7 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -23,7 +23,7 @@ pub mod sync_status; /// Max number of disconnected nodes to remember. const MAX_DC_PEERS: usize = 500; /// The maximum number of banned nodes to remember. -const MAX_BANNED_PEERS: usize = 1000; +pub const MAX_BANNED_PEERS: usize = 1000; /// We ban an IP if there are more than `BANNED_PEERS_PER_IP_THRESHOLD` banned peers with this IP. const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5; /// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing @@ -709,6 +709,7 @@ impl PeerDB { } PeerConnectionStatus::Banned { .. } => { error!(self.log, "Accepted a connection from a banned peer"; "peer_id" => %peer_id); + // TODO: check if this happens and report the unban back self.banned_peers_count .remove_banned_peer(info.seen_ip_addresses()); } @@ -765,7 +766,6 @@ impl PeerDB { .seen_ip_addresses() .filter(|ip| known_banned_ips.contains(ip)) .collect::>(); - self.shrink_to_fit(); return Some(BanOperation::ReadyToBan(banned_ips)); } PeerConnectionStatus::Disconnecting { .. } @@ -776,7 +776,6 @@ impl PeerDB { info.set_connection_status(PeerConnectionStatus::Disconnected { since: Instant::now(), }); - self.shrink_to_fit(); } } } @@ -818,7 +817,6 @@ impl PeerDB { .seen_ip_addresses() .filter(|ip| known_banned_ips.contains(ip)) .collect::>(); - self.shrink_to_fit(); return Some(BanOperation::ReadyToBan(banned_ips)); } (PeerConnectionStatus::Disconnecting { .. }, NewConnectionState::Banned) => { @@ -859,7 +857,6 @@ impl PeerDB { .seen_ip_addresses() .filter(|ip| known_banned_ips.contains(ip)) .collect::>(); - self.shrink_to_fit(); return Some(BanOperation::ReadyToBan(banned_ips)); } @@ -885,7 +882,6 @@ impl PeerDB { .remove_banned_peer(info.seen_ip_addresses()); self.disconnected_peers = self.disconnected_peers().count().saturating_add(1); - self.shrink_to_fit(); } } } @@ -896,8 +892,14 @@ impl PeerDB { /// Sets the peer as disconnected. A banned peer remains banned. If the node has become banned, /// this returns true, otherwise this is false. // VISIBILITY: Only the peer manager can adjust the connection state. - pub(super) fn inject_disconnect(&mut self, peer_id: &PeerId) -> Option { - self.update_connection_state(peer_id, NewConnectionState::Disconnected) + pub(super) fn inject_disconnect( + &mut self, + peer_id: &PeerId, + ) -> (Option, Vec<(PeerId, Vec)>) { + // A peer can be banned for disconnecting. Thus another peer could be purged + let maybe_ban_op = self.update_connection_state(peer_id, NewConnectionState::Disconnected); + let purged_peers = self.shrink_to_fit(); + (maybe_ban_op, purged_peers) } /// The peer manager has notified us that the peer is undergoing a normal disconnect. Optionally tag @@ -908,12 +910,19 @@ impl PeerDB { } /// Removes banned and disconnected peers from the DB if we have reached any of our limits. - /// Drops the peers with the lowest reputation so that the number of - /// disconnected peers is less than MAX_DC_PEERS - fn shrink_to_fit(&mut self) { + /// Drops the peers with the lowest reputation so that the number of disconnected peers is less + /// than MAX_DC_PEERS + #[must_use = "Unbanned peers need to be reported to libp2p."] + fn shrink_to_fit(&mut self) -> Vec<(PeerId, Vec)> { + let excess_peers = self + .banned_peers_count + .banned_peers() + .saturating_sub(MAX_BANNED_PEERS); + let mut unbanned_peers = Vec::with_capacity(excess_peers); + // Remove excess banned peers while self.banned_peers_count.banned_peers() > MAX_BANNED_PEERS { - if let Some(to_drop) = if let Some((id, info, _)) = self + if let Some((to_drop, unbanned_ips)) = if let Some((id, info, _)) = self .peers .iter() .filter_map(|(id, info)| match info.connection_status() { @@ -924,7 +933,12 @@ impl PeerDB { { self.banned_peers_count .remove_banned_peer(info.seen_ip_addresses()); - Some(*id) + let unbanned_ips = info + .seen_ip_addresses() + .filter(|ip| !self.is_ip_banned(ip)) + .collect::>(); + + Some((*id, unbanned_ips)) } else { // If there is no minimum, this is a coding error. crit!( @@ -937,6 +951,7 @@ impl PeerDB { } { debug!(self.log, "Removing old banned peer"; "peer_id" => %to_drop); self.peers.remove(&to_drop); + unbanned_peers.push((to_drop, unbanned_ips)) } } @@ -960,6 +975,8 @@ impl PeerDB { // the count to avoid a potential infinite loop. self.disconnected_peers = self.disconnected_peers.saturating_sub(1); } + + unbanned_peers } /// This handles score transitions between states. It transitions peers states from @@ -1721,6 +1738,7 @@ mod tests { //peers[0] gets unbanned reset_score(&mut pdb, &peers[0]); pdb.update_connection_state(&peers[0], NewConnectionState::Unbanned); + let _ = pdb.shrink_to_fit(); //nothing changed assert!(pdb.ban_status(&p1).is_banned()); @@ -1732,6 +1750,7 @@ mod tests { //peers[1] gets unbanned reset_score(&mut pdb, &peers[1]); pdb.update_connection_state(&peers[1], NewConnectionState::Unbanned); + let _ = pdb.shrink_to_fit(); //all ips are unbanned assert!(!pdb.ban_status(&p1).is_banned()); @@ -1769,6 +1788,7 @@ mod tests { // unban a peer reset_score(&mut pdb, &peers[0]); pdb.update_connection_state(&peers[0], NewConnectionState::Unbanned); + let _ = pdb.shrink_to_fit(); // check not banned anymore assert!(!pdb.ban_status(&p1).is_banned()); @@ -1778,6 +1798,7 @@ mod tests { for p in &peers { reset_score(&mut pdb, p); pdb.update_connection_state(p, NewConnectionState::Unbanned); + let _ = pdb.shrink_to_fit(); } // add ip2 to all peers and ban them. @@ -1797,6 +1818,7 @@ mod tests { for p in &peers { reset_score(&mut pdb, p); pdb.update_connection_state(p, NewConnectionState::Unbanned); + let _ = pdb.shrink_to_fit(); } // reban every peer except one diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 638270c2b..aadd13a23 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,6 +1,6 @@ //! A collection of variables that are accessible outside of the network thread itself. use crate::peer_manager::peerdb::PeerDB; -use crate::rpc::MetaData; +use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; use crate::Client; use crate::EnrExt; @@ -127,4 +127,25 @@ impl NetworkGlobals { pub fn set_sync_state(&self, new_state: SyncState) -> SyncState { std::mem::replace(&mut *self.sync_state.write(), new_state) } + + /// TESTING ONLY. Build a dummy NetworkGlobals instance. + pub fn new_test_globals(log: &slog::Logger) -> NetworkGlobals { + use crate::CombinedKeyExt; + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let enr_key: discv5::enr::CombinedKey = + discv5::enr::CombinedKey::from_libp2p(&keypair).unwrap(); + let enr = discv5::enr::EnrBuilder::new("v4").build(&enr_key).unwrap(); + NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData::V2(MetaDataV2 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + }), + vec![], + log, + ) + } } diff --git a/beacon_node/lighthouse_network/tests/common/behaviour.rs b/beacon_node/lighthouse_network/tests/common/behaviour.rs new file mode 100644 index 000000000..ab4ae901f --- /dev/null +++ b/beacon_node/lighthouse_network/tests/common/behaviour.rs @@ -0,0 +1,349 @@ +// NOTE: Taken from libp2p's swarm's testing utils. +// +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::collections::HashMap; +use std::task::{Context, Poll}; + +use libp2p::core::connection::{ConnectedPoint, ConnectionId, ListenerId}; +use libp2p::swarm::protocols_handler::{ + DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler, +}; +use libp2p::swarm::{DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::{Multiaddr, PeerId}; + +/// A `MockBehaviour` is a `NetworkBehaviour` that allows for +/// the instrumentation of return values, without keeping +/// any further state. +pub struct MockBehaviour< + THandler = DummyProtocolsHandler, + TOutEvent = ::OutEvent, +> where + THandler: ProtocolsHandler, +{ + /// The prototype protocols handler that is cloned for every + /// invocation of `new_handler`. + pub handler_proto: THandler, + /// The addresses to return from `addresses_of_peer`. + pub addresses: HashMap>, + /// The next action to return from `poll`. + /// + /// An action is only returned once. + pub next_action: Option>, +} + +impl MockBehaviour +where + THandler: ProtocolsHandler, +{ + pub fn new(handler_proto: THandler) -> Self { + MockBehaviour { + handler_proto, + addresses: HashMap::new(), + next_action: None, + } + } +} + +impl NetworkBehaviour for MockBehaviour +where + THandler: ProtocolsHandler + Clone, + THandler::OutEvent: Clone, + TOutEvent: Send + 'static, +{ + type ProtocolsHandler = THandler; + type OutEvent = TOutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.handler_proto.clone() + } + + fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { + self.addresses.get(p).map_or(Vec::new(), |v| v.clone()) + } + + fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: THandler::OutEvent) {} + + fn poll( + &mut self, + _: &mut Context, + _: &mut impl PollParameters, + ) -> Poll> { + Option::take(&mut self.next_action).map_or(Poll::Pending, Poll::Ready) + } +} + +/// A `CallTraceBehaviour` is a `NetworkBehaviour` that tracks invocations of callback methods and +/// their arguments, wrapping around an inner behaviour. It ensures certain invariants are met. +pub struct CallTraceBehaviour +where + TInner: NetworkBehaviour, +{ + inner: TInner, + + pub addresses_of_peer: Vec, + pub inject_connected: Vec, + pub inject_disconnected: Vec, + pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_event: Vec<( + PeerId, + ConnectionId, + <::Handler as ProtocolsHandler>::OutEvent, + )>, + pub inject_dial_failure: Vec>, + pub inject_new_listener: Vec, + pub inject_new_listen_addr: Vec<(ListenerId, Multiaddr)>, + pub inject_new_external_addr: Vec, + pub inject_expired_listen_addr: Vec<(ListenerId, Multiaddr)>, + pub inject_expired_external_addr: Vec, + pub inject_listener_error: Vec, + pub inject_listener_closed: Vec<(ListenerId, bool)>, + pub poll: usize, +} + +impl CallTraceBehaviour +where + TInner: NetworkBehaviour, +{ + pub fn new(inner: TInner) -> Self { + Self { + inner, + addresses_of_peer: Vec::new(), + inject_connected: Vec::new(), + inject_disconnected: Vec::new(), + inject_connection_established: Vec::new(), + inject_connection_closed: Vec::new(), + inject_event: Vec::new(), + inject_dial_failure: Vec::new(), + inject_new_listener: Vec::new(), + inject_new_listen_addr: Vec::new(), + inject_new_external_addr: Vec::new(), + inject_expired_listen_addr: Vec::new(), + inject_expired_external_addr: Vec::new(), + inject_listener_error: Vec::new(), + inject_listener_closed: Vec::new(), + poll: 0, + } + } + + #[allow(dead_code)] + pub fn reset(&mut self) { + self.addresses_of_peer = Vec::new(); + self.inject_connected = Vec::new(); + self.inject_disconnected = Vec::new(); + self.inject_connection_established = Vec::new(); + self.inject_connection_closed = Vec::new(); + self.inject_event = Vec::new(); + self.inject_dial_failure = Vec::new(); + self.inject_new_listen_addr = Vec::new(); + self.inject_new_external_addr = Vec::new(); + self.inject_expired_listen_addr = Vec::new(); + self.inject_listener_error = Vec::new(); + self.inject_listener_closed = Vec::new(); + self.poll = 0; + } + + pub fn inner(&mut self) -> &mut TInner { + &mut self.inner + } + + /// Checks that when the expected number of closed connection notifications are received, a + /// given number of expected disconnections have been received as well. + /// + /// Returns if the first condition is met. + pub fn assert_disconnected( + &self, + expected_closed_connections: usize, + expected_disconnections: usize, + ) -> bool { + if self.inject_connection_closed.len() == expected_closed_connections { + assert_eq!(self.inject_disconnected.len(), expected_disconnections); + return true; + } + + false + } + + /// Checks that when the expected number of established connection notifications are received, + /// a given number of expected connections have been received as well. + /// + /// Returns if the first condition is met. + pub fn assert_connected( + &self, + expected_established_connections: usize, + expected_connections: usize, + ) -> bool { + if self.inject_connection_established.len() == expected_established_connections { + assert_eq!(self.inject_connected.len(), expected_connections); + return true; + } + + false + } +} + +impl NetworkBehaviour for CallTraceBehaviour +where + TInner: NetworkBehaviour, + <::Handler as ProtocolsHandler>::OutEvent: + Clone, +{ + type ProtocolsHandler = TInner::ProtocolsHandler; + type OutEvent = TInner::OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { + self.addresses_of_peer.push(*p); + self.inner.addresses_of_peer(p) + } + + fn inject_connected(&mut self, peer: &PeerId) { + assert!( + self.inject_connection_established + .iter() + .any(|(peer_id, _, _)| peer_id == peer), + "`inject_connected` is called after at least one `inject_connection_established`." + ); + self.inject_connected.push(*peer); + self.inner.inject_connected(peer); + } + + fn inject_connection_established( + &mut self, + p: &PeerId, + c: &ConnectionId, + e: &ConnectedPoint, + errors: Option<&Vec>, + ) { + self.inject_connection_established.push((*p, *c, e.clone())); + self.inner.inject_connection_established(p, c, e, errors); + } + + fn inject_disconnected(&mut self, peer: &PeerId) { + assert!( + self.inject_connection_closed + .iter() + .any(|(peer_id, _, _)| peer_id == peer), + "`inject_disconnected` is called after at least one `inject_connection_closed`." + ); + self.inject_disconnected.push(*peer); + self.inner.inject_disconnected(peer); + } + + fn inject_connection_closed( + &mut self, + p: &PeerId, + c: &ConnectionId, + e: &ConnectedPoint, + handler: ::Handler, + ) { + let connection = (*p, *c, e.clone()); + assert!( + self.inject_connection_established.contains(&connection), + "`inject_connection_closed` is called only for connections for \ + which `inject_connection_established` was called first." + ); + self.inject_connection_closed.push(connection); + self.inner.inject_connection_closed(p, c, e, handler); + } + + fn inject_event( + &mut self, + p: PeerId, + c: ConnectionId, + e: <::Handler as ProtocolsHandler>::OutEvent, + ) { + assert!( + self.inject_connection_established + .iter() + .any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id), + "`inject_event` is called for reported connections." + ); + assert!( + !self + .inject_connection_closed + .iter() + .any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id), + "`inject_event` is never called for closed connections." + ); + + self.inject_event.push((p, c, e.clone())); + self.inner.inject_event(p, c, e); + } + + fn inject_dial_failure( + &mut self, + p: Option, + handler: Self::ProtocolsHandler, + error: &DialError, + ) { + self.inject_dial_failure.push(p); + self.inner.inject_dial_failure(p, handler, error); + } + + fn inject_new_listener(&mut self, id: ListenerId) { + self.inject_new_listener.push(id); + self.inner.inject_new_listener(id); + } + + fn inject_new_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) { + self.inject_new_listen_addr.push((id, a.clone())); + self.inner.inject_new_listen_addr(id, a); + } + + fn inject_expired_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) { + self.inject_expired_listen_addr.push((id, a.clone())); + self.inner.inject_expired_listen_addr(id, a); + } + + fn inject_new_external_addr(&mut self, a: &Multiaddr) { + self.inject_new_external_addr.push(a.clone()); + self.inner.inject_new_external_addr(a); + } + + fn inject_expired_external_addr(&mut self, a: &Multiaddr) { + self.inject_expired_external_addr.push(a.clone()); + self.inner.inject_expired_external_addr(a); + } + + fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) { + self.inject_listener_error.push(l); + self.inner.inject_listener_error(l, e); + } + + fn inject_listener_closed(&mut self, l: ListenerId, r: Result<(), &std::io::Error>) { + self.inject_listener_closed.push((l, r.is_ok())); + self.inner.inject_listener_closed(l, r); + } + + fn poll( + &mut self, + cx: &mut Context, + args: &mut impl PollParameters, + ) -> Poll> { + self.poll += 1; + self.inner.poll(cx, args) + } +} diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index 865946a22..520921e87 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -13,6 +13,13 @@ use std::time::Duration; use tokio::runtime::Runtime; use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec}; +#[allow(clippy::type_complexity)] +#[allow(unused)] +pub mod behaviour; +#[allow(clippy::type_complexity)] +#[allow(unused)] +pub mod swarm; + type E = MinimalEthSpec; use tempfile::Builder as TempBuilder; @@ -41,6 +48,7 @@ impl std::ops::DerefMut for Libp2pInstance { } } +#[allow(unused)] pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); diff --git a/beacon_node/lighthouse_network/tests/common/swarm.rs b/beacon_node/lighthouse_network/tests/common/swarm.rs new file mode 100644 index 000000000..2930e2e4d --- /dev/null +++ b/beacon_node/lighthouse_network/tests/common/swarm.rs @@ -0,0 +1,99 @@ +use std::collections::HashMap; +use std::pin::Pin; + +use super::behaviour::{CallTraceBehaviour, MockBehaviour}; + +use futures::stream::Stream; +use futures::task::{Context, Poll}; +use libp2p::swarm::protocols_handler::ProtocolsHandler; +use libp2p::swarm::{IntoProtocolsHandler, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p::{PeerId, Transport}; + +use futures::StreamExt; + +pub fn new_test_swarm(behaviour: B) -> Swarm +where + B: NetworkBehaviour, +{ + let id_keys = libp2p::identity::Keypair::generate_ed25519(); + let local_public_key = id_keys.public(); + let transport = libp2p::core::transport::MemoryTransport::default() + .upgrade(libp2p::core::upgrade::Version::V1) + .authenticate(libp2p::plaintext::PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(libp2p::yamux::YamuxConfig::default()) + .boxed(); + SwarmBuilder::new(transport, behaviour, local_public_key.into()).build() +} + +pub fn random_multiaddr() -> libp2p::multiaddr::Multiaddr { + libp2p::multiaddr::Protocol::Memory(rand::random::()).into() +} + +/// Bind a memory multiaddr to a compatible swarm. +pub async fn bind_listener( + swarm: &mut Swarm, +) -> libp2p::multiaddr::Multiaddr { + swarm.listen_on(random_multiaddr()).unwrap(); + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { + listener_id: _, + address, + } => address, + _ => panic!("Testing swarm's first event should be a new listener"), + } +} + +#[derive(Default)] +pub struct SwarmPool { + swarms: HashMap>, +} + +impl SwarmPool { + pub fn with_capacity(capacity: usize) -> Self { + Self { + swarms: HashMap::with_capacity(capacity), + } + } + pub fn insert(&mut self, swarm: Swarm) -> PeerId { + let peer_id = *swarm.local_peer_id(); + self.swarms.insert(peer_id, swarm); + peer_id + } + + pub fn remove(&mut self, peer_id: &PeerId) { + self.swarms.remove(peer_id); + } + + pub fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut Swarm> { + self.swarms.get_mut(peer_id) + } + + pub fn swarms(&self) -> &HashMap> { + &self.swarms + } + + pub fn swarms_mut(&mut self) -> &mut HashMap> { + &mut self.swarms + } +} + +impl Stream for SwarmPool +where + B: NetworkBehaviour, + ::ProtocolsHandler: ProtocolsHandler, +{ + type Item = (PeerId, + SwarmEvent<::OutEvent, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error>); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut polls = self + .get_mut() + .swarms + .iter_mut() + .map(|(&peer_id, swarm)| swarm.map(move |ev| (peer_id, ev))) + .collect::>(); + polls.poll_next_unpin(cx) + } +} diff --git a/beacon_node/lighthouse_network/tests/pm_tests.rs b/beacon_node/lighthouse_network/tests/pm_tests.rs new file mode 100644 index 000000000..96f91797a --- /dev/null +++ b/beacon_node/lighthouse_network/tests/pm_tests.rs @@ -0,0 +1,204 @@ +#![cfg(not(debug_assertions))] + +mod common; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use common::{ + behaviour::{CallTraceBehaviour, MockBehaviour}, + swarm, +}; +use lighthouse_network::{ + peer_manager::{config::Config, PeerManagerEvent}, + NetworkGlobals, PeerAction, PeerInfo, PeerManager, ReportSource, +}; +use types::MinimalEthSpec as E; + +use futures::StreamExt; +use libp2p::{ + core::either::EitherError, + swarm::SwarmEvent, + swarm::{protocols_handler::DummyProtocolsHandler, DummyBehaviour, KeepAlive, Swarm}, + NetworkBehaviour, +}; + +use slog::debug; + +/// Struct that mimics the lighthouse_network::Service with respect to handling peer manager +/// events. +// TODO: make this a real struct for more accurate testing. +struct Service { + swarm: Swarm, +} + +impl Service { + async fn select_next_some(&mut self) -> SwarmEvent> { + let ev = self.swarm.select_next_some().await; + match &ev { + SwarmEvent::Behaviour(Ev(PeerManagerEvent::Banned(peer_id, _addr_vec))) => { + self.swarm.ban_peer_id(*peer_id); + } + SwarmEvent::Behaviour(Ev(PeerManagerEvent::UnBanned(peer_id, _addr_vec))) => { + self.swarm.unban_peer_id(*peer_id); + } + SwarmEvent::Behaviour(Ev(PeerManagerEvent::DisconnectPeer(peer_id, _reason))) => { + // directly disconnect here. + let _ = self.swarm.disconnect_peer_id(*peer_id); + } + _ => {} + } + ev + } +} + +#[derive(Debug)] +struct Ev(PeerManagerEvent); +impl From for Ev { + fn from(_: void::Void) -> Self { + unreachable!("No events are emmited") + } +} +impl From for Ev { + fn from(ev: PeerManagerEvent) -> Self { + Ev(ev) + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "Ev")] +struct Behaviour { + pm_call_trace: CallTraceBehaviour>, + sibling: MockBehaviour, +} + +impl Behaviour { + fn new(pm: PeerManager) -> Self { + Behaviour { + pm_call_trace: CallTraceBehaviour::new(pm), + sibling: MockBehaviour::new(DummyProtocolsHandler { + // The peer manager votes No, so we make sure the combined handler stays alive this + // way. + keep_alive: KeepAlive::Yes, + }), + } + } +} + +#[tokio::test] +async fn banned_peers_consistency() { + let log = common::build_log(slog::Level::Debug, false); + let pm_log = log.new(slog::o!("who" => "[PM]")); + let globals: Arc> = Arc::new(NetworkGlobals::new_test_globals(&log)); + + // Build the peer manager. + let (mut pm_service, pm_addr) = { + let pm_config = Config { + discovery_enabled: false, + ..Default::default() + }; + let pm = PeerManager::new(pm_config, globals.clone(), &pm_log) + .await + .unwrap(); + let mut pm_swarm = swarm::new_test_swarm(Behaviour::new(pm)); + let pm_addr = swarm::bind_listener(&mut pm_swarm).await; + let service = Service { swarm: pm_swarm }; + (service, pm_addr) + }; + + let excess_banned_peers = 15; + let peers_to_ban = + lighthouse_network::peer_manager::peerdb::MAX_BANNED_PEERS + excess_banned_peers; + + // Build all the dummy peers needed. + let (mut swarm_pool, peers) = { + let mut pool = swarm::SwarmPool::with_capacity(peers_to_ban); + let mut peers = HashSet::with_capacity(peers_to_ban); + for _ in 0..peers_to_ban { + let mut peer_swarm = + swarm::new_test_swarm(DummyBehaviour::with_keep_alive(KeepAlive::Yes)); + let _peer_addr = swarm::bind_listener(&mut peer_swarm).await; + // It is ok to dial all at the same time since the swarm handles an event at a time. + peer_swarm.dial(pm_addr.clone()).unwrap(); + let peer_id = pool.insert(peer_swarm); + peers.insert(peer_id); + } + (pool, peers) + }; + + // we track banned peers at the swarm level here since there is no access to that info. + let mut swarm_banned_peers = HashMap::with_capacity(peers_to_ban); + let mut peers_unbanned = 0; + let timeout = tokio::time::sleep(tokio::time::Duration::from_secs(30)); + futures::pin_mut!(timeout); + + loop { + // poll the pm and dummy swarms. + tokio::select! { + pm_event = pm_service.select_next_some() => { + debug!(log, "[PM] {:?}", pm_event); + match pm_event { + SwarmEvent::Behaviour(Ev(ev)) => match ev { + PeerManagerEvent::Banned(peer_id, _) => { + let has_been_unbanned = false; + swarm_banned_peers.insert(peer_id, has_been_unbanned); + } + PeerManagerEvent::UnBanned(peer_id, _) => { + *swarm_banned_peers.get_mut(&peer_id).expect("Unbanned peer must be banned first") = true; + peers_unbanned += 1; + } + _ => {} + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint: _, + num_established: _, + concurrent_dial_errors: _, + } => { + assert!(peers.contains(&peer_id)); + // now we report the peer as banned. + pm_service + .swarm + .behaviour_mut() + .pm_call_trace + .inner() + .report_peer( + &peer_id, + PeerAction::Fatal, + ReportSource::Processor, + None + ); + }, + _ => {} + } + } + Some((_peer_id, _peer_ev)) = swarm_pool.next() => { + // we need to poll the swarms to keep the peers going + } + _ = timeout.as_mut() => { + panic!("Test timeout.") + } + } + + if peers_unbanned == excess_banned_peers { + let pdb = globals.peers.read(); + let inconsistencies = swarm_banned_peers + .into_iter() + .map(|(peer_id, was_unbanned)| { + was_unbanned + != pdb.peer_info(&peer_id).map_or( + false, /* We forgot about a banned peer */ + PeerInfo::is_banned, + ) + }); + assert_eq!( + inconsistencies + .filter(|is_consistent| *is_consistent) + .count(), + peers_to_ban + ); + return; + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index f6cf4199b..ffe74ea98 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -371,7 +371,7 @@ mod tests { use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::parking_lot::RwLock; use lighthouse_network::rpc::BlocksByRangeRequest; - use lighthouse_network::{libp2p, Request}; + use lighthouse_network::Request; use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; use slog::{o, Drain}; @@ -568,29 +568,7 @@ mod tests { log.new(o!("component" => "range")), ); let (network_tx, network_rx) = mpsc::unbounded_channel(); - let globals = { - use lighthouse_network::discovery::enr_ext::CombinedKeyExt; - use lighthouse_network::discv5::enr::CombinedKey; - use lighthouse_network::discv5::enr::EnrBuilder; - use lighthouse_network::rpc::methods::{MetaData, MetaDataV2}; - - let keypair = libp2p::identity::Keypair::generate_secp256k1(); - let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); - let enr = EnrBuilder::new("v4").build(&enr_key).unwrap(); - let globals = NetworkGlobals::new( - enr, - 9000, - 9000, - MetaData::V2(MetaDataV2 { - seq_number: 0, - attnets: Default::default(), - syncnets: Default::default(), - }), - vec![], - &log, - ); - Arc::new(globals) - }; + let globals = Arc::new(NetworkGlobals::new_test_globals(&log)); let cx = SyncNetworkContext::new( network_tx, globals.clone(),