[DEV FEATURE] Deterministic long lived subnets (#3453)

## Issue Addressed

#2847 

## Proposed Changes
Add under a feature flag the required changes to subscribe to long lived subnets in a deterministic way

## Additional Info

There is an additional required change that is actually searching for peers using the prefix, but I find that it's best to make this change in the future
This commit is contained in:
Divma 2022-10-04 10:37:48 +00:00
parent 6a92bf70e4
commit 4926e3967f
7 changed files with 371 additions and 37 deletions

1
Cargo.lock generated
View File

@ -4150,6 +4150,7 @@ dependencies = [
"error-chain", "error-chain",
"eth2_ssz", "eth2_ssz",
"eth2_ssz_types", "eth2_ssz_types",
"ethereum-types 0.12.1",
"exit-future", "exit-future",
"fnv", "fnv",
"futures", "futures",

View File

@ -44,3 +44,8 @@ strum = "0.24.0"
tokio-util = { version = "0.6.3", features = ["time"] } tokio-util = { version = "0.6.3", features = ["time"] }
derivative = "2.2.0" derivative = "2.2.0"
delay_map = "0.1.1" delay_map = "0.1.1"
ethereum-types = { version = "0.12.1", optional = true }
[features]
deterministic_long_lived_attnets = [ "ethereum-types" ]
# default = ["deterministic_long_lived_attnets"]

View File

@ -299,9 +299,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
)?; )?;
// attestation subnet service // attestation subnet service
let attestation_service = let attestation_service = AttestationService::new(
AttestationService::new(beacon_chain.clone(), config, &network_log); beacon_chain.clone(),
#[cfg(feature = "deterministic_long_lived_attnets")]
network_globals.local_enr().node_id().raw().into(),
config,
&network_log,
);
// sync committee subnet service // sync committee subnet service
let sync_committee_service = let sync_committee_service =
SyncCommitteeService::new(beacon_chain.clone(), config, &network_log); SyncCommitteeService::new(beacon_chain.clone(), config, &network_log);

View File

@ -3,7 +3,7 @@
//! determines whether attestations should be aggregated and/or passed to the beacon node. //! determines whether attestations should be aggregated and/or passed to the beacon node.
use super::SubnetServiceMessage; use super::SubnetServiceMessage;
#[cfg(test)] #[cfg(any(test, feature = "deterministic_long_lived_attnets"))]
use std::collections::HashSet; use std::collections::HashSet;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::pin::Pin; use std::pin::Pin;
@ -15,6 +15,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
use delay_map::{HashMapDelay, HashSetDelay}; use delay_map::{HashMapDelay, HashSetDelay};
use futures::prelude::*; use futures::prelude::*;
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery}; use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery};
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use slog::{debug, error, o, trace, warn}; use slog::{debug, error, o, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@ -28,6 +29,7 @@ use crate::metrics;
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from /// The time (in slots) before a last seen validator is considered absent and we unsubscribe from
/// the random gossip topics that we subscribed to due to the validator connection. /// the random gossip topics that we subscribed to due to the validator connection.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150; const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150;
/// The fraction of a slot that we subscribe to a subnet before the required slot. /// The fraction of a slot that we subscribe to a subnet before the required slot.
/// ///
@ -70,6 +72,9 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// Subnets we are currently subscribed to as long lived subscriptions. /// Subnets we are currently subscribed to as long lived subscriptions.
/// ///
/// We advertise these in our ENR. When these expire, the subnet is removed from our ENR. /// We advertise these in our ENR. When these expire, the subnet is removed from our ENR.
#[cfg(feature = "deterministic_long_lived_attnets")]
long_lived_subscriptions: HashSet<SubnetId>,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
long_lived_subscriptions: HashMapDelay<SubnetId, Slot>, long_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
/// Short lived subscriptions that need to be done in the future. /// Short lived subscriptions that need to be done in the future.
@ -83,6 +88,7 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update /// subscribed to. As these time out, we unsubscribe for the required random subnets and update
/// our ENR. /// our ENR.
/// This is a set of validator indices. /// This is a set of validator indices.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
known_validators: HashSetDelay<u64>, known_validators: HashSetDelay<u64>,
/// The waker for the current thread. /// The waker for the current thread.
@ -95,8 +101,17 @@ pub struct AttestationService<T: BeaconChainTypes> {
subscribe_all_subnets: bool, subscribe_all_subnets: bool,
/// For how many slots we subscribe to long lived subnets. /// For how many slots we subscribe to long lived subnets.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
long_lived_subnet_subscription_slots: u64, long_lived_subnet_subscription_slots: u64,
/// Our Discv5 node_id.
#[cfg(feature = "deterministic_long_lived_attnets")]
node_id: ethereum_types::U256,
/// Future used to manage subscribing and unsubscribing from long lived subnets.
#[cfg(feature = "deterministic_long_lived_attnets")]
next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>,
/// The logger for the attestation service. /// The logger for the attestation service.
log: slog::Logger, log: slog::Logger,
} }
@ -104,6 +119,7 @@ pub struct AttestationService<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> AttestationService<T> { impl<T: BeaconChainTypes> AttestationService<T> {
/* Public functions */ /* Public functions */
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
pub fn new( pub fn new(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
config: &NetworkConfig, config: &NetworkConfig,
@ -145,31 +161,85 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
} }
#[cfg(feature = "deterministic_long_lived_attnets")]
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
node_id: ethereum_types::U256,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "attestation_service"));
// Calculate the random subnet duration from the spec constants.
let slot_duration = beacon_chain.slot_clock.slot_duration();
slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node);
let track_validators = !config.import_all_attestations;
let aggregate_validators_on_subnet =
track_validators.then(|| HashSetDelay::new(slot_duration));
let mut service = AttestationService {
events: VecDeque::with_capacity(10),
beacon_chain,
short_lived_subscriptions: HashMapDelay::new(slot_duration),
long_lived_subscriptions: HashSet::default(),
scheduled_short_lived_subscriptions: HashSetDelay::default(),
aggregate_validators_on_subnet,
waker: None,
discovery_disabled: config.disable_discovery,
subscribe_all_subnets: config.subscribe_all_subnets,
node_id,
next_long_lived_subscription_event: {
// Set a dummy sleep. Calculating the current subnet subscriptions will update this
// value with a smarter timing
Box::pin(tokio::time::sleep(Duration::from_secs(1)))
},
log,
};
service.recompute_long_lived_subnets();
service
}
/// Return count of all currently subscribed subnets (long-lived **and** short-lived). /// Return count of all currently subscribed subnets (long-lived **and** short-lived).
#[cfg(test)] #[cfg(test)]
pub fn subscription_count(&self) -> usize { pub fn subscription_count(&self) -> usize {
if self.subscribe_all_subnets { if self.subscribe_all_subnets {
self.beacon_chain.spec.attestation_subnet_count as usize self.beacon_chain.spec.attestation_subnet_count as usize
} else { } else {
self.short_lived_subscriptions #[cfg(feature = "deterministic_long_lived_attnets")]
let count = self
.short_lived_subscriptions
.keys()
.chain(self.long_lived_subscriptions.iter())
.collect::<HashSet<_>>()
.len();
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
let count = self
.short_lived_subscriptions
.keys() .keys()
.chain(self.long_lived_subscriptions.keys()) .chain(self.long_lived_subscriptions.keys())
.collect::<HashSet<_>>() .collect::<HashSet<_>>()
.len() .len();
count
} }
} }
/// Give access to the current subscriptions for testing purposes. /// Returns whether we are subscribed to a subnet for testing purposes.
#[cfg(test)] #[cfg(test)]
pub(crate) fn subscriptions( pub(crate) fn is_subscribed(
&self, &self,
subnet_id: &SubnetId,
subscription_kind: SubscriptionKind, subscription_kind: SubscriptionKind,
) -> &HashMapDelay<SubnetId, Slot> { ) -> bool {
match subscription_kind { match subscription_kind {
SubscriptionKind::LongLived => &self.long_lived_subscriptions, #[cfg(feature = "deterministic_long_lived_attnets")]
SubscriptionKind::ShortLived => &self.short_lived_subscriptions, SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id),
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains_key(subnet_id),
SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id),
} }
} }
/// Processes a list of validator subscriptions. /// Processes a list of validator subscriptions.
/// ///
/// This will: /// This will:
@ -197,6 +267,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
"Validator subscription"; "Validator subscription";
"subscription" => ?subscription, "subscription" => ?subscription,
); );
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
self.add_known_validator(subscription.validator_index); self.add_known_validator(subscription.validator_index);
let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>( let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>(
@ -267,6 +338,111 @@ impl<T: BeaconChainTypes> AttestationService<T> {
Ok(()) Ok(())
} }
#[cfg(feature = "deterministic_long_lived_attnets")]
fn recompute_long_lived_subnets(&mut self) {
// Ensure the next computation is scheduled even if assigning subnets fails.
let next_subscription_event = self
.recompute_long_lived_subnets_inner()
.unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration());
debug!(self.log, "Recomputing deterministic long lived attnets");
self.next_long_lived_subscription_event =
Box::pin(tokio::time::sleep(next_subscription_event));
if let Some(waker) = self.waker.as_ref() {
waker.wake_by_ref();
}
}
/// Gets the long lived subnets the node should be subscribed to during the current epoch and
/// the remaining duration for which they remain valid.
#[cfg(feature = "deterministic_long_lived_attnets")]
fn recompute_long_lived_subnets_inner(&mut self) -> Result<Duration, ()> {
let current_epoch = self.beacon_chain.epoch().map_err(
|e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e),
)?;
let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::<T::EthSpec>(
self.node_id,
current_epoch,
&self.beacon_chain.spec,
)
.map_err(|e| error!(self.log, "Could not compute subnets for current epoch"; "err" => e))?;
let next_subscription_slot =
next_subscription_epoch.start_slot(T::EthSpec::slots_per_epoch());
let next_subscription_event = self
.beacon_chain
.slot_clock
.duration_to_slot(next_subscription_slot)
.ok_or_else(|| {
error!(
self.log,
"Failed to compute duration to next to long lived subscription event"
)
})?;
self.update_long_lived_subnets(subnets.collect());
Ok(next_subscription_event)
}
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
pub fn update_long_lived_subnets_testing(&mut self, subnets: HashSet<SubnetId>) {
self.update_long_lived_subnets(subnets)
}
/// Updates the long lived subnets.
///
/// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr
/// updated accordingly.
#[cfg(feature = "deterministic_long_lived_attnets")]
fn update_long_lived_subnets(&mut self, mut subnets: HashSet<SubnetId>) {
for subnet in &subnets {
// Add the events for those subnets that are new as long lived subscriptions.
if !self.long_lived_subscriptions.contains(subnet) {
// Check if this subnet is new and send the subscription event if needed.
if !self.short_lived_subscriptions.contains_key(subnet) {
debug!(self.log, "Subscribing to subnet";
"subnet" => ?subnet,
"subscription_kind" => ?SubscriptionKind::LongLived,
);
self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation(
*subnet,
)));
}
self.queue_event(SubnetServiceMessage::EnrAdd(Subnet::Attestation(*subnet)));
if !self.discovery_disabled {
self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet: Subnet::Attestation(*subnet),
min_ttl: None,
}]))
}
}
}
// Check for subnets that are being removed
std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets);
for subnet in subnets {
if !self.long_lived_subscriptions.contains(&subnet) {
if !self.short_lived_subscriptions.contains_key(&subnet) {
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet, "subscription_kind" => ?SubscriptionKind::LongLived);
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet,
)));
}
self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(subnet)));
}
}
}
/// Overwrites the long lived subscriptions for testing.
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
pub fn set_long_lived_subscriptions(&mut self, subnets: HashSet<SubnetId>) {
self.long_lived_subscriptions = subnets
}
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
/// verification, re-propagates and returns false. /// verification, re-propagates and returns false.
pub fn should_process_attestation( pub fn should_process_attestation(
@ -377,6 +553,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// This is a current or past slot, we subscribe immediately. // This is a current or past slot, we subscribe immediately.
self.subscribe_to_subnet_immediately( self.subscribe_to_subnet_immediately(
subnet_id, subnet_id,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::ShortLived, SubscriptionKind::ShortLived,
slot + 1, slot + 1,
)?; )?;
@ -391,6 +568,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
/// Updates the `known_validators` mapping and subscribes to long lived subnets if required. /// Updates the `known_validators` mapping and subscribes to long lived subnets if required.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn add_known_validator(&mut self, validator_index: u64) { fn add_known_validator(&mut self, validator_index: u64) {
let previously_known = self.known_validators.contains_key(&validator_index); let previously_known = self.known_validators.contains_key(&validator_index);
// Add the new validator or update the current timeout for a known validator. // Add the new validator or update the current timeout for a known validator.
@ -405,6 +583,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// Subscribe to long-lived random subnets and update the local ENR bitfield. /// Subscribe to long-lived random subnets and update the local ENR bitfield.
/// The number of subnets to subscribe depends on the number of active validators and number of /// The number of subnets to subscribe depends on the number of active validators and number of
/// current subscriptions. /// current subscriptions.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn subscribe_to_random_subnets(&mut self) { fn subscribe_to_random_subnets(&mut self) {
if self.subscribe_all_subnets { if self.subscribe_all_subnets {
// This case is not handled by this service. // This case is not handled by this service.
@ -468,9 +647,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// Checks that the time in which the subscription would end is not in the past. If we are /// Checks that the time in which the subscription would end is not in the past. If we are
/// already subscribed, extends the timeout if necessary. If this is a new subscription, we send /// already subscribed, extends the timeout if necessary. If this is a new subscription, we send
/// out the appropriate events. /// out the appropriate events.
///
/// On determinist long lived subnets, this is only used for short lived subscriptions.
fn subscribe_to_subnet_immediately( fn subscribe_to_subnet_immediately(
&mut self, &mut self,
subnet_id: SubnetId, subnet_id: SubnetId,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
subscription_kind: SubscriptionKind, subscription_kind: SubscriptionKind,
end_slot: Slot, end_slot: Slot,
) -> Result<(), &'static str> { ) -> Result<(), &'static str> {
@ -490,9 +672,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
return Err("Time when subscription would end has already passed."); return Err("Time when subscription would end has already passed.");
} }
#[cfg(feature = "deterministic_long_lived_attnets")]
let subscription_kind = SubscriptionKind::ShortLived;
// We need to check and add a subscription for the right kind, regardless of the presence // We need to check and add a subscription for the right kind, regardless of the presence
// of the subnet as a subscription of the other kind. This is mainly since long lived // of the subnet as a subscription of the other kind. This is mainly since long lived
// subscriptions can be removed at any time when a validator goes offline. // subscriptions can be removed at any time when a validator goes offline.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind { let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind {
SubscriptionKind::ShortLived => ( SubscriptionKind::ShortLived => (
&mut self.short_lived_subscriptions, &mut self.short_lived_subscriptions,
@ -504,6 +690,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
), ),
}; };
#[cfg(feature = "deterministic_long_lived_attnets")]
let (subscriptions, already_subscribed_as_other_kind) = (
&mut self.short_lived_subscriptions,
self.long_lived_subscriptions.contains(&subnet_id),
);
match subscriptions.get(&subnet_id) { match subscriptions.get(&subnet_id) {
Some(current_end_slot) => { Some(current_end_slot) => {
// We are already subscribed. Check if we need to extend the subscription. // We are already subscribed. Check if we need to extend the subscription.
@ -535,6 +727,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
// If this is a new long lived subscription, send out the appropriate events. // If this is a new long lived subscription, send out the appropriate events.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
if SubscriptionKind::LongLived == subscription_kind { if SubscriptionKind::LongLived == subscription_kind {
let subnet = Subnet::Attestation(subnet_id); let subnet = Subnet::Attestation(subnet_id);
// Advertise this subnet in our ENR. // Advertise this subnet in our ENR.
@ -564,6 +757,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// ///
/// This function selects a new subnet to join, or extends the expiry if there are no more /// This function selects a new subnet to join, or extends the expiry if there are no more
/// available subnets to choose from. /// available subnets to choose from.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) { fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) {
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived); self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
@ -576,12 +770,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// subscription of the other kind. For long lived subscriptions, it also removes the // subscription of the other kind. For long lived subscriptions, it also removes the
// advertisement from our ENR. // advertisement from our ENR.
fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) { fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) {
let other_subscriptions = match subscription_kind { let exists_in_other_subscriptions = match subscription_kind {
SubscriptionKind::LongLived => &self.short_lived_subscriptions, SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id),
SubscriptionKind::ShortLived => &self.long_lived_subscriptions, #[cfg(feature = "deterministic_long_lived_attnets")]
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id),
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains_key(&subnet_id),
}; };
if !other_subscriptions.contains_key(&subnet_id) { if !exists_in_other_subscriptions {
// Subscription no longer exists as short lived or long lived. // Subscription no longer exists as short lived or long lived.
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind); debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind);
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
@ -603,6 +800,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// We don't keep track of a specific validator to random subnet, rather the ratio of active /// We don't keep track of a specific validator to random subnet, rather the ratio of active
/// validators to random subnets. So when a validator goes offline, we can simply remove the /// validators to random subnets. So when a validator goes offline, we can simply remove the
/// allocated amount of random subnets. /// allocated amount of random subnets.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn handle_known_validator_expiry(&mut self) { fn handle_known_validator_expiry(&mut self) {
// Calculate how many subnets should we remove. // Calculate how many subnets should we remove.
let extra_subnet_count = { let extra_subnet_count = {
@ -659,6 +857,7 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
// Process first any known validator expiries, since these affect how many long lived // Process first any known validator expiries, since these affect how many long lived
// subnets we need. // subnets we need.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
match self.known_validators.poll_next_unpin(cx) { match self.known_validators.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(_validator_index))) => { Poll::Ready(Some(Ok(_validator_index))) => {
self.handle_known_validator_expiry(); self.handle_known_validator_expiry();
@ -669,12 +868,19 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
Poll::Ready(None) | Poll::Pending => {} Poll::Ready(None) | Poll::Pending => {}
} }
#[cfg(feature = "deterministic_long_lived_attnets")]
match self.next_long_lived_subscription_event.as_mut().poll(cx) {
Poll::Ready(_) => self.recompute_long_lived_subnets(),
Poll::Pending => {}
}
// Process scheduled subscriptions that might be ready, since those can extend a soon to // Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription. // expire subscription.
match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) { match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => { Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => {
if let Err(e) = self.subscribe_to_subnet_immediately( if let Err(e) = self.subscribe_to_subnet_immediately(
subnet_id, subnet_id,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::ShortLived, SubscriptionKind::ShortLived,
slot + 1, slot + 1,
) { ) {
@ -699,6 +905,7 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
} }
// Process any random subnet expiries. // Process any random subnet expiries.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
match self.long_lived_subscriptions.poll_next_unpin(cx) { match self.long_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => { Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
self.handle_random_subnet_expiry(subnet_id) self.handle_random_subnet_expiry(subnet_id)

View File

@ -123,7 +123,15 @@ fn get_attestation_service(
let beacon_chain = CHAIN.chain.clone(); let beacon_chain = CHAIN.chain.clone();
AttestationService::new(beacon_chain, &config, &log) AttestationService::new(
beacon_chain,
#[cfg(feature = "deterministic_long_lived_attnets")]
lighthouse_network::discv5::enr::NodeId::random()
.raw()
.into(),
&config,
&log,
)
} }
fn get_sync_committee_service() -> SyncCommitteeService<TestBeaconChainType> { fn get_sync_committee_service() -> SyncCommitteeService<TestBeaconChainType> {
@ -170,6 +178,9 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
mod attestation_service { mod attestation_service {
#[cfg(feature = "deterministic_long_lived_attnets")]
use std::collections::HashSet;
use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD; use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
use super::*; use super::*;
@ -190,6 +201,7 @@ mod attestation_service {
} }
} }
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn get_subscriptions( fn get_subscriptions(
validator_count: u64, validator_count: u64,
slot: Slot, slot: Slot,
@ -268,8 +280,7 @@ mod attestation_service {
// If the long lived and short lived subnets are the same, there should be no more events // If the long lived and short lived subnets are the same, there should be no more events
// as we don't resubscribe already subscribed subnets. // as we don't resubscribe already subscribed subnets.
if !attestation_service if !attestation_service
.subscriptions(attestation_subnets::SubscriptionKind::LongLived) .is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived)
.contains_key(&subnet_id)
{ {
assert_eq!(expected[..], events[3..]); assert_eq!(expected[..], events[3..]);
} }
@ -352,11 +363,12 @@ mod attestation_service {
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are different. // Should be still subscribed to 1 long lived and 1 short lived subnet if both are
if !attestation_service // different.
.subscriptions(attestation_subnets::SubscriptionKind::LongLived) if !attestation_service.is_subscribed(
.contains_key(&subnet_id1) &subnet_id1,
{ attestation_subnets::SubscriptionKind::LongLived,
) {
assert_eq!(expected, events[3]); assert_eq!(expected, events[3]);
assert_eq!(attestation_service.subscription_count(), 2); assert_eq!(attestation_service.subscription_count(), 2);
} else { } else {
@ -366,11 +378,12 @@ mod attestation_service {
// Get event for 1 more slot duration, we should get the unsubscribe event now. // Get event for 1 more slot duration, we should get the unsubscribe event now.
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await; let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event. // If the long lived and short lived subnets are different, we should get an unsubscription
if !attestation_service // event.
.subscriptions(attestation_subnets::SubscriptionKind::LongLived) if !attestation_service.is_subscribed(
.contains_key(&subnet_id1) &subnet_id1,
{ attestation_subnets::SubscriptionKind::LongLived,
) {
assert_eq!( assert_eq!(
[SubnetServiceMessage::Unsubscribe(Subnet::Attestation( [SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id1 subnet_id1
@ -383,6 +396,7 @@ mod attestation_service {
assert_eq!(attestation_service.subscription_count(), 1); assert_eq!(attestation_service.subscription_count(), 1);
} }
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
#[tokio::test] #[tokio::test]
async fn subscribe_all_random_subnets() { async fn subscribe_all_random_subnets() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
@ -440,6 +454,7 @@ mod attestation_service {
// test completed successfully // test completed successfully
} }
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
#[tokio::test] #[tokio::test]
async fn subscribe_all_random_subnets_plus_one() { async fn subscribe_all_random_subnets_plus_one() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
@ -573,10 +588,10 @@ mod attestation_service {
let expected_unsubscription = let expected_unsubscription =
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1)); SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));
if !attestation_service if !attestation_service.is_subscribed(
.subscriptions(attestation_subnets::SubscriptionKind::LongLived) &subnet_id1,
.contains_key(&subnet_id1) attestation_subnets::SubscriptionKind::LongLived,
{ ) {
assert_eq!(expected_subscription, events[3]); assert_eq!(expected_subscription, events[3]);
// fourth is a discovery event // fourth is a discovery event
assert_eq!(expected_unsubscription, events[5]); assert_eq!(expected_unsubscription, events[5]);
@ -600,10 +615,10 @@ mod attestation_service {
let second_subscribe_event = get_events(&mut attestation_service, None, 2).await; let second_subscribe_event = get_events(&mut attestation_service, None, 2).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event. // If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service if !attestation_service.is_subscribed(
.subscriptions(attestation_subnets::SubscriptionKind::LongLived) &subnet_id1,
.contains_key(&subnet_id1) attestation_subnets::SubscriptionKind::LongLived,
{ ) {
assert_eq!( assert_eq!(
[SubnetServiceMessage::Subscribe(Subnet::Attestation( [SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id1 subnet_id1
@ -612,6 +627,43 @@ mod attestation_service {
); );
} }
} }
#[tokio::test]
#[cfg(feature = "deterministic_long_lived_attnets")]
async fn test_update_deterministic_long_lived_subnets() {
let mut attestation_service = get_attestation_service(None);
let new_subnet = SubnetId::new(1);
let maintained_subnet = SubnetId::new(2);
let removed_subnet = SubnetId::new(3);
attestation_service
.set_long_lived_subscriptions(HashSet::from([removed_subnet, maintained_subnet]));
// clear initial events
let _events = get_events(&mut attestation_service, None, 1).await;
attestation_service
.update_long_lived_subnets_testing(HashSet::from([maintained_subnet, new_subnet]));
let events = get_events(&mut attestation_service, None, 1).await;
let new_subnet = Subnet::Attestation(new_subnet);
let removed_subnet = Subnet::Attestation(removed_subnet);
assert_eq!(
events,
[
// events for the new subnet
SubnetServiceMessage::Subscribe(new_subnet),
SubnetServiceMessage::EnrAdd(new_subnet),
SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet: new_subnet,
min_ttl: None
}]),
// events for the removed subnet
SubnetServiceMessage::Unsubscribe(removed_subnet),
SubnetServiceMessage::EnrRemove(removed_subnet),
]
);
println!("{events:?}")
}
} }
mod sync_committee_service { mod sync_committee_service {

View File

@ -161,6 +161,9 @@ pub struct ChainSpec {
pub attestation_subnet_count: u64, pub attestation_subnet_count: u64,
pub random_subnets_per_validator: u64, pub random_subnets_per_validator: u64,
pub epochs_per_random_subnet_subscription: u64, pub epochs_per_random_subnet_subscription: u64,
pub subnets_per_node: u8,
pub epochs_per_subnet_subscription: u64,
attestation_subnet_extra_bits: u8,
/* /*
* Application params * Application params
@ -427,6 +430,22 @@ impl ChainSpec {
Hash256::from(domain) Hash256::from(domain)
} }
#[allow(clippy::integer_arithmetic)]
pub const fn attestation_subnet_prefix_bits(&self) -> u32 {
// maybe use log2 when stable https://github.com/rust-lang/rust/issues/70887
// NOTE: this line is here simply to guarantee that if self.attestation_subnet_count type
// is changed, a compiler warning will be raised. This code depends on the type being u64.
let attestation_subnet_count: u64 = self.attestation_subnet_count;
let attestation_subnet_count_bits = if attestation_subnet_count == 0 {
0
} else {
63 - attestation_subnet_count.leading_zeros()
};
self.attestation_subnet_extra_bits as u32 + attestation_subnet_count_bits
}
/// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification.
pub fn mainnet() -> Self { pub fn mainnet() -> Self {
Self { Self {
@ -576,9 +595,12 @@ impl ChainSpec {
attestation_propagation_slot_range: 32, attestation_propagation_slot_range: 32,
attestation_subnet_count: 64, attestation_subnet_count: 64,
random_subnets_per_validator: 1, random_subnets_per_validator: 1,
subnets_per_node: 1,
maximum_gossip_clock_disparity_millis: 500, maximum_gossip_clock_disparity_millis: 500,
target_aggregators_per_committee: 16, target_aggregators_per_committee: 16,
epochs_per_random_subnet_subscription: 256, epochs_per_random_subnet_subscription: 256,
epochs_per_subnet_subscription: 256,
attestation_subnet_extra_bits: 6,
/* /*
* Application specific * Application specific
@ -786,9 +808,12 @@ impl ChainSpec {
attestation_propagation_slot_range: 32, attestation_propagation_slot_range: 32,
attestation_subnet_count: 64, attestation_subnet_count: 64,
random_subnets_per_validator: 1, random_subnets_per_validator: 1,
subnets_per_node: 1,
maximum_gossip_clock_disparity_millis: 500, maximum_gossip_clock_disparity_millis: 500,
target_aggregators_per_committee: 16, target_aggregators_per_committee: 16,
epochs_per_random_subnet_subscription: 256, epochs_per_random_subnet_subscription: 256,
epochs_per_subnet_subscription: 256,
attestation_subnet_extra_bits: 6,
/* /*
* Application specific * Application specific

View File

@ -1,8 +1,9 @@
//! Identifies each shard by an integer identifier. //! Identifies each shard by an integer identifier.
use crate::{AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; use crate::{AttestationData, ChainSpec, CommitteeIndex, Epoch, EthSpec, Slot};
use safe_arith::{ArithError, SafeArith}; use safe_arith::{ArithError, SafeArith};
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use swap_or_not_shuffle::compute_shuffled_index;
const MAX_SUBNET_ID: usize = 64; const MAX_SUBNET_ID: usize = 64;
@ -71,6 +72,45 @@ impl SubnetId {
.safe_rem(spec.attestation_subnet_count)? .safe_rem(spec.attestation_subnet_count)?
.into()) .into())
} }
#[allow(clippy::integer_arithmetic)]
/// Computes the set of subnets the node should be subscribed to during the current epoch,
/// along with the first epoch in which these subscriptions are no longer valid.
pub fn compute_subnets_for_epoch<T: EthSpec>(
node_id: ethereum_types::U256,
epoch: Epoch,
spec: &ChainSpec,
) -> Result<(impl Iterator<Item = SubnetId>, Epoch), &'static str> {
let node_id_prefix =
(node_id >> (256 - spec.attestation_subnet_prefix_bits() as usize)).as_usize();
let subscription_event_idx = epoch.as_u64() / spec.epochs_per_subnet_subscription;
let permutation_seed =
eth2_hashing::hash(&int_to_bytes::int_to_bytes8(subscription_event_idx));
let num_subnets = 1 << spec.attestation_subnet_prefix_bits();
let permutated_prefix = compute_shuffled_index(
node_id_prefix,
num_subnets,
&permutation_seed,
spec.shuffle_round_count,
)
.ok_or("Unable to shuffle")? as u64;
// Get the constants we need to avoid holding a reference to the spec
let &ChainSpec {
subnets_per_node,
attestation_subnet_count,
..
} = spec;
let subnet_set_generator = (0..subnets_per_node).map(move |idx| {
SubnetId::new((permutated_prefix + idx as u64) % attestation_subnet_count)
});
let valid_until_epoch = (subscription_event_idx + 1) * spec.epochs_per_subnet_subscription;
Ok((subnet_set_generator, valid_until_epoch.into()))
}
} }
impl Deref for SubnetId { impl Deref for SubnetId {