Refactor Peerdb and PeerManager (#2660)

## Proposed Changes

This is a refactor of the PeerDB and PeerManager. A number of bugs have been surfacing around the connection state of peers and their interaction with the score state. 

This refactor tightens the mutability properties of peers such that only specific modules are able to modify the state of peer information preventing inadvertant state changes that can lead to our local peer manager db being out of sync with libp2p. 

Further, the logic around connection and scoring was quite convoluted and the distinction between the PeerManager and Peerdb was not well defined. Although these issues are not fully resolved, this PR is step to cleaning up this logic. The peerdb solely manages most mutability operations of peers leaving high-order logic to the peer manager. 

A single `update_connection_state()` function has been added to the peer-db making it solely responsible for modifying the peer's connection state. The way the peer's scores can be modified have been reduced to three simple functions (`update_scores()`, `update_gossipsub_scores()` and `report_peer()`). This prevents any add-hoc modifications of scores and only natural processes of score modification is allowed which simplifies the reasoning of score and state changes.
This commit is contained in:
Age Manning 2021-10-11 02:45:06 +00:00
parent 708557a473
commit 0aee7ec873
19 changed files with 1587 additions and 1154 deletions

View File

@ -4,7 +4,7 @@ use crate::behaviour::gossipsub_scoring_parameters::{
use crate::config::gossipsub_config;
use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
use crate::peer_manager::{
score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent,
peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent,
};
use crate::rpc::*;
use crate::service::METADATA_FILENAME;
@ -255,7 +255,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);
gossipsub
.with_peer_score(params.clone(), thresholds)
.with_peer_score(params, thresholds)
.expect("Valid score params and thresholds");
Ok(Behaviour {
@ -445,7 +445,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.peers
.read()
.peer_info(propagation_source)
.map(|info| info.client.kind.as_ref())
.map(|info| info.client().kind.as_ref())
{
metrics::inc_counter_vec(
&metrics::GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT,
@ -834,12 +834,18 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
}
GossipsubEvent::Subscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
self.peer_manager.add_subscription(&peer_id, subnet_id);
self.network_globals
.peers
.write()
.add_subscription(&peer_id, subnet_id);
}
}
GossipsubEvent::Unsubscribed { peer_id, topic } => {
if let Some(subnet_id) = subnet_from_topic_hash(&topic) {
self.peer_manager.remove_subscription(&peer_id, subnet_id);
self.network_globals
.peers
.write()
.remove_subscription(&peer_id, &subnet_id);
}
}
}

View File

@ -74,8 +74,9 @@ pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr};
pub use metrics::scrape_discovery_metrics;
pub use peer_manager::{
client::Client,
score::{PeerAction, ReportSource},
ConnectionDirection, PeerConnectionStatus, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo,
peerdb::client::Client,
peerdb::score::{PeerAction, ReportSource},
peerdb::PeerDB,
ConnectionDirection, PeerConnectionStatus, PeerInfo, PeerManager, SyncInfo, SyncStatus,
};
pub use service::{load_private_key, Libp2pEvent, Service, NETWORK_KEY_FILENAME};

View File

@ -1,6 +1,5 @@
//! Implementation of Lighthouse's peer management system.
pub use self::peerdb::*;
use crate::discovery::TARGET_SUBNET_PEERS;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::types::SyncState;
@ -13,6 +12,7 @@ use futures::Stream;
use hashset_delay::HashSetDelay;
use libp2p::core::ConnectedPoint;
use libp2p::identify::IdentifyInfo;
use peerdb::{BanOperation, BanResult, ScoreUpdateResult};
use slog::{debug, error, warn};
use smallvec::SmallVec;
use std::{
@ -25,17 +25,14 @@ use types::{EthSpec, SyncSubnetId};
pub use libp2p::core::{identity::Keypair, Multiaddr};
pub mod client;
mod peer_info;
mod peer_sync_status;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
mod peerdb;
pub(crate) mod score;
pub mod peerdb;
pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo};
pub use peer_sync_status::{PeerSyncStatus, SyncInfo};
use score::{PeerAction, ReportSource, ScoreState};
use std::cmp::Ordering;
pub use peerdb::peer_info::{
ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo,
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;
@ -65,10 +62,6 @@ pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.3;
/// dialing priority peers we need for validator duties.
pub const PRIORITY_PEER_EXCESS: f32 = 0.05;
/// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing
/// them in lighthouse.
const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1;
/// The main struct that handles peer's reputation and connection status.
pub struct PeerManager<TSpec: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
@ -154,63 +147,102 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
///
/// This will send a goodbye and disconnect the peer if it is connected or dialing.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
// get the peer info
// Update the sync status if required
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score());
if matches!(reason, GoodbyeReason::IrrelevantNetwork) {
info.sync_status.update(PeerSyncStatus::IrrelevantPeer);
info.update_sync_status(SyncStatus::IrrelevantPeer);
}
// Goodbye's are fatal
info.apply_peer_action_to_score(PeerAction::Fatal);
metrics::inc_counter_vec(
&metrics::PEER_ACTION_EVENTS_PER_CLIENT,
&[
info.client.kind.as_ref(),
PeerAction::Fatal.as_ref(),
source.into(),
],
);
}
// Update the peerdb and start the disconnection.
self.ban_peer(peer_id, reason);
self.report_peer(peer_id, PeerAction::Fatal, source, Some(reason));
}
/// Reports a peer for some action.
///
/// If the peer doesn't exist, log a warning and insert defaults.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) {
// Helper function to avoid any potential deadlocks.
let mut to_ban_peers = Vec::with_capacity(1);
let mut to_unban_peers = Vec::with_capacity(1);
pub fn report_peer(
&mut self,
peer_id: &PeerId,
action: PeerAction,
source: ReportSource,
reason: Option<GoodbyeReason>,
) {
let action = self
.network_globals
.peers
.write()
.report_peer(peer_id, action, source);
self.handle_score_action(peer_id, action, reason);
}
{
let mut peer_db = self.network_globals.peers.write();
if let Some(info) = peer_db.peer_info_mut(peer_id) {
let previous_state = info.score_state();
info.apply_peer_action_to_score(action);
metrics::inc_counter_vec(
&metrics::PEER_ACTION_EVENTS_PER_CLIENT,
&[info.client.kind.as_ref(), action.as_ref(), source.into()],
);
Self::handle_score_transitions(
previous_state,
peer_id,
info,
&mut to_ban_peers,
&mut to_unban_peers,
&mut self.events,
&self.log,
);
if previous_state == info.score_state() {
debug!(self.log, "Peer score adjusted"; "peer_id" => %peer_id, "score" => %info.score());
}
/// Upon adjusting a Peer's score, there are times the peer manager must pass messages up to
/// libp2p. This function handles the conditional logic associated with each score update
/// result.
fn handle_score_action(
&mut self,
peer_id: &PeerId,
action: ScoreUpdateResult,
reason: Option<GoodbyeReason>,
) {
match action {
ScoreUpdateResult::Ban(ban_operation) => {
// The peer has been banned and we need to handle the banning operation
// NOTE: When we ban a peer, its IP address can be banned. We do not recursively search
// through all our connected peers banning all other peers that are using this IP address.
// If these peers are behaving fine, we permit their current connections. However, if any new
// nodes or current nodes try to reconnect on a banned IP, they will be instantly banned
// and disconnected.
self.handle_ban_operation(peer_id, ban_operation, reason);
}
} // end write lock
ScoreUpdateResult::Disconnect => {
// The peer has transitioned to a disconnect state and has been marked as such in
// the peer db. We must inform libp2p to disconnect this peer.
self.events.push(PeerManagerEvent::DisconnectPeer(
*peer_id,
GoodbyeReason::BadScore,
));
}
ScoreUpdateResult::NoAction => {
// The report had no effect on the peer and there is nothing to do.
}
ScoreUpdateResult::Unbanned(unbanned_ips) => {
// Inform the Swarm to unban the peer
self.events
.push(PeerManagerEvent::UnBanned(*peer_id, unbanned_ips));
}
}
}
self.ban_and_unban_peers(to_ban_peers, to_unban_peers);
/// If a peer is being banned, this handles the banning operation.
fn handle_ban_operation(
&mut self,
peer_id: &PeerId,
ban_operation: BanOperation,
reason: Option<GoodbyeReason>,
) {
match ban_operation {
BanOperation::DisconnectThePeer => {
// The peer was currently connected, so we start a disconnection.
// Once the peer has disconnected, its connection state will transition to a
// banned state.
self.events.push(PeerManagerEvent::DisconnectPeer(
*peer_id,
reason.unwrap_or(GoodbyeReason::BadScore),
));
}
BanOperation::PeerDisconnecting => {
// The peer is currently being disconnected and will be banned once the
// disconnection completes.
}
BanOperation::ReadyToBan(banned_ips) => {
// The peer is not currently connected, we can safely ban it at the swarm
// level.
// Inform the Swarm to ban the peer
self.events
.push(PeerManagerEvent::Banned(*peer_id, banned_ips));
}
}
}
/// Peers that have been returned by discovery requests that are suitable for dialing are
@ -274,27 +306,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.status_peers.insert(*peer_id);
}
/// Adds a gossipsub subscription to a peer in the peerdb.
pub fn add_subscription(&self, peer_id: &PeerId, subnet: Subnet) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.insert(subnet);
}
}
/// Removes a gossipsub subscription to a peer in the peerdb.
pub fn remove_subscription(&self, peer_id: &PeerId, subnet: Subnet) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets.remove(&subnet);
}
}
/// Removes all gossipsub subscriptions to a peer in the peerdb.
pub fn remove_all_subscriptions(&self, peer_id: &PeerId) {
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
info.subnets = Default::default();
}
}
/// Insert the sync subnet into list of long lived sync committee subnets that we need to
/// maintain adequate number of peers for.
pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) {
@ -417,10 +428,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
) {
if num_established == 0 {
// There are no more connections
// Remove all subnet subscriptions from the peer_db
self.remove_all_subscriptions(&peer_id);
if self
.network_globals
.peers
@ -441,7 +448,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.peers
.read()
.peer_info(&peer_id)
.map(|info| info.client.kind.clone())
.map(|info| info.client().kind.clone())
{
if let Some(v) =
metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()])
@ -496,15 +503,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// Updates `PeerInfo` with `identify` information.
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let previous_kind = peer_info.client.kind.clone();
let previous_listening_addresses = std::mem::replace(
&mut peer_info.listening_addresses,
info.listen_addrs.clone(),
);
peer_info.client = client::Client::from_identify_info(info);
let previous_kind = peer_info.client().kind.clone();
let previous_listening_addresses =
peer_info.set_listening_addresses(info.listen_addrs.clone());
peer_info.set_client(peerdb::client::Client::from_identify_info(info));
if previous_kind != peer_info.client.kind
|| peer_info.listening_addresses != previous_listening_addresses
if previous_kind != peer_info.client().kind
|| *peer_info.listening_addresses() != previous_listening_addresses
{
debug!(self.log, "Identified Peer"; "peer" => %peer_id,
"protocol_version" => &info.protocol_version,
@ -517,7 +522,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// update the peer client kind metric
if let Some(v) = metrics::get_int_gauge(
&metrics::PEERS_PER_CLIENT,
&[&peer_info.client.kind.to_string()],
&[&peer_info.client().kind.to_string()],
) {
v.inc()
};
@ -636,7 +641,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
RPCError::Disconnected => return, // No penalty for a graceful disconnection
};
self.report_peer(peer_id, peer_action, ReportSource::RPC);
self.report_peer(peer_id, peer_action, ReportSource::RPC, None);
}
/// A ping request has been received.
@ -646,7 +651,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// received a ping
// reset the to-ping timer for this peer
debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq);
match peer_info.connection_direction {
match peer_info.connection_direction() {
Some(ConnectionDirection::Incoming) => {
self.inbound_ping_peers.insert(*peer_id);
}
@ -659,7 +664,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
// if the sequence number is unknown send an update the meta data of the peer.
if let Some(meta_data) = &peer_info.meta_data {
if let Some(meta_data) = &peer_info.meta_data() {
if *meta_data.seq_number() < seq {
debug!(self.log, "Requesting new metadata from peer";
"peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "ping_seq_no" => seq);
@ -683,7 +688,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// received a pong
// if the sequence number is unknown send update the meta data of the peer.
if let Some(meta_data) = &peer_info.meta_data {
if let Some(meta_data) = &peer_info.meta_data() {
if *meta_data.seq_number() < seq {
debug!(self.log, "Requesting new metadata from peer";
"peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "pong_seq_no" => seq);
@ -703,7 +708,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<TSpec>) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data {
if let Some(known_meta_data) = &peer_info.meta_data() {
if *known_meta_data.seq_number() < *meta_data.seq_number() {
debug!(self.log, "Updating peer's metadata";
"peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number(), "new_seq_no" => meta_data.seq_number());
@ -718,64 +723,24 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
debug!(self.log, "Obtained peer's metadata";
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
}
peer_info.meta_data = Some(meta_data);
peer_info.set_meta_data(meta_data);
} else {
error!(self.log, "Received METADATA from an unknown peer";
"peer_id" => %peer_id);
}
}
/// Updates the gossipsub scores for all known peers in gossipsub.
pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) {
let mut to_ban_peers = Vec::new();
let mut to_unban_peers = Vec::new();
let actions = self
.network_globals
.peers
.write()
.update_gossipsub_scores(self.target_peers, gossipsub);
{
// collect peers with scores
let mut guard = self.network_globals.peers.write();
let mut peers: Vec<_> = guard
.peers_mut()
.filter_map(|(peer_id, info)| {
gossipsub
.peer_score(peer_id)
.map(|score| (peer_id, info, score))
})
.collect();
// sort descending by score
peers.sort_unstable_by(|(.., s1), (.., s2)| {
s2.partial_cmp(s1).unwrap_or(Ordering::Equal)
});
let mut to_ignore_negative_peers =
(self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize;
for (peer_id, info, score) in peers {
let previous_state = info.score_state();
info.update_gossipsub_score(
score,
if score < 0.0 && to_ignore_negative_peers > 0 {
to_ignore_negative_peers -= 1;
// We ignore the negative score for the best negative peers so that their
// gossipsub score can recover without getting disconnected.
true
} else {
false
},
);
Self::handle_score_transitions(
previous_state,
peer_id,
info,
&mut to_ban_peers,
&mut to_unban_peers,
&mut self.events,
&self.log,
);
}
} // end write lock
self.ban_and_unban_peers(to_ban_peers, to_unban_peers);
for (peer_id, score_action) in actions {
self.handle_score_action(&peer_id, score_action, None);
}
}
/* Internal functions */
@ -810,14 +775,15 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
///
/// This is also called when dialing a peer fails.
fn inject_disconnect(&mut self, peer_id: &PeerId) {
if self
let ban_operation = self
.network_globals
.peers
.write()
.inject_disconnect(peer_id)
{
.inject_disconnect(peer_id);
if let Some(ban_operation) = ban_operation {
// The peer was awaiting a ban, continue to ban the peer.
self.ban_peer(peer_id, GoodbyeReason::BadScore);
self.handle_ban_operation(peer_id, ban_operation, None);
}
// Remove the ping and status timer for the peer
@ -879,7 +845,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.peers
.read()
.peer_info(peer_id)
.map(|peer_info| peer_info.client.kind.clone())
.map(|peer_info| peer_info.client().kind.clone())
{
if let Some(v) =
metrics::get_int_gauge(&metrics::PEERS_PER_CLIENT, &[&kind.to_string()])
@ -891,154 +857,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
true
}
/// This handles score transitions between states. It transitions peers states from
/// disconnected/banned/connected.
fn handle_score_transitions(
previous_state: ScoreState,
peer_id: &PeerId,
info: &mut PeerInfo<TSpec>,
to_ban_peers: &mut Vec<PeerId>,
to_unban_peers: &mut Vec<PeerId>,
events: &mut SmallVec<[PeerManagerEvent; 16]>,
log: &slog::Logger,
) {
match (info.score_state(), previous_state) {
(ScoreState::Banned, ScoreState::Healthy | ScoreState::Disconnected) => {
debug!(log, "Peer has been banned"; "peer_id" => %peer_id, "score" => %info.score());
to_ban_peers.push(*peer_id);
}
(ScoreState::Disconnected, ScoreState::Banned | ScoreState::Healthy) => {
debug!(log, "Peer transitioned to disconnect state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state);
// disconnect the peer if it's currently connected or dialing
if info.is_connected_or_dialing() {
// Change the state to inform that we are disconnecting the peer.
info.disconnecting(false);
events.push(PeerManagerEvent::DisconnectPeer(
*peer_id,
GoodbyeReason::BadScore,
));
} else if previous_state == ScoreState::Banned {
to_unban_peers.push(*peer_id);
}
}
(ScoreState::Healthy, ScoreState::Disconnected) => {
debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state);
}
(ScoreState::Healthy, ScoreState::Banned) => {
debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state);
// unban the peer if it was previously banned.
to_unban_peers.push(*peer_id);
}
// Explicitly ignore states that haven't transitioned.
(ScoreState::Healthy, ScoreState::Healthy) => {}
(ScoreState::Disconnected, ScoreState::Disconnected) => {}
(ScoreState::Banned, ScoreState::Banned) => {}
}
}
/// Updates the state of banned peers.
fn ban_and_unban_peers(&mut self, to_ban_peers: Vec<PeerId>, to_unban_peers: Vec<PeerId>) {
// process banning peers
for peer_id in to_ban_peers {
self.ban_peer(&peer_id, GoodbyeReason::BadScore);
}
// process unbanning peers
for peer_id in to_unban_peers {
if let Err(e) = self.unban_peer(&peer_id) {
error!(self.log, "{}", e; "peer_id" => %peer_id);
}
}
}
/// Updates the scores of known peers according to their connection
/// status and the time that has passed.
/// NOTE: This is experimental and will likely be adjusted
fn update_peer_scores(&mut self) {
/* Check how long have peers been in this state and update their reputations if needed */
let mut to_ban_peers = Vec::new();
let mut to_unban_peers = Vec::new();
for (peer_id, info) in self.network_globals.peers.write().peers_mut() {
let previous_state = info.score_state();
// Update scores
info.score_update();
Self::handle_score_transitions(
previous_state,
peer_id,
info,
&mut to_ban_peers,
&mut to_unban_peers,
&mut self.events,
&self.log,
);
}
self.ban_and_unban_peers(to_ban_peers, to_unban_peers);
}
/// Bans a peer.
///
/// Records updates the peers connection status and updates the peer db as well as blocks the
/// peer from participating in discovery and removes them from the routing table.
fn ban_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) {
// NOTE: When we ban a peer, its IP address can be banned. We do not recursively search
// through all our connected peers banning all other peers that are using this IP address.
// If these peers are behaving fine, we permit their current connections. However, if any new
// nodes or current nodes try to reconnect on a banned IP, they will be instantly banned
// and disconnected.
let mut peer_db = self.network_globals.peers.write();
match peer_db.disconnect_and_ban(peer_id) {
BanOperation::DisconnectThePeer => {
// The peer was currently connected, so we start a disconnection.
// Once the peer has disconnected, this function will be called to again to ban
// at the swarm level.
self.events
.push(PeerManagerEvent::DisconnectPeer(*peer_id, reason));
}
BanOperation::PeerDisconnecting => {
// The peer is currently being disconnected and will be banned once the
// disconnection completes.
}
BanOperation::ReadyToBan => {
// The peer is not currently connected, we can safely ban it at the swarm
// level.
let banned_ip_addresses = peer_db
.peer_info(peer_id)
.map(|info| {
info.seen_addresses()
.filter(|ip| peer_db.is_ip_banned(ip))
.collect::<Vec<_>>()
})
.unwrap_or_default();
// Inform the Swarm to ban the peer
self.events
.push(PeerManagerEvent::Banned(*peer_id, banned_ip_addresses));
}
}
}
/// Unbans a peer.
///
/// Updates the peer's connection status and updates the peer db as well as removes
/// previous bans from discovery.
fn unban_peer(&mut self, peer_id: &PeerId) -> Result<(), &'static str> {
let mut peer_db = self.network_globals.peers.write();
peer_db.unban(peer_id)?;
let seen_ip_addresses = peer_db
.peer_info(peer_id)
.map(|info| info.seen_addresses().collect::<Vec<_>>())
.unwrap_or_default();
// Inform the Swarm to unban the peer
self.events
.push(PeerManagerEvent::UnBanned(*peer_id, seen_ip_addresses));
Ok(())
}
// Gracefully disconnects a peer without banning them.
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.events
@ -1046,7 +864,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.network_globals
.peers
.write()
.notify_disconnecting(peer_id, false);
.notify_disconnecting(&peer_id, false);
}
/// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`.
@ -1110,8 +928,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::DiscoverPeers);
}
// Updates peer's scores.
self.update_peer_scores();
// Updates peer's scores and unban any peers if required.
let actions = self.network_globals.peers.write().update_scores();
for (peer_id, action) in actions {
self.handle_score_action(&peer_id, action, None);
}
// Maintain minimum count for sync committee peers.
self.maintain_sync_committee_peers();
@ -1394,67 +1215,6 @@ mod tests {
);
}
#[tokio::test]
async fn test_peer_manager_removes_unhealthy_peers_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;
// Create 3 peers to connect to.
let peer0 = PeerId::random();
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();
peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_outgoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None);
// Connect to two peers that are on the threshold of being disconnected.
peer_manager.inject_connect_ingoing(
&inbound_only_peer1,
"/ip4/0.0.0.0".parse().unwrap(),
None,
);
peer_manager.inject_connect_outgoing(
&outbound_only_peer1,
"/ip4/0.0.0.0".parse().unwrap(),
None,
);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
// Update the gossipsub scores to induce connection downgrade
// during the heartbeat, update_peer_scores will downgrade the score from -19.9 to at least -20, this will then trigger a disconnection.
// If we changed the peer scores to -20 before the heartbeat, update_peer_scores will mark the previous score status as disconnected,
// then handle_state_transitions will not change the connection status to disconnected because the score state has not changed.
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager.heartbeat();
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 1);
}
#[tokio::test]
async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() {
let mut peer_manager = build_peer_manager(3).await;
@ -1466,18 +1226,18 @@ mod tests {
let inbound_only_peer1 = PeerId::random();
let outbound_only_peer1 = PeerId::random();
peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None);
// Connect to two peers that are on the threshold of being disconnected.
peer_manager.inject_connect_ingoing(
&inbound_only_peer1,
"/ip4/0.0.0.0".parse().unwrap(),
"/ip4/0.0.0.0/tcp/8000".parse().unwrap(),
None,
);
peer_manager.inject_connect_outgoing(
&outbound_only_peer1,
"/ip4/0.0.0.0".parse().unwrap(),
"/ip4/0.0.0.0/tcp/8000".parse().unwrap(),
None,
);
peer_manager
@ -1486,14 +1246,14 @@ mod tests {
.write()
.peer_info_mut(&(inbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
.add_to_score(-19.8);
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&(outbound_only_peer1))
.unwrap()
.add_to_score(-19.9);
.add_to_score(-19.8);
peer_manager
.network_globals
.peers
@ -1509,9 +1269,9 @@ mod tests {
.unwrap()
.set_gossipsub_score(-85.0);
peer_manager.heartbeat();
// Tests that when we are over the target peer limit, after disconnecting two unhealthy peers,
// Tests that when we are over the target peer limit, after disconnecting one unhealthy peer,
// the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target).
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 2);
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}
#[tokio::test]

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
use super::client::Client;
use super::score::{PeerAction, Score, ScoreState};
use super::PeerSyncStatus;
use super::sync_status::SyncStatus;
use crate::Multiaddr;
use crate::{rpc::MetaData, types::Subnet};
use discv5::Enr;
@ -24,34 +24,34 @@ pub struct PeerInfo<T: EthSpec> {
/// The peers reputation
score: Score,
/// Client managing this peer
pub client: Client,
client: Client,
/// Connection status of this peer
connection_status: PeerConnectionStatus,
/// The known listening addresses of this peer. This is given by identify and can be arbitrary
/// (including local IPs).
pub listening_addresses: Vec<Multiaddr>,
listening_addresses: Vec<Multiaddr>,
/// This is addresses we have physically seen and this is what we use for banning/un-banning
/// peers.
pub seen_addresses: HashSet<SocketAddr>,
seen_addresses: HashSet<SocketAddr>,
/// The current syncing state of the peer. The state may be determined after it's initial
/// connection.
pub sync_status: PeerSyncStatus,
sync_status: SyncStatus,
/// The ENR subnet bitfield of the peer. This may be determined after it's initial
/// connection.
pub meta_data: Option<MetaData<T>>,
meta_data: Option<MetaData<T>>,
/// Subnets the peer is connected to.
pub subnets: HashSet<Subnet>,
subnets: HashSet<Subnet>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
pub min_ttl: Option<Instant>,
min_ttl: Option<Instant>,
/// Is the peer a trusted peer.
pub is_trusted: bool,
is_trusted: bool,
/// Direction of the first connection of the last (or current) connected session with this peer.
/// None if this peer was never connected.
pub connection_direction: Option<ConnectionDirection>,
connection_direction: Option<ConnectionDirection>,
/// The enr of the peer, if known.
pub enr: Option<Enr>,
enr: Option<Enr>,
}
impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
@ -64,7 +64,7 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
listening_addresses: Vec::new(),
seen_addresses: HashSet::new(),
subnets: HashSet::new(),
sync_status: PeerSyncStatus::Unknown,
sync_status: SyncStatus::Unknown,
meta_data: None,
min_ttl: None,
is_trusted: false,
@ -101,13 +101,59 @@ impl<T: EthSpec> PeerInfo<T> {
false
}
/// Obtains the client of the peer.
pub fn client(&self) -> &Client {
&self.client
}
/// Returns the listening addresses of the Peer.
pub fn listening_addresses(&self) -> &Vec<Multiaddr> {
&self.listening_addresses
}
/// Returns the connection direction for the peer.
pub fn connection_direction(&self) -> Option<&ConnectionDirection> {
self.connection_direction.as_ref()
}
/// Returns the sync status of the peer.
pub fn sync_status(&self) -> &SyncStatus {
&self.sync_status
}
/// Returns the metadata for the peer if currently known.
pub fn meta_data(&self) -> Option<&MetaData<T>> {
self.meta_data.as_ref()
}
/// Returns whether the peer is a trusted peer or not.
pub fn is_trusted(&self) -> bool {
self.is_trusted
}
/// The time a peer is expected to be useful until for an attached validator. If this is set to
/// None, the peer is not required for any upcoming duty.
pub fn min_ttl(&self) -> Option<&Instant> {
self.min_ttl.as_ref()
}
/// The ENR of the peer if it is known.
pub fn enr(&self) -> Option<&Enr> {
self.enr.as_ref()
}
/// Returns if the peer is subscribed to a given `Subnet` from the gossipsub subscriptions.
pub fn on_subnet_gossipsub(&self, subnet: &Subnet) -> bool {
self.subnets.contains(subnet)
}
/// Returns the seen IP addresses of the peer.
pub fn seen_addresses(&self) -> impl Iterator<Item = IpAddr> + '_ {
/// Returns the seen addresses of the peer.
pub fn seen_addresses(&self) -> impl Iterator<Item = &SocketAddr> + '_ {
self.seen_addresses.iter()
}
/// Returns a list of seen IP addresses for the peer.
pub fn seen_ip_addresses(&self) -> impl Iterator<Item = IpAddr> + '_ {
self.seen_addresses
.iter()
.map(|socket_addr| socket_addr.ip())
@ -133,34 +179,11 @@ impl<T: EthSpec> PeerInfo<T> {
self.score.state()
}
/// Applies decay rates to a non-trusted peer's score.
pub fn score_update(&mut self) {
if !self.is_trusted {
self.score.update()
}
}
/// Apply peer action to a non-trusted peer's score.
pub fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) {
if !self.is_trusted {
self.score.apply_peer_action(peer_action)
}
}
pub(crate) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) {
self.score.update_gossipsub_score(new_score, ignore);
}
/// Returns true if the gossipsub score is sufficient.
pub fn is_good_gossipsub_peer(&self) -> bool {
self.score.is_good_gossipsub_peer()
}
#[cfg(test)]
/// Resets the peers score.
pub fn reset_score(&mut self) {
self.score.test_reset();
}
/* Peer connection status API */
/// Checks if the status is connected.
@ -181,8 +204,14 @@ impl<T: EthSpec> PeerInfo<T> {
self.is_connected() || self.is_dialing()
}
/// Checks if the status is banned.
/// Checks if the connection status is banned. This can lag behind the score state
/// temporarily.
pub fn is_banned(&self) -> bool {
matches!(self.connection_status, PeerConnectionStatus::Banned { .. })
}
/// Checks if the peer's score is banned.
pub fn score_is_banned(&self) -> bool {
matches!(self.score.state(), ScoreState::Banned)
}
@ -204,53 +233,95 @@ impl<T: EthSpec> PeerInfo<T> {
}
}
// Setters
/* Mutable Functions */
/// Modifies the status to Disconnected and sets the last seen instant to now. Returns None if
/// no changes were made. Returns Some(bool) where the bool represents if peer is to now be
/// banned.
pub fn notify_disconnect(&mut self) -> Option<bool> {
match self.connection_status {
Banned { .. } | Disconnected { .. } => None,
Disconnecting { to_ban } => {
self.connection_status = Disconnected {
since: Instant::now(),
};
Some(to_ban)
}
Connected { .. } | Dialing { .. } | Unknown => {
self.connection_status = Disconnected {
since: Instant::now(),
};
Some(false)
}
/// Updates the sync status. Returns true if the status was changed.
// VISIBILITY: Both the peer manager the network sync is able to update the sync state of a peer
pub fn update_sync_status(&mut self, sync_status: SyncStatus) -> bool {
self.sync_status.update(sync_status)
}
/// Sets the client of the peer.
// VISIBILITY: The peer manager is able to set the client
pub(in crate::peer_manager) fn set_client(&mut self, client: Client) {
self.client = client
}
/// Replaces the current listening addresses with those specified, returning the current
/// listening addresses.
// VISIBILITY: The peer manager is able to set the listening addresses
pub(in crate::peer_manager) fn set_listening_addresses(
&mut self,
listening_addresses: Vec<Multiaddr>,
) -> Vec<Multiaddr> {
std::mem::replace(&mut self.listening_addresses, listening_addresses)
}
/// Sets an explicit value for the meta data.
// VISIBILITY: The peer manager is able to adjust the meta_data
pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData<T>) {
self.meta_data = Some(meta_data)
}
/// Sets the connection status of the peer.
pub(super) fn set_connection_status(&mut self, connection_status: PeerConnectionStatus) {
self.connection_status = connection_status
}
/// Sets the ENR of the peer if one is known.
pub(super) fn set_enr(&mut self, enr: Enr) {
self.enr = Some(enr)
}
/// Sets the time that the peer is expected to be needed until for an attached validator duty.
pub(super) fn set_min_ttl(&mut self, min_ttl: Instant) {
self.min_ttl = Some(min_ttl)
}
/// Adds a known subnet for the peer.
pub(super) fn insert_subnet(&mut self, subnet: Subnet) {
self.subnets.insert(subnet);
}
/// Removes a subnet from the peer.
pub(super) fn remove_subnet(&mut self, subnet: &Subnet) {
self.subnets.remove(subnet);
}
/// Removes all subnets from the peer.
pub(super) fn clear_subnets(&mut self) {
self.subnets.clear()
}
/// Applies decay rates to a non-trusted peer's score.
pub(super) fn score_update(&mut self) {
if !self.is_trusted {
self.score.update()
}
}
/// Notify the we are currently disconnecting this peer. Optionally ban the peer after the
/// disconnect.
pub fn disconnecting(&mut self, to_ban: bool) {
self.connection_status = Disconnecting { to_ban }
/// Apply peer action to a non-trusted peer's score.
// VISIBILITY: The peer manager is able to modify the score of a peer.
pub(in crate::peer_manager) fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) {
if !self.is_trusted {
self.score.apply_peer_action(peer_action)
}
}
/// Modifies the status to banned or unbanned based on the underlying score.
pub fn update_state(&mut self) {
match (&self.connection_status, self.score.state()) {
(Disconnected { .. } | Unknown, ScoreState::Banned) => {
self.connection_status = Banned {
since: Instant::now(),
}
}
(Banned { since }, ScoreState::Healthy | ScoreState::Disconnected) => {
self.connection_status = Disconnected { since: *since }
}
(_, _) => {}
}
/// Updates the gossipsub score with a new score. Optionally ignore the gossipsub score.
pub(super) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) {
self.score.update_gossipsub_score(new_score, ignore);
}
#[cfg(test)]
/// Resets the peers score.
pub fn reset_score(&mut self) {
self.score.test_reset();
}
/// Modifies the status to Dialing
/// Returns an error if the current state is unexpected.
pub(crate) fn dialing_peer(&mut self) -> Result<(), &'static str> {
pub(super) fn dialing_peer(&mut self) -> Result<(), &'static str> {
match &mut self.connection_status {
Connected { .. } => return Err("Dialing connected peer"),
Dialing { .. } => return Err("Dialing an already dialing peer"),
@ -265,7 +336,7 @@ impl<T: EthSpec> PeerInfo<T> {
/// Modifies the status to Connected and increases the number of ingoing
/// connections by one
pub(crate) fn connect_ingoing(&mut self, seen_address: Option<SocketAddr>) {
pub(super) fn connect_ingoing(&mut self, seen_address: Option<SocketAddr>) {
match &mut self.connection_status {
Connected { n_in, .. } => *n_in += 1,
Disconnected { .. }
@ -285,7 +356,7 @@ impl<T: EthSpec> PeerInfo<T> {
/// Modifies the status to Connected and increases the number of outgoing
/// connections by one
pub(crate) fn connect_outgoing(&mut self, seen_address: Option<SocketAddr>) {
pub(super) fn connect_outgoing(&mut self, seen_address: Option<SocketAddr>) {
match &mut self.connection_status {
Connected { n_out, .. } => *n_out += 1,
Disconnected { .. }
@ -335,7 +406,9 @@ impl Default for PeerStatus {
#[derive(Debug, Clone, Serialize, AsRefStr)]
#[strum(serialize_all = "snake_case")]
pub enum ConnectionDirection {
/// The connection was established by a peer dialing us.
Incoming,
/// The connection was established by us dialing a peer.
Outgoing,
}

View File

@ -5,7 +5,7 @@ use types::{Epoch, Hash256, Slot};
#[derive(Clone, Debug, Serialize)]
/// The current sync status of the peer.
pub enum PeerSyncStatus {
pub enum SyncStatus {
/// At the current state as our node or ahead of us.
Synced { info: SyncInfo },
/// The peer has greater knowledge about the canonical chain than we do.
@ -27,46 +27,40 @@ pub struct SyncInfo {
pub finalized_root: Hash256,
}
impl std::cmp::PartialEq for PeerSyncStatus {
impl std::cmp::PartialEq for SyncStatus {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(PeerSyncStatus::Synced { .. }, PeerSyncStatus::Synced { .. })
| (
PeerSyncStatus::Advanced { .. },
PeerSyncStatus::Advanced { .. }
)
| (PeerSyncStatus::Behind { .. }, PeerSyncStatus::Behind { .. })
| (
PeerSyncStatus::IrrelevantPeer,
PeerSyncStatus::IrrelevantPeer
)
| (PeerSyncStatus::Unknown, PeerSyncStatus::Unknown)
(SyncStatus::Synced { .. }, SyncStatus::Synced { .. })
| (SyncStatus::Advanced { .. }, SyncStatus::Advanced { .. })
| (SyncStatus::Behind { .. }, SyncStatus::Behind { .. })
| (SyncStatus::IrrelevantPeer, SyncStatus::IrrelevantPeer)
| (SyncStatus::Unknown, SyncStatus::Unknown)
)
}
}
impl PeerSyncStatus {
impl SyncStatus {
/// Returns true if the peer has advanced knowledge of the chain.
pub fn is_advanced(&self) -> bool {
matches!(self, PeerSyncStatus::Advanced { .. })
matches!(self, SyncStatus::Advanced { .. })
}
/// Returns true if the peer is up to date with the current chain.
pub fn is_synced(&self) -> bool {
matches!(self, PeerSyncStatus::Synced { .. })
matches!(self, SyncStatus::Synced { .. })
}
/// Returns true if the peer is behind the current chain.
pub fn is_behind(&self) -> bool {
matches!(self, PeerSyncStatus::Behind { .. })
matches!(self, SyncStatus::Behind { .. })
}
/// Updates the peer's sync status, returning whether the status transitioned.
///
/// E.g. returns `true` if the state changed from `Synced` to `Advanced`, but not if
/// the status remained `Synced` with different `SyncInfo` within.
pub fn update(&mut self, new_state: PeerSyncStatus) -> bool {
pub fn update(&mut self, new_state: SyncStatus) -> bool {
let changed_status = *self != new_state;
*self = new_state;
changed_status
@ -74,16 +68,16 @@ impl PeerSyncStatus {
pub fn as_str(&self) -> &'static str {
match self {
PeerSyncStatus::Advanced { .. } => "Advanced",
PeerSyncStatus::Behind { .. } => "Behind",
PeerSyncStatus::Synced { .. } => "Synced",
PeerSyncStatus::Unknown => "Unknown",
PeerSyncStatus::IrrelevantPeer => "Irrelevant",
SyncStatus::Advanced { .. } => "Advanced",
SyncStatus::Behind { .. } => "Behind",
SyncStatus::Synced { .. } => "Synced",
SyncStatus::Unknown => "Unknown",
SyncStatus::IrrelevantPeer => "Irrelevant",
}
}
}
impl std::fmt::Display for PeerSyncStatus {
impl std::fmt::Display for SyncStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}

View File

@ -283,7 +283,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
self.swarm
.behaviour_mut()
.peer_manager_mut()
.report_peer(peer_id, action, source);
.report_peer(peer_id, action, source, None);
}
/// Disconnect and ban a peer, providing a reason.

View File

@ -1,5 +1,5 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::peer_manager::PeerDB;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::MetaData;
use crate::types::{BackFillState, SyncState};
use crate::Client;
@ -117,7 +117,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
self.peers
.read()
.peer_info(peer_id)
.map(|info| info.client.clone())
.map(|info| info.client().clone())
.unwrap_or_default()
}

View File

@ -1620,23 +1620,21 @@ pub fn serve<T: BeaconChainTypes>(
})?;
if let Some(peer_info) = network_globals.peers.read().peer_info(&peer_id) {
let address = if let Some(socket_addr) =
peer_info.seen_addresses.iter().next()
{
let address = if let Some(socket_addr) = peer_info.seen_addresses().next() {
let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port()));
addr.to_string()
} else if let Some(addr) = peer_info.listening_addresses.first() {
} else if let Some(addr) = peer_info.listening_addresses().first() {
addr.to_string()
} else {
String::new()
};
// the eth2 API spec implies only peers we have been connected to at some point should be included.
if let Some(dir) = peer_info.connection_direction.as_ref() {
if let Some(dir) = peer_info.connection_direction().as_ref() {
return Ok(api_types::GenericResponse::from(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
enr: peer_info.enr().map(|enr| enr.to_base64()),
last_seen_p2p_address: address,
direction: api_types::PeerDirection::from_connection_direction(dir),
state: api_types::PeerState::from_peer_connection_status(
@ -1669,20 +1667,20 @@ pub fn serve<T: BeaconChainTypes>(
.peers()
.for_each(|(peer_id, peer_info)| {
let address =
if let Some(socket_addr) = peer_info.seen_addresses.iter().next() {
if let Some(socket_addr) = peer_info.seen_addresses().next() {
let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(
socket_addr.port(),
));
addr.to_string()
} else if let Some(addr) = peer_info.listening_addresses.first() {
} else if let Some(addr) = peer_info.listening_addresses().first() {
addr.to_string()
} else {
String::new()
};
// the eth2 API spec implies only peers we have been connected to at some point should be included.
if let Some(dir) = peer_info.connection_direction.as_ref() {
if let Some(dir) = peer_info.connection_direction() {
let direction =
api_types::PeerDirection::from_connection_direction(dir);
let state = api_types::PeerState::from_peer_connection_status(
@ -1700,7 +1698,7 @@ pub fn serve<T: BeaconChainTypes>(
if state_matches && direction_matches {
peers.push(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
enr: peer_info.enr().map(|enr| enr.to_base64()),
last_seen_p2p_address: address,
direction,
state,

View File

@ -7,7 +7,7 @@ use eth2_libp2p::{
discv5::enr::{CombinedKey, EnrBuilder},
rpc::methods::{MetaData, MetaDataV2},
types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield, SyncState},
Enr, NetworkGlobals, PeerId,
ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager,
};
use http_api::{Config, Context};
use network::NetworkMessage;
@ -46,7 +46,7 @@ pub struct ApiServer<E: EthSpec, SFut: Future<Output = ()>> {
}
impl<E: EthSpec> InteractiveTester<E> {
pub fn new(spec: Option<ChainSpec>, validator_count: usize) -> Self {
pub async fn new(spec: Option<ChainSpec>, validator_count: usize) -> Self {
let harness = BeaconChainHarness::new(
E::default(),
spec,
@ -59,7 +59,7 @@ impl<E: EthSpec> InteractiveTester<E> {
shutdown_tx: _server_shutdown,
network_rx,
..
} = create_api_server(harness.chain.clone(), harness.logger().clone());
} = create_api_server(harness.chain.clone(), harness.logger().clone()).await;
tokio::spawn(server);
@ -82,7 +82,7 @@ impl<E: EthSpec> InteractiveTester<E> {
}
}
pub fn create_api_server<T: BeaconChainTypes>(
pub async fn create_api_server<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
log: Logger,
) -> ApiServer<T::EthSpec, impl Future<Output = ()>> {
@ -96,15 +96,30 @@ pub fn create_api_server<T: BeaconChainTypes>(
});
let enr_key = CombinedKey::generate_secp256k1();
let enr = EnrBuilder::new("v4").build(&enr_key).unwrap();
let network_globals =
NetworkGlobals::new(enr.clone(), TCP_PORT, UDP_PORT, meta_data, vec![], &log);
let network_globals = Arc::new(NetworkGlobals::new(
enr.clone(),
TCP_PORT,
UDP_PORT,
meta_data,
vec![],
&log,
));
// Only a peer manager can add peers, so we create a dummy manager.
let network_config = NetworkConfig::default();
let mut pm = PeerManager::new(&network_config, network_globals.clone(), &log)
.await
.unwrap();
// add a peer
let peer_id = PeerId::random();
network_globals
.peers
.write()
.connect_ingoing(&peer_id, EXTERNAL_ADDR.parse().unwrap(), None);
let connected_point = ConnectedPoint::Listener {
local_addr: EXTERNAL_ADDR.parse().unwrap(),
send_back_addr: EXTERNAL_ADDR.parse().unwrap(),
};
let num_established = std::num::NonZeroU32::new(1).unwrap();
pm.inject_connection_established(peer_id, connected_point, num_established, None);
*network_globals.sync_state.write() = SyncState::Synced;
let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone());
@ -119,7 +134,7 @@ pub fn create_api_server<T: BeaconChainTypes>(
},
chain: Some(chain.clone()),
network_tx: Some(network_tx),
network_globals: Some(Arc::new(network_globals)),
network_globals: Some(network_globals),
eth1_service: Some(eth1_service),
log,
});

View File

@ -17,7 +17,7 @@ async fn sync_committee_duties_across_fork() {
let validator_count = E::sync_committee_size();
let fork_epoch = Epoch::new(8);
let spec = altair_spec(fork_epoch);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count).await;
let harness = &tester.harness;
let client = &tester.client;
@ -102,7 +102,7 @@ async fn attestations_across_fork_with_skip_slots() {
let validator_count = E::sync_committee_size();
let fork_epoch = Epoch::new(8);
let spec = altair_spec(fork_epoch);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count).await;
let harness = &tester.harness;
let client = &tester.client;
@ -152,7 +152,7 @@ async fn sync_contributions_across_fork_with_skip_slots() {
let validator_count = E::sync_committee_size();
let fork_epoch = Epoch::new(8);
let spec = altair_spec(fork_epoch);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count).await;
let harness = &tester.harness;
let client = &tester.client;
@ -199,7 +199,7 @@ async fn sync_committee_indices_across_fork() {
let validator_count = E::sync_committee_size();
let fork_epoch = Epoch::new(8);
let spec = altair_spec(fork_epoch);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count).await;
let harness = &tester.harness;
let client = &tester.client;

View File

@ -18,7 +18,7 @@ async fn deposit_contract_custom_network() {
// Arbitrary contract address.
spec.deposit_contract_address = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".parse().unwrap();
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count);
let tester = InteractiveTester::<E>::new(Some(spec.clone()), validator_count).await;
let client = &tester.client;
let result = client.get_config_deposit_contract().await.unwrap().data;

View File

@ -63,14 +63,14 @@ struct ApiTester {
}
impl ApiTester {
pub fn new() -> Self {
pub async fn new() -> Self {
// This allows for testing voluntary exits without building out a massive chain.
let mut spec = E::default_spec();
spec.shard_committee_period = 2;
Self::new_from_spec(spec)
Self::new_from_spec(spec).await
}
pub fn new_from_spec(spec: ChainSpec) -> Self {
pub async fn new_from_spec(spec: ChainSpec) -> Self {
let harness = BeaconChainHarness::new(
MainnetEthSpec,
Some(spec.clone()),
@ -181,7 +181,7 @@ impl ApiTester {
network_rx,
local_enr,
external_peer_id,
} = create_api_server(chain.clone(), log);
} = create_api_server(chain.clone(), log).await;
tokio::spawn(server);
@ -213,7 +213,7 @@ impl ApiTester {
}
}
pub fn new_from_genesis() -> Self {
pub async fn new_from_genesis() -> Self {
let harness = BeaconChainHarness::new(
MainnetEthSpec,
None,
@ -260,7 +260,7 @@ impl ApiTester {
network_rx,
local_enr,
external_peer_id,
} = create_api_server(chain.clone(), log);
} = create_api_server(chain.clone(), log).await;
tokio::spawn(server);
@ -2453,7 +2453,7 @@ async fn poll_events<S: Stream<Item = Result<EventKind<T>, eth2::Error>> + Unpin
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_events() {
ApiTester::new().test_get_events().await;
ApiTester::new().await.test_get_events().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@ -2461,6 +2461,7 @@ async fn get_events_altair() {
let mut spec = E::default_spec();
spec.altair_fork_epoch = Some(Epoch::new(0));
ApiTester::new_from_spec(spec)
.await
.test_get_events_altair()
.await;
}
@ -2468,6 +2469,7 @@ async fn get_events_altair() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_events_from_genesis() {
ApiTester::new_from_genesis()
.await
.test_get_events_from_genesis()
.await;
}
@ -2475,6 +2477,7 @@ async fn get_events_from_genesis() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_get() {
ApiTester::new()
.await
.test_beacon_genesis()
.await
.test_beacon_states_root()
@ -2515,17 +2518,21 @@ async fn beacon_get() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn post_beacon_blocks_valid() {
ApiTester::new().test_post_beacon_blocks_valid().await;
ApiTester::new().await.test_post_beacon_blocks_valid().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn post_beacon_blocks_invalid() {
ApiTester::new().test_post_beacon_blocks_invalid().await;
ApiTester::new()
.await
.test_post_beacon_blocks_invalid()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attestations_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_attestations_valid()
.await;
}
@ -2533,6 +2540,7 @@ async fn beacon_pools_post_attestations_valid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attestations_invalid() {
ApiTester::new()
.await
.test_post_beacon_pool_attestations_invalid()
.await;
}
@ -2540,6 +2548,7 @@ async fn beacon_pools_post_attestations_invalid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attester_slashings_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_attester_slashings_valid()
.await;
}
@ -2547,6 +2556,7 @@ async fn beacon_pools_post_attester_slashings_valid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_attester_slashings_invalid() {
ApiTester::new()
.await
.test_post_beacon_pool_attester_slashings_invalid()
.await;
}
@ -2554,6 +2564,7 @@ async fn beacon_pools_post_attester_slashings_invalid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_proposer_slashings_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_proposer_slashings_valid()
.await;
}
@ -2561,6 +2572,7 @@ async fn beacon_pools_post_proposer_slashings_valid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_proposer_slashings_invalid() {
ApiTester::new()
.await
.test_post_beacon_pool_proposer_slashings_invalid()
.await;
}
@ -2568,6 +2580,7 @@ async fn beacon_pools_post_proposer_slashings_invalid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_voluntary_exits_valid() {
ApiTester::new()
.await
.test_post_beacon_pool_voluntary_exits_valid()
.await;
}
@ -2575,6 +2588,7 @@ async fn beacon_pools_post_voluntary_exits_valid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn beacon_pools_post_voluntary_exits_invalid() {
ApiTester::new()
.await
.test_post_beacon_pool_voluntary_exits_invalid()
.await;
}
@ -2582,6 +2596,7 @@ async fn beacon_pools_post_voluntary_exits_invalid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn config_get() {
ApiTester::new()
.await
.test_get_config_fork_schedule()
.await
.test_get_config_spec()
@ -2593,6 +2608,7 @@ async fn config_get() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn debug_get() {
ApiTester::new()
.await
.test_get_debug_beacon_states()
.await
.test_get_debug_beacon_heads()
@ -2602,6 +2618,7 @@ async fn debug_get() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn node_get() {
ApiTester::new()
.await
.test_get_node_version()
.await
.test_get_node_syncing()
@ -2620,17 +2637,24 @@ async fn node_get() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_duties_early() {
ApiTester::new().test_get_validator_duties_early().await;
ApiTester::new()
.await
.test_get_validator_duties_early()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_duties_attester() {
ApiTester::new().test_get_validator_duties_attester().await;
ApiTester::new()
.await
.test_get_validator_duties_attester()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_duties_attester_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_get_validator_duties_attester()
.await;
@ -2638,12 +2662,16 @@ async fn get_validator_duties_attester_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_duties_proposer() {
ApiTester::new().test_get_validator_duties_proposer().await;
ApiTester::new()
.await
.test_get_validator_duties_proposer()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_duties_proposer_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_get_validator_duties_proposer()
.await;
@ -2651,12 +2679,13 @@ async fn get_validator_duties_proposer_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn block_production() {
ApiTester::new().test_block_production().await;
ApiTester::new().await.test_block_production().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn block_production_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_block_production()
.await;
@ -2664,12 +2693,16 @@ async fn block_production_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_attestation_data() {
ApiTester::new().test_get_validator_attestation_data().await;
ApiTester::new()
.await
.test_get_validator_attestation_data()
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_attestation_data_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_get_validator_attestation_data()
.await;
@ -2678,6 +2711,7 @@ async fn get_validator_attestation_data_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_aggregate_attestation() {
ApiTester::new()
.await
.test_get_validator_aggregate_attestation()
.await;
}
@ -2685,6 +2719,7 @@ async fn get_validator_aggregate_attestation() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_aggregate_attestation_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_get_validator_aggregate_attestation()
.await;
@ -2693,6 +2728,7 @@ async fn get_validator_aggregate_attestation_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_aggregate_and_proofs_valid() {
ApiTester::new()
.await
.test_get_validator_aggregate_and_proofs_valid()
.await;
}
@ -2700,6 +2736,7 @@ async fn get_validator_aggregate_and_proofs_valid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_get_validator_aggregate_and_proofs_valid()
.await;
@ -2708,6 +2745,7 @@ async fn get_validator_aggregate_and_proofs_valid_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_aggregate_and_proofs_invalid() {
ApiTester::new()
.await
.test_get_validator_aggregate_and_proofs_invalid()
.await;
}
@ -2715,6 +2753,7 @@ async fn get_validator_aggregate_and_proofs_invalid() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() {
ApiTester::new()
.await
.skip_slots(E::slots_per_epoch() * 2)
.test_get_validator_aggregate_and_proofs_invalid()
.await;
@ -2723,6 +2762,7 @@ async fn get_validator_aggregate_and_proofs_invalid_with_skip_slots() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_validator_beacon_committee_subscriptions() {
ApiTester::new()
.await
.test_get_validator_beacon_committee_subscriptions()
.await;
}
@ -2730,6 +2770,7 @@ async fn get_validator_beacon_committee_subscriptions() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn lighthouse_endpoints() {
ApiTester::new()
.await
.test_get_lighthouse_health()
.await
.test_get_lighthouse_syncing()

View File

@ -790,7 +790,7 @@ pub fn update_gossip_metrics<T: EthSpec>(
for (peer_id, _) in gossipsub.all_peers() {
let client = peers
.peer_info(peer_id)
.map(|peer_info| peer_info.client.kind.as_static())
.map(|peer_info| peer_info.client().kind.as_static())
.unwrap_or_else(|| "Unknown");
peer_to_client.insert(peer_id, client);
@ -919,7 +919,7 @@ pub fn update_sync_metrics<T: EthSpec>(network_globals: &Arc<NetworkGlobals<T>>)
.peers
.read()
.connected_peers()
.map(|(_peer_id, info)| info.sync_status.as_str())
.map(|(_peer_id, info)| info.sync_status().as_str())
{
*peers_per_sync_type.entry(sync_type).or_default() += 1;
}

View File

@ -654,15 +654,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// NOTE: here we are gracefully handling two race conditions: Receiving the status message
// of a peer that is 1) disconnected 2) not in the PeerDB.
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let new_state = sync_type.as_sync_status(remote_sync_info);
let rpr = new_state.as_str();
let was_updated = peer_info.sync_status.update(new_state.clone());
let new_state = sync_type.as_sync_status(remote_sync_info);
let rpr = new_state.as_str();
// Drop the write lock
let update_sync_status = self
.network_globals
.peers
.write()
.update_sync_status(peer_id, new_state.clone());
if let Some(was_updated) = update_sync_status {
let is_connected = self.network_globals.peers.read().is_connected(peer_id);
if was_updated {
debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr,
"our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch,
"their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch,
"is_connected" => peer_info.is_connected());
"is_connected" => is_connected);
// A peer has transitioned its sync state. If the new state is "synced" we
// inform the backfill sync that a new synced peer has joined us.
@ -670,7 +676,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.backfill_sync.fully_synced_peer_joined();
}
}
peer_info.is_connected()
is_connected
} else {
error!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id);
false

View File

@ -55,7 +55,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
.peers
.read()
.peer_info(peer_id)
.map(|info| info.client.clone())
.map(|info| info.client().clone())
.unwrap_or_default()
}

View File

@ -1,6 +1,6 @@
use super::manager::SLOT_IMPORT_TOLERANCE;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::{PeerSyncStatus, SyncInfo};
use eth2_libp2p::{SyncInfo, SyncStatus as PeerSyncStatus};
use std::cmp::Ordering;
/// The type of peer relative to our current state.