diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 64d9b9e84..25f051ac1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1646,7 +1646,7 @@ pub fn serve( warp_utils::reject::custom_bad_request("invalid peer id.".to_string()) })?; - if let Some(peer_info) = network_globals.peers().peer_info(&peer_id) { + if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { let address = if let Some(socket_addr) = peer_info.seen_addresses().next() { let mut addr = lighthouse_network::Multiaddr::from(socket_addr.ip()); addr.push(lighthouse_network::multiaddr::Protocol::Tcp( @@ -1691,7 +1691,8 @@ pub fn serve( blocking_json_task(move || { let mut peers: Vec = Vec::new(); network_globals - .peers() + .peers + .read() .peers() .for_each(|(peer_id, peer_info)| { let address = @@ -1758,17 +1759,21 @@ pub fn serve( let mut disconnected: u64 = 0; let mut disconnecting: u64 = 0; - network_globals.peers().peers().for_each(|(_, peer_info)| { - let state = api_types::PeerState::from_peer_connection_status( - peer_info.connection_status(), - ); - match state { - api_types::PeerState::Connected => connected += 1, - api_types::PeerState::Connecting => connecting += 1, - api_types::PeerState::Disconnected => disconnected += 1, - api_types::PeerState::Disconnecting => disconnecting += 1, - } - }); + network_globals + .peers + .read() + .peers() + .for_each(|(_, peer_info)| { + let state = api_types::PeerState::from_peer_connection_status( + peer_info.connection_status(), + ); + match state { + api_types::PeerState::Connected => connected += 1, + api_types::PeerState::Connecting => connecting += 1, + api_types::PeerState::Disconnected => disconnected += 1, + api_types::PeerState::Disconnecting => disconnecting += 1, + } + }); Ok(api_types::GenericResponse::from(api_types::PeerCount { connected, @@ -2238,7 +2243,8 @@ pub fn serve( .and_then(|network_globals: Arc>| { blocking_json_task(move || { Ok(network_globals - .peers() + .peers + .read() .peers() .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { peer_id: peer_id.to_string(), @@ -2257,7 +2263,8 @@ pub fn serve( .and_then(|network_globals: Arc>| { blocking_json_task(move || { Ok(network_globals - .peers() + .peers + .read() .connected_peers() .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { peer_id: peer_id.to_string(), diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index 1276db5e7..51699d236 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -14,9 +14,7 @@ use crate::types::{ SubnetDiscovery, }; use crate::Eth2Enr; -use crate::{ - error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, SyncStatus, TopicHash, -}; +use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use libp2p::{ core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr, @@ -34,7 +32,7 @@ use libp2p::{ }, NetworkBehaviour, PeerId, }; -use slog::{crit, debug, error, o, trace, warn}; +use slog::{crit, debug, o, trace, warn}; use ssz::Encode; use std::collections::HashSet; use std::fs::File; @@ -457,7 +455,8 @@ impl Behaviour { } { if let Some(client) = self .network_globals - .peers() + .peers + .read() .peer_info(propagation_source) .map(|info| info.client().kind.as_ref()) { @@ -569,25 +568,6 @@ impl Behaviour { self.discovery.add_enr(enr); } - pub fn update_peers_sync_status(&mut self, peer_id: &PeerId, sync_status: SyncStatus) { - let status_repr = sync_status.as_str(); - match self - .network_globals - .peers_mut() - .update_sync_status(peer_id, sync_status) - { - Some(true) => { - trace!(self.log, "Peer sync status updated"; "peer_id" => %peer_id, "sync_status" => status_repr); - } - Some(false) => { - // Sync status is the same for known peer - } - None => { - error!(self.log, "Sync status update notification for unknown peer"; "peer_id" => %peer_id, "sync_status" => status_repr); - } - } - } - /// Updates a subnet value to the ENR attnets/syncnets bitfield. /// /// The `value` is `true` if a subnet is being added and false otherwise. @@ -613,7 +593,8 @@ impl Behaviour { // Extend min_ttl of connected peers on required subnets if let Some(min_ttl) = s.min_ttl { self.network_globals - .peers_mut() + .peers + .write() .extend_peers_on_subnet(&s.subnet, min_ttl); if let Subnet::SyncCommittee(sync_subnet) = s.subnet { self.peer_manager_mut() @@ -623,7 +604,8 @@ impl Behaviour { // Already have target number of peers, no need for subnet discovery let peers_on_subnet = self .network_globals - .peers() + .peers + .read() .good_peers_on_subnet(s.subnet) .count(); if peers_on_subnet >= TARGET_SUBNET_PEERS { @@ -773,7 +755,7 @@ impl Behaviour { .discovery .cached_enrs() .filter_map(|(peer_id, enr)| { - let peers = self.network_globals.peers(); + let peers = self.network_globals.peers.read(); if predicate(enr) && peers.should_dial(peer_id) { Some(*peer_id) } else { @@ -866,14 +848,16 @@ impl NetworkBehaviourEventProcess for Behaviour< GossipsubEvent::Subscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { self.network_globals - .peers_mut() + .peers + .write() .add_subscription(&peer_id, subnet_id); } } GossipsubEvent::Unsubscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { self.network_globals - .peers_mut() + .peers + .write() .remove_subscription(&peer_id, &subnet_id); } } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index eeff19942..68e085683 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -679,7 +679,8 @@ impl Discovery { // Determine if we have sufficient peers, which may make this discovery unnecessary. let peers_on_subnet = self .network_globals - .peers() + .peers + .read() .good_peers_on_subnet(subnet_query.subnet) .count(); diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index cfad40aa8..decc1ccd1 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -143,7 +143,7 @@ impl PeerManager { /// This will send a goodbye and disconnect the peer if it is connected or dialing. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { // Update the sync status if required - if let Some(info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { + if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); if matches!(reason, GoodbyeReason::IrrelevantNetwork) { info.update_sync_status(SyncStatus::IrrelevantPeer); @@ -165,7 +165,8 @@ impl PeerManager { ) { let action = self .network_globals - .peers_mut() + .peers + .write() .report_peer(peer_id, action, source); self.handle_score_action(peer_id, action, reason); } @@ -263,13 +264,14 @@ impl PeerManager { if (min_ttl.is_some() && connected_or_dialing + to_dial_peers.len() < self.max_priority_peers() || connected_or_dialing + to_dial_peers.len() < self.max_peers()) - && self.network_globals.peers().should_dial(&peer_id) + && self.network_globals.peers.read().should_dial(&peer_id) { // This should be updated with the peer dialing. In fact created once the peer is // dialed if let Some(min_ttl) = min_ttl { self.network_globals - .peers_mut() + .peers + .write() .update_min_ttl(&peer_id, min_ttl); } to_dial_peers.push(peer_id); @@ -339,11 +341,11 @@ impl PeerManager { /// /// This is used to determine if we should accept incoming connections. pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { - self.network_globals.peers().ban_status(peer_id) + self.network_globals.peers.read().ban_status(peer_id) } pub fn is_connected(&self, peer_id: &PeerId) -> bool { - self.network_globals.peers().is_connected(peer_id) + self.network_globals.peers.read().is_connected(peer_id) } /// Reports whether the peer limit is reached in which case we stop allowing new incoming @@ -354,7 +356,7 @@ impl PeerManager { /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { - if let Some(peer_info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { let previous_kind = peer_info.client().kind.clone(); let previous_listening_addresses = peer_info.set_listening_addresses(info.listen_addrs.clone()); @@ -401,7 +403,7 @@ impl PeerManager { direction: ConnectionDirection, ) { let client = self.network_globals.client(peer_id); - let score = self.network_globals.peers().score(peer_id); + let score = self.network_globals.peers.read().score(peer_id); debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client, "peer_id" => %peer_id, "score" => %score, "direction" => ?direction); metrics::inc_counter_vec( @@ -503,7 +505,7 @@ impl PeerManager { /// A ping request has been received. // NOTE: The behaviour responds with a PONG automatically pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers().peer_info(peer_id) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { // received a ping // reset the to-ping timer for this peer debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq); @@ -540,7 +542,7 @@ impl PeerManager { /// A PONG has been returned from a peer. pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers().peer_info(peer_id) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { // received a pong // if the sequence number is unknown send update the meta data of the peer. @@ -563,7 +565,7 @@ impl PeerManager { /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { debug!(self.log, "Updating peer's metadata"; @@ -590,7 +592,8 @@ impl PeerManager { pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { let actions = self .network_globals - .peers_mut() + .peers + .write() .update_gossipsub_scores(self.target_peers, gossipsub); for (peer_id, score_action) in actions { @@ -630,7 +633,11 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. fn inject_disconnect(&mut self, peer_id: &PeerId) { - let ban_operation = self.network_globals.peers_mut().inject_disconnect(peer_id); + let ban_operation = self + .network_globals + .peers + .write() + .inject_disconnect(peer_id); if let Some(ban_operation) = ban_operation { // The peer was awaiting a ban, continue to ban the peer. @@ -656,7 +663,7 @@ impl PeerManager { enr: Option, ) -> bool { { - let mut peerdb = self.network_globals.peers_mut(); + let mut peerdb = self.network_globals.peers.write(); if !matches!(peerdb.ban_status(peer_id), BanResult::NotBanned) { // don't connect if the peer is banned error!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => %peer_id); @@ -693,7 +700,8 @@ impl PeerManager { // Increment the PEERS_PER_CLIENT metric if let Some(kind) = self .network_globals - .peers() + .peers + .read() .peer_info(peer_id) .map(|peer_info| peer_info.client().kind.clone()) { @@ -712,7 +720,8 @@ impl PeerManager { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals - .peers_mut() + .peers + .write() .notify_disconnecting(&peer_id, false); } @@ -728,7 +737,8 @@ impl PeerManager { .filter_map(|(k, v)| { if self .network_globals - .peers() + .peers + .read() .good_peers_on_subnet(Subnet::SyncCommittee(*k)) .count() < TARGET_SUBNET_PEERS @@ -777,7 +787,7 @@ impl PeerManager { } // Updates peer's scores and unban any peers if required. - let actions = self.network_globals.peers_mut().update_scores(); + let actions = self.network_globals.peers.write().update_scores(); for (peer_id, action) in actions { self.handle_score_action(&peer_id, action, None); } @@ -796,7 +806,8 @@ impl PeerManager { let mut n_outbound_removed = 0; for (peer_id, info) in self .network_globals - .peers() + .peers + .read() .worst_connected_peers() .iter() .filter(|(_, info)| !info.has_future_duty()) @@ -915,14 +926,16 @@ mod tests { // Set the outbound-only peers to have the lowest score. peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&outbound_only_peer1) .unwrap() .add_to_score(-1.0); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&outbound_only_peer2) .unwrap() .add_to_score(-2.0); @@ -938,11 +951,13 @@ mod tests { assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); assert!(peer_manager .network_globals - .peers() + .peers + .read() .is_connected(&outbound_only_peer1)); assert!(!peer_manager .network_globals - .peers() + .peers + .read() .is_connected(&outbound_only_peer2)); peer_manager.heartbeat(); @@ -971,7 +986,8 @@ mod tests { ); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(outbound_only_peer)) .unwrap() .add_to_score(-1.0); @@ -1011,25 +1027,29 @@ mod tests { ); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .add_to_score(-19.8); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(outbound_only_peer1)) .unwrap() .add_to_score(-19.8); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(outbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); @@ -1067,13 +1087,15 @@ mod tests { ); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .add_to_score(-19.9); peer_manager .network_globals - .peers_mut() + .peers + .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 840d6bc58..c8b062da4 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -146,7 +146,8 @@ impl NetworkBehaviour for PeerManager { if self.peer_limit_reached() && self .network_globals - .peers() + .peers + .read() .peer_info(peer_id) .map_or(true, |peer| !peer.has_future_duty()) { @@ -184,7 +185,8 @@ impl NetworkBehaviour for PeerManager { // There are no more connections if self .network_globals - .peers() + .peers + .read() .is_connected_or_disconnecting(peer_id) { // We are disconnecting the peer or the peer has already been connected. @@ -198,7 +200,8 @@ impl NetworkBehaviour for PeerManager { // Decrement the PEERS_PER_CLIENT metric if let Some(kind) = self .network_globals - .peers() + .peers + .read() .peer_info(peer_id) .map(|info| info.client().kind.clone()) { @@ -259,7 +262,7 @@ impl NetworkBehaviour for PeerManager { _error: &DialError, ) { if let Some(peer_id) = peer_id { - if !self.network_globals.peers().is_connected(&peer_id) { + if !self.network_globals.peers.read().is_connected(&peer_id) { self.inject_disconnect(&peer_id); } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index cb2816197..74d01c323 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -314,7 +314,7 @@ impl PeerDB { .map(|(id, _)| id) } - /// Returns the peer's connection status. Returns None if the peer is not in the DB. + /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. pub fn connection_status(&self, peer_id: &PeerId) -> Option { self.peer_info(peer_id) .map(|info| info.connection_status().clone()) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 59f4571d8..82aaefc63 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -236,6 +236,7 @@ impl PeerInfo { /* Mutable Functions */ /// Updates the sync status. Returns true if the status was changed. + // VISIBILITY: Both the peer manager the network sync is able to update the sync state of a peer pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool { self.sync_status.update(sync_status) } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs index 4c9adeb6e..bab8aa9ae 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs @@ -27,6 +27,19 @@ pub struct SyncInfo { pub finalized_root: Hash256, } +impl std::cmp::PartialEq for SyncStatus { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (SyncStatus::Synced { .. }, SyncStatus::Synced { .. }) + | (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. }) + | (SyncStatus::Behind { .. }, SyncStatus::Behind { .. }) + | (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer) + | (SyncStatus::Unknown, SyncStatus::Unknown) + ) + } +} + impl SyncStatus { /// Returns true if the peer has advanced knowledge of the chain. pub fn is_advanced(&self) -> bool { @@ -48,7 +61,7 @@ impl SyncStatus { /// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if /// the status remained `Synced` with different `SyncInfo` within. pub fn update(&mut self, new_state: SyncStatus) -> bool { - let changed_status = !(self.is_same_kind(&new_state)); + let changed_status = *self != new_state; *self = new_state; changed_status } @@ -62,17 +75,6 @@ impl SyncStatus { SyncStatus::IrrelevantPeer => "Irrelevant", } } - - pub fn is_same_kind(&self, other: &Self) -> bool { - matches!( - (self, other), - (SyncStatus::Synced { .. }, SyncStatus::Synced { .. }) - | (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. }) - | (SyncStatus::Behind { .. }, SyncStatus::Behind { .. }) - | (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer) - | (SyncStatus::Unknown, SyncStatus::Unknown) - ) - } } impl std::fmt::Display for SyncStatus { diff --git a/beacon_node/lighthouse_network/src/service.rs b/beacon_node/lighthouse_network/src/service.rs index 3ecd32f3d..60252385d 100644 --- a/beacon_node/lighthouse_network/src/service.rs +++ b/beacon_node/lighthouse_network/src/service.rs @@ -215,7 +215,8 @@ impl Service { } if !network_globals - .peers() + .peers + .read() .is_connected_or_dialing(&bootnode_enr.peer_id()) { dial(multiaddr.clone()); diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index db00cf3c0..638270c2b 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -22,7 +22,7 @@ pub struct NetworkGlobals { /// The UDP port that the discovery service is listening on pub listen_port_udp: AtomicU16, /// The collection of known peers. - peers: RwLock>, + pub peers: RwLock>, // The local meta data of our node. pub local_metadata: RwLock>, /// The current gossipsub topic subscriptions. @@ -121,14 +121,6 @@ impl NetworkGlobals { .unwrap_or_default() } - pub fn peers(&self) -> impl std::ops::Deref> + '_ { - self.peers.read() - } - - pub(crate) fn peers_mut(&self) -> impl std::ops::DerefMut> + '_ { - self.peers.write() - } - /// Updates the syncing state of the node. /// /// The old state is returned diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 0dfc65716..35c5b4dce 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -786,7 +786,7 @@ pub fn update_gossip_metrics( let mut peer_to_client = HashMap::new(); let mut scores_per_client: HashMap<&'static str, Vec> = HashMap::new(); { - let peers = network_globals.peers(); + let peers = network_globals.peers.read(); for (peer_id, _) in gossipsub.all_peers() { let client = peers .peer_info(peer_id) @@ -916,7 +916,8 @@ pub fn update_sync_metrics(network_globals: &Arc>) // count per sync status, the number of connected peers let mut peers_per_sync_type = FnvHashMap::default(); for sync_type in network_globals - .peers() + .peers + .read() .connected_peers() .map(|(_peer_id, info)| info.sync_status().as_str()) { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 0ab4c742d..8d639c5ee 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -154,7 +154,7 @@ impl Router { /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) { - if !self.network_globals.peers().is_connected(&peer_id) { + if !self.network_globals.peers.read().is_connected(&peer_id) { debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request); return; } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d9adcd28c..ce8aca472 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -17,7 +17,7 @@ use lighthouse_network::{ types::{GossipEncoding, GossipTopic}, BehaviourEvent, MessageId, NetworkGlobals, PeerId, }; -use lighthouse_network::{MessageAcceptance, Service as LibP2PService, SyncStatus}; +use lighthouse_network::{MessageAcceptance, Service as LibP2PService}; use slog::{crit, debug, error, info, o, trace, warn}; use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; use store::HotColdDB; @@ -100,10 +100,6 @@ pub enum NetworkMessage { reason: GoodbyeReason, source: ReportSource, }, - UpdatePeerSyncStatus { - peer_id: PeerId, - sync_status: SyncStatus, - }, } /// Service that handles communication between internal services and the `lighthouse_network` network service. @@ -531,9 +527,6 @@ fn spawn_service( ); } } - NetworkMessage::UpdatePeerSyncStatus{peer_id, sync_status} => { - service.libp2p.swarm.behaviour_mut().update_peers_sync_status(&peer_id, sync_status); - } } } // process any attestation service events diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index b734773a3..b9016b9fd 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -213,7 +213,14 @@ impl BackFillSync { match self.state() { BackFillState::Syncing => {} // already syncing ignore. BackFillState::Paused => { - if self.network_globals.peers().synced_peers().next().is_some() { + if self + .network_globals + .peers + .read() + .synced_peers() + .next() + .is_some() + { // If there are peers to resume with, begin the resume. debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target); self.set_state(BackFillState::Syncing); @@ -899,7 +906,8 @@ impl BackFillSync { let new_peer = { let mut priorized_peers = self .network_globals - .peers() + .peers + .read() .synced_peers() .map(|peer| { ( @@ -1018,7 +1026,8 @@ impl BackFillSync { let mut rng = rand::thread_rng(); let mut idle_peers = self .network_globals - .peers() + .peers + .read() .synced_peers() .filter(|peer_id| { self.active_requests diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 4d353bd7f..f0726ca94 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -294,7 +294,7 @@ impl SyncManager { let sync_type = remote_sync_type(&local, &remote, &self.chain); // update the state of the peer. - let should_add = self.update_peer_sync_state(peer_id, &local, &remote, &sync_type); + let should_add = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); if matches!(sync_type, PeerSyncType::Advanced) && should_add { self.range_sync @@ -646,7 +646,7 @@ impl SyncManager { /// connection status. fn update_peer_sync_state( &mut self, - peer_id: PeerId, + peer_id: &PeerId, local_sync_info: &SyncInfo, remote_sync_info: &SyncInfo, sync_type: &PeerSyncType, @@ -656,10 +656,15 @@ impl SyncManager { let new_state = sync_type.as_sync_status(remote_sync_info); let rpr = new_state.as_str(); - - if let Some(info) = self.network_globals.peers().peer_info(&peer_id) { - let is_connected = info.is_connected(); - if !info.sync_status().is_same_kind(&new_state) { + // Drop the write lock + let update_sync_status = self + .network_globals + .peers + .write() + .update_sync_status(peer_id, new_state.clone()); + if let Some(was_updated) = update_sync_status { + let is_connected = self.network_globals.peers.read().is_connected(peer_id); + if was_updated { debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr, "our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch, "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch, @@ -670,8 +675,6 @@ impl SyncManager { if new_state.is_synced() { self.backfill_sync.fully_synced_peer_joined(); } - - self.network.update_peer_sync_status(peer_id, new_state); } is_connected } else { @@ -709,7 +712,7 @@ impl SyncManager { let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0)); let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0)); - let peers = self.network_globals.peers(); + let peers = self.network_globals.peers.read(); if current_slot >= head && current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64) && head > 0 diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 895828f5d..e8b67ba92 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -10,9 +10,7 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId, }; -use lighthouse_network::{ - Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, SyncStatus, -}; +use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -54,7 +52,12 @@ impl SyncNetworkContext { /// Returns the Client type of the peer if known pub fn client_type(&self, peer_id: &PeerId) -> Client { - self.network_globals.client(peer_id) + self.network_globals + .peers + .read() + .peer_info(peer_id) + .map(|info| info.client().clone()) + .unwrap_or_default() } pub fn status_peers( @@ -205,17 +208,10 @@ impl SyncNetworkContext { }); } - pub fn update_peer_sync_status(&self, peer_id: PeerId, new_status: SyncStatus) { - let _ = self.send_network_msg(NetworkMessage::UpdatePeerSyncStatus { - peer_id, - sync_status: new_status, - }); - } - /// Sends an arbitrary network message. - fn send_network_msg(&self, msg: NetworkMessage) -> Result<(), &'static str> { - self.network_send.send(msg).map_err(|msg| { - warn!(self.log, "Could not send message to the network service"; "msg" => ?msg.0); + fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { + self.network_send.send(msg).map_err(|_| { + debug!(self.log, "Could not send message to the network service"); "Network channel send Failed" }) }