diff --git a/Cargo.lock b/Cargo.lock index d86be7513..713d8d067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,8 +289,8 @@ dependencies = [ "ctrlc", "dirs", "environment", - "eth2-libp2p", "eth2_config", + "eth2_libp2p", "eth2_ssz", "eth2_testnet_config", "exit-future", @@ -402,6 +402,25 @@ dependencies = [ "zeroize", ] +[[package]] +name = "boot_node" +version = "0.1.0" +dependencies = [ + "clap", + "discv5", + "eth2_libp2p", + "futures 0.3.5", + "log 0.4.8", + "logging", + "slog", + "slog-async", + "slog-scope", + "slog-stdlog", + "slog-term", + "sloggers", + "tokio 0.2.21", +] + [[package]] name = "bs58" version = "0.3.1" @@ -581,8 +600,8 @@ dependencies = [ "environment", "error-chain", "eth1", - "eth2-libp2p", "eth2_config", + "eth2_libp2p", "eth2_ssz", "futures 0.3.5", "genesis", @@ -999,8 +1018,9 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" [[package]] name = "discv5" -version = "0.1.0-alpha.2" -source = "git+https://github.com/sigp/discv5?rev=7b3bd40591b62b8c002ffdb85de008aa9f82e2e5#7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" +version = "0.1.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66319abef3e2f4dc434bf0c9bcb5dee5907d7fece3327dfd7da82db905d02441" dependencies = [ "arrayvec 0.5.1", "digest", @@ -1009,10 +1029,15 @@ dependencies = [ "futures 0.3.5", "hex 0.4.2", "hkdf", + "lazy_static", + "libp2p-core", "libsecp256k1", "log 0.4.8", + "lru_time_cache", + "multihash", "net2", "openssl", + "parking_lot 0.10.2", "rand 0.7.3", "rlp", "sha2", @@ -1121,6 +1146,7 @@ name = "environment" version = "0.1.2" dependencies = [ "ctrlc", + "discv5", "eth2_config", "eth2_testnet_config", "exit-future", @@ -1189,49 +1215,6 @@ dependencies = [ "web3", ] -[[package]] -name = "eth2-libp2p" -version = "0.1.2" -dependencies = [ - "base64 0.12.1", - "dirs", - "discv5", - "environment", - "error-chain", - "eth2_ssz", - "eth2_ssz_derive", - "eth2_ssz_types", - "exit-future", - "fnv", - "futures 0.3.5", - "hashset_delay", - "hex 0.4.2", - "lazy_static", - "libp2p", - "libp2p-tcp", - "lighthouse_metrics", - "lru 0.5.1", - "parking_lot 0.10.2", - "serde", - "serde_derive", - "sha2", - "slog", - "slog-async", - "slog-stdlog", - "slog-term", - "smallvec 1.4.0", - "snap", - "tempdir", - "tiny-keccak 2.0.2", - "tokio 0.2.21", - "tokio-io-timeout", - "tokio-util", - "types", - "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)", - "version", - "void", -] - [[package]] name = "eth2_config" version = "0.2.0" @@ -1298,6 +1281,49 @@ dependencies = [ "zeroize", ] +[[package]] +name = "eth2_libp2p" +version = "0.1.2" +dependencies = [ + "base64 0.12.1", + "dirs", + "discv5", + "environment", + "error-chain", + "eth2_ssz", + "eth2_ssz_derive", + "eth2_ssz_types", + "exit-future", + "fnv", + "futures 0.3.5", + "hashset_delay", + "hex 0.4.2", + "lazy_static", + "libp2p", + "libp2p-tcp", + "lighthouse_metrics", + "lru 0.5.1", + "parking_lot 0.10.2", + "serde", + "serde_derive", + "sha2", + "slog", + "slog-async", + "slog-stdlog", + "slog-term", + "smallvec 1.4.0", + "snap", + "tempdir", + "tiny-keccak 2.0.2", + "tokio 0.2.21", + "tokio-io-timeout", + "tokio-util", + "types", + "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)", + "version", + "void", +] + [[package]] name = "eth2_ssz" version = "0.1.2" @@ -2216,8 +2242,8 @@ dependencies = [ "deposit_contract", "dirs", "environment", - "eth2-libp2p", "eth2_keystore", + "eth2_libp2p", "eth2_ssz", "eth2_testnet_config", "futures 0.3.5", @@ -2559,6 +2585,7 @@ version = "0.1.2" dependencies = [ "account_manager", "beacon_node", + "boot_node", "clap", "clap_utils", "env_logger", @@ -2655,6 +2682,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lru_time_cache" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb241df5c4caeb888755363fc95f8a896618dc0d435e9e775f7930cb099beab" + [[package]] name = "mach" version = "0.3.2" @@ -2896,7 +2929,7 @@ dependencies = [ "beacon_chain", "environment", "error-chain", - "eth2-libp2p", + "eth2_libp2p", "eth2_ssz", "eth2_ssz_types", "exit-future", @@ -3768,8 +3801,8 @@ dependencies = [ "bls", "bus", "environment", - "eth2-libp2p", "eth2_config", + "eth2_libp2p", "eth2_ssz", "eth2_ssz_derive", "futures 0.3.5", @@ -5683,7 +5716,8 @@ dependencies = [ [[package]] name = "web3" version = "0.11.0" -source = "git+https://github.com/tomusdrw/rust-web3#69d5746f124033dee922d7d36acef9321c1df0b0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a681e8d15deced7c510db88c59133d2eafa7b6298b6e91b545e2a3fed93b3fe" dependencies = [ "arrayvec 0.5.1", "base64 0.12.1", diff --git a/Cargo.toml b/Cargo.toml index 43c7398c4..f425720d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,12 @@ [workspace] members = [ "account_manager", + "beacon_node", "beacon_node/beacon_chain", "beacon_node/client", "beacon_node/eth1", - "beacon_node/eth2-libp2p", + "beacon_node/eth2_libp2p", "beacon_node/network", "beacon_node/rest_api", "beacon_node/store", @@ -13,6 +14,8 @@ members = [ "beacon_node/version", "beacon_node/websocket_server", + "boot_node", + "common/clap_utils", "common/compare_fields", "common/compare_fields_derive", @@ -73,4 +76,3 @@ eth2_ssz = { path = "consensus/ssz" } eth2_ssz_derive = { path = "consensus/ssz_derive" } eth2_ssz_types = { path = "consensus/ssz_types" } eth2_hashing = { path = "crypto/eth2_hashing" } -web3 = { git = "https://github.com/tomusdrw/rust-web3" } diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index c8d3cf616..4d70b84f3 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -35,7 +35,7 @@ futures = "0.3.5" environment = { path = "../lighthouse/environment" } genesis = { path = "genesis" } eth2_testnet_config = { path = "../common/eth2_testnet_config" } -eth2-libp2p = { path = "./eth2-libp2p" } +eth2_libp2p = { path = "./eth2_libp2p" } eth2_ssz = "0.1.2" toml = "0.5.6" serde = "1.0.110" diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 1b9ef0ab8..aad15cd9e 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -13,7 +13,7 @@ beacon_chain = { path = "../beacon_chain" } store = { path = "../store" } network = { path = "../network" } timer = { path = "../timer" } -eth2-libp2p = { path = "../eth2-libp2p" } +eth2_libp2p = { path = "../eth2_libp2p" } rest_api = { path = "../rest_api" } parking_lot = "0.10.2" websocket_server = { path = "../websocket_server" } diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs deleted file mode 100644 index 136b00a5d..000000000 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ /dev/null @@ -1,628 +0,0 @@ -///! This manages the discovery and management of peers. -pub(crate) mod enr; -pub mod enr_ext; - -// Allow external use of the lighthouse ENR builder -pub use enr::{build_enr, CombinedKey, Keypair}; -pub use enr_ext::{CombinedKeyExt, EnrExt}; - -use crate::metrics; -use crate::{error, Enr, NetworkConfig, NetworkGlobals}; -use discv5::{enr::NodeId, Discv5, Discv5Event, QueryId}; -use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; -use futures::prelude::*; -use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId}; -use libp2p::multiaddr::Protocol; -use libp2p::swarm::{ - protocols_handler::DummyProtocolsHandler, DialPeerCondition, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, ProtocolsHandler, -}; -use lru::LruCache; -use slog::{crit, debug, info, trace, warn}; -use ssz::{Decode, Encode}; -use ssz_types::BitVector; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - net::SocketAddr, - path::Path, - sync::Arc, - task::{Context, Poll}, - time::{Duration, Instant}, -}; -use tokio::time::{delay_until, Delay}; -use types::{EnrForkId, EthSpec, SubnetId}; - -mod subnet_predicate; - -use subnet_predicate::subnet_predicate; - -/// Maximum seconds before searching for extra peers. -const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; -/// Initial delay between peer searches. -const INITIAL_SEARCH_DELAY: u64 = 5; -/// The number of peers we must be connected to before increasing the discovery delay. -const MINIMUM_PEERS_BEFORE_DELAY_INCREASE: usize = 5; -/// Local ENR storage filename. -pub const ENR_FILENAME: &str = "enr.dat"; -/// Number of peers we'd like to have connected to a given long-lived subnet. -const TARGET_SUBNET_PEERS: usize = 3; -/// Number of times to attempt a discovery request -const MAX_DISCOVERY_RETRY: u64 = 3; - -/// A struct representing the information associated with a single discovery request, -/// which can be retried with multiple queries -#[derive(Clone, Debug)] -pub struct Request { - pub query_id: Option, - pub min_ttl: Option, - pub retries: u64, -} - -/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 -/// libp2p protocol. -pub struct Discovery { - /// Events to be processed by the behaviour. - events: VecDeque>, - - /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. - cached_enrs: LruCache, - - /// The currently banned peers. - banned_peers: HashSet, - - /// The target number of connected peers on the libp2p interface. - max_peers: usize, - - /// The directory where the ENR is stored. - enr_dir: String, - - /// The delay between peer discovery searches. - peer_discovery_delay: Delay, - - /// Tracks the last discovery delay. The delay is doubled each round until the max - /// time is reached. - past_discovery_delay: u64, - - /// The TCP port for libp2p. Used to convert an updated IP address to a multiaddr. Note: This - /// assumes that the external TCP port is the same as the internal TCP port if behind a NAT. - //TODO: Improve NAT handling limit the above restriction - tcp_port: u16, - - /// The discovery behaviour used to discover new peers. - discovery: Discv5, - - /// A collection of network constants that can be read from other threads. - network_globals: Arc>, - - /// A mapping of SubnetId that we are currently searching for to all information associated with each request. - subnet_queries: HashMap, - - /// Logger for the discovery behaviour. - log: slog::Logger, -} - -impl Discovery { - pub fn new( - local_key: &Keypair, - config: &NetworkConfig, - network_globals: Arc>, - log: &slog::Logger, - ) -> error::Result { - let log = log.clone(); - - let enr_dir = match config.network_dir.to_str() { - Some(path) => String::from(path), - None => String::from(""), - }; - - let local_enr = network_globals.local_enr.read().clone(); - - info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> format!("{}",local_enr.node_id()), "ip" => format!("{:?}", local_enr.ip()), "udp"=> format!("{:?}", local_enr.udp()), "tcp" => format!("{:?}", local_enr.tcp())); - - let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); - - // convert the keypair into an ENR key - let enr_key: CombinedKey = CombinedKey::from_libp2p(&local_key)?; - - let mut discovery = Discv5::new( - local_enr, - enr_key, - config.discv5_config.clone(), - listen_socket, - ) - .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; - - // Add bootnodes to routing table - for bootnode_enr in config.boot_nodes.clone() { - debug!( - log, - "Adding node to routing table"; - "node_id" => format!("{}", bootnode_enr.node_id()), - "peer_id" => format!("{}", bootnode_enr.peer_id()), - "ip" => format!("{:?}", bootnode_enr.ip()), - "udp" => format!("{:?}", bootnode_enr.udp()), - "tcp" => format!("{:?}", bootnode_enr.tcp()) - ); - let _ = discovery.add_enr(bootnode_enr).map_err(|e| { - warn!( - log, - "Could not add peer to the local routing table"; - "error" => format!("{}", e) - ) - }); - } - - Ok(Self { - events: VecDeque::with_capacity(16), - cached_enrs: LruCache::new(50), - banned_peers: HashSet::new(), - max_peers: config.max_peers, - peer_discovery_delay: delay_until(tokio::time::Instant::now()), - past_discovery_delay: INITIAL_SEARCH_DELAY, - tcp_port: config.libp2p_port, - discovery, - network_globals, - subnet_queries: HashMap::new(), - log, - enr_dir, - }) - } - - /// Return the nodes local ENR. - pub fn local_enr(&self) -> &Enr { - self.discovery.local_enr() - } - - /// Manually search for peers. This restarts the discovery round, sparking multiple rapid - /// queries. - pub fn discover_peers(&mut self) { - self.past_discovery_delay = INITIAL_SEARCH_DELAY; - self.find_peers(); - } - - /// Add an ENR to the routing table of the discovery mechanism. - pub fn add_enr(&mut self, enr: Enr) { - // add the enr to seen caches - self.cached_enrs.put(enr.peer_id(), enr.clone()); - - let _ = self.discovery.add_enr(enr).map_err(|e| { - warn!( - self.log, - "Could not add peer to the local routing table"; - "error" => format!("{}", e) - ) - }); - } - - /// The peer has been banned. Add this peer to the banned list to prevent any future - /// re-connections. - // TODO: Remove the peer from the DHT if present - pub fn peer_banned(&mut self, peer_id: PeerId) { - self.banned_peers.insert(peer_id); - } - - pub fn peer_unbanned(&mut self, peer_id: &PeerId) { - self.banned_peers.remove(peer_id); - } - - /// Returns an iterator over all enr entries in the DHT. - pub fn enr_entries(&mut self) -> impl Iterator { - self.discovery.enr_entries() - } - - /// Returns the ENR of a known peer if it exists. - pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { - // first search the local cache - if let Some(enr) = self.cached_enrs.get(peer_id) { - return Some(enr.clone()); - } - // not in the local cache, look in the routing table - if let Ok(_node_id) = enr_ext::peer_id_to_node_id(peer_id) { - // TODO: Need to update discv5 - // self.discovery.find_enr(&node_id) - return None; - } else { - return None; - } - } - - /// Adds/Removes a subnet from the ENR Bitfield - pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> { - let id = *subnet_id as usize; - - let local_enr = self.discovery.local_enr(); - let mut current_bitfield = local_enr.bitfield::()?; - - if id >= current_bitfield.len() { - return Err(format!( - "Subnet id: {} is outside the ENR bitfield length: {}", - id, - current_bitfield.len() - )); - } - - if current_bitfield - .get(id) - .map_err(|_| String::from("Subnet ID out of bounds"))? - == value - { - return Err(format!( - "Subnet id: {} already in the local ENR already has value: {}", - id, value - )); - } - - // set the subnet bitfield in the ENR - current_bitfield - .set(id, value) - .map_err(|_| String::from("Subnet ID out of bounds, could not set subnet ID"))?; - - // insert the bitfield into the ENR record - let _ = self - .discovery - .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); - - // replace the global version - *self.network_globals.local_enr.write() = self.discovery.local_enr().clone(); - Ok(()) - } - - /// Updates the `eth2` field of our local ENR. - pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) { - // to avoid having a reference to the spec constant, for the logging we assume - // FAR_FUTURE_EPOCH is u64::max_value() - let next_fork_epoch_log = if enr_fork_id.next_fork_epoch == u64::max_value() { - String::from("No other fork") - } else { - format!("{:?}", enr_fork_id.next_fork_epoch) - }; - - info!(self.log, "Updating the ENR fork version"; - "fork_digest" => format!("{:?}", enr_fork_id.fork_digest), - "next_fork_version" => format!("{:?}", enr_fork_id.next_fork_version), - "next_fork_epoch" => next_fork_epoch_log, - ); - - let _ = self - .discovery - .enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes()) - .map_err(|e| { - warn!( - self.log, - "Could not update eth2 ENR field"; - "error" => format!("{:?}", e) - ) - }); - - // replace the global version with discovery version - *self.network_globals.local_enr.write() = self.discovery.local_enr().clone(); - } - - /// A request to find peers on a given subnet. - pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - // TODO: Extend this to an event once discovery becomes a thread managed by the peer - // manager - if let Some(min_ttl) = min_ttl { - self.network_globals - .peers - .write() - .extend_peers_on_subnet(subnet_id, min_ttl); - } - - // If there is already a discovery request in process for this subnet, ignore this request, - // but update the min_ttl. - if let Some(request) = self.subnet_queries.get_mut(&subnet_id) { - // update the min_ttl if required - if let Some(min_ttl) = min_ttl { - if request.min_ttl < Some(min_ttl) { - request.min_ttl = Some(min_ttl); - } - } - return; - } - - // Insert a request and start a query for the subnet - self.subnet_queries.insert( - subnet_id.clone(), - Request { - query_id: None, - min_ttl, - retries: 0, - }, - ); - self.run_subnet_query(subnet_id); - } - - /// Runs a discovery request for a given subnet_id if one already exists. - fn run_subnet_query(&mut self, subnet_id: SubnetId) { - let mut request = match self.subnet_queries.remove(&subnet_id) { - Some(v) => v, - None => return, // request doesn't exist - }; - - // increment the retry count - request.retries += 1; - - let peers_on_subnet = self - .network_globals - .peers - .read() - .peers_on_subnet(subnet_id) - .count(); - - if peers_on_subnet > TARGET_SUBNET_PEERS { - trace!(self.log, "Discovery ignored"; - "reason" => "Already connected to desired peers", - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - ); - return; - } - - // remove the entry and complete the query if greater than the maximum search count - if request.retries >= MAX_DISCOVERY_RETRY { - debug!( - self.log, - "Subnet peer discovery did not find sufficient peers. Reached max retry limit" - ); - return; - } - - let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; - debug!(self.log, "Searching for peers for subnet"; - "subnet_id" => *subnet_id, - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - "peers_to_find" => target_peers, - "attempt" => request.retries, - ); - - // start the query, and update the queries map if necessary - let subnet_predicate = subnet_predicate::(subnet_id, &self.log); - if let Some(query_id) = self.start_query(subnet_predicate, target_peers) { - request.query_id = Some(query_id); - } else { - // ENR is not present remove the query - return; - } - self.subnet_queries.insert(subnet_id, request); - } - - /* Internal Functions */ - - /// Run a standard query to search for more peers. - /// - /// This searches for the standard kademlia bucket size (16) peers. - fn find_peers(&mut self) { - debug!(self.log, "Searching for peers"); - self.start_query(|_| true, 16); - } - - /// Search for a specified number of new peers using the underlying discovery mechanism. - /// - /// This can optionally search for peers for a given predicate. Regardless of the predicate - /// given, this will only search for peers on the same enr_fork_id as specified in the local - /// ENR. - fn start_query(&mut self, enr_predicate: F, num_nodes: usize) -> Option - where - F: Fn(&Enr) -> bool + Send + 'static + Clone, - { - // pick a random NodeId - let random_node = NodeId::random(); - - let enr_fork_id = match self.local_enr().eth2() { - Ok(v) => v, - Err(e) => { - crit!(self.log, "Local ENR has no fork id"; "error" => e); - return None; - } - }; - // predicate for finding nodes with a matching fork - let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); - let predicate = move |enr: &Enr| eth2_fork_predicate(enr) && enr_predicate(enr); - - // general predicate - Some( - self.discovery - .find_enr_predicate(random_node, predicate, num_nodes), - ) - } - - /// Peers that are found during discovery are optionally dialed. - // TODO: Shift to peer manager. As its own service, discovery should spit out discovered nodes - // and the peer manager should decide about who to connect to. - fn dial_discovered_peers(&mut self, peers: Vec, min_ttl: Option) { - for enr in peers { - // cache known peers - let peer_id = enr.peer_id(); - self.cached_enrs.put(enr.peer_id(), enr); - - // if we need more peers, attempt a connection - if self.network_globals.connected_or_dialing_peers() < self.max_peers - && !self - .network_globals - .peers - .read() - .is_connected_or_dialing(&peer_id) - && !self.banned_peers.contains(&peer_id) - { - debug!(self.log, "Connecting to discovered peer"; "peer_id"=> peer_id.to_string()); - // TODO: Update output - // This should be updated with the peer dialing. In fact created once the peer is - // dialed - if let Some(min_ttl) = min_ttl { - self.network_globals - .peers - .write() - .update_min_ttl(&peer_id, min_ttl); - } - self.events.push_back(NetworkBehaviourAction::DialPeer { - peer_id, - condition: DialPeerCondition::Disconnected, - }); - } - } - } -} - -// Build a dummy Network behaviour around the discv5 server -impl NetworkBehaviour for Discovery { - type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = Discv5Event; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - DummyProtocolsHandler::default() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - if let Some(enr) = self.enr_of_peer(peer_id) { - // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP - // port is removed, which is assumed to be associated with the discv5 protocol (and - // therefore irrelevant for other libp2p components). - let mut out_list = enr.multiaddr(); - out_list.retain(|addr| { - addr.iter() - .find(|v| match v { - Protocol::Udp(_) => true, - _ => false, - }) - .is_none() - }); - - out_list - } else { - // PeerId is not known - Vec::new() - } - } - - // ignore libp2p connections/streams - fn inject_connected(&mut self, _: &PeerId) {} - - // ignore libp2p connections/streams - fn inject_disconnected(&mut self, _: &PeerId) {} - - // no libp2p discv5 events - event originate from the session_service. - fn inject_event( - &mut self, - _: PeerId, - _: ConnectionId, - _event: ::OutEvent, - ) { - void::unreachable(_event) - } - - fn poll( - &mut self, - cx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { - // search for peers if it is time - loop { - match self.peer_discovery_delay.poll_unpin(cx) { - Poll::Ready(_) => { - if self.network_globals.connected_peers() < self.max_peers { - self.find_peers(); - } - // Set to maximum, and update to earlier, once we get our results back. - self.peer_discovery_delay.reset( - tokio::time::Instant::now() - + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES), - ); - } - Poll::Pending => break, - } - } - - // Poll discovery - loop { - match self.discovery.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - match event { - Discv5Event::Discovered(_enr) => { - // peers that get discovered during a query but are not contactable or - // don't match a predicate can end up here. For debugging purposes we - // log these to see if we are unnecessarily dropping discovered peers - /* - if enr.eth2() == self.local_enr().eth2() { - trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); - } else { - // this is temporary warning for debugging the DHT - warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); - } - */ - } - Discv5Event::SocketUpdated(socket) => { - info!(self.log, "Address updated"; "ip" => format!("{}",socket.ip()), "udp_port" => format!("{}", socket.port())); - metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT); - let mut address = Multiaddr::from(socket.ip()); - address.push(Protocol::Tcp(self.tcp_port)); - let enr = self.discovery.local_enr(); - enr::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log); - - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { - address, - }); - } - Discv5Event::FindNodeResult { - closer_peers, - query_id, - .. - } => { - debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len()); - // update the time to the next query - if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES - && self.network_globals.connected_or_dialing_peers() - > MINIMUM_PEERS_BEFORE_DELAY_INCREASE - { - self.past_discovery_delay *= 2; - } - let delay = std::cmp::max( - self.past_discovery_delay, - MAX_TIME_BETWEEN_PEER_SEARCHES, - ); - self.peer_discovery_delay - .reset(tokio::time::Instant::now() + Duration::from_secs(delay)); - - // if this is a subnet query, run it to completion - if let Some((subnet_id, min_ttl)) = self - .subnet_queries - .iter() - .find(|(_, request)| request.query_id == Some(query_id)) - .map(|(subnet_id, request)| { - (subnet_id.clone(), request.min_ttl.clone()) - }) - { - debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => closer_peers.len(), "subnet_id" => *subnet_id); - self.dial_discovered_peers(closer_peers, min_ttl); - self.run_subnet_query(subnet_id); - } else { - if closer_peers.is_empty() { - debug!(self.log, "Peer Discovery request yielded no results."); - } else { - self.dial_discovered_peers(closer_peers, None); - } - } - } - _ => {} - } - } - // discv5 does not output any other NetworkBehaviourAction - Poll::Ready(_) => {} - Poll::Pending => break, - } - } - - // process any queued events - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); - } - - Poll::Pending - } -} diff --git a/beacon_node/eth2-libp2p/src/metrics.rs b/beacon_node/eth2-libp2p/src/metrics.rs deleted file mode 100644 index b678ef6b4..000000000 --- a/beacon_node/eth2-libp2p/src/metrics.rs +++ /dev/null @@ -1,20 +0,0 @@ -pub use lighthouse_metrics::*; - -lazy_static! { - pub static ref ADDRESS_UPDATE_COUNT: Result = try_create_int_counter( - "libp2p_address_update_total", - "Count of libp2p socked updated events (when our view of our IP address has changed)" - ); - pub static ref PEERS_CONNECTED: Result = try_create_int_gauge( - "libp2p_peer_connected_peers_total", - "Count of libp2p peers currently connected" - ); - pub static ref PEER_CONNECT_EVENT_COUNT: Result = try_create_int_counter( - "libp2p_peer_connect_event_total", - "Count of libp2p peer connect events (not the current number of connected peers)" - ); - pub static ref PEER_DISCONNECT_EVENT_COUNT: Result = try_create_int_counter( - "libp2p_peer_disconnect_event_total", - "Count of libp2p peer disconnect events" - ); -} diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml similarity index 91% rename from beacon_node/eth2-libp2p/Cargo.toml rename to beacon_node/eth2_libp2p/Cargo.toml index 42c41e74e..2b8949528 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "eth2-libp2p" +name = "eth2_libp2p" version = "0.1.2" authors = ["Age Manning "] edition = "2018" @@ -32,8 +32,7 @@ snap = "1.0.0" void = "1.0.2" tokio-io-timeout = "0.4.0" tokio-util = { version = "0.3.1", features = ["codec", "compat"] } -# Patched for quick updates -discv5 = { git = "https://github.com/sigp/discv5", rev = "7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" } +discv5 = { version = "0.1.0-alpha.5", features = ["libp2p"] } tiny-keccak = "2.0.2" environment = { path = "../../lighthouse/environment" } libp2p-tcp = { version = "0.19.1", default-features = false, features = ["tokio"] } diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs similarity index 72% rename from beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs rename to beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs index 8fb31feef..b20fe5170 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs @@ -1,4 +1,3 @@ -use crate::discovery::Discovery; use crate::rpc::*; use libp2p::{ core::either::{EitherError, EitherOutput}, @@ -19,7 +18,6 @@ use types::EthSpec; type GossipHandler = ::ProtocolsHandler; type RPCHandler = as NetworkBehaviour>::ProtocolsHandler; type IdentifyHandler = ::ProtocolsHandler; -type DiscoveryHandler = as NetworkBehaviour>::ProtocolsHandler; /// Handler that combines Lighthouse's Behaviours' handlers in a delegating manner. pub(super) struct DelegatingHandler { @@ -29,22 +27,14 @@ pub(super) struct DelegatingHandler { rpc_handler: RPCHandler, /// Handler for the Identify protocol. identify_handler: IdentifyHandler, - /// Handler for the Discovery protocol. - discovery_handler: DiscoveryHandler, } impl DelegatingHandler { - pub fn new( - gossipsub: &mut Gossipsub, - rpc: &mut RPC, - identify: &mut Identify, - discovery: &mut Discovery, - ) -> Self { + pub fn new(gossipsub: &mut Gossipsub, rpc: &mut RPC, identify: &mut Identify) -> Self { DelegatingHandler { gossip_handler: gossipsub.new_handler(), rpc_handler: rpc.new_handler(), identify_handler: identify.new_handler(), - discovery_handler: discovery.new_handler(), } } @@ -73,7 +63,6 @@ pub enum DelegateIn { Gossipsub(::InEvent), RPC( as ProtocolsHandler>::InEvent), Identify(::InEvent), - Discovery( as ProtocolsHandler>::InEvent), } /// Wrapper around the `ProtocolsHandler::OutEvent` types of the handlers. @@ -82,7 +71,6 @@ pub enum DelegateOut { Gossipsub(::OutEvent), RPC( as ProtocolsHandler>::OutEvent), Identify(::OutEvent), - Discovery( as ProtocolsHandler>::OutEvent), } /// Wrapper around the `ProtocolsHandler::Error` types of the handlers. @@ -92,7 +80,6 @@ pub enum DelegateError { Gossipsub(::Error), RPC( as ProtocolsHandler>::Error), Identify(::Error), - Discovery( as ProtocolsHandler>::Error), } impl std::error::Error for DelegateError {} @@ -106,7 +93,6 @@ impl std::fmt::Display for DelegateError { DelegateError::Gossipsub(err) => err.fmt(formater), DelegateError::RPC(err) => err.fmt(formater), DelegateError::Identify(err) => err.fmt(formater), - DelegateError::Discovery(err) => err.fmt(formater), } } } @@ -115,10 +101,7 @@ pub type DelegateInProto = SelectUpgrade< ::InboundProtocol, SelectUpgrade< as ProtocolsHandler>::InboundProtocol, - SelectUpgrade< - ::InboundProtocol, - as ProtocolsHandler>::InboundProtocol, - >, + ::InboundProtocol, >, >; @@ -126,10 +109,7 @@ pub type DelegateOutProto = EitherUpgrade< ::OutboundProtocol, EitherUpgrade< as ProtocolsHandler>::OutboundProtocol, - EitherUpgrade< - ::OutboundProtocol, - as ProtocolsHandler>::OutboundProtocol, - >, + ::OutboundProtocol, >, >; @@ -138,10 +118,7 @@ pub type DelegateOutInfo = EitherOutput< ::OutboundOpenInfo, EitherOutput< as ProtocolsHandler>::OutboundOpenInfo, - EitherOutput< - ::OutboundOpenInfo, - as ProtocolsHandler>::OutboundOpenInfo, - >, + ::OutboundOpenInfo, >, >; @@ -157,24 +134,16 @@ impl ProtocolsHandler for DelegatingHandler { let gossip_proto = self.gossip_handler.listen_protocol(); let rpc_proto = self.rpc_handler.listen_protocol(); let identify_proto = self.identify_handler.listen_protocol(); - let discovery_proto = self.discovery_handler.listen_protocol(); let timeout = gossip_proto .timeout() .max(rpc_proto.timeout()) .max(identify_proto.timeout()) - .max(discovery_proto.timeout()) .clone(); let select = SelectUpgrade::new( gossip_proto.into_upgrade().1, - SelectUpgrade::new( - rpc_proto.into_upgrade().1, - SelectUpgrade::new( - identify_proto.into_upgrade().1, - discovery_proto.into_upgrade().1, - ), - ), + SelectUpgrade::new(rpc_proto.into_upgrade().1, identify_proto.into_upgrade().1), ); SubstreamProtocol::new(select).with_timeout(timeout) @@ -192,13 +161,9 @@ impl ProtocolsHandler for DelegatingHandler { self.rpc_handler.inject_fully_negotiated_inbound(out) } // Identify - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(out))) => { + EitherOutput::Second(EitherOutput::Second(out)) => { self.identify_handler.inject_fully_negotiated_inbound(out) } - // Discovery - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(out))) => { - self.discovery_handler.inject_fully_negotiated_inbound(out) - } } } @@ -221,18 +186,11 @@ impl ProtocolsHandler for DelegatingHandler { .inject_fully_negotiated_outbound(protocol, info), // Identify ( - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(protocol))), - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))), + EitherOutput::Second(EitherOutput::Second(protocol)), + EitherOutput::Second(EitherOutput::Second(info)), ) => self .identify_handler .inject_fully_negotiated_outbound(protocol, info), - // Discovery - ( - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(protocol))), - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))), - ) => self - .discovery_handler - .inject_fully_negotiated_outbound(protocol, info), // Reaching here means we got a protocol and info for different behaviours _ => unreachable!("output and protocol don't match"), } @@ -243,7 +201,6 @@ impl ProtocolsHandler for DelegatingHandler { DelegateIn::Gossipsub(ev) => self.gossip_handler.inject_event(ev), DelegateIn::RPC(ev) => self.rpc_handler.inject_event(ev), DelegateIn::Identify(ev) => self.identify_handler.inject_event(ev), - DelegateIn::Discovery(ev) => self.discovery_handler.inject_event(ev), } } @@ -305,7 +262,7 @@ impl ProtocolsHandler for DelegatingHandler { } }, // Identify - EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))) => match error { + EitherOutput::Second(EitherOutput::Second(info)) => match error { ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { self.identify_handler.inject_dial_upgrade_error( info, @@ -319,7 +276,7 @@ impl ProtocolsHandler for DelegatingHandler { .identify_handler .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( - EitherError::B(EitherError::A(err)), + EitherError::B(err), ))) => self.identify_handler.inject_dial_upgrade_error( info, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), @@ -328,30 +285,6 @@ impl ProtocolsHandler for DelegatingHandler { unreachable!("info and error don't match") } }, - // Discovery - EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))) => match error { - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { - self.discovery_handler.inject_dial_upgrade_error( - info, - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), - ) - } - ProtocolsHandlerUpgrErr::Timer => self - .discovery_handler - .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer), - ProtocolsHandlerUpgrErr::Timeout => self - .discovery_handler - .inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout), - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B( - EitherError::B(EitherError::B(err)), - ))) => self.discovery_handler.inject_dial_upgrade_error( - info, - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)), - ), - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { - unreachable!("info and error don't match") - } - }, } } @@ -360,7 +293,6 @@ impl ProtocolsHandler for DelegatingHandler { .connection_keep_alive() .max(self.rpc_handler.connection_keep_alive()) .max(self.identify_handler.connection_keep_alive()) - .max(self.discovery_handler.connection_keep_alive()) } fn poll( @@ -417,28 +349,8 @@ impl ProtocolsHandler for DelegatingHandler { } Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol - .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::A(u)))), - info: EitherOutput::Second(EitherOutput::Second(EitherOutput::First(info))), - }); - } - Poll::Pending => (), - }; - - match self.discovery_handler.poll(cx) { - Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom(DelegateOut::Discovery(event))); - } - Poll::Ready(ProtocolsHandlerEvent::Close(event)) => { - return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Discovery( - event, - ))); - } - Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol - .map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(EitherUpgrade::B(u)))), - info: EitherOutput::Second(EitherOutput::Second(EitherOutput::Second(info))), + protocol: protocol.map_upgrade(|u| EitherUpgrade::B(EitherUpgrade::B(u))), + info: EitherOutput::Second(EitherOutput::Second(info)), }); } Poll::Pending => (), diff --git a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs similarity index 95% rename from beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs rename to beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index c88813121..64d0bcaca 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -1,4 +1,3 @@ -use crate::discovery::Discovery; use crate::rpc::*; use delegate::DelegatingHandler; pub(super) use delegate::{ @@ -27,14 +26,9 @@ pub struct BehaviourHandler { } impl BehaviourHandler { - pub fn new( - gossipsub: &mut Gossipsub, - rpc: &mut RPC, - identify: &mut Identify, - discovery: &mut Discovery, - ) -> Self { + pub fn new(gossipsub: &mut Gossipsub, rpc: &mut RPC, identify: &mut Identify) -> Self { BehaviourHandler { - delegate: DelegatingHandler::new(gossipsub, rpc, identify, discovery), + delegate: DelegatingHandler::new(gossipsub, rpc, identify), shutting_down: false, } } diff --git a/beacon_node/eth2-libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs similarity index 94% rename from beacon_node/eth2-libp2p/src/behaviour/mod.rs rename to beacon_node/eth2_libp2p/src/behaviour/mod.rs index d9534e2cc..1aca8a5dd 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,9 +1,8 @@ -use crate::discovery::{enr::Eth2Enr, Discovery}; use crate::peer_manager::{PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; +use crate::Eth2Enr; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; -use discv5::Discv5Event; use futures::prelude::*; use handler::{BehaviourHandler, BehaviourHandlerIn, BehaviourHandlerOut, DelegateIn, DelegateOut}; use libp2p::{ @@ -46,8 +45,6 @@ pub struct Behaviour { // TODO: Using id for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. identify: Identify, - /// Discovery behaviour. - discovery: Discovery, /// The peer manager that keeps track of peer's reputation and status. peer_manager: PeerManager, /// The events generated by this behaviour to be consumed in the swarm poll. @@ -76,7 +73,6 @@ macro_rules! delegate_to_behaviours { $self.gossipsub.$fn($($arg),*); $self.eth2_rpc.$fn($($arg),*); $self.identify.$fn($($arg),*); - $self.discovery.$fn($($arg),*); }; } @@ -85,21 +81,11 @@ impl NetworkBehaviour for Behaviour { type OutEvent = BehaviourEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - BehaviourHandler::new( - &mut self.gossipsub, - &mut self.eth2_rpc, - &mut self.identify, - &mut self.discovery, - ) + BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut out = Vec::new(); - out.extend(self.gossipsub.addresses_of_peer(peer_id)); - out.extend(self.eth2_rpc.addresses_of_peer(peer_id)); - out.extend(self.identify.addresses_of_peer(peer_id)); - out.extend(self.discovery.addresses_of_peer(peer_id)); - out + self.peer_manager.addresses_of_peer(peer_id) } fn inject_connected(&mut self, peer_id: &PeerId) { @@ -178,7 +164,6 @@ impl NetworkBehaviour for Behaviour { DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, ev), - DelegateOut::Discovery(ev) => self.discovery.inject_event(peer_id, conn_id, ev), }, /* Custom events sent BY the handler */ BehaviourHandlerOut::Custom => { @@ -240,7 +225,6 @@ impl NetworkBehaviour for Behaviour { poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub); poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC); poll_behaviour!(identify, on_identify_event, DelegateIn::Identify); - poll_behaviour!(discovery, on_discovery_event, DelegateIn::Discovery); self.custom_poll(cx) } @@ -264,14 +248,12 @@ impl Behaviour { ); let enr_fork_id = network_globals - .local_enr - .read() + .local_enr() .eth2() .expect("Local ENR must have a fork id"); let attnets = network_globals - .local_enr - .read() + .local_enr() .bitfield::() .expect("Local ENR must have subnet bitfield"); @@ -283,9 +265,8 @@ impl Behaviour { Ok(Behaviour { eth2_rpc: RPC::new(log.clone()), gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), - discovery: Discovery::new(local_key, net_conf, network_globals.clone(), log)?, identify, - peer_manager: PeerManager::new(network_globals.clone(), log), + peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?, events: Vec::new(), peers_to_dc: Vec::new(), seen_gossip_messages: LruCache::new(100_000), @@ -296,9 +277,9 @@ impl Behaviour { }) } - /// Obtain a reference to the discovery protocol. - pub fn discovery(&self) -> &Discovery { - &self.discovery + /// Returns the local ENR of the node. + pub fn local_enr(&self) -> Enr { + self.network_globals.local_enr() } /// Obtain a reference to the gossipsub protocol. @@ -428,33 +409,35 @@ impl Behaviour { ) } - /* Discovery / Peer management functions */ + /* Peer management functions */ /// Notify discovery that the peer has been banned. - pub fn peer_banned(&mut self, peer_id: PeerId) { - self.discovery.peer_banned(peer_id); - } + // TODO: Remove this and integrate all disconnection/banning logic inside the peer manager. + pub fn peer_banned(&mut self, _peer_id: PeerId) {} /// Notify discovery that the peer has been unbanned. - pub fn peer_unbanned(&mut self, peer_id: &PeerId) { - self.discovery.peer_unbanned(peer_id); - } + // TODO: Remove this and integrate all disconnection/banning logic inside the peer manager. + pub fn peer_unbanned(&mut self, _peer_id: &PeerId) {} /// Returns an iterator over all enr entries in the DHT. - pub fn enr_entries(&mut self) -> impl Iterator { - self.discovery.enr_entries() + pub fn enr_entries(&mut self) -> Vec { + self.peer_manager.discovery_mut().table_entries_enr() } /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { - self.discovery.add_enr(enr); + self.peer_manager.discovery_mut().add_enr(enr); } /// Updates a subnet value to the ENR bitfield. /// /// The `value` is `true` if a subnet is being added and false otherwise. pub fn update_enr_subnet(&mut self, subnet_id: SubnetId, value: bool) { - if let Err(e) = self.discovery.update_enr_bitfield(subnet_id, value) { + if let Err(e) = self + .peer_manager + .discovery_mut() + .update_enr_bitfield(subnet_id, value) + { crit!(self.log, "Could not update ENR bitfield"; "error" => e); } // update the local meta data which informs our peers of the update during PINGS @@ -464,12 +447,14 @@ impl Behaviour { /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - self.discovery.discover_subnet_peers(subnet_id, min_ttl) + self.peer_manager.discover_subnet_peers(subnet_id, min_ttl) } /// Updates the local ENR's "eth2" field with the latest EnrForkId. pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { - self.discovery.update_eth2_enr(enr_fork_id.clone()); + self.peer_manager + .discovery_mut() + .update_eth2_enr(enr_fork_id.clone()); // unsubscribe from all gossip topics and re-subscribe to their new fork counterparts let subscribed_topics = self @@ -497,11 +482,12 @@ impl Behaviour { /* Private internal functions */ - /// Updates the current meta data of the node. + /// Updates the current meta data of the node to match the local ENR. fn update_metadata(&mut self) { self.meta_data.seq_number += 1; self.meta_data.attnets = self - .discovery + .peer_manager + .discovery() .local_enr() .bitfield::() .expect("Local discovery must have bitfield"); @@ -764,6 +750,15 @@ impl Behaviour { loop { match self.peer_manager.poll_next_unpin(cx) { Poll::Ready(Some(event)) => match event { + PeerManagerEvent::Dial(peer_id) => { + return Poll::Ready(NBAction::DialPeer { + peer_id, + condition: libp2p::swarm::DialPeerCondition::Disconnected, + }); + } + PeerManagerEvent::SocketUpdated(address) => { + return Poll::Ready(NBAction::ReportObservedAddr { address }); + } PeerManagerEvent::Status(peer_id) => { // it's time to status. We don't keep a beacon chain reference here, so we inform // the network to send a status to this peer @@ -835,10 +830,6 @@ impl Behaviour { IdentifyEvent::Error { .. } => {} } } - - fn on_discovery_event(&mut self, _event: Discv5Event) { - // discv5 has no events to inject - } } /* Public API types */ diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs similarity index 86% rename from beacon_node/eth2-libp2p/src/config.rs rename to beacon_node/eth2_libp2p/src/config.rs index 369d0477e..6facb7750 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -39,12 +39,6 @@ pub struct Config { /// Target number of connected peers. pub max_peers: usize, - /// A secp256k1 secret key, as bytes in ASCII-encoded hex. - /// - /// With or without `0x` prefix. - #[serde(skip)] - pub secret_key_hex: Option, - /// Gossipsub configuration parameters. #[serde(skip)] pub gs_config: GossipsubConfig, @@ -64,11 +58,6 @@ pub struct Config { /// List of extra topics to initially subscribe to as strings. pub topics: Vec, - - /// Introduces randomization in network propagation of messages. This should only be set for - /// testing purposes and will likely be removed in future versions. - // TODO: Remove this functionality for mainnet - pub propagation_percentage: Option, } impl Default for Config { @@ -109,14 +98,15 @@ impl Default for Config { // discv5 configuration let discv5_config = Discv5ConfigBuilder::new() + .enable_packet_filter() + .session_cache_capacity(1000) .request_timeout(Duration::from_secs(4)) .request_retries(2) - .enr_update(true) // update IP based on PONG responses .enr_peer_update_min(2) // prevents NAT's should be raised for mainnet .query_parallelism(5) .query_timeout(Duration::from_secs(60)) .query_peer_timeout(Duration::from_secs(2)) - .ip_limit(false) // limits /24 IP's in buckets. Enable for mainnet + .ip_limit() // limits /24 IP's in buckets. .ping_interval(Duration::from_secs(300)) .build(); @@ -130,14 +120,12 @@ impl Default for Config { enr_udp_port: None, enr_tcp_port: None, max_peers: 50, - secret_key_hex: None, gs_config, discv5_config, boot_nodes: vec![], libp2p_nodes: vec![], client_version: version::version(), topics, - propagation_percentage: None, } } } diff --git a/beacon_node/eth2-libp2p/src/discovery/enr.rs b/beacon_node/eth2_libp2p/src/discovery/enr.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/discovery/enr.rs rename to beacon_node/eth2_libp2p/src/discovery/enr.rs index 3f3cbe12f..798fbee48 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr.rs +++ b/beacon_node/eth2_libp2p/src/discovery/enr.rs @@ -1,12 +1,12 @@ //! Helper functions and an extension trait for Ethereum 2 ENRs. pub use discv5::enr::{self, CombinedKey, EnrBuilder}; -pub use libp2p::core::identity::Keypair; +use super::enr_ext::CombinedKeyExt; use super::ENR_FILENAME; use crate::types::{Enr, EnrBitfield}; -use crate::CombinedKeyExt; use crate::NetworkConfig; +use libp2p::core::identity::Keypair; use slog::{debug, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs similarity index 73% rename from beacon_node/eth2-libp2p/src/discovery/enr_ext.rs rename to beacon_node/eth2_libp2p/src/discovery/enr_ext.rs index ba0af4d1e..3c6f5c781 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs +++ b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs @@ -12,6 +12,12 @@ pub trait EnrExt { /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. /// The vector remains empty if these fields are not defined. fn multiaddr(&self) -> Vec; + + /// Returns the multiaddr with the `PeerId` prepended. + fn multiaddr_p2p(&self) -> Vec; + + /// Returns any multiaddrs that contain the TCP protocol. + fn multiaddr_tcp(&self) -> Vec; } /// Extend ENR CombinedPublicKey for libp2p types. @@ -34,8 +40,6 @@ impl EnrExt for Enr { /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. /// The vector remains empty if these fields are not defined. - /// - /// Note: Only available with the `libp2p` feature flag. fn multiaddr(&self) -> Vec { let mut multiaddrs: Vec = Vec::new(); if let Some(ip) = self.ip() { @@ -66,6 +70,67 @@ impl EnrExt for Enr { } multiaddrs } + + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. + /// The vector remains empty if these fields are not defined. + /// + /// This also prepends the `PeerId` into each multiaddr with the `P2p` protocol. + fn multiaddr_p2p(&self) -> Vec { + let peer_id = self.peer_id(); + let mut multiaddrs: Vec = Vec::new(); + if let Some(ip) = self.ip() { + if let Some(udp) = self.udp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Udp(udp)); + multiaddr.push(Protocol::P2p(peer_id.clone().into())); + multiaddrs.push(multiaddr); + } + + if let Some(tcp) = self.tcp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Tcp(tcp)); + multiaddr.push(Protocol::P2p(peer_id.clone().into())); + multiaddrs.push(multiaddr); + } + } + if let Some(ip6) = self.ip6() { + if let Some(udp6) = self.udp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Udp(udp6)); + multiaddr.push(Protocol::P2p(peer_id.clone().into())); + multiaddrs.push(multiaddr); + } + + if let Some(tcp6) = self.tcp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Tcp(tcp6)); + multiaddr.push(Protocol::P2p(peer_id.into())); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } + + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. + /// The vector remains empty if these fields are not defined. + fn multiaddr_tcp(&self) -> Vec { + let mut multiaddrs: Vec = Vec::new(); + if let Some(ip) = self.ip() { + if let Some(tcp) = self.tcp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Tcp(tcp)); + multiaddrs.push(multiaddr); + } + } + if let Some(ip6) = self.ip6() { + if let Some(tcp6) = self.tcp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Tcp(tcp6)); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } } impl CombinedKeyPublicExt for CombinedPublicKey { diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs new file mode 100644 index 000000000..ca6cd9fe8 --- /dev/null +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -0,0 +1,637 @@ +///! This manages the discovery and management of peers. +pub(crate) mod enr; +pub mod enr_ext; + +// Allow external use of the lighthouse ENR builder +pub use enr::{build_enr, CombinedKey, Eth2Enr}; +pub use enr_ext::{CombinedKeyExt, EnrExt}; +pub use libp2p::core::identity::Keypair; + +use crate::metrics; +use crate::{error, Enr, NetworkConfig, NetworkGlobals}; +use discv5::{enr::NodeId, Discv5, Discv5Event}; +use enr::{BITFIELD_ENR_KEY, ETH2_ENR_KEY}; +use futures::prelude::*; +use futures::stream::FuturesUnordered; +use libp2p::core::PeerId; +use lru::LruCache; +use slog::{crit, debug, info, trace, warn}; +use ssz::{Decode, Encode}; +use ssz_types::BitVector; +use std::{ + collections::VecDeque, + net::SocketAddr, + path::Path, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; +use tokio::sync::mpsc; +use types::{EnrForkId, EthSpec, SubnetId}; + +mod subnet_predicate; +use subnet_predicate::subnet_predicate; + +/// Local ENR storage filename. +pub const ENR_FILENAME: &str = "enr.dat"; +/// Target number of peers we'd like to have connected to a given long-lived subnet. +const TARGET_SUBNET_PEERS: usize = 3; +/// Number of times to attempt a discovery request +const MAX_DISCOVERY_RETRY: usize = 3; +/// The maximum number of concurrent discovery queries. +const MAX_CONCURRENT_QUERIES: usize = 1; +/// The number of closest peers to search for when doing a regular peer search. +/// +/// We could reduce this constant to speed up queries however at the cost of security. It will +/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. +const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; + +/// The events emitted by polling discovery. +pub enum DiscoveryEvent { + /// A query has completed. The first parameter is the `min_ttl` of the peers if it is specified + /// and the second parameter are the discovered peers. + QueryResult(Option, Box>), + /// This indicates that our local UDP socketaddr has been updated and we should inform libp2p. + SocketUpdated(SocketAddr), +} + +#[derive(Debug, Clone, PartialEq)] +enum QueryType { + /// We are searching for subnet peers. + Subnet { + subnet_id: SubnetId, + min_ttl: Option, + retries: usize, + }, + /// We are searching for more peers without ENR or time constraints. + FindPeers, +} + +impl QueryType { + /// Returns true if this query has expired. + pub fn expired(&self) -> bool { + match self { + Self::FindPeers => false, + Self::Subnet { min_ttl, .. } => { + if let Some(ttl) = min_ttl { + ttl > &Instant::now() + } else { + true + } + } + } + } + + /// Returns the min_ttl of the query if one exists + /// + /// This is required for returning to the peer manager. The peer manager will update newly + /// connected peers with this `min_ttl` + pub fn min_ttl(&self) -> Option { + match self { + Self::FindPeers => None, + Self::Subnet { min_ttl, .. } => min_ttl.clone(), + } + } +} + +/// The result of a query. +struct QueryResult(QueryType, Result, discv5::QueryError>); + +// Awaiting the event stream future +enum EventStream { + /// Awaiting an event stream to be generated. This is required due to the poll nature of + /// `Discovery` + Awaiting( + Pin< + Box< + dyn Future, discv5::Discv5Error>> + + Send, + >, + >, + ), + /// The future has completed. + Present(mpsc::Receiver), + // The future has failed, there are no events from discv5. + Failed, +} + +pub struct Discovery { + /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. + cached_enrs: LruCache, + + /// The directory where the ENR is stored. + enr_dir: String, + + /// The handle for the underlying discv5 Server. + /// + /// This is behind a Reference counter to allow for futures to be spawned and polled with a + /// static lifetime. + discv5: Discv5, + + /// A collection of network constants that can be read from other threads. + network_globals: Arc>, + + /// Indicates if we are actively searching for peers. We only allow a single FindPeers query at + /// a time, regardless of the query concurrency. + find_peer_active: bool, + + /// A queue of discovery queries to be processed. + queued_queries: VecDeque, + + /// Active discovery queries. + active_queries: FuturesUnordered + Send>>>, + + /// The discv5 event stream. + event_stream: EventStream, + + /// Logger for the discovery behaviour. + log: slog::Logger, +} + +impl Discovery { + /// NOTE: Creating discovery requires running within a tokio execution environment. + pub fn new( + local_key: &Keypair, + config: &NetworkConfig, + network_globals: Arc>, + log: &slog::Logger, + ) -> error::Result { + let log = log.clone(); + + let enr_dir = match config.network_dir.to_str() { + Some(path) => String::from(path), + None => String::from(""), + }; + + let local_enr = network_globals.local_enr.read().clone(); + + info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> format!("{}",local_enr.node_id()), "ip" => format!("{:?}", local_enr.ip()), "udp"=> format!("{:?}", local_enr.udp()), "tcp" => format!("{:?}", local_enr.tcp())); + + let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); + + // convert the keypair into an ENR key + let enr_key: CombinedKey = CombinedKey::from_libp2p(&local_key)?; + + let mut discv5 = Discv5::new(local_enr, enr_key, config.discv5_config.clone()) + .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in config.boot_nodes.clone() { + debug!( + log, + "Adding node to routing table"; + "node_id" => format!("{}", bootnode_enr.node_id()), + "peer_id" => format!("{}", bootnode_enr.peer_id()), + "ip" => format!("{:?}", bootnode_enr.ip()), + "udp" => format!("{:?}", bootnode_enr.udp()), + "tcp" => format!("{:?}", bootnode_enr.tcp()) + ); + let _ = discv5.add_enr(bootnode_enr).map_err(|e| { + debug!( + log, + "Could not add peer to the local routing table"; + "error" => format!("{}", e) + ) + }); + } + + // Start the discv5 service. + discv5.start(listen_socket); + debug!(log, "Discovery service started"); + + // Obtain the event stream + let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream())); + + Ok(Self { + cached_enrs: LruCache::new(50), + network_globals, + find_peer_active: false, + queued_queries: VecDeque::with_capacity(10), + active_queries: FuturesUnordered::new(), + discv5, + event_stream, + log, + enr_dir, + }) + } + + /// Return the nodes local ENR. + pub fn local_enr(&self) -> Enr { + self.discv5.local_enr() + } + + /// This adds a new `FindPeers` query to the queue if one doesn't already exist. + pub fn discover_peers(&mut self) { + // If we are in the process of a query, don't bother queuing a new one. + if self.find_peer_active { + return; + } + // If there is not already a find peer's query queued, add one + let query = QueryType::FindPeers; + if !self.queued_queries.contains(&query) { + trace!(self.log, "Queuing a peer discovery request"); + self.queued_queries.push_back(query); + // update the metrics + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + } + } + + /// Processes a request to search for more peers on a subnet. + pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + self.add_subnet_query(subnet_id, min_ttl, 0); + } + + /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this + /// updates the min_ttl field. + fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option, retries: usize) { + // remove the entry and complete the query if greater than the maximum search count + if retries >= MAX_DISCOVERY_RETRY { + debug!( + self.log, + "Subnet peer discovery did not find sufficient peers. Reached max retry limit" + ); + return; + } + + // Search through any queued requests and update the timeout if a query for this subnet + // already exists + let mut found = false; + for query in self.queued_queries.iter_mut() { + if let QueryType::Subnet { + subnet_id: ref mut q_subnet_id, + min_ttl: ref mut q_min_ttl, + retries: ref mut q_retries, + } = query + { + if *q_subnet_id == subnet_id { + if *q_min_ttl < min_ttl { + *q_min_ttl = min_ttl; + } + // update the number of retries + *q_retries = retries; + // mimic an `Iter::Find()` and short-circuit the loop + found = true; + break; + } + } + } + if !found { + // Set up the query and add it to the queue + let query = QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }; + // update the metrics and insert into the queue. + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + self.queued_queries.push_back(query); + } + } + + /// Add an ENR to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + // add the enr to seen caches + self.cached_enrs.put(enr.peer_id(), enr.clone()); + + if let Err(e) = self.discv5.add_enr(enr) { + warn!( + self.log, + "Could not add peer to the local routing table"; + "error" => format!("{}", e) + ) + } + } + + /// Returns an iterator over all enr entries in the DHT. + pub fn table_entries_enr(&mut self) -> Vec { + self.discv5.table_entries_enr() + } + + /// Returns the ENR of a known peer if it exists. + pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { + // first search the local cache + if let Some(enr) = self.cached_enrs.get(peer_id) { + return Some(enr.clone()); + } + // not in the local cache, look in the routing table + if let Ok(node_id) = enr_ext::peer_id_to_node_id(peer_id) { + self.discv5.find_enr(&node_id) + } else { + None + } + } + + /// Adds/Removes a subnet from the ENR Bitfield + pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> { + let id = *subnet_id as usize; + + let local_enr = self.discv5.local_enr(); + let mut current_bitfield = local_enr.bitfield::()?; + + if id >= current_bitfield.len() { + return Err(format!( + "Subnet id: {} is outside the ENR bitfield length: {}", + id, + current_bitfield.len() + )); + } + + if current_bitfield + .get(id) + .map_err(|_| String::from("Subnet ID out of bounds"))? + == value + { + return Err(format!( + "Subnet id: {} already in the local ENR already has value: {}", + id, value + )); + } + + // set the subnet bitfield in the ENR + current_bitfield + .set(id, value) + .map_err(|_| String::from("Subnet ID out of bounds, could not set subnet ID"))?; + + // insert the bitfield into the ENR record + let _ = self + .discv5 + .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); + + // replace the global version + *self.network_globals.local_enr.write() = self.discv5.local_enr().clone(); + Ok(()) + } + + /// Updates the `eth2` field of our local ENR. + pub fn update_eth2_enr(&mut self, enr_fork_id: EnrForkId) { + // to avoid having a reference to the spec constant, for the logging we assume + // FAR_FUTURE_EPOCH is u64::max_value() + let next_fork_epoch_log = if enr_fork_id.next_fork_epoch == u64::max_value() { + String::from("No other fork") + } else { + format!("{:?}", enr_fork_id.next_fork_epoch) + }; + + info!(self.log, "Updating the ENR fork version"; + "fork_digest" => format!("{:?}", enr_fork_id.fork_digest), + "next_fork_version" => format!("{:?}", enr_fork_id.next_fork_version), + "next_fork_epoch" => next_fork_epoch_log, + ); + + let _ = self + .discv5 + .enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes()) + .map_err(|e| { + warn!( + self.log, + "Could not update eth2 ENR field"; + "error" => format!("{:?}", e) + ) + }); + + // replace the global version with discovery version + *self.network_globals.local_enr.write() = self.discv5.local_enr().clone(); + } + + /* Internal Functions */ + + /// Consume the discovery queue and initiate queries when applicable. + /// + /// This also sanitizes the queue removing out-dated queries. + fn process_queue(&mut self) { + // Sanitize the queue, removing any out-dated subnet queries + self.queued_queries.retain(|query| !query.expired()); + + // Check that we are within our query concurrency limit + while !self.at_capacity() && !self.queued_queries.is_empty() { + // consume and process the query queue + match self.queued_queries.pop_front() { + Some(QueryType::FindPeers) => { + // Only permit one FindPeers query at a time + if self.find_peer_active { + self.queued_queries.push_back(QueryType::FindPeers); + continue; + } + // This is a regular request to find additional peers + debug!(self.log, "Searching for new peers"); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS); + } + Some(QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }) => { + // This query is for searching for peers of a particular subnet + self.start_subnet_query(subnet_id, min_ttl, retries); + } + None => {} // Queue is empty + } + } + // Update the queue metric + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + } + + // Returns a boolean indicating if we are currently processing the maximum number of + // concurrent queries or not. + fn at_capacity(&self) -> bool { + if self.active_queries.len() >= MAX_CONCURRENT_QUERIES { + true + } else { + false + } + } + + /// Runs a discovery request for a given subnet_id if one already exists. + fn start_subnet_query( + &mut self, + subnet_id: SubnetId, + min_ttl: Option, + retries: usize, + ) { + // Determine if we have sufficient peers, which may make this discovery unnecessary. + let peers_on_subnet = self + .network_globals + .peers + .read() + .peers_on_subnet(subnet_id) + .count(); + + if peers_on_subnet > TARGET_SUBNET_PEERS { + trace!(self.log, "Discovery ignored"; + "reason" => "Already connected to desired peers", + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + ); + return; + } + + let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; + debug!(self.log, "Searching for peers for subnet"; + "subnet_id" => *subnet_id, + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + "peers_to_find" => target_peers, + "attempt" => retries, + ); + + // start the query, and update the queries map if necessary + let query = QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }; + self.start_query(query, target_peers); + } + + /// Search for a specified number of new peers using the underlying discovery mechanism. + /// + /// This can optionally search for peers for a given predicate. Regardless of the predicate + /// given, this will only search for peers on the same enr_fork_id as specified in the local + /// ENR. + fn start_query(&mut self, query: QueryType, target_peers: usize) { + // Generate a random target node id. + let random_node = NodeId::random(); + + let enr_fork_id = match self.local_enr().eth2() { + Ok(v) => v, + Err(e) => { + crit!(self.log, "Local ENR has no fork id"; "error" => e); + return; + } + }; + // predicate for finding nodes with a matching fork + let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); + + // General predicate + let predicate: Box bool + Send> = match &query { + QueryType::FindPeers => Box::new(eth2_fork_predicate), + QueryType::Subnet { subnet_id, .. } => { + // build the subnet predicate as a combination of the eth2_fork_predicate and the + // subnet predicate + let subnet_predicate = subnet_predicate::(subnet_id.clone(), &self.log); + Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && subnet_predicate(enr)) + } + }; + + // Build the future + let query_future = self + .discv5 + .find_node_predicate(random_node, predicate, target_peers) + .map(|v| QueryResult(query, v)); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } + + /// Drives the queries returning any results from completed queries. + fn poll_queries(&mut self, cx: &mut Context) -> Option<(Option, Vec)> { + while let Poll::Ready(Some(query_future)) = self.active_queries.poll_next_unpin(cx) { + match query_future.0 { + QueryType::FindPeers => { + self.find_peer_active = false; + match query_future.1 { + Ok(r) if r.is_empty() => { + debug!(self.log, "Discovery query yielded no results."); + } + Ok(r) => { + debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + return Some((None, r)); + } + Err(e) => { + warn!(self.log, "Discovery query failed"; "error" => e.to_string()); + } + } + } + QueryType::Subnet { + subnet_id, + min_ttl, + retries, + } => { + match query_future.1 { + Ok(r) if r.is_empty() => { + debug!(self.log, "Subnet discovery query yielded no results."; "subnet_id" => *subnet_id, "retries" => retries); + } + Ok(r) => { + debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => r.len(), "subnet_id" => *subnet_id); + // A subnet query has completed. Add back to the queue, incrementing retries. + self.add_subnet_query(subnet_id, min_ttl, retries + 1); + // Report the results back to the peer manager. + return Some((query_future.0.min_ttl(), r)); + } + Err(e) => { + warn!(self.log,"Subnet Discovery query failed"; "subnet_id" => *subnet_id, "error" => e.to_string()); + } + } + } + } + } + None + } + + // Main execution loop to be driven by the peer manager. + pub fn poll(&mut self, cx: &mut Context) -> Poll { + // Process the query queue + self.process_queue(); + + // Drive the queries and return any results from completed queries + if let Some((min_ttl, result)) = self.poll_queries(cx) { + // cache the found ENR's + for enr in result.iter().cloned() { + self.cached_enrs.put(enr.peer_id(), enr); + } + // return the result to the peer manager + return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, Box::new(result))); + } + + // Process the server event stream + match self.event_stream { + EventStream::Awaiting(ref mut fut) => { + // Still awaiting the event stream, poll it + if let Poll::Ready(event_stream) = fut.poll_unpin(cx) { + match event_stream { + Ok(stream) => self.event_stream = EventStream::Present(stream), + Err(e) => { + slog::crit!(self.log, "Discv5 event stream failed"; "error" => e.to_string()); + self.event_stream = EventStream::Failed; + } + } + } + } + EventStream::Failed => {} // ignore checking the stream + EventStream::Present(ref mut stream) => { + while let Ok(event) = stream.try_recv() { + match event { + // We filter out unwanted discv5 events here and only propagate useful results to + // the peer manager. + Discv5Event::Discovered(_enr) => { + // Peers that get discovered during a query but are not contactable or + // don't match a predicate can end up here. For debugging purposes we + // log these to see if we are unnecessarily dropping discovered peers + /* + if enr.eth2() == self.local_enr().eth2() { + trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + } else { + // this is temporary warning for debugging the DHT + warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + } + */ + } + Discv5Event::SocketUpdated(socket) => { + info!(self.log, "Address updated"; "ip" => format!("{}",socket.ip()), "udp_port" => format!("{}", socket.port())); + metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT); + // Discv5 will have updated our local ENR. We save the updated version + // to disk. + let enr = self.discv5.local_enr(); + enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr, &self.log); + return Poll::Ready(DiscoveryEvent::SocketUpdated(socket)); + } + _ => {} // Ignore all other discv5 server events + } + } + } + } + Poll::Pending + } +} diff --git a/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs similarity index 95% rename from beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs rename to beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs index 89451d7f6..2e74ff32e 100644 --- a/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs +++ b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs @@ -5,7 +5,7 @@ use super::*; pub fn subnet_predicate( subnet_id: SubnetId, log: &slog::Logger, -) -> impl Fn(&Enr) -> bool + Send + 'static + Clone +) -> impl Fn(&Enr) -> bool + Send where TSpec: EthSpec, { diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs similarity index 88% rename from beacon_node/eth2-libp2p/src/lib.rs rename to beacon_node/eth2_libp2p/src/lib.rs index 9bdf6ef08..f416e0f06 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -17,9 +17,11 @@ pub mod types; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; -pub use discovery::enr_ext::{CombinedKeyExt, EnrExt}; +pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; +pub use discv5; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; +pub use metrics::scrape_discovery_metrics; pub use peer_manager::{client::Client, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2_libp2p/src/metrics.rs b/beacon_node/eth2_libp2p/src/metrics.rs new file mode 100644 index 000000000..d1e19b653 --- /dev/null +++ b/beacon_node/eth2_libp2p/src/metrics.rs @@ -0,0 +1,59 @@ +pub use lighthouse_metrics::*; + +lazy_static! { + pub static ref ADDRESS_UPDATE_COUNT: Result = try_create_int_counter( + "libp2p_address_update_total", + "Count of libp2p socked updated events (when our view of our IP address has changed)" + ); + pub static ref PEERS_CONNECTED: Result = try_create_int_gauge( + "libp2p_peer_connected_peers_total", + "Count of libp2p peers currently connected" + ); + pub static ref PEER_CONNECT_EVENT_COUNT: Result = try_create_int_counter( + "libp2p_peer_connect_event_total", + "Count of libp2p peer connect events (not the current number of connected peers)" + ); + pub static ref PEER_DISCONNECT_EVENT_COUNT: Result = try_create_int_counter( + "libp2p_peer_disconnect_event_total", + "Count of libp2p peer disconnect events" + ); + pub static ref DISCOVERY_QUEUE: Result = try_create_int_gauge( + "discovery_queue_size", + "The number of discovery queries awaiting execution" + ); + pub static ref DISCOVERY_REQS: Result = try_create_float_gauge( + "discovery_requests", + "The number of unsolicited discovery requests per second" + ); + pub static ref DISCOVERY_SESSIONS: Result = try_create_int_gauge( + "discovery_sessions", + "The number of active discovery sessions with peers" + ); + pub static ref DISCOVERY_REQS_IP: Result = try_create_float_gauge_vec( + "discovery_reqs_per_ip", + "Unsolicited discovery requests per ip per second", + &["Addresses"] + ); +} + +pub fn scrape_discovery_metrics() { + let metrics = discv5::metrics::Metrics::from(discv5::Discv5::raw_metrics()); + + set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second); + + set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64); + + let process_gauge_vec = |gauge: &Result, metrics: discv5::metrics::Metrics| { + if let Ok(gauge_vec) = gauge { + gauge_vec.reset(); + for (ip, value) in metrics.requests_per_ip_per_second.iter() { + if let Ok(metric) = gauge_vec.get_metric_with_label_values(&[&format!("{:?}", ip)]) + { + metric.set(*value); + } + } + } + }; + + process_gauge_vec(&DISCOVERY_REQS_IP, metrics); +} diff --git a/beacon_node/eth2-libp2p/src/peer_manager/client.rs b/beacon_node/eth2_libp2p/src/peer_manager/client.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/peer_manager/client.rs rename to beacon_node/eth2_libp2p/src/peer_manager/client.rs diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs similarity index 74% rename from beacon_node/eth2-libp2p/src/peer_manager/mod.rs rename to beacon_node/eth2_libp2p/src/peer_manager/mod.rs index a051f9b77..5d8720289 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -1,21 +1,27 @@ //! Implementation of a Lighthouse's peer management system. pub use self::peerdb::*; -use crate::metrics; +use crate::discovery::{Discovery, DiscoveryEvent}; use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode}; -use crate::{NetworkGlobals, PeerId}; +use crate::{error, metrics}; +use crate::{Enr, EnrExt, NetworkConfig, NetworkGlobals, PeerId}; use futures::prelude::*; use futures::Stream; use hashset_delay::HashSetDelay; +use libp2p::core::multiaddr::Protocol as MProtocol; use libp2p::identify::IdentifyInfo; -use slog::{crit, debug, error, warn}; +use slog::{crit, debug, error}; use smallvec::SmallVec; -use std::convert::TryInto; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use types::EthSpec; +use std::{ + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant}, +}; +use types::{EthSpec, SubnetId}; + +pub use libp2p::core::{identity::Keypair, Multiaddr}; pub mod client; mod peer_info; @@ -33,18 +39,26 @@ const STATUS_INTERVAL: u64 = 300; /// this time frame (Seconds) const PING_INTERVAL: u64 = 30; +/// The heartbeat performs regular updates such as updating reputations and performing discovery +/// requests. This defines the interval in seconds. +const HEARTBEAT_INTERVAL: u64 = 30; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. network_globals: Arc>, /// A queue of events that the `PeerManager` is waiting to produce. - events: SmallVec<[PeerManagerEvent; 5]>, + events: SmallVec<[PeerManagerEvent; 16]>, /// A collection of peers awaiting to be Ping'd. ping_peers: HashSetDelay, /// A collection of peers awaiting to be Status'd. status_peers: HashSetDelay, - /// Last updated moment. - _last_updated: Instant, + /// The target number of peers we would like to connect to. + target_peers: usize, + /// The discovery service. + discovery: Discovery, + /// The heartbeat interval to perform routine maintenance. + heartbeat: tokio::time::Interval, /// The logger associated with the `PeerManager`. log: slog::Logger, } @@ -89,6 +103,10 @@ impl PeerAction { /// The events that the `PeerManager` outputs (requests). pub enum PeerManagerEvent { + /// Dial a PeerId. + Dial(PeerId), + /// Inform libp2p that our external socket addr has been updated. + SocketUpdated(Multiaddr), /// Sends a STATUS to a peer. Status(PeerId), /// Sends a PING to a peer. @@ -100,99 +118,59 @@ pub enum PeerManagerEvent { } impl PeerManager { - pub fn new(network_globals: Arc>, log: &slog::Logger) -> Self { - PeerManager { + // NOTE: Must be run inside a tokio executor. + pub fn new( + local_key: &Keypair, + config: &NetworkConfig, + network_globals: Arc>, + log: &slog::Logger, + ) -> error::Result { + // start the discovery service + let mut discovery = Discovery::new(local_key, config, network_globals.clone(), log)?; + + // start searching for peers + discovery.discover_peers(); + + let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL)); + + Ok(PeerManager { network_globals, events: SmallVec::new(), - _last_updated: Instant::now(), ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), + target_peers: config.max_peers, //TODO: Add support for target peers and max peers + discovery, + heartbeat, log: log.clone(), - } + }) } /* Public accessible functions */ - /// A ping request has been received. - // NOTE: The behaviour responds with a PONG automatically - // TODO: Update last seen - pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { - // received a ping - // reset the to-ping timer for this peer - debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); - self.ping_peers.insert(peer_id.clone()); + /* Discovery Requests */ - // if the sequence number is unknown send an update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { - if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; - "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; - "peer_id" => peer_id.to_string()); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - crit!(self.log, "Received a PING from an unknown peer"; - "peer_id" => peer_id.to_string()); - } + /// Provides a reference to the underlying discovery service. + pub fn discovery(&self) -> &Discovery { + &self.discovery } - /// A PONG has been returned from a peer. - // TODO: Update last seen - pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { - // received a pong - - // if the sequence number is unknown send update the meta data of the peer. - if let Some(meta_data) = &peer_info.meta_data { - if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; - "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; - "peer_id" => peer_id.to_string()); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); - } - } else { - crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => peer_id.to_string()); - } + /// Provides a mutable reference to the underlying discovery service. + pub fn discovery_mut(&mut self) -> &mut Discovery { + &mut self.discovery } - /// Received a metadata response from a peer. - // TODO: Update last seen - pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - if let Some(known_meta_data) = &peer_info.meta_data { - if known_meta_data.seq_number < meta_data.seq_number { - debug!(self.log, "Updating peer's metadata"; - "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); - peer_info.meta_data = Some(meta_data); - } else { - debug!(self.log, "Received old metadata"; - "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); - } - } else { - // we have no meta-data for this peer, update - debug!(self.log, "Obtained peer's metadata"; - "peer_id" => peer_id.to_string(), "new_seq_no" => meta_data.seq_number); - peer_info.meta_data = Some(meta_data); - } - } else { - crit!(self.log, "Received METADATA from an unknown peer"; - "peer_id" => peer_id.to_string()); + /// A request to find peers on a given subnet. + pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + // Extend the time to maintain peers if required. + if let Some(min_ttl) = min_ttl { + self.network_globals + .peers + .write() + .extend_peers_on_subnet(subnet_id, min_ttl); } + + // request the subnet query from discovery + self.discovery.discover_subnet_peers(subnet_id, min_ttl); } /// A STATUS message has been received from a peer. This resets the status timer. @@ -320,8 +298,158 @@ impl PeerManager { self.report_peer(peer_id, peer_action); } + /// A ping request has been received. + // NOTE: The behaviour responds with a PONG automatically + // TODO: Update last seen + pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + // received a ping + // reset the to-ping timer for this peer + debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); + self.ping_peers.insert(peer_id.clone()); + + // if the sequence number is unknown send an update the meta data of the peer. + if let Some(meta_data) = &peer_info.meta_data { + if meta_data.seq_number < seq { + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + // if we don't know the meta-data, request it + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + crit!(self.log, "Received a PING from an unknown peer"; + "peer_id" => peer_id.to_string()); + } + } + + /// A PONG has been returned from a peer. + // TODO: Update last seen + pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + // received a pong + + // if the sequence number is unknown send update the meta data of the peer. + if let Some(meta_data) = &peer_info.meta_data { + if meta_data.seq_number < seq { + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + // if we don't know the meta-data, request it + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => peer_id.to_string()); + } + } + + /// Received a metadata response from a peer. + // TODO: Update last seen + pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + if let Some(known_meta_data) = &peer_info.meta_data { + if known_meta_data.seq_number < meta_data.seq_number { + debug!(self.log, "Updating peer's metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + peer_info.meta_data = Some(meta_data); + } else { + debug!(self.log, "Received old metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + } + } else { + // we have no meta-data for this peer, update + debug!(self.log, "Obtained peer's metadata"; + "peer_id" => peer_id.to_string(), "new_seq_no" => meta_data.seq_number); + peer_info.meta_data = Some(meta_data); + } + } else { + crit!(self.log, "Received METADATA from an unknown peer"; + "peer_id" => peer_id.to_string()); + } + } + + // Handles the libp2p request to obtain multiaddrs for peer_id's in order to dial them. + pub fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + if let Some(enr) = self.discovery.enr_of_peer(peer_id) { + // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP + // port is removed, which is assumed to be associated with the discv5 protocol (and + // therefore irrelevant for other libp2p components). + let mut out_list = enr.multiaddr(); + out_list.retain(|addr| { + addr.iter() + .find(|v| match v { + MProtocol::Udp(_) => true, + _ => false, + }) + .is_none() + }); + + out_list + } else { + // PeerId is not known + Vec::new() + } + } + /* Internal functions */ + // The underlying discovery server has updated our external IP address. We send this up to + // notify libp2p. + fn socket_updated(&mut self, socket: SocketAddr) { + // Build a multiaddr to report to libp2p + let mut multiaddr = Multiaddr::from(socket.ip()); + // NOTE: This doesn't actually track the external TCP port. More sophisticated NAT handling + // should handle this. + multiaddr.push(MProtocol::Tcp(self.network_globals.listen_port_tcp())); + self.events.push(PeerManagerEvent::SocketUpdated(multiaddr)); + } + + /// Peers that have been returned by discovery requests are dialed here if they are suitable. + /// + /// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated + /// with a new `PeerId` which involves a discovery routing table lookup. We could dial the + /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup + /// proves resource constraining, we should switch to multiaddr dialling here. + fn peers_discovered(&mut self, peers: Vec, min_ttl: Option) { + for enr in peers { + let peer_id = enr.peer_id(); + + // if we need more peers, attempt a connection + if self.network_globals.connected_or_dialing_peers() < self.target_peers + && !self + .network_globals + .peers + .read() + .is_connected_or_dialing(&peer_id) + && !self.network_globals.peers.read().peer_banned(&peer_id) + { + debug!(self.log, "Dialing discovered peer"; "peer_id"=> peer_id.to_string()); + // TODO: Update output + // This should be updated with the peer dialing. In fact created once the peer is + // dialed + if let Some(min_ttl) = min_ttl { + self.network_globals + .peers + .write() + .update_min_ttl(&peer_id, min_ttl); + } + self.events.push(PeerManagerEvent::Dial(peer_id)); + } + } + } + /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being /// dialed or connecting to us. /// @@ -376,12 +504,11 @@ impl PeerManager { /// /// A banned(disconnected) peer that gets its rep above(below) MIN_REP_BEFORE_BAN is /// now considered a disconnected(banned) peer. + // TODO: Implement when reputation is added. fn _update_reputations(&mut self) { + /* // avoid locking the peerdb too often // TODO: call this on a timer - if self._last_updated.elapsed().as_secs() < 30 { - return; - } let now = Instant::now(); @@ -457,6 +584,28 @@ impl PeerManager { } self._last_updated = Instant::now(); + */ + } + + /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. + /// + /// It will request discovery queries if the peer count has not reached the desired number of + /// peers. + /// + /// NOTE: Discovery will only add a new query if one isn't already queued. + fn heartbeat(&mut self) { + // TODO: Provide a back-off time for discovery queries. I.e Queue many initially, then only + // perform discoveries over a larger fixed interval. Perhaps one every 6 heartbeats + let peer_count = self.network_globals.connected_or_dialing_peers(); + if peer_count < self.target_peers { + // If we need more peers, queue a discovery lookup. + self.discovery.discover_peers(); + } + + // TODO: If we have too many peers, remove peers that are not required for subnet + // validation. + + // TODO: Perform peer reputation maintenance here } } @@ -464,6 +613,21 @@ impl Stream for PeerManager { type Item = PeerManagerEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // perform the heartbeat when necessary + while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) { + self.heartbeat(); + } + + // handle any discovery events + while let Poll::Ready(event) = self.discovery.poll(cx) { + match event { + DiscoveryEvent::SocketUpdated(socket_addr) => self.socket_updated(socket_addr), + DiscoveryEvent::QueryResult(min_ttl, peers) => { + self.peers_discovered(*peers, min_ttl) + } + } + } + // poll the timeouts for pings and status' loop { match self.ping_peers.poll_next_unpin(cx) { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/peer_manager/peer_sync_status.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs rename to beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index b5aa2cb3d..063d36db9 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -106,6 +106,14 @@ impl PeerDB { } } + /// Returns true if the Peer is banned. + pub fn peer_banned(&self, peer_id: &PeerId) -> bool { + match self.peers.get(peer_id).map(|info| &info.connection_status) { + Some(status) => status.is_banned(), + None => false, + } + } + /// Gives the ids of all known connected peers. pub fn connected_peers(&self) -> impl Iterator)> { self.peers diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/base.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/base.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/mod.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/mod.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs rename to beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/handler.rs rename to beacon_node/eth2_libp2p/src/rpc/handler.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/methods.rs rename to beacon_node/eth2_libp2p/src/rpc/methods.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/mod.rs rename to beacon_node/eth2_libp2p/src/rpc/mod.rs diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/rpc/protocol.rs rename to beacon_node/eth2_libp2p/src/rpc/protocol.rs diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/service.rs rename to beacon_node/eth2_libp2p/src/service.rs index b2c5efcf7..130c495a3 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -90,11 +90,7 @@ impl Service { trace!(log, "Libp2p Service starting"); // initialise the node's ID - let local_keypair = if let Some(hex_bytes) = &config.secret_key_hex { - keypair_from_hex(hex_bytes)? - } else { - load_private_key(config, &log) - }; + let local_keypair = load_private_key(config, &log); // Create an ENR or load from disk if appropriate let enr = @@ -340,7 +336,6 @@ impl Service { debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())) } SwarmEvent::Dialing(peer_id) => { - debug!(self.log, "Dialing peer"; "peer" => peer_id.to_string()); self.swarm.peer_manager().dialing_peer(&peer_id); } } @@ -392,6 +387,8 @@ fn build_transport( .boxed()) } +// Useful helper functions for debugging. Currently not used in the client. +#[allow(dead_code)] fn keypair_from_hex(hex_bytes: &str) -> error::Result { let hex_bytes = if hex_bytes.starts_with("0x") { hex_bytes[2..].to_string() @@ -404,6 +401,7 @@ fn keypair_from_hex(hex_bytes: &str) -> error::Result { .and_then(keypair_from_bytes) } +#[allow(dead_code)] fn keypair_from_bytes(mut bytes: Vec) -> error::Result { libp2p::core::identity::secp256k1::SecretKey::from_bytes(&mut bytes) .map(|secret| { diff --git a/beacon_node/eth2-libp2p/src/types/error.rs b/beacon_node/eth2_libp2p/src/types/error.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/error.rs rename to beacon_node/eth2_libp2p/src/types/error.rs diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs similarity index 98% rename from beacon_node/eth2-libp2p/src/types/globals.rs rename to beacon_node/eth2_libp2p/src/types/globals.rs index d765d4240..84e183b6b 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -4,7 +4,7 @@ use crate::rpc::methods::MetaData; use crate::types::SyncState; use crate::Client; use crate::EnrExt; -use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; +use crate::{Enr, Eth2Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::atomic::{AtomicU16, Ordering}; diff --git a/beacon_node/eth2-libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/mod.rs rename to beacon_node/eth2_libp2p/src/types/mod.rs diff --git a/beacon_node/eth2-libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/pubsub.rs rename to beacon_node/eth2_libp2p/src/types/pubsub.rs diff --git a/beacon_node/eth2-libp2p/src/types/sync_state.rs b/beacon_node/eth2_libp2p/src/types/sync_state.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/sync_state.rs rename to beacon_node/eth2_libp2p/src/types/sync_state.rs diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs similarity index 100% rename from beacon_node/eth2-libp2p/src/types/topics.rs rename to beacon_node/eth2_libp2p/src/types/topics.rs diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs similarity index 88% rename from beacon_node/eth2-libp2p/tests/common/mod.rs rename to beacon_node/eth2_libp2p/tests/common/mod.rs index e26381f51..50ff284c2 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -71,11 +71,7 @@ pub fn unused_port(transport: &str) -> Result { Ok(local_addr.port()) } -pub fn build_config( - port: u16, - mut boot_nodes: Vec, - secret_key: Option, -) -> NetworkConfig { +pub fn build_config(port: u16, mut boot_nodes: Vec) -> NetworkConfig { let mut config = NetworkConfig::default(); let path = TempDir::new(&format!("libp2p_test{}", port)).unwrap(); @@ -85,7 +81,6 @@ pub fn build_config( config.enr_udp_port = Some(port); config.enr_address = Some("127.0.0.1".parse().unwrap()); config.boot_nodes.append(&mut boot_nodes); - config.secret_key_hex = secret_key; config.network_dir = path.into_path(); // Reduce gossipsub heartbeat parameters config.gs_config.heartbeat_initial_delay = Duration::from_millis(500); @@ -93,13 +88,9 @@ pub fn build_config( config } -pub fn build_libp2p_instance( - boot_nodes: Vec, - secret_key: Option, - log: slog::Logger, -) -> Libp2pInstance { +pub fn build_libp2p_instance(boot_nodes: Vec, log: slog::Logger) -> Libp2pInstance { let port = unused_port("tcp").unwrap(); - let config = build_config(port, boot_nodes, secret_key); + let config = build_config(port, boot_nodes); // launch libp2p service let (signal, exit) = exit_future::signal(); @@ -115,7 +106,7 @@ pub fn build_libp2p_instance( #[allow(dead_code)] pub fn get_enr(node: &LibP2PService) -> Enr { - let enr = node.swarm.discovery().local_enr().clone(); + let enr = node.swarm.local_enr().clone(); enr } @@ -123,7 +114,7 @@ pub fn get_enr(node: &LibP2PService) -> Enr { #[allow(dead_code)] pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec { let mut nodes: Vec<_> = (0..n) - .map(|_| build_libp2p_instance(vec![], None, log.clone())) + .map(|_| build_libp2p_instance(vec![], log.clone())) .collect(); let multiaddrs: Vec = nodes .iter() @@ -150,10 +141,10 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = build_libp2p_instance(vec![], None, sender_log); - let mut receiver = build_libp2p_instance(vec![], None, receiver_log); + let mut sender = build_libp2p_instance(vec![], sender_log); + let mut receiver = build_libp2p_instance(vec![], receiver_log); - let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone(); + let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone(); // let the two nodes set up listeners let sender_fut = async { @@ -192,7 +183,7 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta #[allow(dead_code)] pub fn build_linear(log: slog::Logger, n: usize) -> Vec { let mut nodes: Vec<_> = (0..n) - .map(|_| build_libp2p_instance(vec![], None, log.clone())) + .map(|_| build_libp2p_instance(vec![], log.clone())) .collect(); let multiaddrs: Vec = nodes .iter() diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2_libp2p/tests/gossipsub_tests.rs similarity index 100% rename from beacon_node/eth2-libp2p/tests/gossipsub_tests.rs rename to beacon_node/eth2_libp2p/tests/gossipsub_tests.rs diff --git a/beacon_node/eth2_libp2p/tests/noise.rs b/beacon_node/eth2_libp2p/tests/noise.rs new file mode 100644 index 000000000..2427622a2 --- /dev/null +++ b/beacon_node/eth2_libp2p/tests/noise.rs @@ -0,0 +1,184 @@ +#![cfg(test)] +use crate::behaviour::Behaviour; +use crate::multiaddr::Protocol; +use ::types::{EnrForkId, MinimalEthSpec}; +use eth2_libp2p::discovery::{build_enr, CombinedKey, CombinedKeyExt}; +use eth2_libp2p::*; +use futures::prelude::*; +use libp2p::core::identity::Keypair; +use libp2p::{ + core, + core::{muxing::StreamMuxerBox, transport::boxed::Boxed}, + secio, + swarm::{SwarmBuilder, SwarmEvent}, + PeerId, Swarm, Transport, +}; +use slog::{crit, debug, info, Level}; +use std::io::{Error, ErrorKind}; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +type TSpec = MinimalEthSpec; + +mod common; + +type Libp2pBehaviour = Behaviour; + +/// Build and return a eth2_libp2p Swarm with only secio support. +fn build_secio_swarm( + config: &NetworkConfig, + log: slog::Logger, +) -> error::Result> { + let local_keypair = Keypair::generate_secp256k1(); + let local_peer_id = PeerId::from(local_keypair.public()); + let enr_key = CombinedKey::from_libp2p(&local_keypair).unwrap(); + + let enr = build_enr::(&enr_key, config, EnrForkId::default()).unwrap(); + let network_globals = Arc::new(NetworkGlobals::new( + enr, + config.libp2p_port, + config.discovery_port, + &log, + )); + + let mut swarm = { + // Set up the transport - tcp/ws with secio and mplex/yamux + let transport = build_secio_transport(local_keypair.clone()); + // Lighthouse network behaviour + let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; + // requires a tokio runtime + struct Executor(tokio::runtime::Handle); + impl libp2p::core::Executor for Executor { + fn exec(&self, f: Pin + Send>>) { + self.0.spawn(f); + } + } + SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) + .executor(Box::new(Executor(tokio::runtime::Handle::current()))) + .build() + }; + + // listen on the specified address + let listen_multiaddr = { + let mut m = Multiaddr::from(config.listen_address); + m.push(Protocol::Tcp(config.libp2p_port)); + m + }; + + match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) { + Ok(_) => { + let mut log_address = listen_multiaddr; + log_address.push(Protocol::P2p(local_peer_id.clone().into())); + info!(log, "Listening established"; "address" => format!("{}", log_address)); + } + Err(err) => { + crit!( + log, + "Unable to listen on libp2p address"; + "error" => format!("{:?}", err), + "listen_multiaddr" => format!("{}", listen_multiaddr), + ); + return Err("Libp2p was unable to listen on the given listen address.".into()); + } + }; + + // helper closure for dialing peers + let mut dial_addr = |multiaddr: &Multiaddr| { + match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { + Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), + Err(err) => debug!( + log, + "Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err) + ), + }; + }; + + // attempt to connect to any specified boot-nodes + for bootnode_enr in &config.boot_nodes { + for multiaddr in &bootnode_enr.multiaddr() { + // ignore udp multiaddr if it exists + let components = multiaddr.iter().collect::>(); + if let Protocol::Udp(_) = components[1] { + continue; + } + dial_addr(multiaddr); + } + } + Ok(swarm) +} + +/// Build a simple TCP transport with secio, mplex/yamux. +fn build_secio_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { + let transport = libp2p_tcp::TokioTcpConfig::new().nodelay(true); + transport + .upgrade(core::upgrade::Version::V1) + .authenticate(secio::SecioConfig::new(local_private_key)) + .multiplex(core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + )) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) + .map_err(|err| Error::new(ErrorKind::Other, err)) + .boxed() +} + +/// Test if the encryption falls back to secio if noise isn't available +#[tokio::test] +async fn test_secio_noise_fallback() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + + let port = common::unused_port("tcp").unwrap(); + let noisy_config = common::build_config(port, vec![]); + let (_signal, exit) = exit_future::signal(); + let executor = + environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); + let mut noisy_node = Service::new(executor, &noisy_config, EnrForkId::default(), &log) + .expect("should build a libp2p instance") + .1; + + let port = common::unused_port("tcp").unwrap(); + let secio_config = common::build_config(port, vec![common::get_enr(&noisy_node)]); + + // Building a custom Libp2pService from outside the crate isn't possible because of + // private fields in the Libp2pService struct. A swarm is good enough for testing + // compatibility with secio. + let mut secio_swarm = + build_secio_swarm(&secio_config, log.clone()).expect("should build a secio swarm"); + + let secio_log = log.clone(); + + let noisy_future = async { + loop { + noisy_node.next_event().await; + } + }; + + let secio_future = async { + loop { + match secio_swarm.next_event().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + // secio node negotiated a secio transport with + // the noise compatible node + info!(secio_log, "Connected to peer {}", peer_id); + return; + } + _ => {} // Ignore all other events + } + } + }; + + tokio::select! { + _ = noisy_future => {} + _ = secio_future => {} + _ = tokio::time::delay_for(Duration::from_millis(800)) => { + panic!("Future timed out"); + } + } +} diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs similarity index 100% rename from beacon_node/eth2-libp2p/tests/rpc_tests.rs rename to beacon_node/eth2_libp2p/tests/rpc_tests.rs diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 36d0a5e84..e548fa7a9 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -15,7 +15,7 @@ exit-future = "0.2.0" [dependencies] beacon_chain = { path = "../beacon_chain" } store = { path = "../store" } -eth2-libp2p = { path = "../eth2-libp2p" } +eth2_libp2p = { path = "../eth2_libp2p" } hashset_delay = { path = "../../common/hashset_delay" } rest_types = { path = "../../common/rest_types" } types = { path = "../../consensus/types" } diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 9bdf795c8..7ed904df4 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -71,17 +71,20 @@ impl PartialEq for AttServiceMessage { subnet_id: other_subnet_id, min_ttl: other_min_ttl, }, - ) => match (min_ttl, other_min_ttl) { - (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { - min_ttl_instant.saturating_duration_since(other_min_ttl_instant) - < DURATION_DIFFERENCE - && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) - < DURATION_DIFFERENCE - && subnet_id == other_subnet_id - } - (None, None) => subnet_id == other_subnet_id, - _ => false, - }, + ) => { + subnet_id == other_subnet_id + && match (min_ttl, other_min_ttl) { + (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { + min_ttl_instant.saturating_duration_since(other_min_ttl_instant) + < DURATION_DIFFERENCE + && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) + < DURATION_DIFFERENCE + } + (None, None) => true, + (None, Some(_)) => true, + (Some(_), None) => true, + } + } _ => false, } } @@ -362,12 +365,12 @@ impl AttestationService { *other_min_ttl = min_ttl; } } - (None, Some(_)) => { - // Update the min_ttl to None, because the new message is longer-lived. - *other_min_ttl = None; + (None, Some(_)) => {} // Keep the current one as it has an actual min_ttl + (Some(min_ttl), None) => { + // Update the request to include a min_ttl. + *other_min_ttl = Some(min_ttl); } - (Some(_), None) => {} // Don't replace this because the existing message is for a longer-lived peer. - (None, None) => {} // Duplicate message, do nothing. + (None, None) => {} // Duplicate message, do nothing. } is_duplicate = true; return; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index a421379f9..383a4d276 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -11,7 +11,7 @@ use eth2_libp2p::{ rpc::{RPCResponseErrorCode, RequestId}, Libp2pEvent, PeerRequestId, PubsubMessage, Request, Response, }; -use eth2_libp2p::{BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; +use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace, warn}; @@ -48,8 +48,6 @@ pub struct NetworkService { next_fork_update: Option, /// The logger for the network service. log: slog::Logger, - /// A probability of propagation. - propagation_percentage: Option, } impl NetworkService { @@ -67,8 +65,6 @@ impl NetworkService { // get a reference to the beacon chain store let store = beacon_chain.store.clone(); - let propagation_percentage = config.propagation_percentage; - // build the current enr_fork_id for adding to our local ENR let enr_fork_id = beacon_chain.enr_fork_id(); @@ -79,8 +75,14 @@ impl NetworkService { let (network_globals, mut libp2p) = LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log)?; - for enr in load_dht::(store.clone()) { - libp2p.swarm.add_enr(enr); + // Repopulate the DHT with stored ENR's. + let enrs_to_load = load_dht::(store.clone()); + debug!( + network_log, + "Loading peers into the routing table"; "peers" => enrs_to_load.len() + ); + for enr in enrs_to_load { + libp2p.swarm.add_enr(enr.clone()); } // launch derived network services @@ -110,7 +112,6 @@ impl NetworkService { network_globals: network_globals.clone(), next_fork_update, log: network_log, - propagation_percentage, }; spawn_service(executor, network_service)?; @@ -136,7 +137,7 @@ fn spawn_service( // handle network shutdown _ = (&mut exit_rx) => { // network thread is terminating - let enrs: Vec = service.libp2p.swarm.enr_entries().cloned().collect(); + let enrs = service.libp2p.swarm.enr_entries(); debug!( service.log, "Persisting DHT to store"; @@ -174,20 +175,6 @@ fn spawn_service( propagation_source, message_id, } => { - // TODO: Remove this for mainnet - // randomly prevents propagation - let mut should_send = true; - if let Some(percentage) = service.propagation_percentage { - // not exact percentage but close enough - let rand = rand::random::() % 100; - if rand > percentage { - // don't propagate - should_send = false; - } - } - if !should_send { - info!(service.log, "Random filter did not propagate message"); - } else { trace!(service.log, "Propagating gossipsub message"; "propagation_peer" => format!("{:?}", propagation_source), "message_id" => message_id.to_string(), @@ -196,23 +183,8 @@ fn spawn_service( .libp2p .swarm .propagate_message(&propagation_source, message_id); - } } NetworkMessage::Publish { messages } => { - // TODO: Remove this for mainnet - // randomly prevents propagation - let mut should_send = true; - if let Some(percentage) = service.propagation_percentage { - // not exact percentage but close enough - let rand = rand::random::() % 100; - if rand > percentage { - // don't propagate - should_send = false; - } - } - if !should_send { - info!(service.log, "Random filter did not publish messages"); - } else { let mut topic_kinds = Vec::new(); for message in &messages { if !topic_kinds.contains(&message.kind()) { @@ -227,7 +199,6 @@ fn spawn_service( ); expose_publish_metrics(&messages); service.libp2p.swarm.publish(messages); - } } NetworkMessage::Disconnect { peer_id } => { service.libp2p.disconnect_and_ban_peer( diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index e226df87f..bd9302f62 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -10,7 +10,7 @@ bls = { path = "../../crypto/bls" } rest_types = { path = "../../common/rest_types" } beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } -eth2-libp2p = { path = "../eth2-libp2p" } +eth2_libp2p = { path = "../eth2_libp2p" } store = { path = "../store" } version = { path = "../version" } serde = { version = "1.0.110", features = ["derive"] } diff --git a/beacon_node/rest_api/src/metrics.rs b/beacon_node/rest_api/src/metrics.rs index ac6775623..87b4f6285 100644 --- a/beacon_node/rest_api/src/metrics.rs +++ b/beacon_node/rest_api/src/metrics.rs @@ -104,6 +104,7 @@ pub fn get_prometheus( slot_clock::scrape_for_metrics::(&beacon_chain.slot_clock); store::scrape_for_metrics(&db_path, &freezer_db_path); beacon_chain::scrape_for_metrics(&beacon_chain); + eth2_libp2p::scrape_discovery_metrics(); // This will silently fail if we are unable to observe the health. This is desired behaviour // since we don't support `Health` for all platforms. diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 4747ec27a..50f635d0d 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -61,8 +61,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( - Arg::with_name("maxpeers") - .long("maxpeers") + Arg::with_name("max_peers") + .long("max-peers") .help("The maximum number of peers.") .default_value("50") .takes_value(true), @@ -125,24 +125,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { without an ENR.") .takes_value(true), ) - .arg( - Arg::with_name("p2p-priv-key") - .long("p2p-priv-key") - .value_name("HEX") - .help("A secp256k1 secret key, represented as ASCII-encoded hex bytes (with or \ - without 0x prefix). Default is either loaded from disk or generated \ - automatically.") - .takes_value(true), - ) - .arg( - Arg::with_name("random-propagation") - .long("random-propagation") - .value_name("INTEGER") - .takes_value(true) - .help("Specifies (as a percentage) the likelihood of propagating blocks and \ - attestations. This should only be used for testing networking elements. The \ - value must like in the range 1-100. Default is 100.") - ) /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 5c129196b..f794f8886 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -102,7 +102,7 @@ pub fn get_config( client_config.network.listen_address = listen_address; } - if let Some(max_peers_str) = cli_args.value_of("maxpeers") { + if let Some(max_peers_str) = cli_args.value_of("max-peers") { client_config.network.max_peers = max_peers_str .parse::() .map_err(|_| format!("Invalid number of max peers: {}", max_peers_str))?; @@ -208,24 +208,6 @@ pub fn get_config( client_config.network.discv5_config.enr_update = false; } - if let Some(p2p_priv_key) = cli_args.value_of("p2p-priv-key") { - client_config.network.secret_key_hex = Some(p2p_priv_key.to_string()); - } - - // Define a percentage of messages that should be propogated, useful for simulating bad network - // conditions. - // - // WARNING: setting this to anything less than 100 will cause bad behaviour. - if let Some(propagation_percentage_string) = cli_args.value_of("random-propagation") { - let percentage = propagation_percentage_string - .parse::() - .map_err(|_| "Unable to parse the propagation percentage".to_string())?; - if percentage > 100 { - return Err("Propagation percentage greater than 100".to_string()); - } - client_config.network.propagation_percentage = Some(percentage); - } - /* * Http server */ diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index d8459b3c3..fdf8855a4 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -82,6 +82,8 @@ impl ProductionBeaconNode { let db_path = client_config.create_db_path()?; let freezer_db_path_res = client_config.create_freezer_db_path(); + let executor = context.executor.clone(); + let builder = ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) @@ -119,6 +121,9 @@ impl ProductionBeaconNode { .system_time_slot_clock()? .tee_event_handler(client_config.websocket_server.clone())?; + // Inject the executor into the discv5 network config. + client_config.network.discv5_config.executor = Some(Box::new(executor)); + let builder = builder .build_beacon_chain()? .network(&mut client_config.network)? diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml new file mode 100644 index 000000000..90f9f3840 --- /dev/null +++ b/boot_node/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "boot_node" +version = "0.1.0" +authors = ["Age Manning "] +edition = "2018" + +[dependencies] +clap = "2.33.0" +eth2_libp2p = { path = "../beacon_node/eth2_libp2p" } +slog = "2.5.2" +sloggers = "1.0.1" +tokio = "0.2.21" +log = "0.4.8" +slog-term = "2.6.0" +logging = { path = "../common/logging" } +slog-async = "2.5.0" +slog-scope = "4.3.0" +slog-stdlog = "4.0.0" +futures = "0.3.5" +discv5 = "0.1.0-alpha.5" diff --git a/boot_node/src/cli.rs b/boot_node/src/cli.rs new file mode 100644 index 000000000..ac700b5f1 --- /dev/null +++ b/boot_node/src/cli.rs @@ -0,0 +1,60 @@ +//! Simple logic for spawning a Lighthouse BootNode. + +use clap::{App, Arg}; + +// TODO: Add DOS prevention CLI params +pub fn cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("boot_node") + .about("Start a special Lighthouse process that only serves as a discv5 boot-node. This + process will *not* import blocks or perform most typical beacon node functions. Instead, it + will simply run the discv5 service and assist nodes on the network to discover each other. + This is the recommended way to provide a network boot-node since it has a reduced attack + surface compared to a full beacon node.") + .settings(&[clap::AppSettings::ColoredHelp]) + .arg( + Arg::with_name("boot-node-enr-address") + .value_name("IP-ADDRESS") + .help("The external IP address/ DNS address to broadcast to other peers on how to reach this node. \ + If a DNS address is provided, the enr-address is set to the IP address it resolves to and \ + does not auto-update based on PONG responses in discovery.") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("port") + .value_name("PORT") + .help("The UDP port to listen on.") + .default_value("9000") + .takes_value(true), + ) + .arg( + Arg::with_name("listen-address") + .long("listen-address") + .value_name("ADDRESS") + .help("The address the bootnode will listen for UDP connections.") + .default_value("0.0.0.0") + .takes_value(true) + ) + .arg( + Arg::with_name("boot-nodes") + .long("boot-nodes") + .allow_hyphen_values(true) + .value_name("ENR-LIST/Multiaddr") + .help("One or more comma-delimited base64-encoded ENR's or multiaddr strings of peers to initially add to the local routing table") + .takes_value(true), + ) + .arg( + Arg::with_name("enr-port") + .long("enr-port") + .value_name("PORT") + .help("The UDP port of the boot node's ENR. This is the port that external peers will dial to reach this boot node. Set this only if the external port differs from the listening port.") + .takes_value(true), + ) + .arg( + Arg::with_name("enable-enr-auto-update") + .short("x") + .long("enable-enr-auto-update") + .help("Discovery can automatically update the node's local ENR with an external IP address and port as seen by other peers on the network. \ + This enables this feature.") + ) +} diff --git a/boot_node/src/config.rs b/boot_node/src/config.rs new file mode 100644 index 000000000..89bb44d4a --- /dev/null +++ b/boot_node/src/config.rs @@ -0,0 +1,102 @@ +use clap::ArgMatches; +use discv5::{enr::CombinedKey, Enr}; +use std::convert::TryFrom; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; + +/// A set of configuration parameters for the bootnode, established from CLI arguments. +pub struct BootNodeConfig { + pub listen_socket: SocketAddr, + // TODO: Generalise to multiaddr + pub boot_nodes: Vec, + pub local_enr: Enr, + pub local_key: CombinedKey, + pub auto_update: bool, +} + +impl TryFrom<&ArgMatches<'_>> for BootNodeConfig { + type Error = String; + + fn try_from(matches: &ArgMatches<'_>) -> Result { + let listen_address = matches + .value_of("listen-address") + .expect("required parameter") + .parse::() + .map_err(|_| format!("Invalid listening address"))?; + + let listen_port = matches + .value_of("port") + .expect("required parameter") + .parse::() + .map_err(|_| format!("Invalid listening port"))?; + + let boot_nodes = { + if let Some(boot_nodes) = matches.value_of("boot-nodes") { + boot_nodes + .split(',') + .map(|enr| enr.parse().map_err(|_| format!("Invalid ENR: {}", enr))) + .collect::, _>>()? + } else { + Vec::new() + } + }; + + let enr_port = { + if let Some(port) = matches.value_of("boot-node-enr-port") { + port.parse::() + .map_err(|_| format!("Invalid ENR port"))? + } else { + listen_port + } + }; + + let enr_address = { + let address_string = matches + .value_of("boot-node-enr-address") + .expect("required parameter"); + resolve_address(address_string.into(), enr_port)? + }; + + let auto_update = matches.is_present("enable-enr_auto_update"); + + // the address to listen on + let listen_socket = SocketAddr::new(listen_address.into(), enr_port); + + // Generate a new key and build a new ENR + let local_key = CombinedKey::generate_secp256k1(); + let local_enr = discv5::enr::EnrBuilder::new("v4") + .ip(enr_address) + .udp(enr_port) + .build(&local_key) + .map_err(|e| format!("Failed to build ENR: {:?}", e))?; + + Ok(BootNodeConfig { + listen_socket, + boot_nodes, + local_enr, + local_key, + auto_update, + }) + } +} + +/// Resolves an IP/DNS string to an IpAddr. +fn resolve_address(address_string: String, port: u16) -> Result { + match address_string.parse::() { + Ok(addr) => Ok(addr), // valid IpAddr + Err(_) => { + let mut addr = address_string.clone(); + // Appending enr-port to the dns hostname to appease `to_socket_addrs()` parsing. + addr.push_str(&format!(":{}", port.to_string())); + // `to_socket_addr()` does the dns resolution + // Note: `to_socket_addrs()` is a blocking call + addr.to_socket_addrs() + .map(|mut resolved_addrs| + // Pick the first ip from the list of resolved addresses + resolved_addrs + .next() + .map(|a| a.ip()) + .ok_or_else(|| format!("Resolved dns addr contains no entries"))) + .map_err(|_| format!("Failed to parse enr-address: {}", address_string))? + } + } +} diff --git a/boot_node/src/lib.rs b/boot_node/src/lib.rs new file mode 100644 index 000000000..41d1baa80 --- /dev/null +++ b/boot_node/src/lib.rs @@ -0,0 +1,65 @@ +//! Creates a simple DISCV5 server which can be used to bootstrap an Eth2 network. +use clap::ArgMatches; +use slog; +use slog::{o, Drain, Level, Logger}; + +use std::convert::TryFrom; +mod cli; +mod config; +mod server; +pub use cli::cli_app; +use config::BootNodeConfig; + +/// Run the bootnode given the CLI configuration. +pub fn run(matches: &ArgMatches<'_>, debug_level: String) { + let debug_level = match debug_level.as_str() { + "trace" => log::Level::Trace, + "debug" => log::Level::Debug, + "info" => log::Level::Info, + "warn" => log::Level::Warn, + "error" => log::Level::Error, + "crit" => log::Level::Error, + _ => unreachable!(), + }; + + // Setting up the initial logger format and building it. + let drain = { + let decorator = slog_term::TermDecorator::new().build(); + let decorator = logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + slog_async::Async::new(drain).build() + }; + + let drain = match debug_level { + log::Level::Info => drain.filter_level(Level::Info), + log::Level::Debug => drain.filter_level(Level::Debug), + log::Level::Trace => drain.filter_level(Level::Trace), + log::Level::Warn => drain.filter_level(Level::Warning), + log::Level::Error => drain.filter_level(Level::Error), + }; + + let logger = Logger::root(drain.fuse(), o!()); + let _scope_guard = slog_scope::set_global_logger(logger); + let _log_guard = slog_stdlog::init_with_level(debug_level).unwrap(); + + // Run the main function emitting any errors + if let Err(e) = main(matches, slog_scope::logger()) { + slog::crit!(slog_scope::logger(), "{}", e); + } +} + +fn main(matches: &ArgMatches<'_>, log: slog::Logger) -> Result<(), String> { + // Builds a custom executor for the bootnode + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .map_err(|e| format!("Failed to build runtime: {}", e))?; + + // parse the CLI args into a useable config + let config = BootNodeConfig::try_from(matches)?; + + // Run the boot node + runtime.block_on(server::run(config, log)); + Ok(()) +} diff --git a/boot_node/src/server.rs b/boot_node/src/server.rs new file mode 100644 index 000000000..162b30888 --- /dev/null +++ b/boot_node/src/server.rs @@ -0,0 +1,89 @@ +//! The main bootnode server execution. + +use super::BootNodeConfig; +use discv5::{Discv5, Discv5ConfigBuilder, Discv5Event}; +use eth2_libp2p::EnrExt; +use futures::prelude::*; +use slog::info; + +pub async fn run(config: BootNodeConfig, log: slog::Logger) { + // Print out useful information about the generated ENR + + let enr_socket = config.local_enr.udp_socket().expect("Enr has a UDP socket"); + info!(log, "Configuration parameters"; "listening_address" => format!("{}:{}", config.listen_socket.ip(), config.listen_socket.port()), "broadcast_address" => format!("{}:{}",enr_socket.ip(), enr_socket.port())); + + info!(log, "Identity established"; "peer_id" => config.local_enr.peer_id().to_string(), "node_id" => config.local_enr.node_id().to_string()); + + // build the contactable multiaddr list, adding the p2p protocol + info!(log, "Contact information"; "enr" => config.local_enr.to_base64()); + info!(log, "Contact information"; "multiaddrs" => format!("{:?}", config.local_enr.multiaddr_p2p())); + + // Build the discv5 server + + // default configuration with packet filtering + + let discv5_config = { + let mut builder = Discv5ConfigBuilder::new(); + builder.enable_packet_filter(); + if !config.auto_update { + builder.disable_enr_update(); + } + builder.build() + }; + + // construct the discv5 server + let mut discv5 = Discv5::new(config.local_enr, config.local_key, discv5_config).unwrap(); + + // If there are any bootnodes add them to the routing table + for enr in config.boot_nodes { + info!(log, "Adding bootnode"; "address" => format!("{:?}", enr.udp_socket()), "peer_id" => enr.peer_id().to_string(), "node_id" => enr.node_id().to_string()); + if let Err(e) = discv5.add_enr(enr) { + slog::warn!(log, "Failed adding ENR"; "error" => e.to_string()); + } + } + + // start the server + discv5.start(config.listen_socket); + + // if there are peers in the local routing table, establish a session by running a query + if !discv5.table_entries_id().is_empty() { + info!(log, "Executing bootstrap query..."); + let _ = discv5.find_node(discv5::enr::NodeId::random()).await; + } + + // respond with metrics every 10 seconds + let mut metric_interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + + // get an event stream + let mut event_stream = match discv5.event_stream().await { + Ok(stream) => stream, + Err(e) => { + slog::crit!(log, "Failed to obtain event stream"; "error" => e.to_string()); + return; + } + }; + + // listen for events + loop { + tokio::select! { + _ = metric_interval.next() => { + // display server metrics + let metrics = discv5.metrics(); + info!(log, "Server metrics"; "connected_peers" => discv5.connected_peers(), "active_sessions" => metrics.active_sessions, "requests/s" => format!("{:.2}", metrics.unsolicited_requests_per_second)); + } + Some(event) = event_stream.recv() => { + match event { + Discv5Event::Discovered(_enr) => { + // An ENR has bee obtained by the server + // Ignore these events here + } + Discv5Event::EnrAdded { .. } => {} // Ignore + Discv5Event::NodeInserted { .. } => {} // Ignore + Discv5Event::SocketUpdated(socket_addr) => { + info!(log, "External socket address updated"; "socket_addr" => format!("{:?}", socket_addr)); + } + } + } + } + } +} diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index c6314bdb4..a0f59c54b 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -57,7 +57,8 @@ use prometheus::{HistogramOpts, HistogramTimer, Opts}; pub use prometheus::{ - Encoder, Gauge, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result, TextEncoder, + Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result, + TextEncoder, }; /// Collect all the metrics for reporting. @@ -127,6 +128,19 @@ pub fn try_create_int_gauge_vec( Ok(counter_vec) } +/// Attempts to crate a `GaugeVec`, returning `Err` if the registry does not accept the gauge +/// (potentially due to naming conflict). +pub fn try_create_float_gauge_vec( + name: &str, + help: &str, + label_names: &[&str], +) -> Result { + let opts = Opts::new(name, help); + let counter_vec = GaugeVec::new(opts, label_names)?; + prometheus::register(Box::new(counter_vec.clone()))?; + Ok(counter_vec) +} + pub fn get_int_gauge(int_gauge_vec: &Result, name: &[&str]) -> Option { if let Ok(int_gauge_vec) = int_gauge_vec { Some(int_gauge_vec.get_metric_with_label_values(name).ok()?) @@ -177,6 +191,12 @@ pub fn set_gauge(gauge: &Result, value: i64) { } } +pub fn set_float_gauge(gauge: &Result, value: f64) { + if let Ok(gauge) = gauge { + gauge.set(value); + } +} + pub fn inc_gauge(gauge: &Result) { if let Ok(gauge) = gauge { gauge.inc(); @@ -195,12 +215,6 @@ pub fn maybe_set_gauge(gauge: &Result, value_opt: Option) { } } -pub fn set_float_gauge(gauge: &Result, value: f64) { - if let Ok(gauge) = gauge { - gauge.set(value); - } -} - pub fn maybe_set_float_gauge(gauge: &Result, value_opt: Option) { if let Some(value) = value_opt { set_float_gauge(gauge, value) diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index a87cf88e7..8e64c279c 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -28,7 +28,7 @@ deposit_contract = { path = "../common/deposit_contract" } tree_hash = "0.1.0" tokio = { version = "0.2.21", features = ["full"] } clap_utils = { path = "../common/clap_utils" } -eth2-libp2p = { path = "../beacon_node/eth2-libp2p" } +eth2_libp2p = { path = "../beacon_node/eth2_libp2p" } validator_dir = { path = "../common/validator_dir", features = ["insecure_keys"] } rand = "0.7.2" eth2_keystore = { path = "../crypto/eth2_keystore" } diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index f4abcddb6..a3100d94b 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -19,6 +19,7 @@ logging = { path = "../common/logging" } slog-term = "2.5.0" slog-async = "2.5.0" environment = { path = "./environment" } +boot_node = { path = "../boot_node" } futures = "0.3.5" validator_client = { "path" = "../validator_client" } account_manager = { "path" = "../account_manager" } diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index 7d36f7f98..4510b9bba 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -21,3 +21,4 @@ slog-json = "2.3.0" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +discv5 = "0.1.0-alpha.5" diff --git a/lighthouse/environment/src/executor.rs b/lighthouse/environment/src/executor.rs index feb95eeb1..78c9de303 100644 --- a/lighthouse/environment/src/executor.rs +++ b/lighthouse/environment/src/executor.rs @@ -126,3 +126,9 @@ impl TaskExecutor { &self.log } } + +impl discv5::Executor for TaskExecutor { + fn spawn(&self, future: std::pin::Pin + Send>>) { + self.spawn(future, "discv5") + } +} diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 5d30ae957..ac7b32ffd 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -18,13 +18,11 @@ pub const CLIENT_CONFIG_FILENAME: &str = "beacon-node.toml"; pub const ETH2_CONFIG_FILENAME: &str = "eth2-spec.toml"; fn main() { - // Debugging output for libp2p and external crates. - Builder::from_env(Env::default()).init(); - // Parse the CLI parameters. let matches = App::new("Lighthouse") .version(crate_version!()) .author("Sigma Prime ") + .setting(clap::AppSettings::ColoredHelp) .about( "Ethereum 2.0 client by Sigma Prime. Provides a full-featured beacon \ node, a validator client and utilities for managing validator accounts.", @@ -40,6 +38,13 @@ fn main() { .global(true) .default_value("mainnet"), ) + .arg( + Arg::with_name("env_log") + .short("l") + .help("Enables environment logging giving access to sub-protocol logs such as discv5 and libp2p", + ) + .takes_value(false), + ) .arg( Arg::with_name("logfile") .long("logfile") @@ -64,6 +69,7 @@ fn main() { .help("The verbosity level for emitting logs.") .takes_value(true) .possible_values(&["info", "debug", "trace", "warn", "error", "crit"]) + .global(true) .default_value("info"), ) .arg( @@ -89,10 +95,27 @@ fn main() { .global(true), ) .subcommand(beacon_node::cli_app()) + .subcommand(boot_node::cli_app()) .subcommand(validator_client::cli_app()) .subcommand(account_manager::cli_app()) .get_matches(); + // boot node subcommand circumvents the environment + if let Some(bootnode_matches) = matches.subcommand_matches("boot_node") { + // The bootnode uses the main debug-level flag + let debug_info = matches + .value_of("debug-level") + .expect("Debug-level must be present") + .into(); + boot_node::run(bootnode_matches, debug_info); + return; + } + + // Debugging output for libp2p and external crates. + if matches.is_present("env_log") { + Builder::from_env(Env::default()).init(); + } + macro_rules! run_with_spec { ($env_builder: expr) => { run($env_builder, &matches)