diff --git a/Cargo.lock b/Cargo.lock index da07bf425..d0e3622e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4150,6 +4150,7 @@ dependencies = [ "error-chain", "eth2_ssz", "eth2_ssz_types", + "ethereum-types 0.12.1", "exit-future", "fnv", "futures", diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 87c7650fb..2e7b2227b 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -44,3 +44,8 @@ strum = "0.24.0" tokio-util = { version = "0.6.3", features = ["time"] } derivative = "2.2.0" delay_map = "0.1.1" +ethereum-types = { version = "0.12.1", optional = true } + +[features] +deterministic_long_lived_attnets = [ "ethereum-types" ] +# default = ["deterministic_long_lived_attnets"] diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ec8573ea1..31c42b860 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -299,9 +299,13 @@ impl NetworkService { )?; // attestation subnet service - let attestation_service = - AttestationService::new(beacon_chain.clone(), config, &network_log); - + let attestation_service = AttestationService::new( + beacon_chain.clone(), + #[cfg(feature = "deterministic_long_lived_attnets")] + network_globals.local_enr().node_id().raw().into(), + config, + &network_log, + ); // sync committee subnet service let sync_committee_service = SyncCommitteeService::new(beacon_chain.clone(), config, &network_log); diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index ee8ba24fc..70ba1c817 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -3,7 +3,7 @@ //! determines whether attestations should be aggregated and/or passed to the beacon node. use super::SubnetServiceMessage; -#[cfg(test)] +#[cfg(any(test, feature = "deterministic_long_lived_attnets"))] use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::pin::Pin; @@ -15,6 +15,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use delay_map::{HashMapDelay, HashSetDelay}; use futures::prelude::*; use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery}; +#[cfg(not(feature = "deterministic_long_lived_attnets"))] use rand::seq::SliceRandom; use slog::{debug, error, o, trace, warn}; use slot_clock::SlotClock; @@ -28,6 +29,7 @@ use crate::metrics; pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; /// The time (in slots) before a last seen validator is considered absent and we unsubscribe from /// the random gossip topics that we subscribed to due to the validator connection. +#[cfg(not(feature = "deterministic_long_lived_attnets"))] const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150; /// The fraction of a slot that we subscribe to a subnet before the required slot. /// @@ -70,6 +72,9 @@ pub struct AttestationService { /// Subnets we are currently subscribed to as long lived subscriptions. /// /// We advertise these in our ENR. When these expire, the subnet is removed from our ENR. + #[cfg(feature = "deterministic_long_lived_attnets")] + long_lived_subscriptions: HashSet, + #[cfg(not(feature = "deterministic_long_lived_attnets"))] long_lived_subscriptions: HashMapDelay, /// Short lived subscriptions that need to be done in the future. @@ -83,6 +88,7 @@ pub struct AttestationService { /// subscribed to. As these time out, we unsubscribe for the required random subnets and update /// our ENR. /// This is a set of validator indices. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] known_validators: HashSetDelay, /// The waker for the current thread. @@ -95,8 +101,17 @@ pub struct AttestationService { subscribe_all_subnets: bool, /// For how many slots we subscribe to long lived subnets. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] long_lived_subnet_subscription_slots: u64, + /// Our Discv5 node_id. + #[cfg(feature = "deterministic_long_lived_attnets")] + node_id: ethereum_types::U256, + + /// Future used to manage subscribing and unsubscribing from long lived subnets. + #[cfg(feature = "deterministic_long_lived_attnets")] + next_long_lived_subscription_event: Pin>, + /// The logger for the attestation service. log: slog::Logger, } @@ -104,6 +119,7 @@ pub struct AttestationService { impl AttestationService { /* Public functions */ + #[cfg(not(feature = "deterministic_long_lived_attnets"))] pub fn new( beacon_chain: Arc>, config: &NetworkConfig, @@ -145,31 +161,85 @@ impl AttestationService { } } + #[cfg(feature = "deterministic_long_lived_attnets")] + pub fn new( + beacon_chain: Arc>, + node_id: ethereum_types::U256, + config: &NetworkConfig, + log: &slog::Logger, + ) -> Self { + let log = log.new(o!("service" => "attestation_service")); + + // Calculate the random subnet duration from the spec constants. + let slot_duration = beacon_chain.slot_clock.slot_duration(); + + slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node); + + let track_validators = !config.import_all_attestations; + let aggregate_validators_on_subnet = + track_validators.then(|| HashSetDelay::new(slot_duration)); + let mut service = AttestationService { + events: VecDeque::with_capacity(10), + beacon_chain, + short_lived_subscriptions: HashMapDelay::new(slot_duration), + long_lived_subscriptions: HashSet::default(), + scheduled_short_lived_subscriptions: HashSetDelay::default(), + aggregate_validators_on_subnet, + waker: None, + discovery_disabled: config.disable_discovery, + subscribe_all_subnets: config.subscribe_all_subnets, + node_id, + next_long_lived_subscription_event: { + // Set a dummy sleep. Calculating the current subnet subscriptions will update this + // value with a smarter timing + Box::pin(tokio::time::sleep(Duration::from_secs(1))) + }, + log, + }; + service.recompute_long_lived_subnets(); + service + } + /// Return count of all currently subscribed subnets (long-lived **and** short-lived). #[cfg(test)] pub fn subscription_count(&self) -> usize { if self.subscribe_all_subnets { self.beacon_chain.spec.attestation_subnet_count as usize } else { - self.short_lived_subscriptions + #[cfg(feature = "deterministic_long_lived_attnets")] + let count = self + .short_lived_subscriptions + .keys() + .chain(self.long_lived_subscriptions.iter()) + .collect::>() + .len(); + #[cfg(not(feature = "deterministic_long_lived_attnets"))] + let count = self + .short_lived_subscriptions .keys() .chain(self.long_lived_subscriptions.keys()) .collect::>() - .len() + .len(); + count } } - /// Give access to the current subscriptions for testing purposes. + /// Returns whether we are subscribed to a subnet for testing purposes. #[cfg(test)] - pub(crate) fn subscriptions( + pub(crate) fn is_subscribed( &self, + subnet_id: &SubnetId, subscription_kind: SubscriptionKind, - ) -> &HashMapDelay { + ) -> bool { match subscription_kind { - SubscriptionKind::LongLived => &self.long_lived_subscriptions, - SubscriptionKind::ShortLived => &self.short_lived_subscriptions, + #[cfg(feature = "deterministic_long_lived_attnets")] + SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id), + #[cfg(not(feature = "deterministic_long_lived_attnets"))] + SubscriptionKind::LongLived => self.long_lived_subscriptions.contains_key(subnet_id), + SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id), } } + /// Processes a list of validator subscriptions. /// /// This will: @@ -197,6 +267,7 @@ impl AttestationService { "Validator subscription"; "subscription" => ?subscription, ); + #[cfg(not(feature = "deterministic_long_lived_attnets"))] self.add_known_validator(subscription.validator_index); let subnet_id = match SubnetId::compute_subnet::( @@ -267,6 +338,111 @@ impl AttestationService { Ok(()) } + #[cfg(feature = "deterministic_long_lived_attnets")] + fn recompute_long_lived_subnets(&mut self) { + // Ensure the next computation is scheduled even if assigning subnets fails. + let next_subscription_event = self + .recompute_long_lived_subnets_inner() + .unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration()); + + debug!(self.log, "Recomputing deterministic long lived attnets"); + self.next_long_lived_subscription_event = + Box::pin(tokio::time::sleep(next_subscription_event)); + + if let Some(waker) = self.waker.as_ref() { + waker.wake_by_ref(); + } + } + + /// Gets the long lived subnets the node should be subscribed to during the current epoch and + /// the remaining duration for which they remain valid. + #[cfg(feature = "deterministic_long_lived_attnets")] + fn recompute_long_lived_subnets_inner(&mut self) -> Result { + let current_epoch = self.beacon_chain.epoch().map_err( + |e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e), + )?; + + let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::( + self.node_id, + current_epoch, + &self.beacon_chain.spec, + ) + .map_err(|e| error!(self.log, "Could not compute subnets for current epoch"; "err" => e))?; + + let next_subscription_slot = + next_subscription_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let next_subscription_event = self + .beacon_chain + .slot_clock + .duration_to_slot(next_subscription_slot) + .ok_or_else(|| { + error!( + self.log, + "Failed to compute duration to next to long lived subscription event" + ) + })?; + + self.update_long_lived_subnets(subnets.collect()); + + Ok(next_subscription_event) + } + + #[cfg(all(test, feature = "deterministic_long_lived_attnets"))] + pub fn update_long_lived_subnets_testing(&mut self, subnets: HashSet) { + self.update_long_lived_subnets(subnets) + } + + /// Updates the long lived subnets. + /// + /// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr + /// updated accordingly. + #[cfg(feature = "deterministic_long_lived_attnets")] + fn update_long_lived_subnets(&mut self, mut subnets: HashSet) { + for subnet in &subnets { + // Add the events for those subnets that are new as long lived subscriptions. + if !self.long_lived_subscriptions.contains(subnet) { + // Check if this subnet is new and send the subscription event if needed. + if !self.short_lived_subscriptions.contains_key(subnet) { + debug!(self.log, "Subscribing to subnet"; + "subnet" => ?subnet, + "subscription_kind" => ?SubscriptionKind::LongLived, + ); + self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation( + *subnet, + ))); + } + self.queue_event(SubnetServiceMessage::EnrAdd(Subnet::Attestation(*subnet))); + if !self.discovery_disabled { + self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery { + subnet: Subnet::Attestation(*subnet), + min_ttl: None, + }])) + } + } + } + + // Check for subnets that are being removed + std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets); + for subnet in subnets { + if !self.long_lived_subscriptions.contains(&subnet) { + if !self.short_lived_subscriptions.contains_key(&subnet) { + debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet, "subscription_kind" => ?SubscriptionKind::LongLived); + self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( + subnet, + ))); + } + + self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(subnet))); + } + } + } + + /// Overwrites the long lived subscriptions for testing. + #[cfg(all(test, feature = "deterministic_long_lived_attnets"))] + pub fn set_long_lived_subscriptions(&mut self, subnets: HashSet) { + self.long_lived_subscriptions = subnets + } + /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip /// verification, re-propagates and returns false. pub fn should_process_attestation( @@ -377,6 +553,7 @@ impl AttestationService { // This is a current or past slot, we subscribe immediately. self.subscribe_to_subnet_immediately( subnet_id, + #[cfg(not(feature = "deterministic_long_lived_attnets"))] SubscriptionKind::ShortLived, slot + 1, )?; @@ -391,6 +568,7 @@ impl AttestationService { } /// Updates the `known_validators` mapping and subscribes to long lived subnets if required. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] fn add_known_validator(&mut self, validator_index: u64) { let previously_known = self.known_validators.contains_key(&validator_index); // Add the new validator or update the current timeout for a known validator. @@ -405,6 +583,7 @@ impl AttestationService { /// Subscribe to long-lived random subnets and update the local ENR bitfield. /// The number of subnets to subscribe depends on the number of active validators and number of /// current subscriptions. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] fn subscribe_to_random_subnets(&mut self) { if self.subscribe_all_subnets { // This case is not handled by this service. @@ -468,9 +647,12 @@ impl AttestationService { /// Checks that the time in which the subscription would end is not in the past. If we are /// already subscribed, extends the timeout if necessary. If this is a new subscription, we send /// out the appropriate events. + /// + /// On determinist long lived subnets, this is only used for short lived subscriptions. fn subscribe_to_subnet_immediately( &mut self, subnet_id: SubnetId, + #[cfg(not(feature = "deterministic_long_lived_attnets"))] subscription_kind: SubscriptionKind, end_slot: Slot, ) -> Result<(), &'static str> { @@ -490,9 +672,13 @@ impl AttestationService { return Err("Time when subscription would end has already passed."); } + #[cfg(feature = "deterministic_long_lived_attnets")] + let subscription_kind = SubscriptionKind::ShortLived; + // We need to check and add a subscription for the right kind, regardless of the presence // of the subnet as a subscription of the other kind. This is mainly since long lived // subscriptions can be removed at any time when a validator goes offline. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind { SubscriptionKind::ShortLived => ( &mut self.short_lived_subscriptions, @@ -504,6 +690,12 @@ impl AttestationService { ), }; + #[cfg(feature = "deterministic_long_lived_attnets")] + let (subscriptions, already_subscribed_as_other_kind) = ( + &mut self.short_lived_subscriptions, + self.long_lived_subscriptions.contains(&subnet_id), + ); + match subscriptions.get(&subnet_id) { Some(current_end_slot) => { // We are already subscribed. Check if we need to extend the subscription. @@ -535,6 +727,7 @@ impl AttestationService { } // If this is a new long lived subscription, send out the appropriate events. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] if SubscriptionKind::LongLived == subscription_kind { let subnet = Subnet::Attestation(subnet_id); // Advertise this subnet in our ENR. @@ -564,6 +757,7 @@ impl AttestationService { /// /// This function selects a new subnet to join, or extends the expiry if there are no more /// available subnets to choose from. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) { self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived); @@ -576,12 +770,15 @@ impl AttestationService { // subscription of the other kind. For long lived subscriptions, it also removes the // advertisement from our ENR. fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) { - let other_subscriptions = match subscription_kind { - SubscriptionKind::LongLived => &self.short_lived_subscriptions, - SubscriptionKind::ShortLived => &self.long_lived_subscriptions, + let exists_in_other_subscriptions = match subscription_kind { + SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id), + #[cfg(feature = "deterministic_long_lived_attnets")] + SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id), + #[cfg(not(feature = "deterministic_long_lived_attnets"))] + SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains_key(&subnet_id), }; - if !other_subscriptions.contains_key(&subnet_id) { + if !exists_in_other_subscriptions { // Subscription no longer exists as short lived or long lived. debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind); self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( @@ -603,6 +800,7 @@ impl AttestationService { /// We don't keep track of a specific validator to random subnet, rather the ratio of active /// validators to random subnets. So when a validator goes offline, we can simply remove the /// allocated amount of random subnets. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] fn handle_known_validator_expiry(&mut self) { // Calculate how many subnets should we remove. let extra_subnet_count = { @@ -659,6 +857,7 @@ impl Stream for AttestationService { // Process first any known validator expiries, since these affect how many long lived // subnets we need. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] match self.known_validators.poll_next_unpin(cx) { Poll::Ready(Some(Ok(_validator_index))) => { self.handle_known_validator_expiry(); @@ -669,12 +868,19 @@ impl Stream for AttestationService { Poll::Ready(None) | Poll::Pending => {} } + #[cfg(feature = "deterministic_long_lived_attnets")] + match self.next_long_lived_subscription_event.as_mut().poll(cx) { + Poll::Ready(_) => self.recompute_long_lived_subnets(), + Poll::Pending => {} + } + // Process scheduled subscriptions that might be ready, since those can extend a soon to // expire subscription. match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => { if let Err(e) = self.subscribe_to_subnet_immediately( subnet_id, + #[cfg(not(feature = "deterministic_long_lived_attnets"))] SubscriptionKind::ShortLived, slot + 1, ) { @@ -699,6 +905,7 @@ impl Stream for AttestationService { } // Process any random subnet expiries. + #[cfg(not(feature = "deterministic_long_lived_attnets"))] match self.long_lived_subscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => { self.handle_random_subnet_expiry(subnet_id) diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 65ca9f219..30f030eba 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -123,7 +123,15 @@ fn get_attestation_service( let beacon_chain = CHAIN.chain.clone(); - AttestationService::new(beacon_chain, &config, &log) + AttestationService::new( + beacon_chain, + #[cfg(feature = "deterministic_long_lived_attnets")] + lighthouse_network::discv5::enr::NodeId::random() + .raw() + .into(), + &config, + &log, + ) } fn get_sync_committee_service() -> SyncCommitteeService { @@ -170,6 +178,9 @@ async fn get_events + Unpin>( mod attestation_service { + #[cfg(feature = "deterministic_long_lived_attnets")] + use std::collections::HashSet; + use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD; use super::*; @@ -190,6 +201,7 @@ mod attestation_service { } } + #[cfg(not(feature = "deterministic_long_lived_attnets"))] fn get_subscriptions( validator_count: u64, slot: Slot, @@ -268,8 +280,7 @@ mod attestation_service { // If the long lived and short lived subnets are the same, there should be no more events // as we don't resubscribe already subscribed subnets. if !attestation_service - .subscriptions(attestation_subnets::SubscriptionKind::LongLived) - .contains_key(&subnet_id) + .is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived) { assert_eq!(expected[..], events[3..]); } @@ -352,11 +363,12 @@ mod attestation_service { let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); - // Should be still subscribed to 1 long lived and 1 short lived subnet if both are different. - if !attestation_service - .subscriptions(attestation_subnets::SubscriptionKind::LongLived) - .contains_key(&subnet_id1) - { + // Should be still subscribed to 1 long lived and 1 short lived subnet if both are + // different. + if !attestation_service.is_subscribed( + &subnet_id1, + attestation_subnets::SubscriptionKind::LongLived, + ) { assert_eq!(expected, events[3]); assert_eq!(attestation_service.subscription_count(), 2); } else { @@ -366,11 +378,12 @@ mod attestation_service { // Get event for 1 more slot duration, we should get the unsubscribe event now. let unsubscribe_event = get_events(&mut attestation_service, None, 1).await; - // If the long lived and short lived subnets are different, we should get an unsubscription event. - if !attestation_service - .subscriptions(attestation_subnets::SubscriptionKind::LongLived) - .contains_key(&subnet_id1) - { + // If the long lived and short lived subnets are different, we should get an unsubscription + // event. + if !attestation_service.is_subscribed( + &subnet_id1, + attestation_subnets::SubscriptionKind::LongLived, + ) { assert_eq!( [SubnetServiceMessage::Unsubscribe(Subnet::Attestation( subnet_id1 @@ -383,6 +396,7 @@ mod attestation_service { assert_eq!(attestation_service.subscription_count(), 1); } + #[cfg(not(feature = "deterministic_long_lived_attnets"))] #[tokio::test] async fn subscribe_all_random_subnets() { let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; @@ -440,6 +454,7 @@ mod attestation_service { // test completed successfully } + #[cfg(not(feature = "deterministic_long_lived_attnets"))] #[tokio::test] async fn subscribe_all_random_subnets_plus_one() { let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; @@ -573,10 +588,10 @@ mod attestation_service { let expected_unsubscription = SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1)); - if !attestation_service - .subscriptions(attestation_subnets::SubscriptionKind::LongLived) - .contains_key(&subnet_id1) - { + if !attestation_service.is_subscribed( + &subnet_id1, + attestation_subnets::SubscriptionKind::LongLived, + ) { assert_eq!(expected_subscription, events[3]); // fourth is a discovery event assert_eq!(expected_unsubscription, events[5]); @@ -600,10 +615,10 @@ mod attestation_service { let second_subscribe_event = get_events(&mut attestation_service, None, 2).await; // If the long lived and short lived subnets are different, we should get an unsubscription event. - if !attestation_service - .subscriptions(attestation_subnets::SubscriptionKind::LongLived) - .contains_key(&subnet_id1) - { + if !attestation_service.is_subscribed( + &subnet_id1, + attestation_subnets::SubscriptionKind::LongLived, + ) { assert_eq!( [SubnetServiceMessage::Subscribe(Subnet::Attestation( subnet_id1 @@ -612,6 +627,43 @@ mod attestation_service { ); } } + + #[tokio::test] + #[cfg(feature = "deterministic_long_lived_attnets")] + async fn test_update_deterministic_long_lived_subnets() { + let mut attestation_service = get_attestation_service(None); + let new_subnet = SubnetId::new(1); + let maintained_subnet = SubnetId::new(2); + let removed_subnet = SubnetId::new(3); + + attestation_service + .set_long_lived_subscriptions(HashSet::from([removed_subnet, maintained_subnet])); + // clear initial events + let _events = get_events(&mut attestation_service, None, 1).await; + + attestation_service + .update_long_lived_subnets_testing(HashSet::from([maintained_subnet, new_subnet])); + + let events = get_events(&mut attestation_service, None, 1).await; + let new_subnet = Subnet::Attestation(new_subnet); + let removed_subnet = Subnet::Attestation(removed_subnet); + assert_eq!( + events, + [ + // events for the new subnet + SubnetServiceMessage::Subscribe(new_subnet), + SubnetServiceMessage::EnrAdd(new_subnet), + SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery { + subnet: new_subnet, + min_ttl: None + }]), + // events for the removed subnet + SubnetServiceMessage::Unsubscribe(removed_subnet), + SubnetServiceMessage::EnrRemove(removed_subnet), + ] + ); + println!("{events:?}") + } } mod sync_committee_service { diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index b2ba24ac3..f68e65d7d 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -161,6 +161,9 @@ pub struct ChainSpec { pub attestation_subnet_count: u64, pub random_subnets_per_validator: u64, pub epochs_per_random_subnet_subscription: u64, + pub subnets_per_node: u8, + pub epochs_per_subnet_subscription: u64, + attestation_subnet_extra_bits: u8, /* * Application params @@ -427,6 +430,22 @@ impl ChainSpec { Hash256::from(domain) } + #[allow(clippy::integer_arithmetic)] + pub const fn attestation_subnet_prefix_bits(&self) -> u32 { + // maybe use log2 when stable https://github.com/rust-lang/rust/issues/70887 + + // NOTE: this line is here simply to guarantee that if self.attestation_subnet_count type + // is changed, a compiler warning will be raised. This code depends on the type being u64. + let attestation_subnet_count: u64 = self.attestation_subnet_count; + let attestation_subnet_count_bits = if attestation_subnet_count == 0 { + 0 + } else { + 63 - attestation_subnet_count.leading_zeros() + }; + + self.attestation_subnet_extra_bits as u32 + attestation_subnet_count_bits + } + /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. pub fn mainnet() -> Self { Self { @@ -576,9 +595,12 @@ impl ChainSpec { attestation_propagation_slot_range: 32, attestation_subnet_count: 64, random_subnets_per_validator: 1, + subnets_per_node: 1, maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_random_subnet_subscription: 256, + epochs_per_subnet_subscription: 256, + attestation_subnet_extra_bits: 6, /* * Application specific @@ -786,9 +808,12 @@ impl ChainSpec { attestation_propagation_slot_range: 32, attestation_subnet_count: 64, random_subnets_per_validator: 1, + subnets_per_node: 1, maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, epochs_per_random_subnet_subscription: 256, + epochs_per_subnet_subscription: 256, + attestation_subnet_extra_bits: 6, /* * Application specific diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 046ed8f33..e1de27761 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -1,8 +1,9 @@ //! Identifies each shard by an integer identifier. -use crate::{AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use crate::{AttestationData, ChainSpec, CommitteeIndex, Epoch, EthSpec, Slot}; use safe_arith::{ArithError, SafeArith}; use serde_derive::{Deserialize, Serialize}; use std::ops::{Deref, DerefMut}; +use swap_or_not_shuffle::compute_shuffled_index; const MAX_SUBNET_ID: usize = 64; @@ -71,6 +72,45 @@ impl SubnetId { .safe_rem(spec.attestation_subnet_count)? .into()) } + + #[allow(clippy::integer_arithmetic)] + /// Computes the set of subnets the node should be subscribed to during the current epoch, + /// along with the first epoch in which these subscriptions are no longer valid. + pub fn compute_subnets_for_epoch( + node_id: ethereum_types::U256, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result<(impl Iterator, Epoch), &'static str> { + let node_id_prefix = + (node_id >> (256 - spec.attestation_subnet_prefix_bits() as usize)).as_usize(); + + let subscription_event_idx = epoch.as_u64() / spec.epochs_per_subnet_subscription; + let permutation_seed = + eth2_hashing::hash(&int_to_bytes::int_to_bytes8(subscription_event_idx)); + + let num_subnets = 1 << spec.attestation_subnet_prefix_bits(); + + let permutated_prefix = compute_shuffled_index( + node_id_prefix, + num_subnets, + &permutation_seed, + spec.shuffle_round_count, + ) + .ok_or("Unable to shuffle")? as u64; + + // Get the constants we need to avoid holding a reference to the spec + let &ChainSpec { + subnets_per_node, + attestation_subnet_count, + .. + } = spec; + + let subnet_set_generator = (0..subnets_per_node).map(move |idx| { + SubnetId::new((permutated_prefix + idx as u64) % attestation_subnet_count) + }); + let valid_until_epoch = (subscription_event_idx + 1) * spec.epochs_per_subnet_subscription; + Ok((subnet_set_generator, valid_until_epoch.into())) + } } impl Deref for SubnetId {