From 113b40f32144db94f1a4e4a480775dbf627dea26 Mon Sep 17 00:00:00 2001 From: divma Date: Mon, 17 Aug 2020 02:13:26 +0000 Subject: [PATCH] Add multiaddr support in bootnodes (#1481) ## Issue Addressed #1384 Only catch, as currently implemented, when dialing the multiaddr nodes, there is no way to ask the peer manager if they are already connected or dialing --- beacon_node/client/src/builder.rs | 5 +- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 7 +- beacon_node/eth2_libp2p/src/config.rs | 8 ++- beacon_node/eth2_libp2p/src/discovery/mod.rs | 69 ++++++++++++++++--- .../eth2_libp2p/src/peer_manager/mod.rs | 12 ++-- beacon_node/eth2_libp2p/src/service.rs | 19 +++-- beacon_node/eth2_libp2p/tests/common/mod.rs | 28 ++++---- beacon_node/network/src/service.rs | 8 +-- beacon_node/network/src/service/tests.rs | 6 +- beacon_node/src/cli.rs | 4 +- beacon_node/src/config.rs | 30 ++++++-- beacon_node/src/lib.rs | 3 +- testing/simulator/src/local_network.rs | 2 +- 13 files changed, 145 insertions(+), 56 deletions(-) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 12645b8a5..94d67b7c3 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -232,8 +232,8 @@ where Ok(self) } - /// Immediately starts the networking stack. - pub fn network(mut self, config: &NetworkConfig) -> Result { + /// Starts the networking stack. + pub async fn network(mut self, config: &NetworkConfig) -> Result { let beacon_chain = self .beacon_chain .clone() @@ -246,6 +246,7 @@ where let (network_globals, network_send) = NetworkService::start(beacon_chain, config, context.executor) + .await .map_err(|e| format!("Failed to start network: {:?}", e))?; self.network_globals = Some(network_globals); diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index a76c0aa9d..2cf8188e9 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -67,7 +67,7 @@ pub struct Behaviour { /// Implements the combined behaviour for the libp2p service. impl Behaviour { - pub fn new( + pub async fn new( local_key: &Keypair, net_conf: &NetworkConfig, network_globals: Arc>, @@ -106,7 +106,8 @@ impl Behaviour { net_conf.gs_config.clone(), ), identify, - peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?, + peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log) + .await?, events: VecDeque::new(), peers_to_dc: VecDeque::new(), meta_data, @@ -123,7 +124,7 @@ impl Behaviour { /// /// All external dials, dial a multiaddr. This is currently unused but kept here in case any /// part of lighthouse needs to connect to a peer_id in the future. - pub fn _dial(&mut self, peer_id: &PeerId) { + pub fn dial(&mut self, peer_id: &PeerId) { self.peer_manager.dial_peer(peer_id); } diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index fa4ec9792..408a1c5c6 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -50,7 +50,10 @@ pub struct Config { pub discv5_config: Discv5Config, /// List of nodes to initially connect to. - pub boot_nodes: Vec, + pub boot_nodes_enr: Vec, + + /// List of nodes to initially connect to, on Multiaddr format. + pub boot_nodes_multiaddr: Vec, /// List of libp2p nodes to initially connect to. pub libp2p_nodes: Vec, @@ -136,7 +139,8 @@ impl Default for Config { target_peers: 50, gs_config, discv5_config, - boot_nodes: vec![], + boot_nodes_enr: vec![], + boot_nodes_multiaddr: vec![], libp2p_nodes: vec![], client_version: lighthouse_version::version_with_platform(), disable_discovery: false, diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index ccc18382a..7982e9351 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -15,7 +15,7 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; use libp2p::core::PeerId; use lru::LruCache; -use slog::{crit, debug, info, warn}; +use slog::{crit, debug, error, info, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; use std::{ @@ -163,7 +163,7 @@ pub struct Discovery { impl Discovery { /// NOTE: Creating discovery requires running within a tokio execution environment. - pub fn new( + pub async fn new( local_key: &Keypair, config: &NetworkConfig, network_globals: Arc>, @@ -189,21 +189,23 @@ impl Discovery { .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; // Add bootnodes to routing table - for bootnode_enr in config.boot_nodes.clone() { + for bootnode_enr in config.boot_nodes_enr.clone() { debug!( log, "Adding node to routing table"; - "node_id" => format!("{}", bootnode_enr.node_id()), - "peer_id" => format!("{}", bootnode_enr.peer_id()), + "node_id" => bootnode_enr.node_id().to_string(), + "peer_id" => bootnode_enr.peer_id().to_string(), "ip" => format!("{:?}", bootnode_enr.ip()), "udp" => format!("{:?}", bootnode_enr.udp()), "tcp" => format!("{:?}", bootnode_enr.tcp()) ); + let repr = bootnode_enr.to_string(); let _ = discv5.add_enr(bootnode_enr).map_err(|e| { - debug!( + error!( log, "Could not add peer to the local routing table"; - "error" => e.to_string() + "addr" => repr, + "error" => e.to_string(), ) }); } @@ -217,7 +219,54 @@ impl Discovery { EventStream::InActive }; - // Obtain the event stream + if !config.boot_nodes_multiaddr.is_empty() { + info!(log, "Contacting Multiaddr boot-nodes for their ENR"); + } + + // get futures for requesting the Enrs associated to these multiaddr and wait for their + // completion + let mut fut_coll = config + .boot_nodes_multiaddr + .iter() + .map(|addr| addr.to_string()) + // request the ENR for this multiaddr and keep the original for logging + .map(|addr| { + futures::future::join( + discv5.request_enr(addr.clone()), + futures::future::ready(addr), + ) + }) + .collect::>(); + + while let Some((result, original_addr)) = fut_coll.next().await { + match result { + Ok(Some(enr)) => { + debug!( + log, + "Adding node to routing table"; + "node_id" => enr.node_id().to_string(), + "peer_id" => enr.peer_id().to_string(), + "ip" => format!("{:?}", enr.ip()), + "udp" => format!("{:?}", enr.udp()), + "tcp" => format!("{:?}", enr.tcp()) + ); + let _ = discv5.add_enr(enr).map_err(|e| { + error!( + log, + "Could not add peer to the local routing table"; + "addr" => original_addr.to_string(), + "error" => e.to_string(), + ) + }); + } + Ok(None) => { + error!(log, "No ENR found for MultiAddr"; "addr" => original_addr.to_string()) + } + Err(e) => { + error!(log, "Error getting mapping to ENR"; "multiaddr" => original_addr.to_string(), "error" => e.to_string()) + } + } + } Ok(Self { cached_enrs: LruCache::new(50), @@ -733,8 +782,8 @@ impl Discovery { if enr.eth2() == self.local_enr().eth2() { trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); } else { - // this is temporary warning for debugging the DHT - warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + // this is temporary warning for debugging the DHT + warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); } */ } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index d8cdfb4b2..a23994a25 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -88,14 +88,14 @@ pub enum PeerManagerEvent { impl PeerManager { // NOTE: Must be run inside a tokio executor. - pub fn new( + pub async fn new( local_key: &Keypair, config: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, ) -> error::Result { // start the discovery service - let mut discovery = Discovery::new(local_key, config, network_globals.clone(), log)?; + let mut discovery = Discovery::new(local_key, config, network_globals.clone(), log).await?; // start searching for peers discovery.discover_peers(); @@ -539,11 +539,8 @@ impl PeerManager { /// /// This is called by `connect_ingoing` and `connect_outgoing`. /// - /// This informs if the peer was accepted in to the db or not. + /// Informs if the peer was accepted in to the db or not. fn connect_peer(&mut self, peer_id: &PeerId, connection: ConnectingType) -> bool { - // TODO: remove after timed updates - //self.update_reputations(); - { let mut peerdb = self.network_globals.peers.write(); if peerdb.connection_status(peer_id).map(|c| c.is_banned()) == Some(true) { @@ -690,7 +687,8 @@ impl PeerManager { /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { // TODO: Provide a back-off time for discovery queries. I.e Queue many initially, then only - // perform discoveries over a larger fixed interval. Perhaps one every 6 heartbeats + // perform discoveries over a larger fixed interval. Perhaps one every 6 heartbeats. This + // is achievable with a leaky bucket let peer_count = self.network_globals.connected_or_dialing_peers(); if peer_count < self.target_peers { // If we need more peers, queue a discovery lookup. diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index cee0f45d5..32081a88d 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -51,7 +51,7 @@ pub struct Service { } impl Service { - pub fn new( + pub async fn new( executor: environment::TaskExecutor, config: &NetworkConfig, enr_fork_id: EnrForkId, @@ -76,7 +76,7 @@ impl Service { &log, )); - info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", enr.peer_id())); + info!(log, "Libp2p Service"; "peer_id" => enr.peer_id().to_string()); let discovery_string = if config.disable_discovery { "None".into() } else { @@ -89,7 +89,8 @@ impl Service { let transport = build_transport(local_keypair.clone()) .map_err(|e| format!("Failed to build transport: {:?}", e))?; // Lighthouse network behaviour - let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; + let behaviour = + Behaviour::new(&local_keypair, config, network_globals.clone(), &log).await?; // use the executor for libp2p struct Executor(environment::TaskExecutor); @@ -151,7 +152,7 @@ impl Service { } // attempt to connect to any specified boot-nodes - let mut boot_nodes = config.boot_nodes.clone(); + let mut boot_nodes = config.boot_nodes_enr.clone(); boot_nodes.dedup(); for bootnode_enr in boot_nodes { @@ -172,6 +173,16 @@ impl Service { } } + for multiaddr in &config.boot_nodes_multiaddr { + // check TCP support for dialing + if multiaddr + .iter() + .any(|proto| matches!(proto, Protocol::Tcp(_))) + { + dial_addr(multiaddr.clone()); + } + } + let mut subscribed_topics: Vec = vec![]; for topic_kind in &config.topics { if swarm.subscribe_kind(topic_kind.clone()) { diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index 50ff284c2..d76c0aa92 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -80,7 +80,7 @@ pub fn build_config(port: u16, mut boot_nodes: Vec) -> NetworkConfig { config.enr_tcp_port = Some(port); config.enr_udp_port = Some(port); config.enr_address = Some("127.0.0.1".parse().unwrap()); - config.boot_nodes.append(&mut boot_nodes); + config.boot_nodes_enr.append(&mut boot_nodes); config.network_dir = path.into_path(); // Reduce gossipsub heartbeat parameters config.gs_config.heartbeat_initial_delay = Duration::from_millis(500); @@ -88,7 +88,7 @@ pub fn build_config(port: u16, mut boot_nodes: Vec) -> NetworkConfig { config } -pub fn build_libp2p_instance(boot_nodes: Vec, log: slog::Logger) -> Libp2pInstance { +pub async fn build_libp2p_instance(boot_nodes: Vec, log: slog::Logger) -> Libp2pInstance { let port = unused_port("tcp").unwrap(); let config = build_config(port, boot_nodes); // launch libp2p service @@ -98,6 +98,7 @@ pub fn build_libp2p_instance(boot_nodes: Vec, log: slog::Logger) -> Libp2pI environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); Libp2pInstance( LibP2PService::new(executor, &config, EnrForkId::default(), &log) + .await .expect("should build libp2p instance") .1, signal, @@ -112,10 +113,11 @@ pub fn get_enr(node: &LibP2PService) -> Enr { // Returns `n` libp2p peers in fully connected topology. #[allow(dead_code)] -pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec { - let mut nodes: Vec<_> = (0..n) - .map(|_| build_libp2p_instance(vec![], log.clone())) - .collect(); +pub async fn build_full_mesh(log: slog::Logger, n: usize) -> Vec { + let mut nodes = Vec::with_capacity(n); + for _ in 0..n { + nodes.push(build_libp2p_instance(vec![], log.clone()).await); + } let multiaddrs: Vec = nodes .iter() .map(|x| get_enr(&x).multiaddr()[1].clone()) @@ -141,8 +143,8 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = build_libp2p_instance(vec![], sender_log); - let mut receiver = build_libp2p_instance(vec![], receiver_log); + let mut sender = build_libp2p_instance(vec![], sender_log).await; + let mut receiver = build_libp2p_instance(vec![], receiver_log).await; let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone(); @@ -181,10 +183,12 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta // Returns `n` peers in a linear topology #[allow(dead_code)] -pub fn build_linear(log: slog::Logger, n: usize) -> Vec { - let mut nodes: Vec<_> = (0..n) - .map(|_| build_libp2p_instance(vec![], log.clone())) - .collect(); +pub async fn build_linear(log: slog::Logger, n: usize) -> Vec { + let mut nodes = Vec::with_capacity(n); + for _ in 0..n { + nodes.push(build_libp2p_instance(vec![], log.clone()).await); + } + let multiaddrs: Vec = nodes .iter() .map(|x| get_enr(&x).multiaddr()[1].clone()) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7409a89de..f85953e1b 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -95,7 +95,7 @@ pub struct NetworkService { impl NetworkService { #[allow(clippy::type_complexity)] - pub fn start( + pub async fn start( beacon_chain: Arc>, config: &NetworkConfig, executor: environment::TaskExecutor, @@ -117,7 +117,7 @@ impl NetworkService { // launch libp2p service let (network_globals, mut libp2p) = - LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log)?; + LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log).await?; // Repopulate the DHT with stored ENR's. let enrs_to_load = load_dht::(store.clone()); @@ -126,7 +126,7 @@ impl NetworkService { "Loading peers into the routing table"; "peers" => enrs_to_load.len() ); for enr in enrs_to_load { - libp2p.swarm.add_enr(enr.clone()); + libp2p.swarm.add_enr(enr.clone()); //TODO change? } // launch derived network services @@ -145,7 +145,7 @@ impl NetworkService { AttestationService::new(beacon_chain.clone(), network_globals.clone(), &network_log); // create the network service and spawn the task - let network_log = network_log.new(o!("service"=> "network")); + let network_log = network_log.new(o!("service" => "network")); let network_service = NetworkService { beacon_chain, libp2p, diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index b27933924..af8e1ddde 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -45,12 +45,14 @@ mod tests { let mut config = NetworkConfig::default(); config.libp2p_port = 21212; config.discovery_port = 21212; - config.boot_nodes = enrs.clone(); + config.boot_nodes_enr = enrs.clone(); runtime.spawn(async move { // Create a new network service which implicitly gets dropped at the // end of the block. - let _ = NetworkService::start(beacon_chain.clone(), &config, executor).unwrap(); + let _ = NetworkService::start(beacon_chain.clone(), &config, executor) + .await + .unwrap(); drop(signal); }); runtime.shutdown_timeout(tokio::time::Duration::from_millis(300)); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 0bd1e4da2..152f84c44 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -71,8 +71,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("boot-nodes") .long("boot-nodes") .allow_hyphen_values(true) - .value_name("ENR-LIST") - .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") + .value_name("ENR/MULTIADDR LIST") + .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network. Multiaddr is also supported.") .takes_value(true), ) .arg( diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index c917d572f..bef56e36e 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -2,7 +2,7 @@ use beacon_chain::builder::PUBKEY_CACHE_FILENAME; use clap::ArgMatches; use clap_utils::BAD_TESTNET_DIR_MESSAGE; use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis}; -use eth2_libp2p::{Enr, Multiaddr}; +use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr}; use eth2_testnet_config::Eth2TestnetConfig; use slog::{crit, info, Logger}; use ssz::Encode; @@ -111,10 +111,28 @@ pub fn get_config( } if let Some(boot_enr_str) = cli_args.value_of("boot-nodes") { - client_config.network.boot_nodes = boot_enr_str - .split(',') - .map(|enr| enr.parse().map_err(|_| format!("Invalid ENR: {}", enr))) - .collect::, _>>()?; + let mut enrs: Vec = vec![]; + let mut multiaddrs: Vec = vec![]; + for addr in boot_enr_str.split(',') { + match addr.parse() { + Ok(enr) => enrs.push(enr), + Err(_) => { + // parsing as ENR failed, try as Multiaddr + let multi: Multiaddr = addr + .parse() + .map_err(|_| format!("Not valid as ENR nor Multiaddr: {}", addr))?; + if !multi.iter().any(|proto| matches!(proto, Protocol::Udp(_))) { + slog::error!(log, "Missing UDP in Multiaddr {}", multi.to_string()); + } + if !multi.iter().any(|proto| matches!(proto, Protocol::P2p(_))) { + slog::error!(log, "Missing P2P in Multiaddr {}", multi.to_string()); + } + multiaddrs.push(multi); + } + } + } + client_config.network.boot_nodes_enr = enrs; + client_config.network.boot_nodes_multiaddr = multiaddrs; } if let Some(libp2p_addresses_str) = cli_args.value_of("libp2p-addresses") { @@ -337,7 +355,7 @@ pub fn get_config( client_config.eth1.follow_distance = spec.eth1_follow_distance; if let Some(mut boot_nodes) = eth2_testnet_config.boot_enr { - client_config.network.boot_nodes.append(&mut boot_nodes) + client_config.network.boot_nodes_enr.append(&mut boot_nodes) } if let Some(genesis_state) = eth2_testnet_config.genesis_state { diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 02108c13a..c3bce447a 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -126,7 +126,8 @@ impl ProductionBeaconNode { let builder = builder .build_beacon_chain()? - .network(&client_config.network)? + .network(&client_config.network) + .await? .notifier()?; let builder = if client_config.rest_api.enabled { diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index b2421bcef..37ce3ab56 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -86,7 +86,7 @@ impl LocalNetwork { let boot_node = read_lock.first().expect("should have at least one node"); - beacon_config.network.boot_nodes.push( + beacon_config.network.boot_nodes_enr.push( boot_node .client .enr()