Maintain trusted peers ()

## Issue Addressed
 

## Proposed Changes

Maintain trusted peers in the pruning logic. ~~In principle the changes here are not necessary as a trusted peer has a max score (100) and all other peers can have at most 0 (because we don't implement positive scores). This means that we should never prune trusted peers unless we have more trusted peers than the target peer count.~~

This change shifts this logic to explicitly never prune trusted peers which I expect is the intuitive behaviour. 

~~I suspect the issue in  arises when a trusted peer disconnects from us for one reason or another and then we remove that peer from our peerdb as it becomes stale. When it re-connects at some large time later, it is no longer a trusted peer.~~

Currently we do disconnect trusted peers, and this PR corrects this to maintain trusted peers in the pruning logic.

As suggested in  we maintain trusted peers in the db and thus we remember them even if they disconnect from us.
This commit is contained in:
Age Manning 2023-05-03 04:12:10 +00:00
parent 826e748629
commit 616bee6757
6 changed files with 113 additions and 22 deletions
beacon_node
lighthouse_network/src
peer_manager
types
network/src/sync
block_lookups
range_sync
src

View File

@ -940,6 +940,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// MIN_SYNC_COMMITTEE_PEERS
/// number should be set low as an absolute lower bound to maintain peers on the sync
/// committees.
/// - Do not prune trusted peers. NOTE: This means if a user has more trusted peers than the
/// excess peer limit, all of the following logic is subverted as we will not prune any peers.
/// Also, the more trusted peers a user has, the less room Lighthouse has to efficiently manage
/// its peers across the subnets.
///
/// Prune peers in the following order:
/// 1. Remove worst scoring peers
@ -970,7 +974,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| !info.has_future_duty() && $filter(*info))
.filter(|(_, info)| {
!info.has_future_duty() && !info.is_trusted() && $filter(*info)
})
{
if peers_to_prune.len()
>= connected_peer_count.saturating_sub(self.target_peers)
@ -1020,8 +1026,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
> = HashMap::new();
for (peer_id, info) in self.network_globals.peers.read().connected_peers() {
// Ignore peers we are already pruning
if peers_to_prune.contains(peer_id) {
// Ignore peers we trust or that we are already pruning
if info.is_trusted() || peers_to_prune.contains(peer_id) {
continue;
}
@ -1318,25 +1324,47 @@ mod tests {
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(&log);
let globals = NetworkGlobals::new_test_globals(vec![], &log);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}
async fn build_peer_manager_with_trusted_peers(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
) -> PeerManager<E> {
let config = config::Config {
target_peer_count,
discovery_enabled: false,
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}
#[tokio::test]
async fn test_peer_manager_disconnects_correctly_during_heartbeat() {
let mut peer_manager = build_peer_manager(3).await;
// Create 5 peers to connect to.
// Create 6 peers to connect to with a target of 3.
// 2 will be outbound-only, and have the lowest score.
// 1 will be a trusted peer.
// The other 3 will be ingoing peers.
// We expect this test to disconnect from 3 peers. 1 from the outbound peer (the other must
// remain due to the outbound peer limit) and 2 from the ingoing peers (the trusted peer
// should remain connected).
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let outbound_only_peer1 = PeerId::random();
let outbound_only_peer2 = PeerId::random();
let trusted_peer = PeerId::random();
let mut peer_manager = build_peer_manager_with_trusted_peers(vec![trusted_peer], 3).await;
peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None);
peer_manager.inject_connect_outgoing(
&outbound_only_peer1,
"/ip4/0.0.0.0".parse().unwrap(),
@ -1366,7 +1394,7 @@ mod tests {
.add_to_score(-2.0);
// Check initial connected peers.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 5);
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 6);
peer_manager.heartbeat();
@ -1385,8 +1413,22 @@ mod tests {
.read()
.is_connected(&outbound_only_peer2));
// The trusted peer remains connected
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&trusted_peer));
peer_manager.heartbeat();
// The trusted peer remains connected, even after subsequent heartbeats.
assert!(peer_manager
.network_globals
.peers
.read()
.is_connected(&trusted_peer));
// Check that if we are at target number of peers, we do not disconnect any.
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);
}
@ -2131,7 +2173,7 @@ mod tests {
#[cfg(test)]
mod property_based_tests {
use crate::peer_manager::config::DEFAULT_TARGET_PEERS;
use crate::peer_manager::tests::build_peer_manager;
use crate::peer_manager::tests::build_peer_manager_with_trusted_peers;
use crate::rpc::MetaData;
use libp2p::PeerId;
use quickcheck::{Arbitrary, Gen, TestResult};
@ -2142,10 +2184,12 @@ mod tests {
#[derive(Clone, Debug)]
struct PeerCondition {
peer_id: PeerId,
outgoing: bool,
attestation_net_bitfield: Vec<bool>,
sync_committee_net_bitfield: Vec<bool>,
score: f64,
trusted: bool,
gossipsub_score: f64,
}
@ -2170,10 +2214,12 @@ mod tests {
};
PeerCondition {
peer_id: PeerId::random(),
outgoing: bool::arbitrary(g),
attestation_net_bitfield,
sync_committee_net_bitfield,
score: f64::arbitrary(g),
trusted: bool::arbitrary(g),
gossipsub_score: f64::arbitrary(g),
}
}
@ -2185,26 +2231,36 @@ mod tests {
if peer_conditions.len() < target_peer_count {
return TestResult::discard();
}
let trusted_peers: Vec<_> = peer_conditions
.iter()
.filter_map(|p| if p.trusted { Some(p.peer_id) } else { None })
.collect();
// If we have a high percentage of trusted peers, it is very difficult to reason about
// the expected results of the pruning.
if trusted_peers.len() > peer_conditions.len() / 3_usize {
return TestResult::discard();
}
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let mut peer_manager = build_peer_manager(target_peer_count).await;
// Collect all the trusted peers
let mut peer_manager =
build_peer_manager_with_trusted_peers(trusted_peers, target_peer_count).await;
// Create peers based on the randomly generated conditions.
for condition in &peer_conditions {
let peer = PeerId::random();
let mut attnets = crate::types::EnrAttestationBitfield::<E>::new();
let mut syncnets = crate::types::EnrSyncCommitteeBitfield::<E>::new();
if condition.outgoing {
peer_manager.inject_connect_outgoing(
&peer,
&condition.peer_id,
"/ip4/0.0.0.0".parse().unwrap(),
None,
);
} else {
peer_manager.inject_connect_ingoing(
&peer,
&condition.peer_id,
"/ip4/0.0.0.0".parse().unwrap(),
None,
);
@ -2225,22 +2281,51 @@ mod tests {
};
let mut peer_db = peer_manager.network_globals.peers.write();
let peer_info = peer_db.peer_info_mut(&peer).unwrap();
let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap();
peer_info.set_meta_data(MetaData::V2(metadata));
peer_info.set_gossipsub_score(condition.gossipsub_score);
peer_info.add_to_score(condition.score);
for subnet in peer_info.long_lived_subnets() {
peer_db.add_subscription(&peer, subnet);
peer_db.add_subscription(&condition.peer_id, subnet);
}
}
// Perform the heartbeat.
peer_manager.heartbeat();
TestResult::from_bool(
// The minimum number of connected peers cannot be less than the target peer count
// or submitted peers.
let expected_peer_count = target_peer_count.min(peer_conditions.len());
// Trusted peers could make this larger however.
let no_of_trusted_peers = peer_conditions
.iter()
.filter(|condition| condition.trusted)
.count();
let expected_peer_count = expected_peer_count.max(no_of_trusted_peers);
let target_peer_condition =
peer_manager.network_globals.connected_or_dialing_peers()
== target_peer_count.min(peer_conditions.len()),
== expected_peer_count;
// It could be that we reach our target outbound limit and are unable to prune any
// extra, which violates the target_peer_condition.
let outbound_peers = peer_manager.network_globals.connected_outbound_only_peers();
let hit_outbound_limit = outbound_peers == peer_manager.target_outbound_peers();
// No trusted peers should be disconnected
let trusted_peer_disconnected = peer_conditions.iter().any(|condition| {
condition.trusted
&& !peer_manager
.network_globals
.peers
.read()
.is_connected(&condition.peer_id)
});
TestResult::from_bool(
(target_peer_condition || hit_outbound_limit) && !trusted_peer_disconnected,
)
})
}

View File

@ -1062,7 +1062,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
if let Some(to_drop) = self
.peers
.iter()
.filter(|(_, info)| info.is_disconnected())
.filter(|(_, info)| info.is_disconnected() && !info.is_trusted())
.filter_map(|(id, info)| match info.connection_status() {
PeerConnectionStatus::Disconnected { since } => Some((id, since)),
_ => None,

View File

@ -129,7 +129,10 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
}
/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(log: &slog::Logger) -> NetworkGlobals<TSpec> {
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
log: &slog::Logger,
) -> NetworkGlobals<TSpec> {
use crate::CombinedKeyExt;
let keypair = libp2p::identity::Keypair::generate_secp256k1();
let enr_key: discv5::enr::CombinedKey =
@ -144,7 +147,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
attnets: Default::default(),
syncnets: Default::default(),
}),
vec![],
trusted_peers,
false,
log,
)

View File

@ -50,7 +50,7 @@ impl TestRig {
};
let bl = BlockLookups::new(log.new(slog::o!("component" => "block_lookups")));
let cx = {
let globals = Arc::new(NetworkGlobals::new_test_globals(&log));
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
SyncNetworkContext::new(
network_tx,
globals,

View File

@ -599,7 +599,7 @@ mod tests {
log.new(o!("component" => "range")),
);
let (network_tx, network_rx) = mpsc::unbounded_channel();
let globals = Arc::new(NetworkGlobals::new_test_globals(&log));
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
let cx = SyncNetworkContext::new(
network_tx,
globals.clone(),

View File

@ -1044,6 +1044,9 @@ pub fn set_network_config(
.map_err(|_| format!("Invalid trusted peer id: {}", peer_id))
})
.collect::<Result<Vec<PeerIdSerialized>, _>>()?;
if config.trusted_peers.len() >= config.target_peers {
slog::warn!(log, "More trusted peers than the target peer limit. This will prevent efficient peer selection criteria."; "target_peers" => config.target_peers, "trusted_peers" => config.trusted_peers.len());
}
}
if let Some(enr_udp_port_str) = cli_args.value_of("enr-udp-port") {