diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index e1fc69583..60e091801 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -81,6 +81,9 @@ pub struct Config { /// Attempt to construct external port mappings with UPnP. pub upnp_enabled: bool, + /// Subscribe to all subnets for the duration of the runtime. + pub subscribe_all_subnets: bool, + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, } @@ -88,7 +91,7 @@ pub struct Config { impl Default for Config { /// Generate a default network configuration. fn default() -> Self { - // WARNING: this directory default should be always overrided with parameters + // WARNING: this directory default should be always overwritten with parameters // from cli for specific networks. let network_dir = dirs::home_dir() .unwrap_or_else(|| PathBuf::from(".")) @@ -181,6 +184,7 @@ impl Default for Config { client_version: lighthouse_version::version_with_platform(), disable_discovery: false, upnp_enabled: true, + subscribe_all_subnets: false, topics: Vec::new(), } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 816ad4ea4..225b51fbe 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -11,7 +11,7 @@ use futures::Stream; use hashset_delay::HashSetDelay; use libp2p::core::multiaddr::Protocol as MProtocol; use libp2p::identify::IdentifyInfo; -use slog::{crit, debug, error}; +use slog::{crit, debug, error, warn}; use smallvec::SmallVec; use std::{ net::SocketAddr, @@ -40,7 +40,11 @@ use std::collections::HashMap; const STATUS_INTERVAL: u64 = 300; /// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us /// within this time frame (Seconds) -const PING_INTERVAL: u64 = 30; +/// This is asymmetric to avoid simultaneous pings. +/// The interval for outbound connections. +const PING_INTERVAL_OUTBOUND: u64 = 30; +/// The interval for inbound connections. +const PING_INTERVAL_INBOUND: u64 = 35; /// The heartbeat performs regular updates such as updating reputations and performing discovery /// requests. This defines the interval in seconds. @@ -61,8 +65,10 @@ pub struct PeerManager { network_globals: Arc>, /// A queue of events that the `PeerManager` is waiting to produce. events: SmallVec<[PeerManagerEvent; 16]>, - /// A collection of peers awaiting to be Ping'd. - ping_peers: HashSetDelay, + /// A collection of inbound-connected peers awaiting to be Ping'd. + inbound_ping_peers: HashSetDelay, + /// A collection of outbound-connected peers awaiting to be Ping'd. + outbound_ping_peers: HashSetDelay, /// A collection of peers awaiting to be Status'd. status_peers: HashSetDelay, /// The target number of peers we would like to connect to. @@ -112,7 +118,8 @@ impl PeerManager { Ok(PeerManager { network_globals, events: SmallVec::new(), - ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), + inbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_INBOUND)), + outbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_OUTBOUND)), status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), target_peers: config.target_peers, max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize, @@ -203,6 +210,11 @@ impl PeerManager { /// A request to find peers on a given subnet. pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { + // If discovery is not started or disabled, ignore the request + if !self.discovery.started { + return; + } + let filtered: Vec = subnets_to_discover .into_iter() .filter(|s| { @@ -263,7 +275,8 @@ impl PeerManager { .notify_disconnect(peer_id); // remove the ping and status timer for the peer - self.ping_peers.remove(peer_id); + self.inbound_ping_peers.remove(peer_id); + self.outbound_ping_peers.remove(peer_id); self.status_peers.remove(peer_id); } @@ -410,7 +423,17 @@ impl PeerManager { // received a ping // reset the to-ping timer for this peer debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); - self.ping_peers.insert(peer_id.clone()); + match peer_info.connection_direction { + Some(ConnectionDirection::Incoming) => { + self.inbound_ping_peers.insert(peer_id.clone()); + } + Some(ConnectionDirection::Outgoing) => { + self.outbound_ping_peers.insert(peer_id.clone()); + } + None => { + warn!(self.log, "Received a ping from a peer with an unknown connection direction"; "peer_id" => %peer_id); + } + } // if the sequence number is unknown send an update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data { @@ -656,16 +679,19 @@ impl PeerManager { return true; } ConnectingType::IngoingConnected { multiaddr } => { - peerdb.connect_outgoing(peer_id, multiaddr, enr) + peerdb.connect_ingoing(peer_id, multiaddr, enr); + // start a timer to ping inbound peers. + self.inbound_ping_peers.insert(peer_id.clone()); } ConnectingType::OutgoingConnected { multiaddr } => { - peerdb.connect_ingoing(peer_id, multiaddr, enr) + peerdb.connect_outgoing(peer_id, multiaddr, enr); + // start a timer for to ping outbound peers. + self.outbound_ping_peers.insert(peer_id.clone()); } } } // start a ping and status timer for the peer - self.ping_peers.insert(peer_id.clone()); self.status_peers.insert(peer_id.clone()); // increment prometheus metrics @@ -833,8 +859,10 @@ impl PeerManager { let peer_count = self.network_globals.connected_or_dialing_peers(); if peer_count < self.target_peers { // If we need more peers, queue a discovery lookup. - debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); - self.discovery.discover_peers(); + if self.discovery.started { + debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); + self.discovery.discover_peers(); + } } // Updates peer's scores. @@ -892,13 +920,26 @@ impl Stream for PeerManager { // poll the timeouts for pings and status' loop { - match self.ping_peers.poll_next_unpin(cx) { + match self.inbound_ping_peers.poll_next_unpin(cx) { Poll::Ready(Some(Ok(peer_id))) => { - self.ping_peers.insert(peer_id.clone()); + self.inbound_ping_peers.insert(peer_id.clone()); self.events.push(PeerManagerEvent::Ping(peer_id)); } Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) + error!(self.log, "Failed to check for inbound peers to ping"; "error" => e.to_string()) + } + Poll::Ready(None) | Poll::Pending => break, + } + } + + loop { + match self.outbound_ping_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.outbound_ping_peers.insert(peer_id.clone()); + self.events.push(PeerManagerEvent::Ping(peer_id)); + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for outbound peers to ping"; "error" => e.to_string()) } Poll::Ready(None) | Poll::Pending => break, } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 5d2eece4d..577e20509 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -462,7 +462,7 @@ impl PeerDB { }); // Ban the peer if the score is not already low enough. - match info.score().state() { + match info.score_state() { ScoreState::Banned => {} _ => { // If score isn't low enough to ban, this function has been called incorrectly. @@ -522,7 +522,7 @@ impl PeerDB { return Err("Unbanning peer that is not banned"); } - if let ScoreState::Banned = info.score().state() { + if let ScoreState::Banned = info.score_state() { return Err("Attempted to unban (connection status) a banned peer"); } diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index e8eb68d47..b41426d41 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -283,8 +283,8 @@ where let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) { info } else { - warn!(self.log, "Stream has expired. Response not sent"; - "response" => response.to_string(), "id" => inbound_id); + warn!(self.log, "Inbound stream has expired, response not sent"; + "response" => response.to_string(), "id" => inbound_id, "msg" => "Likely too many resources, reduce peer count"); return; }; diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index 477f76d8e..4492571ca 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -119,7 +119,7 @@ impl RPC { Duration::from_secs(10), ) .build() - .unwrap(); + .expect("Configuration parameters are valid"); RPC { limiter, events: Vec::new(), diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 2f638e0e3..26e844b1a 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -16,7 +16,7 @@ use libp2p::{ swarm::{SwarmBuilder, SwarmEvent}, PeerId, Swarm, Transport, }; -use slog::{crit, debug, info, o, trace, warn}; +use slog::{crit, debug, info, o, trace, warn, Logger}; use ssz::Decode; use std::fs::File; use std::io::prelude::*; @@ -53,7 +53,7 @@ pub struct Service { pub local_peer_id: PeerId, /// The libp2p logger handle. - pub log: slog::Logger, + pub log: Logger, } impl Service { @@ -61,7 +61,7 @@ impl Service { executor: task_executor::TaskExecutor, config: &NetworkConfig, enr_fork_id: EnrForkId, - log: &slog::Logger, + log: &Logger, chain_spec: &ChainSpec, ) -> error::Result<(Arc>, Self)> { let log = log.new(o!("service"=> "libp2p")); @@ -206,6 +206,7 @@ impl Service { } let mut subscribed_topics: Vec = vec![]; + for topic_kind in &config.topics { if swarm.subscribe_kind(topic_kind.clone()) { subscribed_topics.push(topic_kind.clone()); @@ -213,6 +214,7 @@ impl Service { warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); } } + if !subscribed_topics.is_empty() { info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); } diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index af2d1aeeb..645732fa0 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -13,7 +13,7 @@ use rand::seq::SliceRandom; use slog::{debug, error, o, trace, warn}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::SubnetDiscovery; +use eth2_libp2p::{NetworkConfig, SubnetDiscovery}; use hashset_delay::HashSetDelay; use slot_clock::SlotClock; use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription}; @@ -89,6 +89,12 @@ pub struct AttestationService { /// The waker for the current thread. waker: Option, + /// The discovery mechanism of lighthouse is disabled. + discovery_disabled: bool, + + /// We are always subscribed to all subnets. + subscribe_all_subnets: bool, + /// The logger for the attestation service. log: slog::Logger, } @@ -96,7 +102,11 @@ pub struct AttestationService { impl AttestationService { /* Public functions */ - pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { + pub fn new( + beacon_chain: Arc>, + config: &NetworkConfig, + log: &slog::Logger, + ) -> Self { let log = log.new(o!("service" => "attestation_service")); // calculate the random subnet duration from the spec constants @@ -124,6 +134,8 @@ impl AttestationService { aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), known_validators: HashSetDelay::new(last_seen_val_timeout), waker: None, + subscribe_all_subnets: config.subscribe_all_subnets, + discovery_disabled: config.disable_discovery, log, } } @@ -131,7 +143,11 @@ impl AttestationService { /// Return count of all currently subscribed subnets (long-lived **and** short-lived). #[cfg(test)] pub fn subscription_count(&self) -> usize { - self.subscriptions.len() + if self.subscribe_all_subnets { + self.beacon_chain.spec.attestation_subnet_count as usize + } else { + self.subscriptions.len() + } } /// Processes a list of validator subscriptions. @@ -186,7 +202,7 @@ impl AttestationService { if subscription.slot > *slot { subnets_to_discover.insert(subnet_id, subscription.slot); } - } else { + } else if !self.discovery_disabled { subnets_to_discover.insert(subnet_id, subscription.slot); } @@ -218,13 +234,17 @@ impl AttestationService { } } - if let Err(e) = self.discover_peers_request( - subnets_to_discover - .into_iter() - .map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }), - ) { - warn!(self.log, "Discovery lookup request error"; "error" => e); - }; + // If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the + // required subnets. + if !self.discovery_disabled { + if let Err(e) = self.discover_peers_request( + subnets_to_discover + .into_iter() + .map(|(subnet_id, slot)| ExactSubnet { subnet_id, slot }), + ) { + warn!(self.log, "Discovery lookup request error"; "error" => e); + }; + } // pre-emptively wake the thread to check for new events if let Some(waker) = &self.waker { @@ -343,7 +363,7 @@ impl AttestationService { // in-active. This case is checked on the subscription event (see `handle_subscriptions`). // Return if we already have a subscription for this subnet_id and slot - if self.unsubscriptions.contains(&exact_subnet) { + if self.unsubscriptions.contains(&exact_subnet) || self.subscribe_all_subnets { return Ok(()); } @@ -366,7 +386,7 @@ impl AttestationService { /// /// This also updates the ENR to indicate our long-lived subscription to the subnet fn add_known_validator(&mut self, validator_index: u64) { - if self.known_validators.get(&validator_index).is_none() { + if self.known_validators.get(&validator_index).is_none() && !self.subscribe_all_subnets { // New validator has subscribed // Subscribe to random topics and update the ENR if needed. diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 98eb6dc8d..469bbf747 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -92,10 +92,11 @@ mod tests { fn get_attestation_service() -> AttestationService { let log = get_logger(); + let config = NetworkConfig::default(); let beacon_chain = CHAIN.chain.clone(); - AttestationService::new(beacon_chain, &log) + AttestationService::new(beacon_chain, &config, &log) } fn get_subscription( diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index cd7ba2cbe..4973a1a7b 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -20,7 +20,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use store::HotColdDB; use tokio::sync::mpsc; use tokio::time::Delay; -use types::{EthSpec, RelativeEpoch, ValidatorSubscription}; +use types::{EthSpec, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription}; mod tests; @@ -110,6 +110,8 @@ pub struct NetworkService { discovery_auto_update: bool, /// A delay that expires when a new fork takes place. next_fork_update: Option, + /// Subscribe to all the subnets once synced. + subscribe_all_subnets: bool, /// A timer for updating various network metrics. metrics_update: tokio::time::Interval, /// gossipsub_parameter_update timer @@ -186,7 +188,8 @@ impl NetworkService { )?; // attestation service - let attestation_service = AttestationService::new(beacon_chain.clone(), &network_log); + let attestation_service = + AttestationService::new(beacon_chain.clone(), &config, &network_log); // create a timer for updating network metrics let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL)); @@ -207,6 +210,7 @@ impl NetworkService { upnp_mappings: (None, None), discovery_auto_update: config.discv5_config.enr_update, next_fork_update, + subscribe_all_subnets: config.subscribe_all_subnets, metrics_update, gossipsub_parameter_update, log: network_log, @@ -397,6 +401,22 @@ fn spawn_service( warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); } } + + // if we are to subscribe to all subnets we do it here + if service.subscribe_all_subnets { + for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { + let subnet_id = SubnetId::new(subnet_id); + let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id); + if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) { + // Update the ENR bitfield. + service.libp2p.swarm.update_enr_subnet(subnet_id, true); + subscribed_topics.push(topic_kind.clone()); + } else { + warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind)); + } + } + } + if !subscribed_topics.is_empty() { info!(service.log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index d62849a55..45210950e 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -30,6 +30,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { /* * Network parameters. */ + .arg( + Arg::with_name("subscribe-all-subnets") + .long("subscribe-all-subnets") + .help("Subscribe to all subnets regardless of validator count. \ + This will also advertise the beacon node as being long-lived subscribed to all subnets.") + .takes_value(false), + ) .arg( Arg::with_name("zero-ports") .long("zero-ports") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 5711395e8..9d8d66874 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -354,6 +354,10 @@ pub fn set_network_config( config.network_dir = data_dir.join(DEFAULT_NETWORK_DIR); }; + if cli_args.is_present("subscribe-all-subnets") { + config.subscribe_all_subnets = true; + } + if let Some(listen_address_str) = cli_args.value_of("listen-address") { let listen_address = listen_address_str .parse()