From 44c9058477792d1fc15e6e0d68b44464d4a55d81 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 25 Jun 2019 14:51:45 +1000 Subject: [PATCH] Integrate discv5 into lighthouse --- beacon_node/client/Cargo.toml | 2 +- beacon_node/client/src/client_config.rs | 71 ------- beacon_node/client/src/lib.rs | 4 +- beacon_node/eth2-libp2p/Cargo.toml | 3 +- beacon_node/eth2-libp2p/src/behaviour.rs | 116 ++++------- beacon_node/eth2-libp2p/src/config.rs | 143 ++++++------- beacon_node/eth2-libp2p/src/discovery.rs | 246 ++++++++++++++++------- beacon_node/eth2-libp2p/src/lib.rs | 4 +- beacon_node/eth2-libp2p/src/service.rs | 46 +---- beacon_node/network/src/lib.rs | 2 +- beacon_node/src/main.rs | 24 ++- beacon_node/src/run.rs | 2 +- 12 files changed, 312 insertions(+), 351 deletions(-) delete mode 100644 beacon_node/client/src/client_config.rs diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 7c5a67b89..94a529ea7 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -16,7 +16,7 @@ types = { path = "../../eth2/types" } tree_hash = { path = "../../eth2/utils/tree_hash" } eth2_config = { path = "../../eth2/utils/eth2_config" } slot_clock = { path = "../../eth2/utils/slot_clock" } -serde = "1.0" +serde = "1.0.93" serde_derive = "1.0" error-chain = "0.12.0" eth2_ssz = { path = "../../eth2/utils/ssz" } diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/client_config.rs deleted file mode 100644 index f2f356daf..000000000 --- a/beacon_node/client/src/client_config.rs +++ /dev/null @@ -1,71 +0,0 @@ -use clap::ArgMatches; -use eth2_libp2p::multiaddr::Protocol; -use eth2_libp2p::Multiaddr; -use fork_choice::ForkChoiceAlgorithm; -use http_server::HttpServerConfig; -use network::{ChainType, NetworkConfig}; -use serde_derive::{Deserialize, Serialize}; -use slog::{error, o, Drain, Level}; -use std::fs; -use std::path::PathBuf; - -/// The core configuration of a Lighthouse beacon node. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ClientConfig { - pub data_dir: PathBuf, - pub db_type: String, - db_name: String, - pub network: network::NetworkConfig, - pub rpc: rpc::RPCConfig, - pub http: HttpServerConfig, -} - -impl Default for ClientConfig { - fn default() -> Self { - Self { - data_dir: PathBuf::from(".lighthouse"), - db_type: "disk".to_string(), - db_name: "chain_db".to_string(), - // Note: there are no default bootnodes specified. - // Once bootnodes are established, add them here. - network: NetworkConfig::new(), - rpc: rpc::RPCConfig::default(), - http: HttpServerConfig::default(), - } - } -} - -impl ClientConfig { - /// Returns the path to which the client may initialize an on-disk database. - pub fn db_path(&self) -> Option { - self.data_dir() - .and_then(|path| Some(path.join(&self.db_name))) - } - - /// Returns the core path for the client. - pub fn data_dir(&self) -> Option { - let path = dirs::home_dir()?.join(&self.data_dir); - fs::create_dir_all(&path).ok()?; - Some(path) - } - - /// Apply the following arguments to `self`, replacing values if they are specified in `args`. - /// - /// Returns an error if arguments are obviously invalid. May succeed even if some values are - /// invalid. - pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> { - if let Some(dir) = args.value_of("datadir") { - self.data_dir = PathBuf::from(dir); - }; - - if let Some(dir) = args.value_of("db") { - self.db_type = dir.to_string(); - } - - self.network.apply_cli_args(args)?; - self.rpc.apply_cli_args(args)?; - self.http.apply_cli_args(args)?; - - Ok(()) - } -} diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 18ddef7bb..7eee8ac0a 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,7 +1,7 @@ extern crate slog; mod beacon_chain_types; -mod client_config; +mod config; pub mod error; pub mod notifier; @@ -21,7 +21,7 @@ use tokio::timer::Interval; pub use beacon_chain::BeaconChainTypes; pub use beacon_chain_types::ClientType; pub use beacon_chain_types::InitialiseBeaconChain; -pub use client_config::ClientConfig; +pub use config::Config as ClientConfig; pub use eth2_config::Eth2Config; /// Main beacon node client service. This provides the connection and initialisation of the clients diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 6fd141028..fd7162767 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -8,7 +8,8 @@ edition = "2018" beacon_chain = { path = "../beacon_chain" } clap = "2.32.0" # SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "71744d4090ebd93a993d1b390787919add4098fd" } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "f018f5c443ed5a93de890048dbc6755393373e72" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "f018f5c443ed5a93de890048dbc6755393373e72", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index c711a2134..4e4cf24f3 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,26 +1,23 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, Rpc}; -use crate::NetworkConfig; +use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; use futures::prelude::*; use libp2p::{ core::{ + identity::Keypair, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - PublicKey, }, + discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent}, - identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, - kad::KademliaOut, ping::{Ping, PingConfig, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, trace, warn}; +use slog::{o, trace, warn}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; -use std::collections::HashMap; use std::num::NonZeroU32; -use std::time::{Duration, Instant}; -use tokio_timer::Delay; +use std::time::Duration; use types::{Attestation, BeaconBlock}; /// Builds the network behaviour that manages the core protocols of eth2. @@ -33,8 +30,6 @@ pub struct Behaviour { gossipsub: Gossipsub, /// The serenity RPC specified in the wire-0 protocol. serenity_rpc: Rpc, - /// Allows discovery of IP addresses for peers on the network. - identify: Identify, /// Keep regular connection to peers and disconnect if absent. ping: Ping, /// Kademlia for peer discovery. @@ -47,6 +42,31 @@ pub struct Behaviour { log: slog::Logger, } +impl Behaviour { + pub fn new( + local_key: &Keypair, + net_conf: &NetworkConfig, + log: &slog::Logger, + ) -> error::Result { + let local_peer_id = local_key.public().clone().into_peer_id(); + let behaviour_log = log.new(o!()); + let ping_config = PingConfig::new() + .with_timeout(Duration::from_secs(30)) + .with_interval(Duration::from_secs(20)) + .with_max_failures(NonZeroU32::new(2).expect("2 != 0")) + .with_keep_alive(false); + + Ok(Behaviour { + serenity_rpc: Rpc::new(log), + gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), + discovery: Discovery::new(local_key, net_conf, log)?, + ping: Ping::new(ping_config), + events: Vec::new(), + log: behaviour_log, + }) + } +} + // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour impl NetworkBehaviourEventProcess for Behaviour @@ -96,38 +116,6 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour -{ - fn inject_event(&mut self, event: IdentifyEvent) { - match event { - IdentifyEvent::Identified { - peer_id, mut info, .. - } => { - if info.listen_addrs.len() > 20 { - debug!( - self.log, - "More than 20 peers have been identified, truncating" - ); - info.listen_addrs.truncate(20); - } - trace!(self.log, "Found addresses"; "Peer Id" => format!("{:?}", peer_id), "Addresses" => format!("{:?}", info.listen_addrs)); - // inject the found addresses into our discovery behaviour - - for address in &info.listen_addrs { - self.discovery - .add_connected_address(&peer_id, address.clone()); - } - - self.events - .push(BehaviourEvent::Identified(peer_id, Box::new(info))); - } - IdentifyEvent::Error { .. } => {} - IdentifyEvent::SendBack { .. } => {} - } - } -} - impl NetworkBehaviourEventProcess for Behaviour { @@ -136,41 +124,7 @@ impl NetworkBehaviourEventProcess } } -// implement the discovery behaviour (currently kademlia) -impl NetworkBehaviourEventProcess - for Behaviour -{ - fn inject_event(&mut self, _out: KademliaOut) { - // not interested in kademlia results at the moment - } -} - impl Behaviour { - pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { - let local_peer_id = local_public_key.clone().into_peer_id(); - let behaviour_log = log.new(o!()); - let identify_config = net_conf.identify_config.clone(); - let ping_config = PingConfig::new() - .with_timeout(Duration::from_secs(30)) - .with_interval(Duration::from_secs(20)) - .with_max_failures(NonZeroU32::new(2).expect("2 != 0")) - .with_keep_alive(false); - - Behaviour { - serenity_rpc: Rpc::new(log), - gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), - discovery: Discovery::new(local_peer_id, log), - identify: Identify::new( - identify_config.version, - identify_config.user_agent, - local_public_key, - ), - ping: Ping::new(ping_config), - events: Vec::new(), - log: behaviour_log, - } - } - /// Consumes the events list when polled. fn poll( &mut self, @@ -183,6 +137,14 @@ impl Behaviour { } } +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, _event: Discv5Event) { + // discv5 has no events to inject + } +} + /// Implements the combined behaviour for the libp2p service. impl Behaviour { /* Pubsub behaviour functions */ @@ -212,8 +174,6 @@ impl Behaviour { pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), - Identified(PeerId, Box), - // TODO: This is a stub at the moment GossipMessage { source: PeerId, topics: Vec, diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index baa1bf47f..00a8ed51e 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,29 +1,44 @@ use clap::ArgMatches; -use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; -use libp2p::multiaddr::{Error as MultiaddrError, Multiaddr}; +use enr::Enr; +use libp2p::{ + gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, + multiaddr::Multiaddr, +}; use serde_derive::{Deserialize, Serialize}; use std::time::Duration; /// The beacon node topic string to subscribe to. -pub const BEACON_PUBSUB_TOPIC: &str = "beacon_node"; -pub const SHARD_TOPIC_PREFIX: &str = "attestations"; // single topic for all attestation for the moment. +pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block"; +pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation"; +//TODO: Implement shard subnets +pub const SHARD_TOPIC_PREFIX: &str = "shard"; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. pub struct Config { /// IP address to listen on. - listen_addresses: Vec, + pub listen_addresses: Vec, + + /// Specifies the IP address that the discovery protocol will listen on. + pub discovery_address: std::net::IpAddr, + + /// UDP port that discovery listens on. + pub discovery_port: u16, + + /// Target number of connected peers. + pub max_peers: usize, + /// Gossipsub configuration parameters. #[serde(skip)] pub gs_config: GossipsubConfig, - /// Configuration parameters for node identification protocol. - #[serde(skip)] - pub identify_config: IdentifyConfig, + /// List of nodes to initially connect to. - boot_nodes: Vec, + pub boot_nodes: Vec, + /// Client version pub client_version: String, + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, } @@ -32,13 +47,16 @@ impl Default for Config { /// Generate a default network configuration. fn default() -> Self { Config { - listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()], + listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse().expect("vaild multiaddr")], + discovery_address: "0.0.0.0".parse().expect("valid ip address"), + discovery_port: 9000, + max_peers: 10, + //TODO: Set realistic values for production gs_config: GossipsubConfigBuilder::new() .max_gossip_size(4_000_000) .inactivity_timeout(Duration::from_secs(90)) .heartbeat_interval(Duration::from_secs(20)) .build(), - identify_config: IdentifyConfig::default(), boot_nodes: vec![], client_version: version::version(), topics: Vec::new(), @@ -52,83 +70,42 @@ impl Config { Config::default() } - pub fn listen_addresses(&self) -> Result, MultiaddrError> { - self.listen_addresses.iter().map(|s| s.parse()).collect() - } - - pub fn boot_nodes(&self) -> Result, MultiaddrError> { - self.boot_nodes.iter().map(|s| s.parse()).collect() - } - - pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> { + pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), String> { if let Some(listen_address_str) = args.value_of("listen-address") { - let listen_addresses = listen_address_str.split(',').map(Into::into).collect(); - self.listen_addresses = listen_addresses; + self.listen_addresses = listen_address_str + .split(',') + .map(|a| { + a.parse::() + .map_err(|_| format!("Invalid Listen address: {:?}", a)) + }) + .collect::, _>>()?; } - if let Some(boot_addresses_str) = args.value_of("boot-nodes") { - let boot_addresses = boot_addresses_str.split(',').map(Into::into).collect(); - self.boot_nodes = boot_addresses; + if let Some(max_peers_str) = args.value_of("maxpeers") { + self.max_peers = max_peers_str + .parse::() + .map_err(|_| format!("Invalid number of max peers: {}", max_peers_str))?; + } + + if let Some(discovery_address_str) = args.value_of("disc-listen-address") { + self.discovery_address = discovery_address_str + .parse::() + .map_err(|_| format!("Invalid discovery address: {:?}", discovery_address_str))?; + } + + if let Some(boot_enr_str) = args.value_of("boot-nodes") { + self.boot_nodes = boot_enr_str + .split(',') + .map(|enr| enr.parse().map_err(|_| format!("Invalid ENR: {}", enr))) + .collect::, _>>()?; + } + + if let Some(disc_port_str) = args.value_of("disc-port") { + self.discovery_port = disc_port_str + .parse::() + .map_err(|_| format!("Invalid discovery port: {}", disc_port_str))?; } Ok(()) } } - -/// The configuration parameters for the Identify protocol -#[derive(Debug, Clone)] -pub struct IdentifyConfig { - /// The protocol version to listen on. - pub version: String, - /// The client's name and version for identification. - pub user_agent: String, -} - -impl Default for IdentifyConfig { - fn default() -> Self { - Self { - version: "/eth/serenity/1.0".to_string(), - user_agent: version::version(), - } - } -} - -/// Creates a standard network config from a chain_id. -/// -/// This creates specified network parameters for each chain type. -impl From for Config { - fn from(chain_type: ChainType) -> Self { - match chain_type { - ChainType::Foundation => Config::default(), - - ChainType::LighthouseTestnet => { - let boot_nodes = vec!["/ip4/127.0.0.1/tcp/9000" - .parse() - .expect("correct multiaddr")]; - Self { - boot_nodes, - ..Config::default() - } - } - - ChainType::Other => Config::default(), - } - } -} - -pub enum ChainType { - Foundation, - LighthouseTestnet, - Other, -} - -/// Maps a chain id to a ChainType. -impl From for ChainType { - fn from(chain_id: u8) -> Self { - match chain_id { - 1 => ChainType::Foundation, - 2 => ChainType::LighthouseTestnet, - _ => ChainType::Other, - } - } -} diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index c3a02b16f..9a1e75691 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -1,3 +1,4 @@ +use crate::{error, NetworkConfig}; /// This manages the discovery and management of peers. /// /// Currently using Kademlia for peer discovery. @@ -6,66 +7,154 @@ use futures::prelude::*; use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; -use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler}; -use libp2p::kad::{Kademlia, KademliaOut}; -use slog::{debug, o, warn}; +use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::discv5::{Discv5, Discv5Event}; +use libp2p::enr::{Enr, EnrBuilder, NodeId}; +use libp2p::multiaddr::Protocol; +use slog::{debug, error, info, o, warn}; +use std::collections::HashSet; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; -//TODO: Make this dynamic -const TIME_BETWEEN_KAD_REQUESTS: Duration = Duration::from_secs(30); +/// Maximum seconds before searching for extra peers. +const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 60; -/// Maintains a list of discovered peers and implements the discovery protocol to discover new -/// peers. +/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 +/// libp2p protocol. pub struct Discovery { - /// Queue of events to processed. - // TODO: Re-implement as discovery protocol grows - // events: Vec>, - /// The discovery behaviour used to discover new peers. - discovery: Kademlia, + /// The peers currently connected to libp2p streams. + connected_peers: HashSet, + + /// The target number of connected peers on the libp2p interface. + max_peers: usize, + /// The delay between peer discovery searches. peer_discovery_delay: Delay, + + /// Tracks the last discovery delay. The delay is doubled each round until the max + /// time is reached. + past_discovery_delay: u64, + + /// The TCP port for libp2p. Used to convert an updated IP address to a multiaddr. Note: This + /// assumes that the external TCP port is the same as the internal TCP port if behind a NAT. + //TODO: Improve NAT handling limit the above restriction + tcp_port: u16, + + /// The discovery behaviour used to discover new peers. + discovery: Discv5, + /// Logger for the discovery behaviour. log: slog::Logger, } impl Discovery { - pub fn new(local_peer_id: PeerId, log: &slog::Logger) -> Self { + pub fn new( + local_key: &Keypair, + net_conf: &NetworkConfig, + log: &slog::Logger, + ) -> error::Result { let log = log.new(o!("Service" => "Libp2p-Discovery")); - Self { - discovery: Kademlia::new(local_peer_id), - peer_discovery_delay: Delay::new(Instant::now()), - log, + + // Build the local ENR. + // The first TCP listening address is used for the ENR record. This will inform our peers to + // connect to this TCP port and establish libp2p streams. + // Note: Discovery should update the ENR record's IP to the external IP as seen by the + // majority of our peers. + let tcp_multiaddr = net_conf + .listen_addresses + .iter() + .filter(|a| { + if let Some(Protocol::Tcp(_)) = a.iter().last() { + true + } else { + false + } + }) + .next() + .ok_or_else(|| "No valid TCP addresses")?; + + let ip: std::net::IpAddr = match tcp_multiaddr.iter().next() { + Some(Protocol::Ip4(ip)) => ip.into(), + Some(Protocol::Ip6(ip)) => ip.into(), + _ => { + error!(log, "Multiaddr has an invalid IP address"); + return Err(format!("Invalid IP Address: {}", tcp_multiaddr).into()); + } + }; + + let tcp_port = match tcp_multiaddr.iter().last() { + Some(Protocol::Tcp(tcp)) => tcp, + _ => unreachable!(), + }; + + let local_enr = EnrBuilder::new() + .ip(ip.into()) + .tcp(tcp_port) + .udp(net_conf.discovery_port) + .build(&local_key) + .map_err(|e| format!("Could not build Local ENR: {:?}", e))?; + info!(log, "Local ENR: {}", local_enr.to_base64()); + + let mut discovery = Discv5::new(local_enr, local_key.clone(), net_conf.discovery_address) + .map_err(|e| format!("Discv5 service failed: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in net_conf.boot_nodes.clone() { + discovery.add_enr(bootnode_enr); } + + Ok(Self { + connected_peers: HashSet::new(), + max_peers: net_conf.max_peers, + peer_discovery_delay: Delay::new(Instant::now()), + past_discovery_delay: 1, + tcp_port, + discovery, + log, + }) } - /// Uses discovery to search for new peers. - pub fn find_peers(&mut self) { - // pick a random PeerId - let random_peer = PeerId::random(); + /// Manually search for peers. This restarts the discovery round, sparking multiple rapid + /// queries. + pub fn discover_peers(&mut self) { + self.past_discovery_delay = 1; + self.find_peers(); + } + + /// Add an Enr to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + self.discovery.add_enr(enr); + } + + /// Search for new peers using the underlying discovery mechanism. + fn find_peers(&mut self) { + // pick a random NodeId + let random_node = NodeId::random(); debug!(self.log, "Searching for peers..."); - self.discovery.find_node(random_peer); + self.discovery.find_node(random_node); - // update the kademlia timeout + // update the time until next discovery + let delay = { + if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { + self.past_discovery_delay *= 2; + self.past_discovery_delay + } else { + MAX_TIME_BETWEEN_PEER_SEARCHES + } + }; self.peer_discovery_delay - .reset(Instant::now() + TIME_BETWEEN_KAD_REQUESTS); - } - - /// We have discovered an address for a peer, add it to known peers. - pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - // pass the address on to kademlia - self.discovery.add_address(peer_id, address); + .reset(Instant::now() + Duration::from_secs(delay)); } } -// Redirect all behaviour event to underlying discovery behaviour. +// Redirect all behaviour events to underlying discovery behaviour. impl NetworkBehaviour for Discovery where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = as NetworkBehaviour>::OutEvent; + type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = as NetworkBehaviour>::OutEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { NetworkBehaviour::new_handler(&mut self.discovery) @@ -76,25 +165,29 @@ where self.discovery.addresses_of_peer(peer_id) } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - NetworkBehaviour::inject_connected(&mut self.discovery, peer_id, endpoint) + fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { + self.connected_peers.insert(peer_id); } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - NetworkBehaviour::inject_disconnected(&mut self.discovery, peer_id, endpoint) + fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { + self.connected_peers.remove(peer_id); } - fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) { - NetworkBehaviour::inject_replaced(&mut self.discovery, peer_id, closed, opened) + fn inject_replaced( + &mut self, + _peer_id: PeerId, + _closed: ConnectedPoint, + _opened: ConnectedPoint, + ) { + // discv5 doesn't implement } fn inject_node_event( &mut self, - peer_id: PeerId, - event: ::OutEvent, + _peer_id: PeerId, + _event: ::OutEvent, ) { - // TODO: Upgrade to discv5 - NetworkBehaviour::inject_node_event(&mut self.discovery, peer_id, event) + // discv5 doesn't implement } fn poll( @@ -106,7 +199,7 @@ where Self::OutEvent, >, > { - // check to see if it's time to search for peers + // search of peers if it is time loop { match self.peer_discovery_delay.poll() { Ok(Async::Ready(_)) => { @@ -114,46 +207,49 @@ where } Ok(Async::NotReady) => break, Err(e) => { - warn!( - self.log, - "Error getting peers from discovery behaviour. Err: {:?}", e - ); + warn!(self.log, "Discovery peer search failed: {:?}", e); } } } - // Poll discovery - match self.discovery.poll(params) { - Async::Ready(action) => { - match &action { - NetworkBehaviourAction::GenerateEvent(disc_output) => match disc_output { - KademliaOut::Discovered { - peer_id, addresses, .. - } => { - debug!(self.log, "Kademlia peer discovered"; "Peer"=> format!("{:?}", peer_id), "Addresses" => format!("{:?}", addresses)); - } - KademliaOut::FindNodeResult { closer_peers, .. } => { - debug!( - self.log, - "Kademlia query found {} peers", - closer_peers.len() - ); - debug!(self.log, "Kademlia peers discovered"; "Peer"=> format!("{:?}", closer_peers)); - if closer_peers.is_empty() { - debug!(self.log, "Kademlia random query yielded empty results"); + // Poll discovery + loop { + match self.discovery.poll(params) { + Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + match event { + Discv5Event::Discovered(enr) => { + debug!(self.log, "Discv5: Peer discovered"; "Peer"=> format!("{:?}", enr.peer_id()), "Addresses" => format!("{:?}", enr.multiaddr())); + + let peer_id = enr.peer_id(); + // if we need more peers, attempt a connection + if self.connected_peers.len() < self.max_peers + && self.connected_peers.get(&peer_id).is_none() + { + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }); + } + } + Discv5Event::SocketUpdated(socket) => { + info!(self.log, "Address updated"; "IP" => format!("{}",socket.ip())); + let mut address = Multiaddr::from(socket.ip()); + address.push(Protocol::Tcp(self.tcp_port)); + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { + address, + }); + } + Discv5Event::FindNodeResult { closer_peers, .. } => { + debug!(self.log, "Discv5 query found {} peers", closer_peers.len()); + if closer_peers.is_empty() { + debug!(self.log, "Discv5 random query yielded empty results"); } - return Async::Ready(action); } _ => {} - }, - _ => {} - }; - // propagate result upwards - return Async::Ready(action); + } + } + // discv5 does not output any other NetworkBehaviourAction + Async::Ready(_) => {} + Async::NotReady => break, } - Async::NotReady => (), } - Async::NotReady } } diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 197c074df..7a3b2e632 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -10,7 +10,9 @@ pub mod rpc; mod service; pub use behaviour::PubsubMessage; -pub use config::{ChainType, Config as NetworkConfig, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX}; +pub use config::{ + Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, +}; pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; pub use libp2p::multiaddr; pub use libp2p::Multiaddr; diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 68ca72620..780b1453f 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -4,7 +4,7 @@ use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; use crate::{TopicBuilder, TopicHash}; -use crate::{BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -36,32 +36,24 @@ pub struct Service { impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { - debug!(log, "Libp2p Service starting"); + debug!(log, "Network-libp2p Service starting"); - // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this - // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 // TODO: Save and recover node key from disk + // TODO: Currently using secp256k1 keypairs - currently required for discv5 let local_private_key = identity::Keypair::generate_secp256k1(); - - let local_public_key = local_private_key.public(); let local_peer_id = PeerId::from(local_private_key.public()); info!(log, "Local peer id: {:?}", local_peer_id); let mut swarm = { - // Set up the transport - let transport = build_transport(local_private_key); - // Set up gossipsub routing - let behaviour = Behaviour::new(local_public_key.clone(), &config, &log); - // Set up Topology - let topology = local_peer_id.clone(); - Swarm::new(transport, behaviour, topology) + // Set up the transport - tcp/ws with secio and mplex/yamux + let transport = build_transport(local_private_key.clone()); + // Lighthouse network behaviour + let behaviour = Behaviour::new(&local_private_key, &config, &log)?; + Swarm::new(transport, behaviour, local_peer_id.clone()) }; // listen on all addresses - for address in config - .listen_addresses() - .map_err(|e| format!("Invalid listen multiaddr: {}", e))? - { + for address in config.listen_addresses { match Swarm::listen_on(&mut swarm, address.clone()) { Ok(_) => { let mut log_address = address.clone(); @@ -71,28 +63,13 @@ impl Service { Err(err) => warn!(log, "Cannot listen on: {} because: {:?}", address, err), }; } - // connect to boot nodes - these are currently stored as multiaddrs - // Once we have discovery, can set to peerId - for bootnode in config - .boot_nodes() - .map_err(|e| format!("Invalid boot node multiaddr: {:?}", e))? - { - match Swarm::dial_addr(&mut swarm, bootnode.clone()) { - Ok(()) => debug!(log, "Dialing bootnode: {}", bootnode), - Err(err) => debug!( - log, - "Could not connect to bootnode: {} error: {:?}", bootnode, err - ), - }; - } // subscribe to default gossipsub topics let mut topics = vec![]; //TODO: Handle multiple shard attestations. For now we simply use a separate topic for //attestations - topics.push(SHARD_TOPIC_PREFIX.to_string()); + topics.push(BEACON_ATTESTATION_TOPIC.to_string()); topics.push(BEACON_PUBSUB_TOPIC.to_string()); - topics.append(&mut config.topics.clone()); let mut subscribed_topics = vec![]; @@ -145,9 +122,6 @@ impl Stream for Service { BehaviourEvent::PeerDialed(peer_id) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); } - BehaviourEvent::Identified(peer_id, info) => { - return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info)))); - } }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index d00c16292..b805c1d75 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -4,6 +4,6 @@ pub mod message_handler; pub mod service; pub mod sync; -pub use eth2_libp2p::{ChainType, NetworkConfig}; +pub use eth2_libp2p::NetworkConfig; pub use service::NetworkMessage; pub use service::Service; diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index 07215119f..51d3a58f9 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -37,11 +37,33 @@ fn main() { .help("One or more comma-delimited multi-addresses to listen for p2p connections.") .takes_value(true), ) + .arg( + Arg::with_name("maxpeers") + .long("maxpeers") + .value_name("Max Peers") + .help("The maximum number of peers (default 10)") + .takes_value(true), + ) .arg( Arg::with_name("boot-nodes") .long("boot-nodes") + .allow_hyphen_values(true) .value_name("BOOTNODES") - .help("One or more comma-delimited multi-addresses to bootstrap the p2p network.") + .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") + .takes_value(true), + ) + .arg( + Arg::with_name("disc-listen-address") + .long("disc-listen_address") + .value_name("DISCPORT") + .help("The IP address that the discovery protocol will listen on. Defaults to 0.0.0.0") + .takes_value(true), + ) + .arg( + Arg::with_name("discovery-port") + .long("disc-port") + .value_name("DISCPORT") + .help("Listen UDP port for the discovery process") .takes_value(true), ) // rpc related arguments diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 834f9a428..15883d974 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -84,7 +84,7 @@ pub fn run_beacon_node( info!( log, "Started beacon node"; - "p2p_listen_addresses" => format!("{:?}", &other_client_config.network.listen_addresses()), + "p2p_listen_addresses" => format!("{:?}", &other_client_config.network.listen_addresses), "data_dir" => format!("{:?}", other_client_config.data_dir()), "spec_constants" => &spec_constants, "db_type" => &other_client_config.db_type,