Revert peer DB changes from #2724 (#2828)

## Proposed Changes

This reverts commit 53562010ec from PR #2724

Hopefully this will restore the reliability of the sync simulator.
This commit is contained in:
Michael Sproul 2021-11-25 03:45:52 +00:00
parent 3fb8162dcc
commit 2c07a72980
16 changed files with 154 additions and 139 deletions

View File

@ -1646,7 +1646,7 @@ pub fn serve<T: BeaconChainTypes>(
warp_utils::reject::custom_bad_request("invalid peer id.".to_string()) warp_utils::reject::custom_bad_request("invalid peer id.".to_string())
})?; })?;
if let Some(peer_info) = network_globals.peers().peer_info(&peer_id) { if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) {
let address = if let Some(socket_addr) = peer_info.seen_addresses().next() { let address = if let Some(socket_addr) = peer_info.seen_addresses().next() {
let mut addr = lighthouse_network::Multiaddr::from(socket_addr.ip()); let mut addr = lighthouse_network::Multiaddr::from(socket_addr.ip());
addr.push(lighthouse_network::multiaddr::Protocol::Tcp( addr.push(lighthouse_network::multiaddr::Protocol::Tcp(
@ -1691,7 +1691,8 @@ pub fn serve<T: BeaconChainTypes>(
blocking_json_task(move || { blocking_json_task(move || {
let mut peers: Vec<api_types::PeerData> = Vec::new(); let mut peers: Vec<api_types::PeerData> = Vec::new();
network_globals network_globals
.peers() .peers
.read()
.peers() .peers()
.for_each(|(peer_id, peer_info)| { .for_each(|(peer_id, peer_info)| {
let address = let address =
@ -1758,17 +1759,21 @@ pub fn serve<T: BeaconChainTypes>(
let mut disconnected: u64 = 0; let mut disconnected: u64 = 0;
let mut disconnecting: u64 = 0; let mut disconnecting: u64 = 0;
network_globals.peers().peers().for_each(|(_, peer_info)| { network_globals
let state = api_types::PeerState::from_peer_connection_status( .peers
peer_info.connection_status(), .read()
); .peers()
match state { .for_each(|(_, peer_info)| {
api_types::PeerState::Connected => connected += 1, let state = api_types::PeerState::from_peer_connection_status(
api_types::PeerState::Connecting => connecting += 1, peer_info.connection_status(),
api_types::PeerState::Disconnected => disconnected += 1, );
api_types::PeerState::Disconnecting => disconnecting += 1, match state {
} api_types::PeerState::Connected => connected += 1,
}); api_types::PeerState::Connecting => connecting += 1,
api_types::PeerState::Disconnected => disconnected += 1,
api_types::PeerState::Disconnecting => disconnecting += 1,
}
});
Ok(api_types::GenericResponse::from(api_types::PeerCount { Ok(api_types::GenericResponse::from(api_types::PeerCount {
connected, connected,
@ -2238,7 +2243,8 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| { .and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
blocking_json_task(move || { blocking_json_task(move || {
Ok(network_globals Ok(network_globals
.peers() .peers
.read()
.peers() .peers()
.map(|(peer_id, peer_info)| eth2::lighthouse::Peer { .map(|(peer_id, peer_info)| eth2::lighthouse::Peer {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),
@ -2257,7 +2263,8 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| { .and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
blocking_json_task(move || { blocking_json_task(move || {
Ok(network_globals Ok(network_globals
.peers() .peers
.read()
.connected_peers() .connected_peers()
.map(|(peer_id, peer_info)| eth2::lighthouse::Peer { .map(|(peer_id, peer_info)| eth2::lighthouse::Peer {
peer_id: peer_id.to_string(), peer_id: peer_id.to_string(),

View File

@ -14,9 +14,7 @@ use crate::types::{
SubnetDiscovery, SubnetDiscovery,
}; };
use crate::Eth2Enr; use crate::Eth2Enr;
use crate::{ use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, SyncStatus, TopicHash,
};
use libp2p::{ use libp2p::{
core::{ core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr, connection::ConnectionId, identity::Keypair, multiaddr::Protocol as MProtocol, Multiaddr,
@ -34,7 +32,7 @@ use libp2p::{
}, },
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
use slog::{crit, debug, error, o, trace, warn}; use slog::{crit, debug, o, trace, warn};
use ssz::Encode; use ssz::Encode;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
@ -457,7 +455,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
} { } {
if let Some(client) = self if let Some(client) = self
.network_globals .network_globals
.peers() .peers
.read()
.peer_info(propagation_source) .peer_info(propagation_source)
.map(|info| info.client().kind.as_ref()) .map(|info| info.client().kind.as_ref())
{ {
@ -569,25 +568,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.discovery.add_enr(enr); self.discovery.add_enr(enr);
} }
pub fn update_peers_sync_status(&mut self, peer_id: &PeerId, sync_status: SyncStatus) {
let status_repr = sync_status.as_str();
match self
.network_globals
.peers_mut()
.update_sync_status(peer_id, sync_status)
{
Some(true) => {
trace!(self.log, "Peer sync status updated"; "peer_id" => %peer_id, "sync_status" => status_repr);
}
Some(false) => {
// Sync status is the same for known peer
}
None => {
error!(self.log, "Sync status update notification for unknown peer"; "peer_id" => %peer_id, "sync_status" => status_repr);
}
}
}
/// Updates a subnet value to the ENR attnets/syncnets bitfield. /// Updates a subnet value to the ENR attnets/syncnets bitfield.
/// ///
/// The `value` is `true` if a subnet is being added and false otherwise. /// The `value` is `true` if a subnet is being added and false otherwise.
@ -613,7 +593,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// Extend min_ttl of connected peers on required subnets // Extend min_ttl of connected peers on required subnets
if let Some(min_ttl) = s.min_ttl { if let Some(min_ttl) = s.min_ttl {
self.network_globals self.network_globals
.peers_mut() .peers
.write()
.extend_peers_on_subnet(&s.subnet, min_ttl); .extend_peers_on_subnet(&s.subnet, min_ttl);
if let Subnet::SyncCommittee(sync_subnet) = s.subnet { if let Subnet::SyncCommittee(sync_subnet) = s.subnet {
self.peer_manager_mut() self.peer_manager_mut()
@ -623,7 +604,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// Already have target number of peers, no need for subnet discovery // Already have target number of peers, no need for subnet discovery
let peers_on_subnet = self let peers_on_subnet = self
.network_globals .network_globals
.peers() .peers
.read()
.good_peers_on_subnet(s.subnet) .good_peers_on_subnet(s.subnet)
.count(); .count();
if peers_on_subnet >= TARGET_SUBNET_PEERS { if peers_on_subnet >= TARGET_SUBNET_PEERS {
@ -773,7 +755,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.discovery .discovery
.cached_enrs() .cached_enrs()
.filter_map(|(peer_id, enr)| { .filter_map(|(peer_id, enr)| {
let peers = self.network_globals.peers(); let peers = self.network_globals.peers.read();
if predicate(enr) && peers.should_dial(peer_id) { if predicate(enr) && peers.should_dial(peer_id) {
Some(*peer_id) Some(*peer_id)
} else { } else {
@ -866,14 +848,16 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
GossipsubEvent::Subscribed { peer_id, topic } => { GossipsubEvent::Subscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_from_topic_hash(&topic) { if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
self.network_globals self.network_globals
.peers_mut() .peers
.write()
.add_subscription(&peer_id, subnet_id); .add_subscription(&peer_id, subnet_id);
} }
} }
GossipsubEvent::Unsubscribed { peer_id, topic } => { GossipsubEvent::Unsubscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_from_topic_hash(&topic) { if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
self.network_globals self.network_globals
.peers_mut() .peers
.write()
.remove_subscription(&peer_id, &subnet_id); .remove_subscription(&peer_id, &subnet_id);
} }
} }

View File

@ -679,7 +679,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Determine if we have sufficient peers, which may make this discovery unnecessary. // Determine if we have sufficient peers, which may make this discovery unnecessary.
let peers_on_subnet = self let peers_on_subnet = self
.network_globals .network_globals
.peers() .peers
.read()
.good_peers_on_subnet(subnet_query.subnet) .good_peers_on_subnet(subnet_query.subnet)
.count(); .count();

View File

@ -143,7 +143,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// This will send a goodbye and disconnect the peer if it is connected or dialing. /// This will send a goodbye and disconnect the peer if it is connected or dialing.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
// Update the sync status if required // Update the sync status if required
if let Some(info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score());
if matches!(reason, GoodbyeReason::IrrelevantNetwork) { if matches!(reason, GoodbyeReason::IrrelevantNetwork) {
info.update_sync_status(SyncStatus::IrrelevantPeer); info.update_sync_status(SyncStatus::IrrelevantPeer);
@ -165,7 +165,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
) { ) {
let action = self let action = self
.network_globals .network_globals
.peers_mut() .peers
.write()
.report_peer(peer_id, action, source); .report_peer(peer_id, action, source);
self.handle_score_action(peer_id, action, reason); self.handle_score_action(peer_id, action, reason);
} }
@ -263,13 +264,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
if (min_ttl.is_some() if (min_ttl.is_some()
&& connected_or_dialing + to_dial_peers.len() < self.max_priority_peers() && connected_or_dialing + to_dial_peers.len() < self.max_priority_peers()
|| connected_or_dialing + to_dial_peers.len() < self.max_peers()) || connected_or_dialing + to_dial_peers.len() < self.max_peers())
&& self.network_globals.peers().should_dial(&peer_id) && self.network_globals.peers.read().should_dial(&peer_id)
{ {
// This should be updated with the peer dialing. In fact created once the peer is // This should be updated with the peer dialing. In fact created once the peer is
// dialed // dialed
if let Some(min_ttl) = min_ttl { if let Some(min_ttl) = min_ttl {
self.network_globals self.network_globals
.peers_mut() .peers
.write()
.update_min_ttl(&peer_id, min_ttl); .update_min_ttl(&peer_id, min_ttl);
} }
to_dial_peers.push(peer_id); to_dial_peers.push(peer_id);
@ -339,11 +341,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// ///
/// This is used to determine if we should accept incoming connections. /// This is used to determine if we should accept incoming connections.
pub fn ban_status(&self, peer_id: &PeerId) -> BanResult { pub fn ban_status(&self, peer_id: &PeerId) -> BanResult {
self.network_globals.peers().ban_status(peer_id) self.network_globals.peers.read().ban_status(peer_id)
} }
pub fn is_connected(&self, peer_id: &PeerId) -> bool { pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.network_globals.peers().is_connected(peer_id) self.network_globals.peers.read().is_connected(peer_id)
} }
/// Reports whether the peer limit is reached in which case we stop allowing new incoming /// Reports whether the peer limit is reached in which case we stop allowing new incoming
@ -354,7 +356,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// Updates `PeerInfo` with `identify` information. /// Updates `PeerInfo` with `identify` information.
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
if let Some(peer_info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let previous_kind = peer_info.client().kind.clone(); let previous_kind = peer_info.client().kind.clone();
let previous_listening_addresses = let previous_listening_addresses =
peer_info.set_listening_addresses(info.listen_addrs.clone()); peer_info.set_listening_addresses(info.listen_addrs.clone());
@ -401,7 +403,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
direction: ConnectionDirection, direction: ConnectionDirection,
) { ) {
let client = self.network_globals.client(peer_id); let client = self.network_globals.client(peer_id);
let score = self.network_globals.peers().score(peer_id); let score = self.network_globals.peers.read().score(peer_id);
debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client, debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client,
"peer_id" => %peer_id, "score" => %score, "direction" => ?direction); "peer_id" => %peer_id, "score" => %score, "direction" => ?direction);
metrics::inc_counter_vec( metrics::inc_counter_vec(
@ -503,7 +505,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// A ping request has been received. /// A ping request has been received.
// NOTE: The behaviour responds with a PONG automatically // NOTE: The behaviour responds with a PONG automatically
pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) {
if let Some(peer_info) = self.network_globals.peers().peer_info(peer_id) { if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) {
// received a ping // received a ping
// reset the to-ping timer for this peer // reset the to-ping timer for this peer
debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq); debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq);
@ -540,7 +542,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// A PONG has been returned from a peer. /// A PONG has been returned from a peer.
pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) {
if let Some(peer_info) = self.network_globals.peers().peer_info(peer_id) { if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) {
// received a pong // received a pong
// if the sequence number is unknown send update the meta data of the peer. // if the sequence number is unknown send update the meta data of the peer.
@ -563,7 +565,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// Received a metadata response from a peer. /// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<TSpec>) { pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<TSpec>) {
if let Some(peer_info) = self.network_globals.peers_mut().peer_info_mut(peer_id) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() { if let Some(known_meta_data) = &peer_info.meta_data() {
if *known_meta_data.seq_number() < *meta_data.seq_number() { if *known_meta_data.seq_number() < *meta_data.seq_number() {
debug!(self.log, "Updating peer's metadata"; debug!(self.log, "Updating peer's metadata";
@ -590,7 +592,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) {
let actions = self let actions = self
.network_globals .network_globals
.peers_mut() .peers
.write()
.update_gossipsub_scores(self.target_peers, gossipsub); .update_gossipsub_scores(self.target_peers, gossipsub);
for (peer_id, score_action) in actions { for (peer_id, score_action) in actions {
@ -630,7 +633,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// ///
/// This is also called when dialing a peer fails. /// This is also called when dialing a peer fails.
fn inject_disconnect(&mut self, peer_id: &PeerId) { fn inject_disconnect(&mut self, peer_id: &PeerId) {
let ban_operation = self.network_globals.peers_mut().inject_disconnect(peer_id); let ban_operation = self
.network_globals
.peers
.write()
.inject_disconnect(peer_id);
if let Some(ban_operation) = ban_operation { if let Some(ban_operation) = ban_operation {
// The peer was awaiting a ban, continue to ban the peer. // The peer was awaiting a ban, continue to ban the peer.
@ -656,7 +663,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
enr: Option<Enr>, enr: Option<Enr>,
) -> bool { ) -> bool {
{ {
let mut peerdb = self.network_globals.peers_mut(); let mut peerdb = self.network_globals.peers.write();
if !matches!(peerdb.ban_status(peer_id), BanResult::NotBanned) { if !matches!(peerdb.ban_status(peer_id), BanResult::NotBanned) {
// don't connect if the peer is banned // don't connect if the peer is banned
error!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => %peer_id); error!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => %peer_id);
@ -693,7 +700,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// Increment the PEERS_PER_CLIENT metric // Increment the PEERS_PER_CLIENT metric
if let Some(kind) = self if let Some(kind) = self
.network_globals .network_globals
.peers() .peers
.read()
.peer_info(peer_id) .peer_info(peer_id)
.map(|peer_info| peer_info.client().kind.clone()) .map(|peer_info| peer_info.client().kind.clone())
{ {
@ -712,7 +720,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); .push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
self.network_globals self.network_globals
.peers_mut() .peers
.write()
.notify_disconnecting(&peer_id, false); .notify_disconnecting(&peer_id, false);
} }
@ -728,7 +737,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.filter_map(|(k, v)| { .filter_map(|(k, v)| {
if self if self
.network_globals .network_globals
.peers() .peers
.read()
.good_peers_on_subnet(Subnet::SyncCommittee(*k)) .good_peers_on_subnet(Subnet::SyncCommittee(*k))
.count() .count()
< TARGET_SUBNET_PEERS < TARGET_SUBNET_PEERS
@ -777,7 +787,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
} }
// Updates peer's scores and unban any peers if required. // Updates peer's scores and unban any peers if required.
let actions = self.network_globals.peers_mut().update_scores(); let actions = self.network_globals.peers.write().update_scores();
for (peer_id, action) in actions { for (peer_id, action) in actions {
self.handle_score_action(&peer_id, action, None); self.handle_score_action(&peer_id, action, None);
} }
@ -796,7 +806,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let mut n_outbound_removed = 0; let mut n_outbound_removed = 0;
for (peer_id, info) in self for (peer_id, info) in self
.network_globals .network_globals
.peers() .peers
.read()
.worst_connected_peers() .worst_connected_peers()
.iter() .iter()
.filter(|(_, info)| !info.has_future_duty()) .filter(|(_, info)| !info.has_future_duty())
@ -915,14 +926,16 @@ mod tests {
// Set the outbound-only peers to have the lowest score. // Set the outbound-only peers to have the lowest score.
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&outbound_only_peer1) .peer_info_mut(&outbound_only_peer1)
.unwrap() .unwrap()
.add_to_score(-1.0); .add_to_score(-1.0);
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&outbound_only_peer2) .peer_info_mut(&outbound_only_peer2)
.unwrap() .unwrap()
.add_to_score(-2.0); .add_to_score(-2.0);
@ -938,11 +951,13 @@ mod tests {
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
assert!(peer_manager assert!(peer_manager
.network_globals .network_globals
.peers() .peers
.read()
.is_connected(&outbound_only_peer1)); .is_connected(&outbound_only_peer1));
assert!(!peer_manager assert!(!peer_manager
.network_globals .network_globals
.peers() .peers
.read()
.is_connected(&outbound_only_peer2)); .is_connected(&outbound_only_peer2));
peer_manager.heartbeat(); peer_manager.heartbeat();
@ -971,7 +986,8 @@ mod tests {
); );
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(outbound_only_peer)) .peer_info_mut(&(outbound_only_peer))
.unwrap() .unwrap()
.add_to_score(-1.0); .add_to_score(-1.0);
@ -1011,25 +1027,29 @@ mod tests {
); );
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(inbound_only_peer1)) .peer_info_mut(&(inbound_only_peer1))
.unwrap() .unwrap()
.add_to_score(-19.8); .add_to_score(-19.8);
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(outbound_only_peer1)) .peer_info_mut(&(outbound_only_peer1))
.unwrap() .unwrap()
.add_to_score(-19.8); .add_to_score(-19.8);
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(inbound_only_peer1)) .peer_info_mut(&(inbound_only_peer1))
.unwrap() .unwrap()
.set_gossipsub_score(-85.0); .set_gossipsub_score(-85.0);
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(outbound_only_peer1)) .peer_info_mut(&(outbound_only_peer1))
.unwrap() .unwrap()
.set_gossipsub_score(-85.0); .set_gossipsub_score(-85.0);
@ -1067,13 +1087,15 @@ mod tests {
); );
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(inbound_only_peer1)) .peer_info_mut(&(inbound_only_peer1))
.unwrap() .unwrap()
.add_to_score(-19.9); .add_to_score(-19.9);
peer_manager peer_manager
.network_globals .network_globals
.peers_mut() .peers
.write()
.peer_info_mut(&(inbound_only_peer1)) .peer_info_mut(&(inbound_only_peer1))
.unwrap() .unwrap()
.set_gossipsub_score(-85.0); .set_gossipsub_score(-85.0);

View File

@ -146,7 +146,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
if self.peer_limit_reached() if self.peer_limit_reached()
&& self && self
.network_globals .network_globals
.peers() .peers
.read()
.peer_info(peer_id) .peer_info(peer_id)
.map_or(true, |peer| !peer.has_future_duty()) .map_or(true, |peer| !peer.has_future_duty())
{ {
@ -184,7 +185,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
// There are no more connections // There are no more connections
if self if self
.network_globals .network_globals
.peers() .peers
.read()
.is_connected_or_disconnecting(peer_id) .is_connected_or_disconnecting(peer_id)
{ {
// We are disconnecting the peer or the peer has already been connected. // We are disconnecting the peer or the peer has already been connected.
@ -198,7 +200,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
// Decrement the PEERS_PER_CLIENT metric // Decrement the PEERS_PER_CLIENT metric
if let Some(kind) = self if let Some(kind) = self
.network_globals .network_globals
.peers() .peers
.read()
.peer_info(peer_id) .peer_info(peer_id)
.map(|info| info.client().kind.clone()) .map(|info| info.client().kind.clone())
{ {
@ -259,7 +262,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
_error: &DialError, _error: &DialError,
) { ) {
if let Some(peer_id) = peer_id { if let Some(peer_id) = peer_id {
if !self.network_globals.peers().is_connected(&peer_id) { if !self.network_globals.peers.read().is_connected(&peer_id) {
self.inject_disconnect(&peer_id); self.inject_disconnect(&peer_id);
} }
} }

View File

@ -314,7 +314,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.map(|(id, _)| id) .map(|(id, _)| id)
} }
/// Returns the peer's connection status. Returns None if the peer is not in the DB. /// Returns the peer's connection status. Returns unknown if the peer is not in the DB.
pub fn connection_status(&self, peer_id: &PeerId) -> Option<PeerConnectionStatus> { pub fn connection_status(&self, peer_id: &PeerId) -> Option<PeerConnectionStatus> {
self.peer_info(peer_id) self.peer_info(peer_id)
.map(|info| info.connection_status().clone()) .map(|info| info.connection_status().clone())

View File

@ -236,6 +236,7 @@ impl<T: EthSpec> PeerInfo<T> {
/* Mutable Functions */ /* Mutable Functions */
/// Updates the sync status. Returns true if the status was changed. /// Updates the sync status. Returns true if the status was changed.
// VISIBILITY: Both the peer manager the network sync is able to update the sync state of a peer
pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool { pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool {
self.sync_status.update(sync_status) self.sync_status.update(sync_status)
} }

View File

@ -27,6 +27,19 @@ pub struct SyncInfo {
pub finalized_root: Hash256, pub finalized_root: Hash256,
} }
impl std::cmp::PartialEq for SyncStatus {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(SyncStatus::Synced { .. }, SyncStatus::Synced { .. })
| (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. })
| (SyncStatus::Behind { .. }, SyncStatus::Behind { .. })
| (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer)
| (SyncStatus::Unknown, SyncStatus::Unknown)
)
}
}
impl SyncStatus { impl SyncStatus {
/// Returns true if the peer has advanced knowledge of the chain. /// Returns true if the peer has advanced knowledge of the chain.
pub fn is_advanced(&self) -> bool { pub fn is_advanced(&self) -> bool {
@ -48,7 +61,7 @@ impl SyncStatus {
/// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if /// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if
/// the status remained `Synced` with different `SyncInfo` within. /// the status remained `Synced` with different `SyncInfo` within.
pub fn update(&mut self, new_state: SyncStatus) -> bool { pub fn update(&mut self, new_state: SyncStatus) -> bool {
let changed_status = !(self.is_same_kind(&new_state)); let changed_status = *self != new_state;
*self = new_state; *self = new_state;
changed_status changed_status
} }
@ -62,17 +75,6 @@ impl SyncStatus {
SyncStatus::IrrelevantPeer => "Irrelevant", SyncStatus::IrrelevantPeer => "Irrelevant",
} }
} }
pub fn is_same_kind(&self, other: &Self) -> bool {
matches!(
(self, other),
(SyncStatus::Synced { .. }, SyncStatus::Synced { .. })
| (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. })
| (SyncStatus::Behind { .. }, SyncStatus::Behind { .. })
| (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer)
| (SyncStatus::Unknown, SyncStatus::Unknown)
)
}
} }
impl std::fmt::Display for SyncStatus { impl std::fmt::Display for SyncStatus {

View File

@ -215,7 +215,8 @@ impl<TSpec: EthSpec> Service<TSpec> {
} }
if !network_globals if !network_globals
.peers() .peers
.read()
.is_connected_or_dialing(&bootnode_enr.peer_id()) .is_connected_or_dialing(&bootnode_enr.peer_id())
{ {
dial(multiaddr.clone()); dial(multiaddr.clone());

View File

@ -22,7 +22,7 @@ pub struct NetworkGlobals<TSpec: EthSpec> {
/// The UDP port that the discovery service is listening on /// The UDP port that the discovery service is listening on
pub listen_port_udp: AtomicU16, pub listen_port_udp: AtomicU16,
/// The collection of known peers. /// The collection of known peers.
peers: RwLock<PeerDB<TSpec>>, pub peers: RwLock<PeerDB<TSpec>>,
// The local meta data of our node. // The local meta data of our node.
pub local_metadata: RwLock<MetaData<TSpec>>, pub local_metadata: RwLock<MetaData<TSpec>>,
/// The current gossipsub topic subscriptions. /// The current gossipsub topic subscriptions.
@ -121,14 +121,6 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
.unwrap_or_default() .unwrap_or_default()
} }
pub fn peers(&self) -> impl std::ops::Deref<Target = PeerDB<TSpec>> + '_ {
self.peers.read()
}
pub(crate) fn peers_mut(&self) -> impl std::ops::DerefMut<Target = PeerDB<TSpec>> + '_ {
self.peers.write()
}
/// Updates the syncing state of the node. /// Updates the syncing state of the node.
/// ///
/// The old state is returned /// The old state is returned

View File

@ -786,7 +786,7 @@ pub fn update_gossip_metrics<T: EthSpec>(
let mut peer_to_client = HashMap::new(); let mut peer_to_client = HashMap::new();
let mut scores_per_client: HashMap<&'static str, Vec<f64>> = HashMap::new(); let mut scores_per_client: HashMap<&'static str, Vec<f64>> = HashMap::new();
{ {
let peers = network_globals.peers(); let peers = network_globals.peers.read();
for (peer_id, _) in gossipsub.all_peers() { for (peer_id, _) in gossipsub.all_peers() {
let client = peers let client = peers
.peer_info(peer_id) .peer_info(peer_id)
@ -916,7 +916,8 @@ pub fn update_sync_metrics<T: EthSpec>(network_globals: &Arc<NetworkGlobals<T>>)
// count per sync status, the number of connected peers // count per sync status, the number of connected peers
let mut peers_per_sync_type = FnvHashMap::default(); let mut peers_per_sync_type = FnvHashMap::default();
for sync_type in network_globals for sync_type in network_globals
.peers() .peers
.read()
.connected_peers() .connected_peers()
.map(|(_peer_id, info)| info.sync_status().as_str()) .map(|(_peer_id, info)| info.sync_status().as_str())
{ {

View File

@ -154,7 +154,7 @@ impl<T: BeaconChainTypes> Router<T> {
/// A new RPC request has been received from the network. /// A new RPC request has been received from the network.
fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) { fn handle_rpc_request(&mut self, peer_id: PeerId, id: PeerRequestId, request: Request) {
if !self.network_globals.peers().is_connected(&peer_id) { if !self.network_globals.peers.read().is_connected(&peer_id) {
debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request); debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?request);
return; return;
} }

View File

@ -17,7 +17,7 @@ use lighthouse_network::{
types::{GossipEncoding, GossipTopic}, types::{GossipEncoding, GossipTopic},
BehaviourEvent, MessageId, NetworkGlobals, PeerId, BehaviourEvent, MessageId, NetworkGlobals, PeerId,
}; };
use lighthouse_network::{MessageAcceptance, Service as LibP2PService, SyncStatus}; use lighthouse_network::{MessageAcceptance, Service as LibP2PService};
use slog::{crit, debug, error, info, o, trace, warn}; use slog::{crit, debug, error, info, o, trace, warn};
use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration}; use std::{net::SocketAddr, pin::Pin, sync::Arc, time::Duration};
use store::HotColdDB; use store::HotColdDB;
@ -100,10 +100,6 @@ pub enum NetworkMessage<T: EthSpec> {
reason: GoodbyeReason, reason: GoodbyeReason,
source: ReportSource, source: ReportSource,
}, },
UpdatePeerSyncStatus {
peer_id: PeerId,
sync_status: SyncStatus,
},
} }
/// Service that handles communication between internal services and the `lighthouse_network` network service. /// Service that handles communication between internal services and the `lighthouse_network` network service.
@ -531,9 +527,6 @@ fn spawn_service<T: BeaconChainTypes>(
); );
} }
} }
NetworkMessage::UpdatePeerSyncStatus{peer_id, sync_status} => {
service.libp2p.swarm.behaviour_mut().update_peers_sync_status(&peer_id, sync_status);
}
} }
} }
// process any attestation service events // process any attestation service events

View File

@ -213,7 +213,14 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
match self.state() { match self.state() {
BackFillState::Syncing => {} // already syncing ignore. BackFillState::Syncing => {} // already syncing ignore.
BackFillState::Paused => { BackFillState::Paused => {
if self.network_globals.peers().synced_peers().next().is_some() { if self
.network_globals
.peers
.read()
.synced_peers()
.next()
.is_some()
{
// If there are peers to resume with, begin the resume. // If there are peers to resume with, begin the resume.
debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target); debug!(self.log, "Resuming backfill sync"; "start_epoch" => self.current_start, "awaiting_batches" => self.batches.len(), "processing_target" => self.processing_target);
self.set_state(BackFillState::Syncing); self.set_state(BackFillState::Syncing);
@ -899,7 +906,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let new_peer = { let new_peer = {
let mut priorized_peers = self let mut priorized_peers = self
.network_globals .network_globals
.peers() .peers
.read()
.synced_peers() .synced_peers()
.map(|peer| { .map(|peer| {
( (
@ -1018,7 +1026,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut idle_peers = self let mut idle_peers = self
.network_globals .network_globals
.peers() .peers
.read()
.synced_peers() .synced_peers()
.filter(|peer_id| { .filter(|peer_id| {
self.active_requests self.active_requests

View File

@ -294,7 +294,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let sync_type = remote_sync_type(&local, &remote, &self.chain); let sync_type = remote_sync_type(&local, &remote, &self.chain);
// update the state of the peer. // update the state of the peer.
let should_add = self.update_peer_sync_state(peer_id, &local, &remote, &sync_type); let should_add = self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type);
if matches!(sync_type, PeerSyncType::Advanced) && should_add { if matches!(sync_type, PeerSyncType::Advanced) && should_add {
self.range_sync self.range_sync
@ -646,7 +646,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
/// connection status. /// connection status.
fn update_peer_sync_state( fn update_peer_sync_state(
&mut self, &mut self,
peer_id: PeerId, peer_id: &PeerId,
local_sync_info: &SyncInfo, local_sync_info: &SyncInfo,
remote_sync_info: &SyncInfo, remote_sync_info: &SyncInfo,
sync_type: &PeerSyncType, sync_type: &PeerSyncType,
@ -656,10 +656,15 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let new_state = sync_type.as_sync_status(remote_sync_info); let new_state = sync_type.as_sync_status(remote_sync_info);
let rpr = new_state.as_str(); let rpr = new_state.as_str();
// Drop the write lock
if let Some(info) = self.network_globals.peers().peer_info(&peer_id) { let update_sync_status = self
let is_connected = info.is_connected(); .network_globals
if !info.sync_status().is_same_kind(&new_state) { .peers
.write()
.update_sync_status(peer_id, new_state.clone());
if let Some(was_updated) = update_sync_status {
let is_connected = self.network_globals.peers.read().is_connected(peer_id);
if was_updated {
debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr, debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr,
"our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch, "our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch,
"their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch, "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch,
@ -670,8 +675,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if new_state.is_synced() { if new_state.is_synced() {
self.backfill_sync.fully_synced_peer_joined(); self.backfill_sync.fully_synced_peer_joined();
} }
self.network.update_peer_sync_status(peer_id, new_state);
} }
is_connected is_connected
} else { } else {
@ -709,7 +712,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0)); let head = self.chain.best_slot().unwrap_or_else(|_| Slot::new(0));
let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0)); let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0));
let peers = self.network_globals.peers(); let peers = self.network_globals.peers.read();
if current_slot >= head if current_slot >= head
&& current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64) && current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64)
&& head > 0 && head > 0

View File

@ -10,9 +10,7 @@ use fnv::FnvHashMap;
use lighthouse_network::rpc::{ use lighthouse_network::rpc::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId, BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId,
}; };
use lighthouse_network::{ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, SyncStatus,
};
use slog::{debug, trace, warn}; use slog::{debug, trace, warn};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -54,7 +52,12 @@ impl<T: EthSpec> SyncNetworkContext<T> {
/// Returns the Client type of the peer if known /// Returns the Client type of the peer if known
pub fn client_type(&self, peer_id: &PeerId) -> Client { pub fn client_type(&self, peer_id: &PeerId) -> Client {
self.network_globals.client(peer_id) self.network_globals
.peers
.read()
.peer_info(peer_id)
.map(|info| info.client().clone())
.unwrap_or_default()
} }
pub fn status_peers<C: ToStatusMessage>( pub fn status_peers<C: ToStatusMessage>(
@ -205,17 +208,10 @@ impl<T: EthSpec> SyncNetworkContext<T> {
}); });
} }
pub fn update_peer_sync_status(&self, peer_id: PeerId, new_status: SyncStatus) {
let _ = self.send_network_msg(NetworkMessage::UpdatePeerSyncStatus {
peer_id,
sync_status: new_status,
});
}
/// Sends an arbitrary network message. /// Sends an arbitrary network message.
fn send_network_msg(&self, msg: NetworkMessage<T>) -> Result<(), &'static str> { fn send_network_msg(&mut self, msg: NetworkMessage<T>) -> Result<(), &'static str> {
self.network_send.send(msg).map_err(|msg| { self.network_send.send(msg).map_err(|_| {
warn!(self.log, "Could not send message to the network service"; "msg" => ?msg.0); debug!(self.log, "Could not send message to the network service");
"Network channel send Failed" "Network channel send Failed"
}) })
} }