diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 48504d89a..b3c687eef 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -13,3 +13,4 @@ slog-async = "^2.3.0" validator_client = { path = "../validator_client" } types = { path = "../eth2/types" } eth2_config = { path = "../eth2/utils/eth2_config" } +dirs = "2.0.1" diff --git a/account_manager/src/main.rs b/account_manager/src/main.rs index 1c8cc8819..ee0e86d60 100644 --- a/account_manager/src/main.rs +++ b/account_manager/src/main.rs @@ -1,7 +1,7 @@ use bls::Keypair; use clap::{App, Arg, SubCommand}; -use eth2_config::get_data_dir; use slog::{crit, debug, info, o, Drain}; +use std::fs; use std::path::PathBuf; use types::test_utils::generate_deterministic_keypair; use validator_client::Config as ValidatorClientConfig; @@ -61,14 +61,34 @@ fn main() { ) .get_matches(); - let data_dir = match get_data_dir(&matches, PathBuf::from(DEFAULT_DATA_DIR)) { - Ok(dir) => dir, - Err(e) => { - crit!(log, "Failed to initialize data dir"; "error" => format!("{:?}", e)); - return; + let data_dir = match matches + .value_of("datadir") + .and_then(|v| Some(PathBuf::from(v))) + { + Some(v) => v, + None => { + // use the default + let mut default_dir = match dirs::home_dir() { + Some(v) => v, + None => { + crit!(log, "Failed to find a home directory"); + return; + } + }; + default_dir.push(DEFAULT_DATA_DIR); + PathBuf::from(default_dir) } }; + // create the directory if needed + match fs::create_dir_all(&data_dir) { + Ok(_) => {} + Err(e) => { + crit!(log, "Failed to initialize data dir"; "error" => format!("{}", e)); + return; + } + } + let mut client_config = ValidatorClientConfig::default(); if let Err(e) = client_config.apply_cli_args(&matches) { diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 309f162e5..9e96f8484 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -22,3 +22,5 @@ tokio-timer = "0.2.10" futures = "0.1.25" exit-future = "0.1.3" state_processing = { path = "../eth2/state_processing" } +env_logger = "0.6.1" +dirs = "2.0.1" diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 7c8ee9c7c..94a529ea7 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -9,17 +9,20 @@ beacon_chain = { path = "../beacon_chain" } network = { path = "../network" } store = { path = "../store" } http_server = { path = "../http_server" } +eth2-libp2p = { path = "../eth2-libp2p" } rpc = { path = "../rpc" } prometheus = "^0.6" types = { path = "../../eth2/types" } tree_hash = { path = "../../eth2/utils/tree_hash" } eth2_config = { path = "../../eth2/utils/eth2_config" } slot_clock = { path = "../../eth2/utils/slot_clock" } -serde = "1.0" +serde = "1.0.93" serde_derive = "1.0" error-chain = "0.12.0" -slog = "^2.2.3" eth2_ssz = { path = "../../eth2/utils/ssz" } +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } +slog-term = "^2.4.0" +slog-async = "^2.3.0" tokio = "0.1.15" clap = "2.32.0" dirs = "1.0.3" diff --git a/beacon_node/client/src/client_config.rs b/beacon_node/client/src/config.rs similarity index 92% rename from beacon_node/client/src/client_config.rs rename to beacon_node/client/src/config.rs index 166725b61..415ef0ec9 100644 --- a/beacon_node/client/src/client_config.rs +++ b/beacon_node/client/src/config.rs @@ -7,7 +7,7 @@ use std::path::PathBuf; /// The core configuration of a Lighthouse beacon node. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ClientConfig { +pub struct Config { pub data_dir: PathBuf, pub db_type: String, db_name: String, @@ -16,7 +16,7 @@ pub struct ClientConfig { pub http: HttpServerConfig, } -impl Default for ClientConfig { +impl Default for Config { fn default() -> Self { Self { data_dir: PathBuf::from(".lighthouse"), @@ -24,14 +24,14 @@ impl Default for ClientConfig { db_name: "chain_db".to_string(), // Note: there are no default bootnodes specified. // Once bootnodes are established, add them here. - network: NetworkConfig::new(vec![]), + network: NetworkConfig::new(), rpc: rpc::RPCConfig::default(), http: HttpServerConfig::default(), } } } -impl ClientConfig { +impl Config { /// Returns the path to which the client may initialize an on-disk database. pub fn db_path(&self) -> Option { self.data_dir() @@ -49,7 +49,7 @@ impl ClientConfig { /// /// Returns an error if arguments are obviously invalid. May succeed even if some values are /// invalid. - pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> { + pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), String> { if let Some(dir) = args.value_of("datadir") { self.data_dir = PathBuf::from(dir); }; diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 18ddef7bb..7eee8ac0a 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,7 +1,7 @@ extern crate slog; mod beacon_chain_types; -mod client_config; +mod config; pub mod error; pub mod notifier; @@ -21,7 +21,7 @@ use tokio::timer::Interval; pub use beacon_chain::BeaconChainTypes; pub use beacon_chain_types::ClientType; pub use beacon_chain_types::InitialiseBeaconChain; -pub use client_config::ClientConfig; +pub use config::Config as ClientConfig; pub use eth2_config::Eth2Config; /// Main beacon node client service. This provides the connection and initialisation of the clients diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index ada5faebb..1fbd30872 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,15 +7,18 @@ edition = "2018" [dependencies] beacon_chain = { path = "../beacon_chain" } clap = "2.32.0" -# SigP repository until PR is merged -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b3c32d9a821ae6cc89079499cc6e8a6bab0bffc3" } +#SigP repository +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" eth2_ssz = { path = "../../eth2/utils/ssz" } eth2_ssz_derive = { path = "../../eth2/utils/ssz_derive" } -slog = "2.4.1" +slog = { version = "^2.4.1" , features = ["max_level_trace", "release_max_level_trace"] } version = { path = "../version" } tokio = "0.1.16" futures = "0.1.25" error-chain = "0.12.0" +tokio-timer = "0.2.10" +dirs = "2.0.1" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 10b140c3b..4e4cf24f3 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,45 +1,72 @@ +use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, Rpc}; -use crate::NetworkConfig; +use crate::{error, NetworkConfig}; +use crate::{Topic, TopicHash}; use futures::prelude::*; use libp2p::{ core::{ + identity::Keypair, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - PublicKey, }, + discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent}, - identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, - ping::{Ping, PingEvent}, + ping::{Ping, PingConfig, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; -use slog::{debug, o, trace, warn}; +use slog::{o, trace, warn}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use std::num::NonZeroU32; +use std::time::Duration; use types::{Attestation, BeaconBlock}; -use types::{Topic, TopicHash}; -/// Builds the network behaviour for the libp2p Swarm. -/// Implements gossipsub message routing. +/// Builds the network behaviour that manages the core protocols of eth2. +/// This core behaviour is managed by `Behaviour` which adds peer management to all core +/// behaviours. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, - // TODO: Add Kademlia for peer discovery - /// The events generated by this behaviour to be consumed in the swarm poll. + /// The serenity RPC specified in the wire-0 protocol. serenity_rpc: Rpc, - /// Allows discovery of IP addresses for peers on the network. - identify: Identify, /// Keep regular connection to peers and disconnect if absent. - // TODO: Keepalive, likely remove this later. - // TODO: Make the ping time customizeable. ping: Ping, + /// Kademlia for peer discovery. + discovery: Discovery, #[behaviour(ignore)] + /// The events generated by this behaviour to be consumed in the swarm poll. events: Vec, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, } +impl Behaviour { + pub fn new( + local_key: &Keypair, + net_conf: &NetworkConfig, + log: &slog::Logger, + ) -> error::Result { + let local_peer_id = local_key.public().clone().into_peer_id(); + let behaviour_log = log.new(o!()); + let ping_config = PingConfig::new() + .with_timeout(Duration::from_secs(30)) + .with_interval(Duration::from_secs(20)) + .with_max_failures(NonZeroU32::new(2).expect("2 != 0")) + .with_keep_alive(false); + + Ok(Behaviour { + serenity_rpc: Rpc::new(log), + gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), + discovery: Discovery::new(local_key, net_conf, log)?, + ping: Ping::new(ping_config), + events: Vec::new(), + log: behaviour_log, + }) + } +} + // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour impl NetworkBehaviourEventProcess for Behaviour @@ -89,30 +116,6 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour -{ - fn inject_event(&mut self, event: IdentifyEvent) { - match event { - IdentifyEvent::Identified { - peer_id, mut info, .. - } => { - if info.listen_addrs.len() > 20 { - debug!( - self.log, - "More than 20 peers have been identified, truncating" - ); - info.listen_addrs.truncate(20); - } - self.events - .push(BehaviourEvent::Identified(peer_id, Box::new(info))); - } - IdentifyEvent::Error { .. } => {} - IdentifyEvent::SendBack { .. } => {} - } - } -} - impl NetworkBehaviourEventProcess for Behaviour { @@ -122,25 +125,6 @@ impl NetworkBehaviourEventProcess } impl Behaviour { - pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { - let local_peer_id = local_public_key.clone().into_peer_id(); - let identify_config = net_conf.identify_config.clone(); - let behaviour_log = log.new(o!()); - - Behaviour { - gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), - serenity_rpc: Rpc::new(log), - identify: Identify::new( - identify_config.version, - identify_config.user_agent, - local_public_key, - ), - ping: Ping::new(), - events: Vec::new(), - log: behaviour_log, - } - } - /// Consumes the events list when polled. fn poll( &mut self, @@ -153,18 +137,23 @@ impl Behaviour { } } +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, _event: Discv5Event) { + // discv5 has no events to inject + } +} + /// Implements the combined behaviour for the libp2p service. impl Behaviour { + /* Pubsub behaviour functions */ + /// Subscribes to a gossipsub topic. pub fn subscribe(&mut self, topic: Topic) -> bool { self.gossipsub.subscribe(topic) } - /// Sends an RPC Request/Response via the RPC protocol. - pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.serenity_rpc.send_rpc(peer_id, rpc_event); - } - /// Publishes a message on the pubsub (gossipsub) behaviour. pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { let message_bytes = ssz_encode(&message); @@ -172,14 +161,19 @@ impl Behaviour { self.gossipsub.publish(topic, message_bytes.clone()); } } + + /* Eth2 RPC behaviour functions */ + + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.serenity_rpc.send_rpc(peer_id, rpc_event); + } } /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), - Identified(PeerId, Box), - // TODO: This is a stub at the moment GossipMessage { source: PeerId, topics: Vec, diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index ee2add75e..ea87075b7 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,89 +1,129 @@ use clap::ArgMatches; +use enr::Enr; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; use serde_derive::{Deserialize, Serialize}; -use types::multiaddr::{Error as MultiaddrError, Multiaddr}; +use std::path::PathBuf; +use std::time::Duration; + +/// The beacon node topic string to subscribe to. +pub const BEACON_PUBSUB_TOPIC: &str = "beacon_block"; +pub const BEACON_ATTESTATION_TOPIC: &str = "beacon_attestation"; +pub const SHARD_TOPIC_PREFIX: &str = "shard"; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. pub struct Config { + /// Data directory where node's keyfile is stored + pub network_dir: PathBuf, + /// IP address to listen on. - listen_addresses: Vec, + pub listen_address: std::net::IpAddr, + + /// The TCP port that libp2p listens on. + pub libp2p_port: u16, + + /// The address to broadcast to peers about which address we are listening on. + pub discovery_address: std::net::IpAddr, + + /// UDP port that discovery listens on. + pub discovery_port: u16, + + /// Target number of connected peers. + pub max_peers: usize, + /// Gossipsub configuration parameters. #[serde(skip)] pub gs_config: GossipsubConfig, - /// Configuration parameters for node identification protocol. - #[serde(skip)] - pub identify_config: IdentifyConfig, + /// List of nodes to initially connect to. - boot_nodes: Vec, + pub boot_nodes: Vec, + /// Client version pub client_version: String, - /// List of topics to subscribe to as strings + + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, } impl Default for Config { /// Generate a default network configuration. fn default() -> Self { + let mut network_dir = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); + network_dir.push(".lighthouse"); + network_dir.push("network"); Config { - listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()], + network_dir, + listen_address: "127.0.0.1".parse().expect("vaild ip address"), + libp2p_port: 9000, + discovery_address: "127.0.0.1".parse().expect("valid ip address"), + discovery_port: 9000, + max_peers: 10, + //TODO: Set realistic values for production gs_config: GossipsubConfigBuilder::new() .max_gossip_size(4_000_000) + .inactivity_timeout(Duration::from_secs(90)) + .heartbeat_interval(Duration::from_secs(20)) .build(), - identify_config: IdentifyConfig::default(), boot_nodes: vec![], client_version: version::version(), - topics: vec![String::from("beacon_chain")], + topics: Vec::new(), } } } +/// Generates a default Config. impl Config { - pub fn new(boot_nodes: Vec) -> Self { - let mut conf = Config::default(); - conf.boot_nodes = boot_nodes; - - conf + pub fn new() -> Self { + Config::default() } - pub fn listen_addresses(&self) -> Result, MultiaddrError> { - self.listen_addresses.iter().map(|s| s.parse()).collect() - } + pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), String> { + if let Some(dir) = args.value_of("datadir") { + self.network_dir = PathBuf::from(dir).join("network"); + }; - pub fn boot_nodes(&self) -> Result, MultiaddrError> { - self.boot_nodes.iter().map(|s| s.parse()).collect() - } - - pub fn apply_cli_args(&mut self, args: &ArgMatches) -> Result<(), &'static str> { if let Some(listen_address_str) = args.value_of("listen-address") { - let listen_addresses = listen_address_str.split(',').map(Into::into).collect(); - self.listen_addresses = listen_addresses; + let listen_address = listen_address_str + .parse() + .map_err(|_| format!("Invalid listen address: {:?}", listen_address_str))?; + self.listen_address = listen_address; + self.discovery_address = listen_address; } - if let Some(boot_addresses_str) = args.value_of("boot-nodes") { - let boot_addresses = boot_addresses_str.split(',').map(Into::into).collect(); - self.boot_nodes = boot_addresses; + if let Some(max_peers_str) = args.value_of("maxpeers") { + self.max_peers = max_peers_str + .parse::() + .map_err(|_| format!("Invalid number of max peers: {}", max_peers_str))?; + } + + if let Some(port_str) = args.value_of("port") { + let port = port_str + .parse::() + .map_err(|_| format!("Invalid port: {}", port_str))?; + self.libp2p_port = port; + self.discovery_port = port; + } + + if let Some(boot_enr_str) = args.value_of("boot-nodes") { + self.boot_nodes = boot_enr_str + .split(',') + .map(|enr| enr.parse().map_err(|_| format!("Invalid ENR: {}", enr))) + .collect::, _>>()?; + } + + if let Some(discovery_address_str) = args.value_of("discovery-address") { + self.discovery_address = discovery_address_str + .parse() + .map_err(|_| format!("Invalid discovery address: {:?}", discovery_address_str))? + } + + if let Some(disc_port_str) = args.value_of("disc-port") { + self.discovery_port = disc_port_str + .parse::() + .map_err(|_| format!("Invalid discovery port: {}", disc_port_str))?; } Ok(()) } } - -/// The configuration parameters for the Identify protocol -#[derive(Debug, Clone)] -pub struct IdentifyConfig { - /// The protocol version to listen on. - pub version: String, - /// The client's name and version for identification. - pub user_agent: String, -} - -impl Default for IdentifyConfig { - fn default() -> Self { - Self { - version: "/eth/serenity/1.0".to_string(), - user_agent: version::version(), - } - } -} diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs new file mode 100644 index 000000000..44b4e655b --- /dev/null +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -0,0 +1,313 @@ +use crate::{error, NetworkConfig}; +/// This manages the discovery and management of peers. +/// +/// Currently using discv5 for peer discovery. +/// +use futures::prelude::*; +use libp2p::core::swarm::{ + ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::discv5::{Discv5, Discv5Event}; +use libp2p::enr::{Enr, EnrBuilder, NodeId}; +use libp2p::multiaddr::Protocol; +use slog::{debug, info, o, warn}; +use std::collections::HashSet; +use std::fs::File; +use std::io::prelude::*; +use std::str::FromStr; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; + +/// Maximum seconds before searching for extra peers. +const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 60; +/// Initial delay between peer searches. +const INITIAL_SEARCH_DELAY: u64 = 5; +/// Local ENR storage filename. +const ENR_FILENAME: &str = "enr.dat"; + +/// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 +/// libp2p protocol. +pub struct Discovery { + /// The peers currently connected to libp2p streams. + connected_peers: HashSet, + + /// The target number of connected peers on the libp2p interface. + max_peers: usize, + + /// The delay between peer discovery searches. + peer_discovery_delay: Delay, + + /// Tracks the last discovery delay. The delay is doubled each round until the max + /// time is reached. + past_discovery_delay: u64, + + /// The TCP port for libp2p. Used to convert an updated IP address to a multiaddr. Note: This + /// assumes that the external TCP port is the same as the internal TCP port if behind a NAT. + //TODO: Improve NAT handling limit the above restriction + tcp_port: u16, + + /// The discovery behaviour used to discover new peers. + discovery: Discv5, + + /// Logger for the discovery behaviour. + log: slog::Logger, +} + +impl Discovery { + pub fn new( + local_key: &Keypair, + config: &NetworkConfig, + log: &slog::Logger, + ) -> error::Result { + let log = log.new(o!("Service" => "Libp2p-Discovery")); + + // checks if current ENR matches that found on disk + let local_enr = load_enr(local_key, config, &log)?; + + info!(log, "Local ENR: {}", local_enr.to_base64()); + debug!(log, "Local Node Id: {}", local_enr.node_id()); + + let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address) + .map_err(|e| format!("Discv5 service failed: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in config.boot_nodes.clone() { + debug!( + log, + "Adding node to routing table: {}", + bootnode_enr.node_id() + ); + discovery.add_enr(bootnode_enr); + } + + Ok(Self { + connected_peers: HashSet::new(), + max_peers: config.max_peers, + peer_discovery_delay: Delay::new(Instant::now()), + past_discovery_delay: INITIAL_SEARCH_DELAY, + tcp_port: config.libp2p_port, + discovery, + log, + }) + } + + /// Manually search for peers. This restarts the discovery round, sparking multiple rapid + /// queries. + pub fn discover_peers(&mut self) { + self.past_discovery_delay = INITIAL_SEARCH_DELAY; + self.find_peers(); + } + + /// Add an Enr to the routing table of the discovery mechanism. + pub fn add_enr(&mut self, enr: Enr) { + self.discovery.add_enr(enr); + } + + /// Search for new peers using the underlying discovery mechanism. + fn find_peers(&mut self) { + // pick a random NodeId + let random_node = NodeId::random(); + debug!(self.log, "Searching for peers..."); + self.discovery.find_node(random_node); + + // update the time until next discovery + let delay = { + if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { + self.past_discovery_delay *= 2; + self.past_discovery_delay + } else { + MAX_TIME_BETWEEN_PEER_SEARCHES + } + }; + self.peer_discovery_delay + .reset(Instant::now() + Duration::from_secs(delay)); + } +} + +// Redirect all behaviour events to underlying discovery behaviour. +impl NetworkBehaviour for Discovery +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = as NetworkBehaviour>::OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + NetworkBehaviour::new_handler(&mut self.discovery) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + // Let discovery track possible known peers. + self.discovery.addresses_of_peer(peer_id) + } + + fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { + self.connected_peers.insert(peer_id); + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { + self.connected_peers.remove(peer_id); + } + + fn inject_replaced( + &mut self, + _peer_id: PeerId, + _closed: ConnectedPoint, + _opened: ConnectedPoint, + ) { + // discv5 doesn't implement + } + + fn inject_node_event( + &mut self, + _peer_id: PeerId, + _event: ::OutEvent, + ) { + // discv5 doesn't implement + } + + fn poll( + &mut self, + params: &mut impl PollParameters, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + // search for peers if it is time + loop { + match self.peer_discovery_delay.poll() { + Ok(Async::Ready(_)) => { + if self.connected_peers.len() < self.max_peers { + self.find_peers(); + } + } + Ok(Async::NotReady) => break, + Err(e) => { + warn!(self.log, "Discovery peer search failed: {:?}", e); + } + } + } + + // Poll discovery + loop { + match self.discovery.poll(params) { + Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + match event { + Discv5Event::Discovered(_enr) => { + // not concerned about FINDNODE results, rather the result of an entire + // query. + } + Discv5Event::SocketUpdated(socket) => { + info!(self.log, "Address updated"; "IP" => format!("{}",socket.ip())); + let mut address = Multiaddr::from(socket.ip()); + address.push(Protocol::Tcp(self.tcp_port)); + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { + address, + }); + } + Discv5Event::FindNodeResult { closer_peers, .. } => { + debug!(self.log, "Discv5 query found {} peers", closer_peers.len()); + if closer_peers.is_empty() { + debug!(self.log, "Discv5 random query yielded empty results"); + } + for peer_id in closer_peers { + // if we need more peers, attempt a connection + if self.connected_peers.len() < self.max_peers + && self.connected_peers.get(&peer_id).is_none() + { + debug!(self.log, "Discv5: Peer discovered"; "Peer"=> format!("{:?}", peer_id)); + return Async::Ready(NetworkBehaviourAction::DialPeer { + peer_id, + }); + } + } + } + _ => {} + } + } + // discv5 does not output any other NetworkBehaviourAction + Async::Ready(_) => {} + Async::NotReady => break, + } + } + Async::NotReady + } +} + +/// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none +/// exists, generates a new one. +/// +/// If an ENR exists, with the same NodeId and IP address, we use the disk-generated one as its +/// ENR sequence will be equal or higher than a newly generated one. +fn load_enr( + local_key: &Keypair, + config: &NetworkConfig, + log: &slog::Logger, +) -> Result { + // Build the local ENR. + // Note: Discovery should update the ENR record's IP to the external IP as seen by the + // majority of our peers. + let mut local_enr = EnrBuilder::new() + .ip(config.discovery_address.into()) + .tcp(config.libp2p_port) + .udp(config.discovery_port) + .build(&local_key) + .map_err(|e| format!("Could not build Local ENR: {:?}", e))?; + + let enr_f = config.network_dir.join(ENR_FILENAME); + if let Ok(mut enr_file) = File::open(enr_f.clone()) { + let mut enr_string = String::new(); + match enr_file.read_to_string(&mut enr_string) { + Err(_) => debug!(log, "Could not read ENR from file"), + Ok(_) => { + match Enr::from_str(&enr_string) { + Ok(enr) => { + debug!(log, "ENR found in file: {:?}", enr_f); + + if enr.node_id() == local_enr.node_id() { + if enr.ip() == config.discovery_address.into() + && enr.tcp() == Some(config.libp2p_port) + && enr.udp() == Some(config.discovery_port) + { + debug!(log, "ENR loaded from file"); + // the stored ENR has the same configuration, use it + return Ok(enr); + } + + // same node id, different configuration - update the sequence number + let new_seq_no = enr.seq().checked_add(1).ok_or_else(|| "ENR sequence number on file is too large. Remove it to generate a new NodeId")?; + local_enr.set_seq(new_seq_no, local_key).map_err(|e| { + format!("Could not update ENR sequence number: {:?}", e) + })?; + debug!(log, "ENR sequence number increased to: {}", new_seq_no); + } + } + Err(e) => { + warn!(log, "ENR from file could not be decoded: {:?}", e); + } + } + } + } + } + + // write ENR to disk + let _ = std::fs::create_dir_all(&config.network_dir); + match File::create(enr_f.clone()) + .and_then(|mut f| f.write_all(&local_enr.to_base64().as_bytes())) + { + Ok(_) => { + debug!(log, "ENR written to disk"); + } + Err(e) => { + warn!( + log, + "Could not write ENR to file: {:?}. Error: {}", enr_f, e + ); + } + } + Ok(local_enr) +} diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 659d6b01c..7a3b2e632 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -4,12 +4,18 @@ /// This crate builds and manages the libp2p services required by the beacon node. pub mod behaviour; mod config; +mod discovery; pub mod error; pub mod rpc; mod service; pub use behaviour::PubsubMessage; -pub use config::Config as NetworkConfig; +pub use config::{ + Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, +}; +pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; +pub use libp2p::multiaddr; +pub use libp2p::Multiaddr; pub use libp2p::{ gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, PeerId, @@ -17,5 +23,3 @@ pub use libp2p::{ pub use rpc::RPCEvent; pub use service::Libp2pEvent; pub use service::Service; -pub use types::multiaddr; -pub use types::Multiaddr; diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 57d7dadbe..2d303469c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -94,7 +94,7 @@ where fn poll( &mut self, - _: &mut PollParameters<'_>, + _: &mut impl PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 2f461988a..7afded3ac 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -11,7 +11,6 @@ use tokio::io::{AsyncRead, AsyncWrite}; const MAX_READ_SIZE: usize = 4_194_304; // 4M /// Implementation of the `ConnectionUpgrade` for the rpc protocol. - #[derive(Debug, Clone)] pub struct RPCProtocol; diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 18f7ca98c..69f8a1ca5 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -3,25 +3,30 @@ use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; +use crate::{TopicBuilder, TopicHash}; +use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ - identity, + identity::Keypair, + multiaddr::Multiaddr, muxing::StreamMuxerBox, nodes::Substream, transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, }; -use libp2p::identify::protocol::IdentifyInfo; use libp2p::{core, secio, PeerId, Swarm, Transport}; use slog::{debug, info, trace, warn}; +use std::fs::File; +use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::time::Duration; -use types::{TopicBuilder, TopicHash}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = Behaviour>; +const NETWORK_KEY_FILENAME: &str = "key"; + /// The configuration and state of the libp2p components for the beacon node. pub struct Service { /// The libp2p Swarm handler. @@ -35,59 +40,52 @@ pub struct Service { impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { - debug!(log, "Libp2p Service starting"); + debug!(log, "Network-libp2p Service starting"); - // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this - // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 - // TODO: Save and recover node key from disk - let local_private_key = identity::Keypair::generate_secp256k1(); + // load the private key from CLI flag, disk or generate a new one + let local_private_key = load_private_key(&config, &log); - let local_public_key = local_private_key.public(); let local_peer_id = PeerId::from(local_private_key.public()); info!(log, "Local peer id: {:?}", local_peer_id); let mut swarm = { - // Set up the transport - let transport = build_transport(local_private_key); - // Set up gossipsub routing - let behaviour = Behaviour::new(local_public_key.clone(), &config, &log); - // Set up Topology - let topology = local_peer_id.clone(); - Swarm::new(transport, behaviour, topology) + // Set up the transport - tcp/ws with secio and mplex/yamux + let transport = build_transport(local_private_key.clone()); + // Lighthouse network behaviour + let behaviour = Behaviour::new(&local_private_key, &config, &log)?; + Swarm::new(transport, behaviour, local_peer_id.clone()) }; - // listen on all addresses - for address in config - .listen_addresses() - .map_err(|e| format!("Invalid listen multiaddr: {}", e))? - { - match Swarm::listen_on(&mut swarm, address.clone()) { - Ok(mut listen_addr) => { - listen_addr.append(Protocol::P2p(local_peer_id.clone().into())); - info!(log, "Listening on: {}", listen_addr); - } - Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err), - }; - } - // connect to boot nodes - these are currently stored as multiaddrs - // Once we have discovery, can set to peerId - for bootnode in config - .boot_nodes() - .map_err(|e| format!("Invalid boot node multiaddr: {:?}", e))? - { - match Swarm::dial_addr(&mut swarm, bootnode.clone()) { - Ok(()) => debug!(log, "Dialing bootnode: {}", bootnode), - Err(err) => debug!( - log, - "Could not connect to bootnode: {} error: {:?}", bootnode, err - ), - }; - } + // listen on the specified address + let listen_multiaddr = { + let mut m = Multiaddr::from(config.listen_address); + m.push(Protocol::Tcp(config.libp2p_port)); + m + }; + + match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) { + Ok(_) => { + let mut log_address = listen_multiaddr; + log_address.push(Protocol::P2p(local_peer_id.clone().into())); + info!(log, "Listening on: {}", log_address); + } + Err(err) => warn!( + log, + "Cannot listen on: {} because: {:?}", listen_multiaddr, err + ), + }; // subscribe to default gossipsub topics + let mut topics = vec![]; + //TODO: Handle multiple shard attestations. For now we simply use a separate topic for + //attestations + topics.push(BEACON_ATTESTATION_TOPIC.to_string()); + topics.push(BEACON_PUBSUB_TOPIC.to_string()); + topics.append(&mut config.topics.clone()); + let mut subscribed_topics = vec![]; - for topic in config.topics { - let t = TopicBuilder::new(topic.to_string()).build(); + for topic in topics { + let t = TopicBuilder::new(topic.clone()).build(); if swarm.subscribe(t) { trace!(log, "Subscribed to topic: {:?}", topic); subscribed_topics.push(topic); @@ -135,9 +133,6 @@ impl Stream for Service { BehaviourEvent::PeerDialed(peer_id) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); } - BehaviourEvent::Identified(peer_id, info) => { - return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info)))); - } }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, @@ -150,7 +145,7 @@ impl Stream for Service { /// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and /// mplex or yamux as the multiplexing layer. -fn build_transport(local_private_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { +fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised // in the future. let transport = libp2p::tcp::TcpConfig::new(); @@ -187,8 +182,6 @@ pub enum Libp2pEvent { RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. PeerDialed(PeerId), - /// Received information about a peer on the network. - Identified(PeerId, Box), /// Received pubsub message. PubsubMessage { source: PeerId, @@ -196,3 +189,51 @@ pub enum Libp2pEvent { message: Box, }, } + +/// Loads a private key from disk. If this fails, a new key is +/// generated and is then saved to disk. +/// +/// Currently only secp256k1 keys are allowed, as these are the only keys supported by discv5. +fn load_private_key(config: &NetworkConfig, log: &slog::Logger) -> Keypair { + // TODO: Currently using secp256k1 keypairs - currently required for discv5 + // check for key from disk + let network_key_f = config.network_dir.join(NETWORK_KEY_FILENAME); + if let Ok(mut network_key_file) = File::open(network_key_f.clone()) { + let mut key_bytes: Vec = Vec::with_capacity(36); + match network_key_file.read_to_end(&mut key_bytes) { + Err(_) => debug!(log, "Could not read network key file"), + Ok(_) => { + // only accept secp256k1 keys for now + if let Ok(secret_key) = + libp2p::core::identity::secp256k1::SecretKey::from_bytes(&mut key_bytes) + { + let kp: libp2p::core::identity::secp256k1::Keypair = secret_key.into(); + debug!(log, "Loaded network key from disk."); + return Keypair::Secp256k1(kp); + } else { + debug!(log, "Network key file is not a valid secp256k1 key"); + } + } + } + } + + // if a key could not be loaded from disk, generate a new one and save it + let local_private_key = Keypair::generate_secp256k1(); + if let Keypair::Secp256k1(key) = local_private_key.clone() { + let _ = std::fs::create_dir_all(&config.network_dir); + match File::create(network_key_f.clone()) + .and_then(|mut f| f.write_all(&key.secret().to_bytes())) + { + Ok(_) => { + debug!(log, "New network key generated and written to disk"); + } + Err(e) => { + warn!( + log, + "Could not write node key to file: {:?}. Error: {}", network_key_f, e + ); + } + } + } + local_private_key +} diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 695adc9bd..23fbdd7d9 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,7 +13,7 @@ store = { path = "../store" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } +slog = { version = "^2.2.3" } eth2_ssz = { path = "../../eth2/utils/ssz" } tree_hash = { path = "../../eth2/utils/tree_hash" } futures = "0.1.25" diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 9c71a60f7..b2ecc1a0b 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -4,6 +4,7 @@ use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use eth2_libp2p::Service as LibP2PService; +use eth2_libp2p::Topic; use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; @@ -13,7 +14,6 @@ use slog::{debug, info, o, trace}; use std::marker::PhantomData; use std::sync::Arc; use tokio::runtime::TaskExecutor; -use types::Topic; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { @@ -126,12 +126,6 @@ fn network_service( .send(HandlerMessage::PeerDialed(peer_id)) .map_err(|_| "failed to send rpc to handler")?; } - Libp2pEvent::Identified(peer_id, info) => { - debug!( - log, - "We have identified peer: {:?} with {:?}", peer_id, info - ); - } Libp2pEvent::PubsubMessage { source, message, .. } => { diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 2382e47a4..5899e5aea 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -4,7 +4,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; -use slog::{debug, error, info, o, warn}; +use slog::{debug, error, info, o, trace, warn}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -532,6 +532,11 @@ impl SimpleSync { // Add this block to the queue self.import_queue .enqueue_full_blocks(vec![block], peer_id.clone()); + trace!( + self.log, + "NewGossipBlock"; + "peer" => format!("{:?}", peer_id), + ); // Unless the parent is in the queue, request the parent block from the peer. // diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 0f585b7e7..86f4331f1 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,5 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; +use eth2_libp2p::TopicBuilder; +use eth2_libp2p::SHARD_TOPIC_PREFIX; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -136,11 +138,10 @@ impl AttestationService for AttestationServiceInstance { "type" => "valid_attestation", ); - // TODO: Obtain topics from the network service properly. - let topic = types::TopicBuilder::new("beacon_chain".to_string()).build(); + // valid attestation, propagate to the network + let topic = TopicBuilder::new(SHARD_TOPIC_PREFIX).build(); let message = PubsubMessage::Attestation(attestation); - // Publish the attestation to the p2p network via gossipsub. self.network_chan .send(NetworkMessage::Publish { topics: vec![topic], @@ -150,7 +151,7 @@ impl AttestationService for AttestationServiceInstance { error!( self.log, "PublishAttestation"; - "type" => "failed to publish to gossipsub", + "type" => "failed to publish attestation to gossipsub", "error" => format!("{:?}", e) ); }); diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index 533fd285a..cdf46a1ab 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,6 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use crossbeam_channel; -use eth2_libp2p::PubsubMessage; +use eth2_libp2p::BEACON_PUBSUB_TOPIC; +use eth2_libp2p::{PubsubMessage, TopicBuilder}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -104,9 +105,8 @@ impl BeaconBlockService for BeaconBlockServiceInstance { "block_root" => format!("{}", block_root), ); - // TODO: Obtain topics from the network service properly. - let topic = - types::TopicBuilder::new("beacon_chain".to_string()).build(); + // get the network topic to send on + let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build(); let message = PubsubMessage::Block(block); // Publish the block to the p2p network via gossipsub. diff --git a/beacon_node/rpc/src/lib.rs b/beacon_node/rpc/src/lib.rs index f2f1b2abf..3e6fd3e73 100644 --- a/beacon_node/rpc/src/lib.rs +++ b/beacon_node/rpc/src/lib.rs @@ -60,8 +60,8 @@ pub fn start_server( }; let attestation_service = { let instance = AttestationServiceInstance { - chain: beacon_chain.clone(), network_chan, + chain: beacon_chain.clone(), log: log.clone(), }; create_attestation_service(instance) diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index d6274befc..791feae54 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -1,11 +1,11 @@ -extern crate slog; - mod run; use clap::{App, Arg}; use client::{ClientConfig, Eth2Config}; -use eth2_config::{get_data_dir, read_from_file, write_to_file}; -use slog::{crit, o, Drain}; +use env_logger::{Builder, Env}; +use eth2_config::{read_from_file, write_to_file}; +use slog::{crit, o, Drain, Level}; +use std::fs; use std::path::PathBuf; pub const DEFAULT_DATA_DIR: &str = ".lighthouse"; @@ -14,10 +14,8 @@ pub const CLIENT_CONFIG_FILENAME: &str = "beacon-node.toml"; pub const ETH2_CONFIG_FILENAME: &str = "eth2-spec.toml"; fn main() { - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::CompactFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).build().fuse(); - let logger = slog::Logger::root(drain, o!()); + // debugging output for libp2p and external crates + Builder::from_env(Env::default()).init(); let matches = App::new("Lighthouse") .version(version::version().as_str()) @@ -30,21 +28,48 @@ fn main() { .value_name("DIR") .help("Data directory for keys and databases.") .takes_value(true) - .default_value(DEFAULT_DATA_DIR), ) // network related arguments .arg( Arg::with_name("listen-address") .long("listen-address") - .value_name("Listen Address") - .help("One or more comma-delimited multi-addresses to listen for p2p connections.") + .value_name("Address") + .help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).") + .takes_value(true), + ) + .arg( + Arg::with_name("maxpeers") + .long("maxpeers") + .help("The maximum number of peers (default 10).") .takes_value(true), ) .arg( Arg::with_name("boot-nodes") .long("boot-nodes") + .allow_hyphen_values(true) .value_name("BOOTNODES") - .help("One or more comma-delimited multi-addresses to bootstrap the p2p network.") + .help("One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network.") + .takes_value(true), + ) + .arg( + Arg::with_name("port") + .long("port") + .value_name("Lighthouse Port") + .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") + .takes_value(true), + ) + .arg( + Arg::with_name("discovery-port") + .long("disc-port") + .value_name("DiscoveryPort") + .help("The discovery UDP port.") + .takes_value(true), + ) + .arg( + Arg::with_name("discovery-address") + .long("discovery-address") + .value_name("Address") + .help("The IP address to broadcast to other peers on how to reach this node.") .takes_value(true), ) // rpc related arguments @@ -58,14 +83,13 @@ fn main() { .arg( Arg::with_name("rpc-address") .long("rpc-address") - .value_name("RPCADDRESS") + .value_name("Address") .help("Listen address for RPC endpoint.") .takes_value(true), ) .arg( Arg::with_name("rpc-port") .long("rpc-port") - .value_name("RPCPORT") .help("Listen port for RPC endpoint.") .takes_value(true), ) @@ -73,21 +97,19 @@ fn main() { .arg( Arg::with_name("http") .long("http") - .value_name("HTTP") .help("Enable the HTTP server.") .takes_value(false), ) .arg( Arg::with_name("http-address") .long("http-address") - .value_name("HTTPADDRESS") + .value_name("Address") .help("Listen address for the HTTP server.") .takes_value(true), ) .arg( Arg::with_name("http-port") .long("http-port") - .value_name("HTTPPORT") .help("Listen port for the HTTP server.") .takes_value(true), ) @@ -116,19 +138,60 @@ fn main() { .short("r") .help("When present, genesis will be within 30 minutes prior. Only for testing"), ) + .arg( + Arg::with_name("verbosity") + .short("v") + .multiple(true) + .help("Sets the verbosity level") + .takes_value(true), + ) .get_matches(); - let data_dir = match get_data_dir(&matches, PathBuf::from(DEFAULT_DATA_DIR)) { - Ok(dir) => dir, - Err(e) => { - crit!(logger, "Failed to initialize data dir"; "error" => format!("{:?}", e)); - return; + // build the initial logger + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build(); + + let drain = match matches.occurrences_of("verbosity") { + 0 => drain.filter_level(Level::Info), + 1 => drain.filter_level(Level::Debug), + 2 => drain.filter_level(Level::Trace), + _ => drain.filter_level(Level::Info), + }; + + let log = slog::Logger::root(drain.fuse(), o!()); + + let data_dir = match matches + .value_of("datadir") + .and_then(|v| Some(PathBuf::from(v))) + { + Some(v) => v, + None => { + // use the default + let mut default_dir = match dirs::home_dir() { + Some(v) => v, + None => { + crit!(log, "Failed to find a home directory"); + return; + } + }; + default_dir.push(DEFAULT_DATA_DIR); + PathBuf::from(default_dir) } }; + // create the directory if needed + match fs::create_dir_all(&data_dir) { + Ok(_) => {} + Err(e) => { + crit!(log, "Failed to initialize data dir"; "error" => format!("{}", e)); + return; + } + } + let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME); - // Attempt to lead the `ClientConfig` from disk. + // Attempt to load the `ClientConfig` from disk. // // If file doesn't exist, create a new, default one. let mut client_config = match read_from_file::(client_config_path.clone()) { @@ -136,13 +199,13 @@ fn main() { Ok(None) => { let default = ClientConfig::default(); if let Err(e) = write_to_file(client_config_path, &default) { - crit!(logger, "Failed to write default ClientConfig to file"; "error" => format!("{:?}", e)); + crit!(log, "Failed to write default ClientConfig to file"; "error" => format!("{:?}", e)); return; } default } Err(e) => { - crit!(logger, "Failed to load a ChainConfig file"; "error" => format!("{:?}", e)); + crit!(log, "Failed to load a ChainConfig file"; "error" => format!("{:?}", e)); return; } }; @@ -154,7 +217,7 @@ fn main() { match client_config.apply_cli_args(&matches) { Ok(()) => (), Err(s) => { - crit!(logger, "Failed to parse ClientConfig CLI arguments"; "error" => s); + crit!(log, "Failed to parse ClientConfig CLI arguments"; "error" => s); return; } }; @@ -173,13 +236,13 @@ fn main() { _ => unreachable!(), // Guarded by slog. }; if let Err(e) = write_to_file(eth2_config_path, &default) { - crit!(logger, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); + crit!(log, "Failed to write default Eth2Config to file"; "error" => format!("{:?}", e)); return; } default } Err(e) => { - crit!(logger, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e)); + crit!(log, "Failed to load/generate an Eth2Config"; "error" => format!("{:?}", e)); return; } }; @@ -188,13 +251,13 @@ fn main() { match eth2_config.apply_cli_args(&matches) { Ok(()) => (), Err(s) => { - crit!(logger, "Failed to parse Eth2Config CLI arguments"; "error" => s); + crit!(log, "Failed to parse Eth2Config CLI arguments"; "error" => s); return; } }; - match run::run_beacon_node(client_config, eth2_config, &logger) { + match run::run_beacon_node(client_config, eth2_config, &log) { Ok(_) => {} - Err(e) => crit!(logger, "Beacon node failed to start"; "reason" => format!("{:}", e)), + Err(e) => crit!(log, "Beacon node failed to start"; "reason" => format!("{:}", e)), } } diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index 834f9a428..51fa16154 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -41,6 +41,15 @@ pub fn run_beacon_node( "This software is EXPERIMENTAL and provides no guarantees or warranties." ); + info!( + log, + "Starting beacon node"; + "p2p_listen_address" => format!("{:?}", &other_client_config.network.listen_address), + "data_dir" => format!("{:?}", other_client_config.data_dir()), + "spec_constants" => &spec_constants, + "db_type" => &other_client_config.db_type, + ); + let result = match (db_type.as_str(), spec_constants.as_str()) { ("disk", "minimal") => run::>( &db_path, @@ -80,17 +89,6 @@ pub fn run_beacon_node( } }; - if result.is_ok() { - info!( - log, - "Started beacon node"; - "p2p_listen_addresses" => format!("{:?}", &other_client_config.network.listen_addresses()), - "data_dir" => format!("{:?}", other_client_config.data_dir()), - "spec_constants" => &spec_constants, - "db_type" => &other_client_config.db_type, - ); - } - result } diff --git a/eth2/types/Cargo.toml b/eth2/types/Cargo.toml index ed6307684..fd6578340 100644 --- a/eth2/types/Cargo.toml +++ b/eth2/types/Cargo.toml @@ -32,7 +32,6 @@ swap_or_not_shuffle = { path = "../utils/swap_or_not_shuffle" } test_random_derive = { path = "../utils/test_random_derive" } tree_hash = { path = "../utils/tree_hash" } tree_hash_derive = { path = "../utils/tree_hash_derive" } -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b3c32d9a821ae6cc89079499cc6e8a6bab0bffc3" } [dev-dependencies] env_logger = "0.6.0" diff --git a/eth2/types/src/chain_spec.rs b/eth2/types/src/chain_spec.rs index 74ce40671..6073fb32e 100644 --- a/eth2/types/src/chain_spec.rs +++ b/eth2/types/src/chain_spec.rs @@ -104,11 +104,7 @@ pub struct ChainSpec { domain_voluntary_exit: u32, domain_transfer: u32, - /* - * Network specific parameters - * - */ - pub boot_nodes: Vec, + pub boot_nodes: Vec, pub chain_id: u8, } @@ -216,7 +212,7 @@ impl ChainSpec { domain_transfer: 5, /* - * Boot nodes + * Network specific */ boot_nodes: vec![], chain_id: 1, // mainnet chain id @@ -231,12 +227,8 @@ impl ChainSpec { pub fn minimal() -> Self { let genesis_slot = Slot::new(0); - // Note: these bootnodes are placeholders. - // - // Should be updated once static bootnodes exist. - let boot_nodes = vec!["/ip4/127.0.0.1/tcp/9000" - .parse() - .expect("correct multiaddr")]; + // Note: bootnodes to be updated when static nodes exist. + let boot_nodes = vec![]; Self { target_committee_size: 4, diff --git a/eth2/types/src/lib.rs b/eth2/types/src/lib.rs index 4d0ec5fae..2406c3a18 100644 --- a/eth2/types/src/lib.rs +++ b/eth2/types/src/lib.rs @@ -82,6 +82,3 @@ pub type ProposerMap = HashMap; pub use bls::{AggregatePublicKey, AggregateSignature, Keypair, PublicKey, SecretKey, Signature}; pub use fixed_len_vec::{typenum, typenum::Unsigned, FixedLenVec}; -pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; -pub use libp2p::multiaddr; -pub use libp2p::Multiaddr; diff --git a/eth2/utils/eth2_config/src/lib.rs b/eth2/utils/eth2_config/src/lib.rs index 9d50a95c1..f6ad54c21 100644 --- a/eth2/utils/eth2_config/src/lib.rs +++ b/eth2/utils/eth2_config/src/lib.rs @@ -1,6 +1,5 @@ use clap::ArgMatches; use serde_derive::{Deserialize, Serialize}; -use std::fs; use std::fs::File; use std::io::prelude::*; use std::path::PathBuf; @@ -105,15 +104,3 @@ where Ok(None) } } - -pub fn get_data_dir(args: &ArgMatches, default_data_dir: PathBuf) -> Result { - if let Some(data_dir) = args.value_of("data_dir") { - Ok(PathBuf::from(data_dir)) - } else { - let path = dirs::home_dir() - .ok_or_else(|| "Unable to locate home directory")? - .join(&default_data_dir); - fs::create_dir_all(&path).map_err(|_| "Unable to create data_dir")?; - Ok(path) - } -} diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index fdb4c33c0..1972f870c 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -34,3 +34,4 @@ toml = "^0.5" error-chain = "0.12.0" bincode = "^1.1.2" futures = "0.1.25" +dirs = "2.0.1" diff --git a/validator_client/src/main.rs b/validator_client/src/main.rs index f74915438..e37b6530e 100644 --- a/validator_client/src/main.rs +++ b/validator_client/src/main.rs @@ -9,9 +9,10 @@ mod signer; use crate::config::Config as ValidatorClientConfig; use crate::service::Service as ValidatorService; use clap::{App, Arg}; -use eth2_config::{get_data_dir, read_from_file, write_to_file, Eth2Config}; +use eth2_config::{read_from_file, write_to_file, Eth2Config}; use protos::services_grpc::ValidatorServiceClient; use slog::{crit, error, info, o, Drain}; +use std::fs; use std::path::PathBuf; use types::{Keypair, MainnetEthSpec, MinimalEthSpec}; @@ -66,14 +67,34 @@ fn main() { ) .get_matches(); - let data_dir = match get_data_dir(&matches, PathBuf::from(DEFAULT_DATA_DIR)) { - Ok(dir) => dir, - Err(e) => { - crit!(log, "Failed to initialize data dir"; "error" => format!("{:?}", e)); - return; + let data_dir = match matches + .value_of("datadir") + .and_then(|v| Some(PathBuf::from(v))) + { + Some(v) => v, + None => { + // use the default + let mut default_dir = match dirs::home_dir() { + Some(v) => v, + None => { + crit!(log, "Failed to find a home directory"); + return; + } + }; + default_dir.push(DEFAULT_DATA_DIR); + PathBuf::from(default_dir) } }; + // create the directory if needed + match fs::create_dir_all(&data_dir) { + Ok(_) => {} + Err(e) => { + crit!(log, "Failed to initialize data dir"; "error" => format!("{}", e)); + return; + } + } + let client_config_path = data_dir.join(CLIENT_CONFIG_FILENAME); // Attempt to lead the `ClientConfig` from disk.