Small network adjustments (#1884)
## Issue Addressed - Asymmetric pings - Currently with symmetric ping intervals, lighthouse nodes race each other to ping often ending in simultaneous ping connections. This shifts the ping interval to be asymmetric based on inbound/outbound connections - Correct inbound/outbound peer-db registering - It appears we were accounting inbound as outbound and vice versa in the peerdb, this has been corrected - Improved logging There is likely more to come - I'll leave this open as we investigate further testnets
This commit is contained in:
parent
8772c02fa0
commit
c00e6c2c6f
@ -81,6 +81,9 @@ pub struct Config {
|
|||||||
/// Attempt to construct external port mappings with UPnP.
|
/// Attempt to construct external port mappings with UPnP.
|
||||||
pub upnp_enabled: bool,
|
pub upnp_enabled: bool,
|
||||||
|
|
||||||
|
/// Subscribe to all subnets for the duration of the runtime.
|
||||||
|
pub subscribe_all_subnets: bool,
|
||||||
|
|
||||||
/// List of extra topics to initially subscribe to as strings.
|
/// List of extra topics to initially subscribe to as strings.
|
||||||
pub topics: Vec<GossipKind>,
|
pub topics: Vec<GossipKind>,
|
||||||
}
|
}
|
||||||
@ -88,7 +91,7 @@ pub struct Config {
|
|||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
/// Generate a default network configuration.
|
/// Generate a default network configuration.
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
// WARNING: this directory default should be always overrided with parameters
|
// WARNING: this directory default should be always overwritten with parameters
|
||||||
// from cli for specific networks.
|
// from cli for specific networks.
|
||||||
let network_dir = dirs::home_dir()
|
let network_dir = dirs::home_dir()
|
||||||
.unwrap_or_else(|| PathBuf::from("."))
|
.unwrap_or_else(|| PathBuf::from("."))
|
||||||
@ -181,6 +184,7 @@ impl Default for Config {
|
|||||||
client_version: lighthouse_version::version_with_platform(),
|
client_version: lighthouse_version::version_with_platform(),
|
||||||
disable_discovery: false,
|
disable_discovery: false,
|
||||||
upnp_enabled: true,
|
upnp_enabled: true,
|
||||||
|
subscribe_all_subnets: false,
|
||||||
topics: Vec::new(),
|
topics: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,7 @@ use futures::Stream;
|
|||||||
use hashset_delay::HashSetDelay;
|
use hashset_delay::HashSetDelay;
|
||||||
use libp2p::core::multiaddr::Protocol as MProtocol;
|
use libp2p::core::multiaddr::Protocol as MProtocol;
|
||||||
use libp2p::identify::IdentifyInfo;
|
use libp2p::identify::IdentifyInfo;
|
||||||
use slog::{crit, debug, error};
|
use slog::{crit, debug, error, warn};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{
|
use std::{
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
@ -40,7 +40,11 @@ use std::collections::HashMap;
|
|||||||
const STATUS_INTERVAL: u64 = 300;
|
const STATUS_INTERVAL: u64 = 300;
|
||||||
/// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us
|
/// The time in seconds between PING events. We do not send a ping if the other peer has PING'd us
|
||||||
/// within this time frame (Seconds)
|
/// within this time frame (Seconds)
|
||||||
const PING_INTERVAL: u64 = 30;
|
/// This is asymmetric to avoid simultaneous pings.
|
||||||
|
/// The interval for outbound connections.
|
||||||
|
const PING_INTERVAL_OUTBOUND: u64 = 30;
|
||||||
|
/// The interval for inbound connections.
|
||||||
|
const PING_INTERVAL_INBOUND: u64 = 35;
|
||||||
|
|
||||||
/// The heartbeat performs regular updates such as updating reputations and performing discovery
|
/// The heartbeat performs regular updates such as updating reputations and performing discovery
|
||||||
/// requests. This defines the interval in seconds.
|
/// requests. This defines the interval in seconds.
|
||||||
@ -61,8 +65,10 @@ pub struct PeerManager<TSpec: EthSpec> {
|
|||||||
network_globals: Arc<NetworkGlobals<TSpec>>,
|
network_globals: Arc<NetworkGlobals<TSpec>>,
|
||||||
/// A queue of events that the `PeerManager` is waiting to produce.
|
/// A queue of events that the `PeerManager` is waiting to produce.
|
||||||
events: SmallVec<[PeerManagerEvent; 16]>,
|
events: SmallVec<[PeerManagerEvent; 16]>,
|
||||||
/// A collection of peers awaiting to be Ping'd.
|
/// A collection of inbound-connected peers awaiting to be Ping'd.
|
||||||
ping_peers: HashSetDelay<PeerId>,
|
inbound_ping_peers: HashSetDelay<PeerId>,
|
||||||
|
/// A collection of outbound-connected peers awaiting to be Ping'd.
|
||||||
|
outbound_ping_peers: HashSetDelay<PeerId>,
|
||||||
/// A collection of peers awaiting to be Status'd.
|
/// A collection of peers awaiting to be Status'd.
|
||||||
status_peers: HashSetDelay<PeerId>,
|
status_peers: HashSetDelay<PeerId>,
|
||||||
/// The target number of peers we would like to connect to.
|
/// The target number of peers we would like to connect to.
|
||||||
@ -112,7 +118,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
Ok(PeerManager {
|
Ok(PeerManager {
|
||||||
network_globals,
|
network_globals,
|
||||||
events: SmallVec::new(),
|
events: SmallVec::new(),
|
||||||
ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)),
|
inbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_INBOUND)),
|
||||||
|
outbound_ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL_OUTBOUND)),
|
||||||
status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)),
|
status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)),
|
||||||
target_peers: config.target_peers,
|
target_peers: config.target_peers,
|
||||||
max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize,
|
max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize,
|
||||||
@ -203,6 +210,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
|
|
||||||
/// A request to find peers on a given subnet.
|
/// A request to find peers on a given subnet.
|
||||||
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
|
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
|
||||||
|
// If discovery is not started or disabled, ignore the request
|
||||||
|
if !self.discovery.started {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let filtered: Vec<SubnetDiscovery> = subnets_to_discover
|
let filtered: Vec<SubnetDiscovery> = subnets_to_discover
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|s| {
|
.filter(|s| {
|
||||||
@ -263,7 +275,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
.notify_disconnect(peer_id);
|
.notify_disconnect(peer_id);
|
||||||
|
|
||||||
// remove the ping and status timer for the peer
|
// remove the ping and status timer for the peer
|
||||||
self.ping_peers.remove(peer_id);
|
self.inbound_ping_peers.remove(peer_id);
|
||||||
|
self.outbound_ping_peers.remove(peer_id);
|
||||||
self.status_peers.remove(peer_id);
|
self.status_peers.remove(peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -410,7 +423,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
// received a ping
|
// received a ping
|
||||||
// reset the to-ping timer for this peer
|
// reset the to-ping timer for this peer
|
||||||
debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq);
|
debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq);
|
||||||
self.ping_peers.insert(peer_id.clone());
|
match peer_info.connection_direction {
|
||||||
|
Some(ConnectionDirection::Incoming) => {
|
||||||
|
self.inbound_ping_peers.insert(peer_id.clone());
|
||||||
|
}
|
||||||
|
Some(ConnectionDirection::Outgoing) => {
|
||||||
|
self.outbound_ping_peers.insert(peer_id.clone());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
warn!(self.log, "Received a ping from a peer with an unknown connection direction"; "peer_id" => %peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if the sequence number is unknown send an update the meta data of the peer.
|
// if the sequence number is unknown send an update the meta data of the peer.
|
||||||
if let Some(meta_data) = &peer_info.meta_data {
|
if let Some(meta_data) = &peer_info.meta_data {
|
||||||
@ -656,16 +679,19 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ConnectingType::IngoingConnected { multiaddr } => {
|
ConnectingType::IngoingConnected { multiaddr } => {
|
||||||
peerdb.connect_outgoing(peer_id, multiaddr, enr)
|
peerdb.connect_ingoing(peer_id, multiaddr, enr);
|
||||||
|
// start a timer to ping inbound peers.
|
||||||
|
self.inbound_ping_peers.insert(peer_id.clone());
|
||||||
}
|
}
|
||||||
ConnectingType::OutgoingConnected { multiaddr } => {
|
ConnectingType::OutgoingConnected { multiaddr } => {
|
||||||
peerdb.connect_ingoing(peer_id, multiaddr, enr)
|
peerdb.connect_outgoing(peer_id, multiaddr, enr);
|
||||||
|
// start a timer for to ping outbound peers.
|
||||||
|
self.outbound_ping_peers.insert(peer_id.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// start a ping and status timer for the peer
|
// start a ping and status timer for the peer
|
||||||
self.ping_peers.insert(peer_id.clone());
|
|
||||||
self.status_peers.insert(peer_id.clone());
|
self.status_peers.insert(peer_id.clone());
|
||||||
|
|
||||||
// increment prometheus metrics
|
// increment prometheus metrics
|
||||||
@ -833,9 +859,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
|||||||
let peer_count = self.network_globals.connected_or_dialing_peers();
|
let peer_count = self.network_globals.connected_or_dialing_peers();
|
||||||
if peer_count < self.target_peers {
|
if peer_count < self.target_peers {
|
||||||
// If we need more peers, queue a discovery lookup.
|
// If we need more peers, queue a discovery lookup.
|
||||||
|
if self.discovery.started {
|
||||||
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
|
debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers);
|
||||||
self.discovery.discover_peers();
|
self.discovery.discover_peers();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Updates peer's scores.
|
// Updates peer's scores.
|
||||||
self.update_peer_scores();
|
self.update_peer_scores();
|
||||||
@ -892,13 +920,26 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
|
|||||||
|
|
||||||
// poll the timeouts for pings and status'
|
// poll the timeouts for pings and status'
|
||||||
loop {
|
loop {
|
||||||
match self.ping_peers.poll_next_unpin(cx) {
|
match self.inbound_ping_peers.poll_next_unpin(cx) {
|
||||||
Poll::Ready(Some(Ok(peer_id))) => {
|
Poll::Ready(Some(Ok(peer_id))) => {
|
||||||
self.ping_peers.insert(peer_id.clone());
|
self.inbound_ping_peers.insert(peer_id.clone());
|
||||||
self.events.push(PeerManagerEvent::Ping(peer_id));
|
self.events.push(PeerManagerEvent::Ping(peer_id));
|
||||||
}
|
}
|
||||||
Poll::Ready(Some(Err(e))) => {
|
Poll::Ready(Some(Err(e))) => {
|
||||||
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
|
error!(self.log, "Failed to check for inbound peers to ping"; "error" => e.to_string())
|
||||||
|
}
|
||||||
|
Poll::Ready(None) | Poll::Pending => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.outbound_ping_peers.poll_next_unpin(cx) {
|
||||||
|
Poll::Ready(Some(Ok(peer_id))) => {
|
||||||
|
self.outbound_ping_peers.insert(peer_id.clone());
|
||||||
|
self.events.push(PeerManagerEvent::Ping(peer_id));
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(Err(e))) => {
|
||||||
|
error!(self.log, "Failed to check for outbound peers to ping"; "error" => e.to_string())
|
||||||
}
|
}
|
||||||
Poll::Ready(None) | Poll::Pending => break,
|
Poll::Ready(None) | Poll::Pending => break,
|
||||||
}
|
}
|
||||||
|
@ -462,7 +462,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Ban the peer if the score is not already low enough.
|
// Ban the peer if the score is not already low enough.
|
||||||
match info.score().state() {
|
match info.score_state() {
|
||||||
ScoreState::Banned => {}
|
ScoreState::Banned => {}
|
||||||
_ => {
|
_ => {
|
||||||
// If score isn't low enough to ban, this function has been called incorrectly.
|
// If score isn't low enough to ban, this function has been called incorrectly.
|
||||||
@ -522,7 +522,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
|
|||||||
return Err("Unbanning peer that is not banned");
|
return Err("Unbanning peer that is not banned");
|
||||||
}
|
}
|
||||||
|
|
||||||
if let ScoreState::Banned = info.score().state() {
|
if let ScoreState::Banned = info.score_state() {
|
||||||
return Err("Attempted to unban (connection status) a banned peer");
|
return Err("Attempted to unban (connection status) a banned peer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,8 +283,8 @@ where
|
|||||||
let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) {
|
let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) {
|
||||||
info
|
info
|
||||||
} else {
|
} else {
|
||||||
warn!(self.log, "Stream has expired. Response not sent";
|
warn!(self.log, "Inbound stream has expired, response not sent";
|
||||||
"response" => response.to_string(), "id" => inbound_id);
|
"response" => response.to_string(), "id" => inbound_id, "msg" => "Likely too many resources, reduce peer count");
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -119,7 +119,7 @@ impl<TSpec: EthSpec> RPC<TSpec> {
|
|||||||
Duration::from_secs(10),
|
Duration::from_secs(10),
|
||||||
)
|
)
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.expect("Configuration parameters are valid");
|
||||||
RPC {
|
RPC {
|
||||||
limiter,
|
limiter,
|
||||||
events: Vec::new(),
|
events: Vec::new(),
|
||||||
|
@ -16,7 +16,7 @@ use libp2p::{
|
|||||||
swarm::{SwarmBuilder, SwarmEvent},
|
swarm::{SwarmBuilder, SwarmEvent},
|
||||||
PeerId, Swarm, Transport,
|
PeerId, Swarm, Transport,
|
||||||
};
|
};
|
||||||
use slog::{crit, debug, info, o, trace, warn};
|
use slog::{crit, debug, info, o, trace, warn, Logger};
|
||||||
use ssz::Decode;
|
use ssz::Decode;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
@ -53,7 +53,7 @@ pub struct Service<TSpec: EthSpec> {
|
|||||||
pub local_peer_id: PeerId,
|
pub local_peer_id: PeerId,
|
||||||
|
|
||||||
/// The libp2p logger handle.
|
/// The libp2p logger handle.
|
||||||
pub log: slog::Logger,
|
pub log: Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSpec: EthSpec> Service<TSpec> {
|
impl<TSpec: EthSpec> Service<TSpec> {
|
||||||
@ -61,7 +61,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
executor: task_executor::TaskExecutor,
|
executor: task_executor::TaskExecutor,
|
||||||
config: &NetworkConfig,
|
config: &NetworkConfig,
|
||||||
enr_fork_id: EnrForkId,
|
enr_fork_id: EnrForkId,
|
||||||
log: &slog::Logger,
|
log: &Logger,
|
||||||
chain_spec: &ChainSpec,
|
chain_spec: &ChainSpec,
|
||||||
) -> error::Result<(Arc<NetworkGlobals<TSpec>>, Self)> {
|
) -> error::Result<(Arc<NetworkGlobals<TSpec>>, Self)> {
|
||||||
let log = log.new(o!("service"=> "libp2p"));
|
let log = log.new(o!("service"=> "libp2p"));
|
||||||
@ -206,6 +206,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut subscribed_topics: Vec<GossipKind> = vec![];
|
let mut subscribed_topics: Vec<GossipKind> = vec![];
|
||||||
|
|
||||||
for topic_kind in &config.topics {
|
for topic_kind in &config.topics {
|
||||||
if swarm.subscribe_kind(topic_kind.clone()) {
|
if swarm.subscribe_kind(topic_kind.clone()) {
|
||||||
subscribed_topics.push(topic_kind.clone());
|
subscribed_topics.push(topic_kind.clone());
|
||||||
@ -213,6 +214,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
|
|||||||
warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
|
warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !subscribed_topics.is_empty() {
|
if !subscribed_topics.is_empty() {
|
||||||
info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics));
|
info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics));
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,7 @@ use rand::seq::SliceRandom;
|
|||||||
use slog::{debug, error, o, trace, warn};
|
use slog::{debug, error, o, trace, warn};
|
||||||
|
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||||
use eth2_libp2p::SubnetDiscovery;
|
use eth2_libp2p::{NetworkConfig, SubnetDiscovery};
|
||||||
use hashset_delay::HashSetDelay;
|
use hashset_delay::HashSetDelay;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
|
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
|
||||||
@ -89,6 +89,12 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
|||||||
/// The waker for the current thread.
|
/// The waker for the current thread.
|
||||||
waker: Option<std::task::Waker>,
|
waker: Option<std::task::Waker>,
|
||||||
|
|
||||||
|
/// The discovery mechanism of lighthouse is disabled.
|
||||||
|
discovery_disabled: bool,
|
||||||
|
|
||||||
|
/// We are always subscribed to all subnets.
|
||||||
|
subscribe_all_subnets: bool,
|
||||||
|
|
||||||
/// The logger for the attestation service.
|
/// The logger for the attestation service.
|
||||||
log: slog::Logger,
|
log: slog::Logger,
|
||||||
}
|
}
|
||||||
@ -96,7 +102,11 @@ pub struct AttestationService<T: BeaconChainTypes> {
|
|||||||
impl<T: BeaconChainTypes> AttestationService<T> {
|
impl<T: BeaconChainTypes> AttestationService<T> {
|
||||||
/* Public functions */
|
/* Public functions */
|
||||||
|
|
||||||
pub fn new(beacon_chain: Arc<BeaconChain<T>>, log: &slog::Logger) -> Self {
|
pub fn new(
|
||||||
|
beacon_chain: Arc<BeaconChain<T>>,
|
||||||
|
config: &NetworkConfig,
|
||||||
|
log: &slog::Logger,
|
||||||
|
) -> Self {
|
||||||
let log = log.new(o!("service" => "attestation_service"));
|
let log = log.new(o!("service" => "attestation_service"));
|
||||||
|
|
||||||
// calculate the random subnet duration from the spec constants
|
// calculate the random subnet duration from the spec constants
|
||||||
@ -124,6 +134,8 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
|
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
|
||||||
known_validators: HashSetDelay::new(last_seen_val_timeout),
|
known_validators: HashSetDelay::new(last_seen_val_timeout),
|
||||||
waker: None,
|
waker: None,
|
||||||
|
subscribe_all_subnets: config.subscribe_all_subnets,
|
||||||
|
discovery_disabled: config.disable_discovery,
|
||||||
log,
|
log,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,8 +143,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
|
/// Return count of all currently subscribed subnets (long-lived **and** short-lived).
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn subscription_count(&self) -> usize {
|
pub fn subscription_count(&self) -> usize {
|
||||||
|
if self.subscribe_all_subnets {
|
||||||
|
self.beacon_chain.spec.attestation_subnet_count as usize
|
||||||
|
} else {
|
||||||
self.subscriptions.len()
|
self.subscriptions.len()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Processes a list of validator subscriptions.
|
/// Processes a list of validator subscriptions.
|
||||||
///
|
///
|
||||||
@ -186,7 +202,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
if subscription.slot > *slot {
|
if subscription.slot > *slot {
|
||||||
subnets_to_discover.insert(subnet_id, subscription.slot);
|
subnets_to_discover.insert(subnet_id, subscription.slot);
|
||||||
}
|
}
|
||||||
} else {
|
} else if !self.discovery_disabled {
|
||||||
subnets_to_discover.insert(subnet_id, subscription.slot);
|
subnets_to_discover.insert(subnet_id, subscription.slot);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,6 +234,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
|
||||||
|
// required subnets.
|
||||||
|
if !self.discovery_disabled {
|
||||||
if let Err(e) = self.discover_peers_request(
|
if let Err(e) = self.discover_peers_request(
|
||||||
subnets_to_discover
|
subnets_to_discover
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -225,6 +244,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
) {
|
) {
|
||||||
warn!(self.log, "Discovery lookup request error"; "error" => e);
|
warn!(self.log, "Discovery lookup request error"; "error" => e);
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// pre-emptively wake the thread to check for new events
|
// pre-emptively wake the thread to check for new events
|
||||||
if let Some(waker) = &self.waker {
|
if let Some(waker) = &self.waker {
|
||||||
@ -343,7 +363,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).
|
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).
|
||||||
|
|
||||||
// Return if we already have a subscription for this subnet_id and slot
|
// Return if we already have a subscription for this subnet_id and slot
|
||||||
if self.unsubscriptions.contains(&exact_subnet) {
|
if self.unsubscriptions.contains(&exact_subnet) || self.subscribe_all_subnets {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -366,7 +386,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
|
|||||||
///
|
///
|
||||||
/// This also updates the ENR to indicate our long-lived subscription to the subnet
|
/// This also updates the ENR to indicate our long-lived subscription to the subnet
|
||||||
fn add_known_validator(&mut self, validator_index: u64) {
|
fn add_known_validator(&mut self, validator_index: u64) {
|
||||||
if self.known_validators.get(&validator_index).is_none() {
|
if self.known_validators.get(&validator_index).is_none() && !self.subscribe_all_subnets {
|
||||||
// New validator has subscribed
|
// New validator has subscribed
|
||||||
// Subscribe to random topics and update the ENR if needed.
|
// Subscribe to random topics and update the ENR if needed.
|
||||||
|
|
||||||
|
@ -92,10 +92,11 @@ mod tests {
|
|||||||
|
|
||||||
fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
|
fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
|
||||||
let log = get_logger();
|
let log = get_logger();
|
||||||
|
let config = NetworkConfig::default();
|
||||||
|
|
||||||
let beacon_chain = CHAIN.chain.clone();
|
let beacon_chain = CHAIN.chain.clone();
|
||||||
|
|
||||||
AttestationService::new(beacon_chain, &log)
|
AttestationService::new(beacon_chain, &config, &log)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_subscription(
|
fn get_subscription(
|
||||||
|
@ -20,7 +20,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
|
|||||||
use store::HotColdDB;
|
use store::HotColdDB;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::time::Delay;
|
use tokio::time::Delay;
|
||||||
use types::{EthSpec, RelativeEpoch, ValidatorSubscription};
|
use types::{EthSpec, RelativeEpoch, SubnetId, Unsigned, ValidatorSubscription};
|
||||||
|
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
@ -110,6 +110,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
|
|||||||
discovery_auto_update: bool,
|
discovery_auto_update: bool,
|
||||||
/// A delay that expires when a new fork takes place.
|
/// A delay that expires when a new fork takes place.
|
||||||
next_fork_update: Option<Delay>,
|
next_fork_update: Option<Delay>,
|
||||||
|
/// Subscribe to all the subnets once synced.
|
||||||
|
subscribe_all_subnets: bool,
|
||||||
/// A timer for updating various network metrics.
|
/// A timer for updating various network metrics.
|
||||||
metrics_update: tokio::time::Interval,
|
metrics_update: tokio::time::Interval,
|
||||||
/// gossipsub_parameter_update timer
|
/// gossipsub_parameter_update timer
|
||||||
@ -186,7 +188,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
// attestation service
|
// attestation service
|
||||||
let attestation_service = AttestationService::new(beacon_chain.clone(), &network_log);
|
let attestation_service =
|
||||||
|
AttestationService::new(beacon_chain.clone(), &config, &network_log);
|
||||||
|
|
||||||
// create a timer for updating network metrics
|
// create a timer for updating network metrics
|
||||||
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
|
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
|
||||||
@ -207,6 +210,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
|||||||
upnp_mappings: (None, None),
|
upnp_mappings: (None, None),
|
||||||
discovery_auto_update: config.discv5_config.enr_update,
|
discovery_auto_update: config.discv5_config.enr_update,
|
||||||
next_fork_update,
|
next_fork_update,
|
||||||
|
subscribe_all_subnets: config.subscribe_all_subnets,
|
||||||
metrics_update,
|
metrics_update,
|
||||||
gossipsub_parameter_update,
|
gossipsub_parameter_update,
|
||||||
log: network_log,
|
log: network_log,
|
||||||
@ -397,6 +401,22 @@ fn spawn_service<T: BeaconChainTypes>(
|
|||||||
warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
|
warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we are to subscribe to all subnets we do it here
|
||||||
|
if service.subscribe_all_subnets {
|
||||||
|
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
|
||||||
|
let subnet_id = SubnetId::new(subnet_id);
|
||||||
|
let topic_kind = eth2_libp2p::types::GossipKind::Attestation(subnet_id);
|
||||||
|
if service.libp2p.swarm.subscribe_kind(topic_kind.clone()) {
|
||||||
|
// Update the ENR bitfield.
|
||||||
|
service.libp2p.swarm.update_enr_subnet(subnet_id, true);
|
||||||
|
subscribed_topics.push(topic_kind.clone());
|
||||||
|
} else {
|
||||||
|
warn!(service.log, "Could not subscribe to topic"; "topic" => format!("{}",topic_kind));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !subscribed_topics.is_empty() {
|
if !subscribed_topics.is_empty() {
|
||||||
info!(service.log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics));
|
info!(service.log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics));
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
/*
|
/*
|
||||||
* Network parameters.
|
* Network parameters.
|
||||||
*/
|
*/
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("subscribe-all-subnets")
|
||||||
|
.long("subscribe-all-subnets")
|
||||||
|
.help("Subscribe to all subnets regardless of validator count. \
|
||||||
|
This will also advertise the beacon node as being long-lived subscribed to all subnets.")
|
||||||
|
.takes_value(false),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("zero-ports")
|
Arg::with_name("zero-ports")
|
||||||
.long("zero-ports")
|
.long("zero-ports")
|
||||||
|
@ -354,6 +354,10 @@ pub fn set_network_config(
|
|||||||
config.network_dir = data_dir.join(DEFAULT_NETWORK_DIR);
|
config.network_dir = data_dir.join(DEFAULT_NETWORK_DIR);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if cli_args.is_present("subscribe-all-subnets") {
|
||||||
|
config.subscribe_all_subnets = true;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(listen_address_str) = cli_args.value_of("listen-address") {
|
if let Some(listen_address_str) = cli_args.value_of("listen-address") {
|
||||||
let listen_address = listen_address_str
|
let listen_address = listen_address_str
|
||||||
.parse()
|
.parse()
|
||||||
|
Loading…
Reference in New Issue
Block a user