Fix: PeerManager doesn't remove "outbound only" peers which should be pruned (#3236)
## Issue Addressed This is one step to address https://github.com/sigp/lighthouse/issues/3092 before introducing `quickcheck`. I noticed an issue while I was reading the pruning implementation `PeerManager::prune_excess_peers()`. If a peer with the following condition, **`outbound_peers_pruned` counter increases but the peer is not pushed to `peers_to_prune`**. - [outbound only](1e4ac8a4b9/beacon_node/lighthouse_network/src/peer_manager/mod.rs (L1018)
) - [min_subnet_count <= MIN_SYNC_COMMITTEE_PEERS](1e4ac8a4b9/beacon_node/lighthouse_network/src/peer_manager/mod.rs (L1047)
) As a result, PeerManager doesn't remove "outbound" peers which should be pruned. Note: [`subnet_to_peer`](e0d673ea86/beacon_node/lighthouse_network/src/peer_manager/mod.rs (L999)
) (HashMap) doesn't guarantee a particular order of iteration. So whether the test fails depend on the order of iteration.
This commit is contained in:
parent
3d51f24717
commit
a6d2ed6119
@ -1015,20 +1015,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
let mut removed_peer_index = None;
|
||||
for (index, (candidate_peer, info)) in peers_on_subnet.iter().enumerate() {
|
||||
// Ensure we don't remove too many outbound peers
|
||||
if info.is_outbound_only() {
|
||||
if self.target_outbound_peers()
|
||||
< connected_outbound_peer_count
|
||||
if info.is_outbound_only()
|
||||
&& self.target_outbound_peers()
|
||||
>= connected_outbound_peer_count
|
||||
.saturating_sub(outbound_peers_pruned)
|
||||
{
|
||||
outbound_peers_pruned += 1;
|
||||
} else {
|
||||
// Restart the main loop with the outbound peer removed from
|
||||
// the list. This will lower the peers per subnet count and
|
||||
// potentially a new subnet may be chosen to remove peers. This
|
||||
// can occur recursively until we have no peers left to choose
|
||||
// from.
|
||||
continue;
|
||||
}
|
||||
{
|
||||
// Restart the main loop with the outbound peer removed from
|
||||
// the list. This will lower the peers per subnet count and
|
||||
// potentially a new subnet may be chosen to remove peers. This
|
||||
// can occur recursively until we have no peers left to choose
|
||||
// from.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check the sync committee
|
||||
@ -1051,6 +1048,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
}
|
||||
}
|
||||
|
||||
if info.is_outbound_only() {
|
||||
outbound_peers_pruned += 1;
|
||||
}
|
||||
// This peer is suitable to be pruned
|
||||
removed_peer_index = Some(index);
|
||||
break;
|
||||
@ -1885,4 +1885,170 @@ mod tests {
|
||||
assert!(!connected_peers.contains(&peers[1]));
|
||||
assert!(!connected_peers.contains(&peers[2]));
|
||||
}
|
||||
|
||||
/// This test is for reproducing the issue:
|
||||
/// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659
|
||||
///
|
||||
/// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't
|
||||
/// guarantee a particular order of iteration. So we repeat the test case to try to reproduce
|
||||
/// the issue.
|
||||
#[tokio::test]
|
||||
async fn test_peer_manager_prune_based_on_subnet_count_repeat() {
|
||||
for _ in 0..100 {
|
||||
test_peer_manager_prune_based_on_subnet_count().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Test the pruning logic to prioritize peers with the most subnets. This test specifies
|
||||
/// the connection direction for the peers.
|
||||
/// Either Peer 4 or 5 is expected to be removed in this test case.
|
||||
///
|
||||
/// Create 8 peers.
|
||||
/// Peer0 (out) : Subnet 1, Sync-committee-1
|
||||
/// Peer1 (out) : Subnet 1, Sync-committee-1
|
||||
/// Peer2 (out) : Subnet 2, Sync-committee-2
|
||||
/// Peer3 (out) : Subnet 2, Sync-committee-2
|
||||
/// Peer4 (out) : Subnet 3
|
||||
/// Peer5 (out) : Subnet 3
|
||||
/// Peer6 (in) : Subnet 4
|
||||
/// Peer7 (in) : Subnet 5
|
||||
async fn test_peer_manager_prune_based_on_subnet_count() {
|
||||
let target = 7;
|
||||
let mut peer_manager = build_peer_manager(target).await;
|
||||
|
||||
// Create 8 peers to connect to.
|
||||
let mut peers = Vec::new();
|
||||
for x in 0..8 {
|
||||
let peer = PeerId::random();
|
||||
|
||||
// Have some of the peers be on a long-lived subnet
|
||||
let mut attnets = crate::types::EnrAttestationBitfield::<E>::new();
|
||||
let mut syncnets = crate::types::EnrSyncCommitteeBitfield::<E>::new();
|
||||
|
||||
match x {
|
||||
0 => {
|
||||
peer_manager.inject_connect_outgoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(1, true).unwrap();
|
||||
syncnets.set(1, true).unwrap();
|
||||
}
|
||||
1 => {
|
||||
peer_manager.inject_connect_outgoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(1, true).unwrap();
|
||||
syncnets.set(1, true).unwrap();
|
||||
}
|
||||
2 => {
|
||||
peer_manager.inject_connect_outgoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(2, true).unwrap();
|
||||
syncnets.set(2, true).unwrap();
|
||||
}
|
||||
3 => {
|
||||
peer_manager.inject_connect_outgoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(2, true).unwrap();
|
||||
syncnets.set(2, true).unwrap();
|
||||
}
|
||||
4 => {
|
||||
peer_manager.inject_connect_outgoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(3, true).unwrap();
|
||||
}
|
||||
5 => {
|
||||
peer_manager.inject_connect_outgoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(3, true).unwrap();
|
||||
}
|
||||
6 => {
|
||||
peer_manager.inject_connect_ingoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(4, true).unwrap();
|
||||
}
|
||||
7 => {
|
||||
peer_manager.inject_connect_ingoing(
|
||||
&peer,
|
||||
"/ip4/0.0.0.0".parse().unwrap(),
|
||||
None,
|
||||
);
|
||||
attnets.set(5, true).unwrap();
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let metadata = crate::rpc::MetaDataV2 {
|
||||
seq_number: 0,
|
||||
attnets,
|
||||
syncnets,
|
||||
};
|
||||
peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.peer_info_mut(&peer)
|
||||
.unwrap()
|
||||
.set_meta_data(MetaData::V2(metadata));
|
||||
let long_lived_subnets = peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.peer_info(&peer)
|
||||
.unwrap()
|
||||
.long_lived_subnets();
|
||||
println!("{},{}", x, peer);
|
||||
for subnet in long_lived_subnets {
|
||||
println!("Subnet: {:?}", subnet);
|
||||
peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
.write()
|
||||
.add_subscription(&peer, subnet);
|
||||
}
|
||||
peers.push(peer);
|
||||
}
|
||||
|
||||
// Perform the heartbeat.
|
||||
peer_manager.heartbeat();
|
||||
|
||||
// Tests that when we are over the target peer limit, after disconnecting an unhealthy peer,
|
||||
// the number of connected peers updates and we will not remove too many peers.
|
||||
assert_eq!(
|
||||
peer_manager.network_globals.connected_or_dialing_peers(),
|
||||
target
|
||||
);
|
||||
|
||||
let connected_peers: std::collections::HashSet<_> = peer_manager
|
||||
.network_globals
|
||||
.peers
|
||||
.read()
|
||||
.connected_or_dialing_peers()
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// Either peer 4 or 5 should be removed.
|
||||
// Check that we keep 6 and 7 peers, which we have few on a particular subnet.
|
||||
assert!(connected_peers.contains(&peers[6]));
|
||||
assert!(connected_peers.contains(&peers[7]));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user