diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 7caf1da06..ad91ad4ef 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -20,7 +20,7 @@ deposit_contract = { path = "../eth2/utils/deposit_contract" } libc = "0.2.65" eth2_ssz = { path = "../eth2/utils/ssz" } eth2_ssz_derive = { path = "../eth2/utils/ssz_derive" } -hex = "0.4" +hex = "0.3" validator_client = { path = "../validator_client" } rayon = "1.2.0" eth2_testnet_config = { path = "../eth2/utils/eth2_testnet_config" } diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index ae99c8ce6..83d77a3c9 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -26,7 +26,7 @@ slog-term = "2.4.2" slog-async = "2.3.0" ctrlc = { version = "3.1.3", features = ["termination"] } tokio = "0.1.22" -tokio-timer = "0.2.11" +tokio-timer = "0.2.12" exit-future = "0.1.4" env_logger = "0.7.1" dirs = "2.0.2" diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index bdd9ded4d..ea3684046 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -15,7 +15,7 @@ reqwest = "0.9" futures = "0.1.25" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -hex = "0.4" +hex = "0.3" types = { path = "../../eth2/types"} merkle_proof = { path = "../../eth2/utils/merkle_proof"} eth2_ssz = { path = "../../eth2/utils/ssz"} @@ -23,7 +23,7 @@ tree_hash = { path = "../../eth2/utils/tree_hash"} eth2_hashing = { path = "../../eth2/utils/eth2_hashing"} parking_lot = "0.7" slog = "^2.2.3" -tokio = "0.1.17" +tokio = "0.1.22" state_processing = { path = "../../eth2/state_processing" } exit-future = "0.1.4" libflate = "0.1" diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 11ece8cd0..8883b5acf 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -21,12 +21,12 @@ tokio = "0.1.22" futures = "0.1.29" error-chain = "0.12.1" dirs = "2.0.2" -smallvec = "0.6.11" fnv = "1.0.6" unsigned-varint = "0.2.3" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } tokio-io-timeout = "0.3.1" +smallvec = "1.0.0" [dev-dependencies] slog-stdlog = "4.0.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 63464bc72..0eb60387b 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -241,6 +241,11 @@ impl Behaviour { self.discovery.peer_banned(peer_id); } + /// Notify discovery that the peer has been unbanned. + pub fn peer_unbanned(&mut self, peer_id: &PeerId) { + self.discovery.peer_unbanned(peer_id); + } + /// Informs the discovery behaviour if a new IP/Port is set at the application layer pub fn update_local_enr_socket(&mut self, socket: std::net::SocketAddr, is_tcp: bool) { self.discovery.update_local_enr(socket, is_tcp); diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 860bf6064..fdbbca2cc 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -39,7 +39,7 @@ pub struct Discovery { /// The target number of connected peers on the libp2p interface. max_peers: usize, - /// directory to save ENR to + /// The directory where the ENR is stored. enr_dir: String, /// The delay between peer discovery searches. @@ -158,11 +158,14 @@ impl Discovery { /// The peer has been banned. Add this peer to the banned list to prevent any future /// re-connections. // TODO: Remove the peer from the DHT if present - // TODO: Implement a timeout, after which we unban the peer pub fn peer_banned(&mut self, peer_id: PeerId) { self.banned_peers.insert(peer_id); } + pub fn peer_unbanned(&mut self, peer_id: &PeerId) { + self.banned_peers.remove(peer_id); + } + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index a9c519e2e..0fb19a4ab 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -12,13 +12,12 @@ use libp2p::core::{ transport::boxed::Boxed, ConnectedPoint, }; use libp2p::{core, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport}; -use slog::{crit, debug, info, trace, warn}; -use smallvec::SmallVec; +use slog::{crit, debug, error, info, trace, warn}; use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::time::Duration; -use std::time::Instant; +use tokio::timer::DelayQueue; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = Behaviour>; @@ -26,7 +25,7 @@ type Libp2pBehaviour = Behaviour>; const NETWORK_KEY_FILENAME: &str = "key"; /// The time in milliseconds to wait before banning a peer. This allows for any Goodbye messages to be /// flushed and protocols to be negotiated. -const BAN_PEER_TIMEOUT: u64 = 200; +const BAN_PEER_WAIT_TIMEOUT: u64 = 200; /// The configuration and state of the libp2p components for the beacon node. pub struct Service { @@ -38,7 +37,10 @@ pub struct Service { pub local_peer_id: PeerId, /// A current list of peers to ban after a given timeout. - peers_to_ban: SmallVec<[(PeerId, Instant); 4]>, + peers_to_ban: DelayQueue, + + /// A list of timeouts after which peers become unbanned. + peer_ban_timeout: DelayQueue, /// Indicates if the listening address have been verified and compared to the expected ENR. verified_listen_address: bool, @@ -157,18 +159,21 @@ impl Service { Ok(Service { local_peer_id, swarm, - peers_to_ban: SmallVec::new(), + peers_to_ban: DelayQueue::new(), + peer_ban_timeout: DelayQueue::new(), verified_listen_address: false, log, }) } - /// Adds a peer to be banned after a timeout period. - pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId) { - self.peers_to_ban.push(( - peer_id, - Instant::now() + Duration::from_millis(BAN_PEER_TIMEOUT), - )); + /// Adds a peer to be banned for a period of time, specified by a timeout. + pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId, timeout: Duration) { + error!(self.log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id), "timeout" => format!("{:?}", timeout)); + self.peers_to_ban.insert( + peer_id.clone(), + Duration::from_millis(BAN_PEER_WAIT_TIMEOUT), + ); + self.peer_ban_timeout.insert(peer_id, timeout); } } @@ -214,6 +219,48 @@ impl Stream for Service { _ => break, } } + + // check if peers need to be banned + loop { + match self.peers_to_ban.poll() { + Ok(Async::Ready(Some(peer_id))) => { + let peer_id = peer_id.into_inner(); + Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); + // TODO: Correctly notify protocols of the disconnect + // TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 + let dummy_connected_point = ConnectedPoint::Dialer { + address: "/ip4/0.0.0.0" + .parse::() + .expect("valid multiaddr"), + }; + self.swarm + .inject_disconnected(&peer_id, dummy_connected_point); + // inform the behaviour that the peer has been banned + self.swarm.peer_banned(peer_id); + } + Ok(Async::NotReady) | Ok(Async::Ready(None)) => break, + Err(e) => { + warn!(self.log, "Peer banning queue failed"; "error" => format!("{:?}", e)); + } + } + } + + // un-ban peer if it's timeout has expired + loop { + match self.peer_ban_timeout.poll() { + Ok(Async::Ready(Some(peer_id))) => { + let peer_id = peer_id.into_inner(); + debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_id)); + self.swarm.peer_unbanned(&peer_id); + Swarm::unban_peer_id(&mut self.swarm, peer_id); + } + Ok(Async::NotReady) | Ok(Async::Ready(None)) => break, + Err(e) => { + warn!(self.log, "Peer banning timeout queue failed"; "error" => format!("{:?}", e)); + } + } + } + // swarm is not ready // check to see if the address is different to the config. If so, update our ENR if !self.verified_listen_address { @@ -226,28 +273,6 @@ impl Stream for Service { } } - // check if there are peers to ban - while !self.peers_to_ban.is_empty() { - if self.peers_to_ban[0].1 < Instant::now() { - let (peer_id, _) = self.peers_to_ban.remove(0); - warn!(self.log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id)); - Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); - // TODO: Correctly notify protocols of the disconnect - // TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 - let dummy_connected_point = ConnectedPoint::Dialer { - address: "/ip4/0.0.0.0" - .parse::() - .expect("valid multiaddr"), - }; - self.swarm - .inject_disconnected(&peer_id, dummy_connected_point); - // inform the behaviour that the peer has been banned - self.swarm.peer_banned(peer_id); - } else { - break; - } - } - Ok(Async::NotReady) } } diff --git a/beacon_node/genesis/Cargo.toml b/beacon_node/genesis/Cargo.toml index 60d7f3f4b..7ab56a689 100644 --- a/beacon_node/genesis/Cargo.toml +++ b/beacon_node/genesis/Cargo.toml @@ -19,7 +19,7 @@ merkle_proof = { path = "../../eth2/utils/merkle_proof" } eth2_ssz = "0.1" eth2_hashing = { path = "../../eth2/utils/eth2_hashing" } tree_hash = "0.1" -tokio = "0.1.17" +tokio = "0.1.22" parking_lot = "0.7" slog = "^2.2.3" exit-future = "0.1.4" diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 1117a2c89..906c4f938 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -20,7 +20,7 @@ futures = "0.1.29" error-chain = "0.12.1" tokio = "0.1.22" parking_lot = "0.9.0" -smallvec = "0.6.11" +smallvec = "1.0.0" # TODO: Remove rand crate for mainnet rand = "0.7.2" fnv = "1.0.6" diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 94b00bb7f..ac7961254 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,6 +14,9 @@ use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; +/// The time in seconds that a peer will be banned and prevented from reconnecting. +const BAN_PEER_TIMEOUT: u64 = 30; + /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { libp2p_service: Arc>, @@ -212,7 +215,10 @@ fn network_service( } } NetworkMessage::Disconnect { peer_id } => { - libp2p_service.lock().disconnect_and_ban_peer(peer_id); + libp2p_service.lock().disconnect_and_ban_peer( + peer_id, + std::time::Duration::from_secs(BAN_PEER_TIMEOUT), + ); } }, Ok(Async::NotReady) => break, @@ -235,7 +241,10 @@ fn network_service( // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { - locked_service.disconnect_and_ban_peer(peer_id.clone()); + locked_service.disconnect_and_ban_peer( + peer_id.clone(), + std::time::Duration::from_secs(BAN_PEER_TIMEOUT), + ); }; message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index ff6cc054b..dd1ab7ce5 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] beacon_node = { "path" = "../beacon_node" } -tokio = "0.1.15" +tokio = "0.1.22" slog = { version = "^2.2.3" , features = ["max_level_trace"] } sloggers = "0.3.4" types = { "path" = "../eth2/types" } diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index 01053ae9a..6e693d12a 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] -tokio = "0.1.15" +tokio = "0.1.22" slog = { version = "^2.2.3" , features = ["max_level_trace"] } sloggers = "0.3.4" types = { "path" = "../../eth2/types" } diff --git a/tests/eth1_test_rig/Cargo.toml b/tests/eth1_test_rig/Cargo.toml index 2f4914e40..8038fd51d 100644 --- a/tests/eth1_test_rig/Cargo.toml +++ b/tests/eth1_test_rig/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] web3 = "0.8.0" -tokio = "0.1.17" +tokio = "0.1.22" futures = "0.1.25" types = { path = "../../eth2/types"} serde_json = "1.0" diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 02eea884b..b38bfff39 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -24,7 +24,7 @@ slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_tr slog-async = "2.3.0" slog-term = "2.4.2" tokio = "0.1.22" -tokio-timer = "0.2.11" +tokio-timer = "0.2.12" error-chain = "0.12.1" bincode = "1.2.0" futures = "0.1.29" @@ -35,7 +35,7 @@ parking_lot = "0.7" exit-future = "0.1.4" libc = "0.2.65" eth2_ssz_derive = { path = "../eth2/utils/ssz_derive" } -hex = "0.4" +hex = "0.3" deposit_contract = { path = "../eth2/utils/deposit_contract" } bls = { path = "../eth2/utils/bls" } remote_beacon_node = { path = "../eth2/utils/remote_beacon_node" }