diff --git a/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs index cc3835062..2b7e03b30 100644 --- a/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs +++ b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs @@ -1,6 +1,6 @@ ///! The subnet predicate used for searching for a particular subnet. use super::*; -use slog::{debug, trace}; +use slog::trace; use std::ops::Deref; /// Returns the predicate for a given subnet. @@ -38,7 +38,7 @@ where ); return false; } else { - debug!( + trace!( log_clone, "Peer found on desired subnet(s)"; "peer_id" => format!("{}", enr.peer_id()), diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index d03480d21..7fce5f929 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -250,7 +250,9 @@ impl PeerManager { .collect(); // request the subnet query from discovery - self.discovery.discover_subnet_peers(filtered); + if !filtered.is_empty() { + self.discovery.discover_subnet_peers(filtered); + } } /// A STATUS message has been received from a peer. This resets the status timer. diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index cd8a9b5a1..af2d1aeeb 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -353,11 +353,9 @@ impl AttestationService { // if there is an unsubscription event for the slot prior, we remove it to prevent // unsubscriptions immediately after the subscription. We also want to minimize // subscription churn and maintain a consecutive subnet subscriptions. - let to_remove_subnet = ExactSubnet { - subnet_id: exact_subnet.subnet_id, - slot: exact_subnet.slot.saturating_sub(1u64), - }; - self.unsubscriptions.remove(&to_remove_subnet); + self.unsubscriptions.retain(|subnet| { + !(subnet.subnet_id == exact_subnet.subnet_id && subnet.slot <= exact_subnet.slot) + }); // add an unsubscription event to remove ourselves from the subnet once completed self.unsubscriptions .insert_at(exact_subnet, expected_end_subscription_duration); @@ -429,6 +427,7 @@ impl AttestationService { // if we are not already subscribed, then subscribe if !self.subscriptions.contains(&subnet_id) { self.subscriptions.insert(subnet_id); + debug!(self.log, "Subscribing to random subnet"; "subnet_id" => ?subnet_id); self.events .push_back(AttServiceMessage::Subscribe(subnet_id)); } @@ -504,17 +503,23 @@ impl AttestationService { self.random_subnets.insert(subnet_id); return; } + // If there are no unsubscription events for `subnet_id`, we unsubscribe immediately. + if self + .unsubscriptions + .keys() + .find(|s| s.subnet_id == subnet_id) + .is_none() + { + // we are not at capacity, unsubscribe from the current subnet. + debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id); + self.events + .push_back(AttServiceMessage::Unsubscribe(subnet_id)); + } - // we are not at capacity, unsubscribe from the current subnet, remove the ENR bitfield bit and choose a new random one - // from the available subnets - // Note: This should not occur during a required subnet as subscriptions update the timeout - // to last as long as they are needed. - - debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id); - self.events - .push_back(AttServiceMessage::Unsubscribe(subnet_id)); + // Remove the ENR bitfield bit and choose a new random on from the available subnets self.events .push_back(AttServiceMessage::EnrRemove(subnet_id)); + // Subscribe to a new random subnet self.subscribe_to_random_subnets(1); } diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 9ae897304..46a4ed9ea 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -20,7 +20,7 @@ mod tests { use tempfile::tempdir; use types::{CommitteeIndex, EthSpec, MinimalEthSpec}; - const SLOT_DURATION_MILLIS: u64 = 200; + const SLOT_DURATION_MILLIS: u64 = 400; type TestBeaconChainType = Witness< NullMigrator, @@ -164,67 +164,13 @@ mod tests { } } - #[tokio::test] - async fn subscribe_current_slot() { - // subscription config - let validator_index = 1; - let committee_index = 1; - let subscription_slot = 0; - let num_events_expected = 4; - let committee_count = 1; - - // create the attestation service and subscriptions - let mut attestation_service = get_attestation_service(); - let current_slot = attestation_service - .beacon_chain - .slot_clock - .now() - .expect("Could not get current slot"); - - let subscriptions = vec![get_subscription( - validator_index, - committee_index, - current_slot + Slot::new(subscription_slot), - committee_count, - )]; - - // submit the subscriptions - attestation_service - .validator_subscriptions(subscriptions) - .unwrap(); - - // not enough time for peer discovery, just subscribe - let expected = vec![AttServiceMessage::Subscribe( - SubnetId::compute_subnet::( - current_slot + Slot::new(subscription_slot), - committee_index, - committee_count, - &attestation_service.beacon_chain.spec, - ) - .unwrap(), - )]; - - let events = get_events(&mut attestation_service, Some(num_events_expected), 1).await; - assert_matches!( - events[..3], - [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] - ); - - // Should be subscribed to 1 long lived and one short lived subnet. - assert_eq!(attestation_service.subscription_count(), 2); - // if there are fewer events than expected, there's been a collision - if events.len() == num_events_expected { - assert_eq!(expected[..], events[3..]); - } - } - #[tokio::test] async fn subscribe_current_slot_wait_for_unsubscribe() { // subscription config let validator_index = 1; let committee_index = 1; + // Keep a low subscription slot so that there are no additional subnet discovery events. let subscription_slot = 0; - let num_events_expected = 5; let committee_count = 1; // create the attestation service and subscriptions @@ -260,18 +206,114 @@ mod tests { AttServiceMessage::Unsubscribe(subnet_id), ]; - let events = get_events(&mut attestation_service, Some(num_events_expected), 2).await; + // Wait for 1 slot duration to get the unsubscription event + let events = get_events(&mut attestation_service, None, 1).await; assert_matches!( events[..3], [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] ); - // Should be subscribed to only 1 long lived subnet after unsubscription. - assert_eq!(attestation_service.subscription_count(), 1); - // if there are fewer events than expected, there's been a collision - if events.len() == num_events_expected { + // If the long lived and short lived subnets are the same, there should be no more events + // as we don't resubscribe already subscribed subnets. + if !attestation_service.random_subnets.contains(&subnet_id) { assert_eq!(expected[..], events[3..]); } + // Should be subscribed to only 1 long lived subnet after unsubscription. + assert_eq!(attestation_service.subscription_count(), 1); + } + + /// Test to verify that we are not unsubscribing to a subnet before a required subscription. + #[tokio::test] + async fn test_same_subnet_unsubscription() { + // subscription config + let validator_index = 1; + let committee_count = 1; + + // Makes 2 validator subscriptions to the same subnet but at different slots. + // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). + let subscription_slot1 = 0; + let subscription_slot2 = 1; + let com1 = 1; + let com2 = 0; + + // create the attestation service and subscriptions + let mut attestation_service = get_attestation_service(); + let current_slot = attestation_service + .beacon_chain + .slot_clock + .now() + .expect("Could not get current slot"); + + let sub1 = get_subscription( + validator_index, + com1, + current_slot + Slot::new(subscription_slot1), + committee_count, + ); + + let sub2 = get_subscription( + validator_index, + com2, + current_slot + Slot::new(subscription_slot2), + committee_count, + ); + + let subnet_id1 = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot1), + com1, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); + + let subnet_id2 = SubnetId::compute_subnet::( + current_slot + Slot::new(subscription_slot2), + com2, + committee_count, + &attestation_service.beacon_chain.spec, + ) + .unwrap(); + + // Assert that subscriptions are different but their subnet is the same + assert_ne!(sub1, sub2); + assert_eq!(subnet_id1, subnet_id2); + + // submit the subscriptions + attestation_service + .validator_subscriptions(vec![sub1, sub2]) + .unwrap(); + + // Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1) + // Get all events for 1 slot duration (unsubscription event should happen after 2 slot durations). + let events = get_events(&mut attestation_service, None, 1).await; + assert_matches!( + events[..3], + [AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)] + ); + + let expected = AttServiceMessage::Subscribe(subnet_id1); + + // Should be still subscribed to 1 long lived and 1 short lived subnet if both are different. + if !attestation_service.random_subnets.contains(&subnet_id1) { + assert_eq!(expected, events[3]); + assert_eq!(attestation_service.subscription_count(), 2); + } else { + assert_eq!(attestation_service.subscription_count(), 1); + } + + // Get event for 1 more slot duration, we should get the unsubscribe event now. + let unsubscribe_event = get_events(&mut attestation_service, None, 1).await; + + // If the long lived and short lived subnets are different, we should get an unsubscription event. + if !attestation_service.random_subnets.contains(&subnet_id1) { + assert_eq!( + [AttServiceMessage::Unsubscribe(subnet_id1)], + unsubscribe_event[..] + ); + } + + // Should be subscribed to only 1 long lived subnet after unsubscription. + assert_eq!(attestation_service.subscription_count(), 1); } #[tokio::test] diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index ed17f69ee..3efccea41 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -93,6 +93,11 @@ impl LocalNetwork { .enr() .expect("bootnode must have a network"), ); + let count = self.beacon_node_count() as u16; + beacon_config.network.discovery_port = BOOTNODE_PORT + count; + beacon_config.network.libp2p_port = BOOTNODE_PORT + count; + beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT + count); + beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT + count); } let index = self.beacon_nodes.read().len();