Simulator and attestation service fixes (#1747)

## Issue Addressed

#1729 #1730 

Which issue # does this PR address?

## Proposed Changes

1. Fixes a bug in the simulator where nodes can't find each other due to 0 udp ports in their enr.
2. Fixes bugs in attestation service where we are unsubscribing from a subnet prematurely.

More testing is needed for attestation service fixes.
This commit is contained in:
Pawan Dhananjay 2020-10-15 07:11:31 +00:00
parent aadbab47cc
commit 97be2ca295
5 changed files with 131 additions and 77 deletions

View File

@ -1,6 +1,6 @@
///! The subnet predicate used for searching for a particular subnet.
use super::*;
use slog::{debug, trace};
use slog::trace;
use std::ops::Deref;
/// Returns the predicate for a given subnet.
@ -38,7 +38,7 @@ where
);
return false;
} else {
debug!(
trace!(
log_clone,
"Peer found on desired subnet(s)";
"peer_id" => format!("{}", enr.peer_id()),

View File

@ -250,7 +250,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.collect();
// request the subnet query from discovery
self.discovery.discover_subnet_peers(filtered);
if !filtered.is_empty() {
self.discovery.discover_subnet_peers(filtered);
}
}
/// A STATUS message has been received from a peer. This resets the status timer.

View File

@ -353,11 +353,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// if there is an unsubscription event for the slot prior, we remove it to prevent
// unsubscriptions immediately after the subscription. We also want to minimize
// subscription churn and maintain a consecutive subnet subscriptions.
let to_remove_subnet = ExactSubnet {
subnet_id: exact_subnet.subnet_id,
slot: exact_subnet.slot.saturating_sub(1u64),
};
self.unsubscriptions.remove(&to_remove_subnet);
self.unsubscriptions.retain(|subnet| {
!(subnet.subnet_id == exact_subnet.subnet_id && subnet.slot <= exact_subnet.slot)
});
// add an unsubscription event to remove ourselves from the subnet once completed
self.unsubscriptions
.insert_at(exact_subnet, expected_end_subscription_duration);
@ -429,6 +427,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// if we are not already subscribed, then subscribe
if !self.subscriptions.contains(&subnet_id) {
self.subscriptions.insert(subnet_id);
debug!(self.log, "Subscribing to random subnet"; "subnet_id" => ?subnet_id);
self.events
.push_back(AttServiceMessage::Subscribe(subnet_id));
}
@ -504,17 +503,23 @@ impl<T: BeaconChainTypes> AttestationService<T> {
self.random_subnets.insert(subnet_id);
return;
}
// If there are no unsubscription events for `subnet_id`, we unsubscribe immediately.
if self
.unsubscriptions
.keys()
.find(|s| s.subnet_id == subnet_id)
.is_none()
{
// we are not at capacity, unsubscribe from the current subnet.
debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id);
self.events
.push_back(AttServiceMessage::Unsubscribe(subnet_id));
}
// we are not at capacity, unsubscribe from the current subnet, remove the ENR bitfield bit and choose a new random one
// from the available subnets
// Note: This should not occur during a required subnet as subscriptions update the timeout
// to last as long as they are needed.
debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id);
self.events
.push_back(AttServiceMessage::Unsubscribe(subnet_id));
// Remove the ENR bitfield bit and choose a new random on from the available subnets
self.events
.push_back(AttServiceMessage::EnrRemove(subnet_id));
// Subscribe to a new random subnet
self.subscribe_to_random_subnets(1);
}

View File

@ -20,7 +20,7 @@ mod tests {
use tempfile::tempdir;
use types::{CommitteeIndex, EthSpec, MinimalEthSpec};
const SLOT_DURATION_MILLIS: u64 = 200;
const SLOT_DURATION_MILLIS: u64 = 400;
type TestBeaconChainType = Witness<
NullMigrator,
@ -164,67 +164,13 @@ mod tests {
}
}
#[tokio::test]
async fn subscribe_current_slot() {
// subscription config
let validator_index = 1;
let committee_index = 1;
let subscription_slot = 0;
let num_events_expected = 4;
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = vec![get_subscription(
validator_index,
committee_index,
current_slot + Slot::new(subscription_slot),
committee_count,
)];
// submit the subscriptions
attestation_service
.validator_subscriptions(subscriptions)
.unwrap();
// not enough time for peer discovery, just subscribe
let expected = vec![AttServiceMessage::Subscribe(
SubnetId::compute_subnet::<MinimalEthSpec>(
current_slot + Slot::new(subscription_slot),
committee_index,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap(),
)];
let events = get_events(&mut attestation_service, Some(num_events_expected), 1).await;
assert_matches!(
events[..3],
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
);
// Should be subscribed to 1 long lived and one short lived subnet.
assert_eq!(attestation_service.subscription_count(), 2);
// if there are fewer events than expected, there's been a collision
if events.len() == num_events_expected {
assert_eq!(expected[..], events[3..]);
}
}
#[tokio::test]
async fn subscribe_current_slot_wait_for_unsubscribe() {
// subscription config
let validator_index = 1;
let committee_index = 1;
// Keep a low subscription slot so that there are no additional subnet discovery events.
let subscription_slot = 0;
let num_events_expected = 5;
let committee_count = 1;
// create the attestation service and subscriptions
@ -260,18 +206,114 @@ mod tests {
AttServiceMessage::Unsubscribe(subnet_id),
];
let events = get_events(&mut attestation_service, Some(num_events_expected), 2).await;
// Wait for 1 slot duration to get the unsubscription event
let events = get_events(&mut attestation_service, None, 1).await;
assert_matches!(
events[..3],
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
);
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
// if there are fewer events than expected, there's been a collision
if events.len() == num_events_expected {
// If the long lived and short lived subnets are the same, there should be no more events
// as we don't resubscribe already subscribed subnets.
if !attestation_service.random_subnets.contains(&subnet_id) {
assert_eq!(expected[..], events[3..]);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
}
/// Test to verify that we are not unsubscribing to a subnet before a required subscription.
#[tokio::test]
async fn test_same_subnet_unsubscription() {
// subscription config
let validator_index = 1;
let committee_count = 1;
// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
let subscription_slot1 = 0;
let subscription_slot2 = 1;
let com1 = 1;
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let sub1 = get_subscription(
validator_index,
com1,
current_slot + Slot::new(subscription_slot1),
committee_count,
);
let sub2 = get_subscription(
validator_index,
com2,
current_slot + Slot::new(subscription_slot2),
committee_count,
);
let subnet_id1 = SubnetId::compute_subnet::<MinimalEthSpec>(
current_slot + Slot::new(subscription_slot1),
com1,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let subnet_id2 = SubnetId::compute_subnet::<MinimalEthSpec>(
current_slot + Slot::new(subscription_slot2),
com2,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
// Assert that subscriptions are different but their subnet is the same
assert_ne!(sub1, sub2);
assert_eq!(subnet_id1, subnet_id2);
// submit the subscriptions
attestation_service
.validator_subscriptions(vec![sub1, sub2])
.unwrap();
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
// Get all events for 1 slot duration (unsubscription event should happen after 2 slot durations).
let events = get_events(&mut attestation_service, None, 1).await;
assert_matches!(
events[..3],
[AttServiceMessage::DiscoverPeers(_), AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3)]
);
let expected = AttServiceMessage::Subscribe(subnet_id1);
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are different.
if !attestation_service.random_subnets.contains(&subnet_id1) {
assert_eq!(expected, events[3]);
assert_eq!(attestation_service.subscription_count(), 2);
} else {
assert_eq!(attestation_service.subscription_count(), 1);
}
// Get event for 1 more slot duration, we should get the unsubscribe event now.
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service.random_subnets.contains(&subnet_id1) {
assert_eq!(
[AttServiceMessage::Unsubscribe(subnet_id1)],
unsubscribe_event[..]
);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1);
}
#[tokio::test]

View File

@ -93,6 +93,11 @@ impl<E: EthSpec> LocalNetwork<E> {
.enr()
.expect("bootnode must have a network"),
);
let count = self.beacon_node_count() as u16;
beacon_config.network.discovery_port = BOOTNODE_PORT + count;
beacon_config.network.libp2p_port = BOOTNODE_PORT + count;
beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT + count);
beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT + count);
}
let index = self.beacon_nodes.read().len();