diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 25f051ac1..64d9b9e84 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1646,7 +1646,7 @@ pub fn serve( warp_utils::reject::custom_bad_request("invalid peer id.".to_string()) })?; - if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) { + if let Some(peer_info) = network_globals.peers().peer_info(&peer_id) { let address = if let Some(socket_addr) = peer_info.seen_addresses().next() { let mut addr = lighthouse_network::Multiaddr::from(socket_addr.ip()); addr.push(lighthouse_network::multiaddr::Protocol::Tcp( @@ -1691,8 +1691,7 @@ pub fn serve( blocking_json_task(move || { let mut peers: Vec = Vec::new(); network_globals - .peers - .read() + .peers() .peers() .for_each(|(peer_id, peer_info)| { let address = @@ -1759,21 +1758,17 @@ pub fn serve( let mut disconnected: u64 = 0; let mut disconnecting: u64 = 0; - network_globals - .peers - .read() - .peers() - .for_each(|(_, peer_info)| { - let state = api_types::PeerState::from_peer_connection_status( - peer_info.connection_status(), - ); - match state { - api_types::PeerState::Connected => connected += 1, - api_types::PeerState::Connecting => connecting += 1, - api_types::PeerState::Disconnected => disconnected += 1, - api_types::PeerState::Disconnecting => disconnecting += 1, - } - }); + network_globals.peers().peers().for_each(|(_, peer_info)| { + let state = api_types::PeerState::from_peer_connection_status( + peer_info.connection_status(), + ); + match state { + api_types::PeerState::Connected => connected += 1, + api_types::PeerState::Connecting => connecting += 1, + api_types::PeerState::Disconnected => disconnected += 1, + api_types::PeerState::Disconnecting => disconnecting += 1, + } + }); Ok(api_types::GenericResponse::from(api_types::PeerCount { connected, @@ -2243,8 +2238,7 @@ pub fn serve( .and_then(|network_globals: Arc>| { blocking_json_task(move || { Ok(network_globals - .peers - .read() + .peers() .peers() .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { peer_id: peer_id.to_string(), @@ -2263,8 +2257,7 @@ pub fn serve( .and_then(|network_globals: Arc>| { blocking_json_task(move || { Ok(network_globals - .peers - .read() + .peers() .connected_peers() .map(|(peer_id, peer_info)| eth2::lighthouse::Peer { peer_id: peer_id.to_string(), diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index 2f6bc6309..29f519761 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -14,7 +14,9 @@ use crate::types::{ SubnetDiscovery, }; use crate::Eth2Enr; -use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; +use crate::{ + error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, SyncStatus, TopicHash, +}; use libp2p::{ core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr, @@ -32,7 +34,7 @@ use libp2p::{ }, NetworkBehaviour, PeerId, }; -use slog::{crit, debug, o, trace, warn}; +use slog::{crit, debug, error, o, trace, warn}; use ssz::Encode; use std::collections::HashSet; use std::fs::File; @@ -455,8 +457,7 @@ impl Behaviour { } { if let Some(client) = self .network_globals - .peers - .read() + .peers() .peer_info(propagation_source) .map(|info| info.client().kind.as_ref()) { @@ -568,6 +569,25 @@ impl Behaviour { self.discovery.add_enr(enr); } + pub fn update_peers_sync_status(&mut self, peer_id: &PeerId, sync_status: SyncStatus) { + let status_repr = sync_status.as_str(); + match self + .network_globals + .peers_mut() + .update_sync_status(peer_id, sync_status) + { + Some(true) => { + trace!(self.log, "Peer sync status updated"; "peer_id" => %peer_id, "sync_status" => status_repr); + } + Some(false) => { + // Sync status is the same for known peer + } + None => { + error!(self.log, "Sync status update notification for unknown peer"; "peer_id" => %peer_id, "sync_status" => status_repr); + } + } + } + /// Updates a subnet value to the ENR attnets/syncnets bitfield. /// /// The `value` is `true` if a subnet is being added and false otherwise. @@ -593,8 +613,7 @@ impl Behaviour { // Extend min_ttl of connected peers on required subnets if let Some(min_ttl) = s.min_ttl { self.network_globals - .peers - .write() + .peers_mut() .extend_peers_on_subnet(&s.subnet, min_ttl); if let Subnet::SyncCommittee(sync_subnet) = s.subnet { self.peer_manager_mut() @@ -604,8 +623,7 @@ impl Behaviour { // Already have target number of peers, no need for subnet discovery let peers_on_subnet = self .network_globals - .peers - .read() + .peers() .good_peers_on_subnet(s.subnet) .count(); if peers_on_subnet >= TARGET_SUBNET_PEERS { @@ -755,7 +773,7 @@ impl Behaviour { .discovery .cached_enrs() .filter_map(|(peer_id, enr)| { - let peers = self.network_globals.peers.read(); + let peers = self.network_globals.peers(); if predicate(enr) && peers.should_dial(peer_id) { Some(*peer_id) } else { @@ -848,16 +866,14 @@ impl NetworkBehaviourEventProcess for Behaviour< GossipsubEvent::Subscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { self.network_globals - .peers - .write() + .peers_mut() .add_subscription(&peer_id, subnet_id); } } GossipsubEvent::Unsubscribed { peer_id, topic } => { if let Some(subnet_id) = subnet_from_topic_hash(&topic) { self.network_globals - .peers - .write() + .peers_mut() .remove_subscription(&peer_id, &subnet_id); } } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 68e085683..eeff19942 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -679,8 +679,7 @@ impl Discovery { // Determine if we have sufficient peers, which may make this discovery unnecessary. let peers_on_subnet = self .network_globals - .peers - .read() + .peers() .good_peers_on_subnet(subnet_query.subnet) .count(); diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index decc1ccd1..cfad40aa8 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -143,7 +143,7 @@ impl PeerManager { /// This will send a goodbye and disconnect the peer if it is connected or dialing. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { // Update the sync status if required - if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + if let Some(info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); if matches!(reason, GoodbyeReason::IrrelevantNetwork) { info.update_sync_status(SyncStatus::IrrelevantPeer); @@ -165,8 +165,7 @@ impl PeerManager { ) { let action = self .network_globals - .peers - .write() + .peers_mut() .report_peer(peer_id, action, source); self.handle_score_action(peer_id, action, reason); } @@ -264,14 +263,13 @@ impl PeerManager { if (min_ttl.is_some() && connected_or_dialing + to_dial_peers.len() < self.max_priority_peers() || connected_or_dialing + to_dial_peers.len() < self.max_peers()) - && self.network_globals.peers.read().should_dial(&peer_id) + && self.network_globals.peers().should_dial(&peer_id) { // This should be updated with the peer dialing. In fact created once the peer is // dialed if let Some(min_ttl) = min_ttl { self.network_globals - .peers - .write() + .peers_mut() .update_min_ttl(&peer_id, min_ttl); } to_dial_peers.push(peer_id); @@ -341,11 +339,11 @@ impl PeerManager { /// /// This is used to determine if we should accept incoming connections. pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { - self.network_globals.peers.read().ban_status(peer_id) + self.network_globals.peers().ban_status(peer_id) } pub fn is_connected(&self, peer_id: &PeerId) -> bool { - self.network_globals.peers.read().is_connected(peer_id) + self.network_globals.peers().is_connected(peer_id) } /// Reports whether the peer limit is reached in which case we stop allowing new incoming @@ -356,7 +354,7 @@ impl PeerManager { /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + if let Some(peer_info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { let previous_kind = peer_info.client().kind.clone(); let previous_listening_addresses = peer_info.set_listening_addresses(info.listen_addrs.clone()); @@ -403,7 +401,7 @@ impl PeerManager { direction: ConnectionDirection, ) { let client = self.network_globals.client(peer_id); - let score = self.network_globals.peers.read().score(peer_id); + let score = self.network_globals.peers().score(peer_id); debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client, "peer_id" => %peer_id, "score" => %score, "direction" => ?direction); metrics::inc_counter_vec( @@ -505,7 +503,7 @@ impl PeerManager { /// A ping request has been received. // NOTE: The behaviour responds with a PONG automatically pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + if let Some(peer_info) = self.network_globals.peers().peer_info(peer_id) { // received a ping // reset the to-ping timer for this peer debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq); @@ -542,7 +540,7 @@ impl PeerManager { /// A PONG has been returned from a peer. pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { - if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + if let Some(peer_info) = self.network_globals.peers().peer_info(peer_id) { // received a pong // if the sequence number is unknown send update the meta data of the peer. @@ -565,7 +563,7 @@ impl PeerManager { /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + if let Some(peer_info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { debug!(self.log, "Updating peer's metadata"; @@ -592,8 +590,7 @@ impl PeerManager { pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { let actions = self .network_globals - .peers - .write() + .peers_mut() .update_gossipsub_scores(self.target_peers, gossipsub); for (peer_id, score_action) in actions { @@ -633,11 +630,7 @@ impl PeerManager { /// /// This is also called when dialing a peer fails. fn inject_disconnect(&mut self, peer_id: &PeerId) { - let ban_operation = self - .network_globals - .peers - .write() - .inject_disconnect(peer_id); + let ban_operation = self.network_globals.peers_mut().inject_disconnect(peer_id); if let Some(ban_operation) = ban_operation { // The peer was awaiting a ban, continue to ban the peer. @@ -663,7 +656,7 @@ impl PeerManager { enr: Option, ) -> bool { { - let mut peerdb = self.network_globals.peers.write(); + let mut peerdb = self.network_globals.peers_mut(); if !matches!(peerdb.ban_status(peer_id), BanResult::NotBanned) { // don't connect if the peer is banned error!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => %peer_id); @@ -700,8 +693,7 @@ impl PeerManager { // Increment the PEERS_PER_CLIENT metric if let Some(kind) = self .network_globals - .peers - .read() + .peers() .peer_info(peer_id) .map(|peer_info| peer_info.client().kind.clone()) { @@ -720,8 +712,7 @@ impl PeerManager { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals - .peers - .write() + .peers_mut() .notify_disconnecting(&peer_id, false); } @@ -737,8 +728,7 @@ impl PeerManager { .filter_map(|(k, v)| { if self .network_globals - .peers - .read() + .peers() .good_peers_on_subnet(Subnet::SyncCommittee(*k)) .count() < TARGET_SUBNET_PEERS @@ -787,7 +777,7 @@ impl PeerManager { } // Updates peer's scores and unban any peers if required. - let actions = self.network_globals.peers.write().update_scores(); + let actions = self.network_globals.peers_mut().update_scores(); for (peer_id, action) in actions { self.handle_score_action(&peer_id, action, None); } @@ -806,8 +796,7 @@ impl PeerManager { let mut n_outbound_removed = 0; for (peer_id, info) in self .network_globals - .peers - .read() + .peers() .worst_connected_peers() .iter() .filter(|(_, info)| !info.has_future_duty()) @@ -926,16 +915,14 @@ mod tests { // Set the outbound-only peers to have the lowest score. peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&outbound_only_peer1) .unwrap() .add_to_score(-1.0); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&outbound_only_peer2) .unwrap() .add_to_score(-2.0); @@ -951,13 +938,11 @@ mod tests { assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); assert!(peer_manager .network_globals - .peers - .read() + .peers() .is_connected(&outbound_only_peer1)); assert!(!peer_manager .network_globals - .peers - .read() + .peers() .is_connected(&outbound_only_peer2)); peer_manager.heartbeat(); @@ -986,8 +971,7 @@ mod tests { ); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(outbound_only_peer)) .unwrap() .add_to_score(-1.0); @@ -1027,29 +1011,25 @@ mod tests { ); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .add_to_score(-19.8); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(outbound_only_peer1)) .unwrap() .add_to_score(-19.8); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(outbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); @@ -1087,15 +1067,13 @@ mod tests { ); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .add_to_score(-19.9); peer_manager .network_globals - .peers - .write() + .peers_mut() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index c8b062da4..840d6bc58 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -146,8 +146,7 @@ impl NetworkBehaviour for PeerManager { if self.peer_limit_reached() && self .network_globals - .peers - .read() + .peers() .peer_info(peer_id) .map_or(true, |peer| !peer.has_future_duty()) { @@ -185,8 +184,7 @@ impl NetworkBehaviour for PeerManager { // There are no more connections if self .network_globals - .peers - .read() + .peers() .is_connected_or_disconnecting(peer_id) { // We are disconnecting the peer or the peer has already been connected. @@ -200,8 +198,7 @@ impl NetworkBehaviour for PeerManager { // Decrement the PEERS_PER_CLIENT metric if let Some(kind) = self .network_globals - .peers - .read() + .peers() .peer_info(peer_id) .map(|info| info.client().kind.clone()) { @@ -262,7 +259,7 @@ impl NetworkBehaviour for PeerManager { _error: &DialError, ) { if let Some(peer_id) = peer_id { - if !self.network_globals.peers.read().is_connected(&peer_id) { + if !self.network_globals.peers().is_connected(&peer_id) { self.inject_disconnect(&peer_id); } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 74d01c323..cb2816197 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -314,7 +314,7 @@ impl PeerDB { .map(|(id, _)| id) } - /// Returns the peer's connection status. Returns unknown if the peer is not in the DB. + /// Returns the peer's connection status. Returns None if the peer is not in the DB. pub fn connection_status(&self, peer_id: &PeerId) -> Option { self.peer_info(peer_id) .map(|info| info.connection_status().clone()) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 82aaefc63..59f4571d8 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -236,7 +236,6 @@ impl PeerInfo { /* Mutable Functions */ /// Updates the sync status. Returns true if the status was changed. - // VISIBILITY: Both the peer manager the network sync is able to update the sync state of a peer pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool { self.sync_status.update(sync_status) } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs index bab8aa9ae..4c9adeb6e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/sync_status.rs @@ -27,19 +27,6 @@ pub struct SyncInfo { pub finalized_root: Hash256, } -impl std::cmp::PartialEq for SyncStatus { - fn eq(&self, other: &Self) -> bool { - matches!( - (self, other), - (SyncStatus::Synced { .. }, SyncStatus::Synced { .. }) - | (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. }) - | (SyncStatus::Behind { .. }, SyncStatus::Behind { .. }) - | (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer) - | (SyncStatus::Unknown, SyncStatus::Unknown) - ) - } -} - impl SyncStatus { /// Returns true if the peer has advanced knowledge of the chain. pub fn is_advanced(&self) -> bool { @@ -61,7 +48,7 @@ impl SyncStatus { /// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if /// the status remained `Synced` with different `SyncInfo` within. pub fn update(&mut self, new_state: SyncStatus) -> bool { - let changed_status = *self != new_state; + let changed_status = !(self.is_same_kind(&new_state)); *self = new_state; changed_status } @@ -75,6 +62,17 @@ impl SyncStatus { SyncStatus::IrrelevantPeer => "Irrelevant", } } + + pub fn is_same_kind(&self, other: &Self) -> bool { + matches!( + (self, other), + (SyncStatus::Synced { .. }, SyncStatus::Synced { .. }) + | (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. }) + | (SyncStatus::Behind { .. }, SyncStatus::Behind { .. }) + | (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer) + | (SyncStatus::Unknown, SyncStatus::Unknown) + ) + } } impl std::fmt::Display for SyncStatus { diff --git a/beacon_node/lighthouse_network/src/service.rs b/beacon_node/lighthouse_network/src/service.rs index 24da3e355..03732e8e6 100644 --- a/beacon_node/lighthouse_network/src/service.rs +++ b/beacon_node/lighthouse_network/src/service.rs @@ -215,8 +215,7 @@ impl Service { } if !network_globals - .peers - .read() + .peers() .is_connected_or_dialing(&bootnode_enr.peer_id()) { dial(multiaddr.clone()); diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 638270c2b..db00cf3c0 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -22,7 +22,7 @@ pub struct NetworkGlobals { /// The UDP port that the discovery service is listening on pub listen_port_udp: AtomicU16, /// The collection of known peers. - pub peers: RwLock>, + peers: RwLock>, // The local meta data of our node. pub local_metadata: RwLock>, /// The current gossipsub topic subscriptions. @@ -121,6 +121,14 @@ impl NetworkGlobals { .unwrap_or_default() } + pub fn peers(&self) -> impl std::ops::Deref> + '_ { + self.peers.read() + } + + pub(crate) fn peers_mut(&self) -> impl std::ops::DerefMut> + '_ { + self.peers.write() + } + /// Updates the syncing state of the node. /// /// The old state is returned diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 35c5b4dce..0dfc65716 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -786,7 +786,7 @@ pub fn update_gossip_metrics( let mut peer_to_client = HashMap::new(); let mut scores_per_client: HashMap<&'static str, Vec> = HashMap::new(); { - let peers = network_globals.peers.read(); + let peers = network_globals.peers(); for (peer_id, _) in gossipsub.all_peers() { let client = peers .peer_info(peer_id) @@ -916,8 +916,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) // count per sync status, the number of connected peers let mut peers_per_sync_type = FnvHashMap::default(); for sync_type in network_globals - .peers - .read() + .peers() .connected_peers() .map(|(_peer_id, info)| info.sync_status().as_str()) { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 8d639c5ee..0ab4c742d 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -154,7 +154,7 @@ impl Router { /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) { - if !self.network_globals.peers.read().is_connected(&peer_id) { + if !self.network_globals.peers().is_connected(&peer_id) { debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request); return; } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ce8aca472..d9adcd28c 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -17,7 +17,7 @@ use lighthouse_network::{ types::{GossipEncoding, GossipTopic}, BehaviourEvent, MessageId, NetworkGlobals, PeerId, }; -use lighthouse_network::{MessageAcceptance, Service as LibP2PService}; +use lighthouse_network::{MessageAcceptance, Service as LibP2PService, SyncStatus}; use slog::{crit, debug, error, info, o, trace, warn}; use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; use store::HotColdDB; @@ -100,6 +100,10 @@ pub enum NetworkMessage { reason: GoodbyeReason, source: ReportSource, }, + UpdatePeerSyncStatus { + peer_id: PeerId, + sync_status: SyncStatus, + }, } /// Service that handles communication between internal services and the `lighthouse_network` network service. @@ -527,6 +531,9 @@ fn spawn_service( ); } } + NetworkMessage::UpdatePeerSyncStatus{peer_id, sync_status} => { + service.libp2p.swarm.behaviour_mut().update_peers_sync_status(&peer_id, sync_status); + } } } // process any attestation service events diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index b9016b9fd..b734773a3 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -213,14 +213,7 @@ impl BackFillSync { match self.state() { BackFillState::Syncing => {} // already syncing ignore. BackFillState::Paused => { - if self - .network_globals - .peers - .read() - .synced_peers() - .next() - .is_some() - { + if self.network_globals.peers().synced_peers().next().is_some() { // If there are peers to resume with, begin the resume. debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target); self.set_state(BackFillState::Syncing); @@ -906,8 +899,7 @@ impl BackFillSync { let new_peer = { let mut priorized_peers = self .network_globals - .peers - .read() + .peers() .synced_peers() .map(|peer| { ( @@ -1026,8 +1018,7 @@ impl BackFillSync { let mut rng = rand::thread_rng(); let mut idle_peers = self .network_globals - .peers - .read() + .peers() .synced_peers() .filter(|peer_id| { self.active_requests diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f0726ca94..4d353bd7f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -294,7 +294,7 @@ impl SyncManager { let sync_type = remote_sync_type(&local, &remote, &self.chain); // update the state of the peer. - let should_add = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); + let should_add = self.update_peer_sync_state(peer_id, &local, &remote, &sync_type); if matches!(sync_type, PeerSyncType::Advanced) && should_add { self.range_sync @@ -646,7 +646,7 @@ impl SyncManager { /// connection status. fn update_peer_sync_state( &mut self, - peer_id: &PeerId, + peer_id: PeerId, local_sync_info: &SyncInfo, remote_sync_info: &SyncInfo, sync_type: &PeerSyncType, @@ -656,15 +656,10 @@ impl SyncManager { let new_state = sync_type.as_sync_status(remote_sync_info); let rpr = new_state.as_str(); - // Drop the write lock - let update_sync_status = self - .network_globals - .peers - .write() - .update_sync_status(peer_id, new_state.clone()); - if let Some(was_updated) = update_sync_status { - let is_connected = self.network_globals.peers.read().is_connected(peer_id); - if was_updated { + + if let Some(info) = self.network_globals.peers().peer_info(&peer_id) { + let is_connected = info.is_connected(); + if !info.sync_status().is_same_kind(&new_state) { debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr, "our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch, "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch, @@ -675,6 +670,8 @@ impl SyncManager { if new_state.is_synced() { self.backfill_sync.fully_synced_peer_joined(); } + + self.network.update_peer_sync_status(peer_id, new_state); } is_connected } else { @@ -712,7 +709,7 @@ impl SyncManager { let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0)); let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0)); - let peers = self.network_globals.peers.read(); + let peers = self.network_globals.peers(); if current_slot >= head && current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64) && head > 0 diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e8b67ba92..895828f5d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -10,7 +10,9 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId, }; -use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +use lighthouse_network::{ + Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, SyncStatus, +}; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -52,12 +54,7 @@ impl SyncNetworkContext { /// Returns the Client type of the peer if known pub fn client_type(&self, peer_id: &PeerId) -> Client { - self.network_globals - .peers - .read() - .peer_info(peer_id) - .map(|info| info.client().clone()) - .unwrap_or_default() + self.network_globals.client(peer_id) } pub fn status_peers( @@ -208,10 +205,17 @@ impl SyncNetworkContext { }); } + pub fn update_peer_sync_status(&self, peer_id: PeerId, new_status: SyncStatus) { + let _ = self.send_network_msg(NetworkMessage::UpdatePeerSyncStatus { + peer_id, + sync_status: new_status, + }); + } + /// Sends an arbitrary network message. - fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { - self.network_send.send(msg).map_err(|_| { - debug!(self.log, "Could not send message to the network service"); + fn send_network_msg(&self, msg: NetworkMessage) -> Result<(), &'static str> { + self.network_send.send(msg).map_err(|msg| { + warn!(self.log, "Could not send message to the network service"; "msg" => ?msg.0); "Network channel send Failed" }) }