Shift subnet backbone structure (attnets revamp) (#4304)

This PR address the following spec change: https://github.com/ethereum/consensus-specs/pull/3312

Instead of subscribing to a long-lived subnet for every attached validator to a beacon node, all beacon nodes will subscribe to `SUBNETS_PER_NODE` long-lived subnets. This is currently set to 2 for mainnet. 

This PR does not include any scoring or advanced discovery mechanisms. A future PR will improve discovery and we can implement scoring after the next hard fork when we expect all client teams and all implementations to respect this spec change.

This will be a significant change in the subnet network structure for consensus clients and we will likely have to monitor and tweak our peer management logic.
This commit is contained in:
Age Manning 2023-05-30 06:15:56 +00:00
parent d150ccbee5
commit fdea8f2b27
11 changed files with 277 additions and 428 deletions

View File

@ -46,8 +46,4 @@ derivative = "2.2.0"
delay_map = "0.3.0" delay_map = "0.3.0"
ethereum-types = { version = "0.14.1", optional = true } ethereum-types = { version = "0.14.1", optional = true }
operation_pool = { path = "../operation_pool" } operation_pool = { path = "../operation_pool" }
execution_layer = { path = "../execution_layer" } execution_layer = { path = "../execution_layer" }
[features]
deterministic_long_lived_attnets = [ "ethereum-types" ]
# default = ["deterministic_long_lived_attnets"]

View File

@ -317,8 +317,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// attestation subnet service // attestation subnet service
let attestation_service = AttestationService::new( let attestation_service = AttestationService::new(
beacon_chain.clone(), beacon_chain.clone(),
#[cfg(feature = "deterministic_long_lived_attnets")] network_globals.local_enr().node_id(),
network_globals.local_enr().node_id().raw().into(),
config, config,
&network_log, &network_log,
); );

View File

@ -3,7 +3,6 @@
//! determines whether attestations should be aggregated and/or passed to the beacon node. //! determines whether attestations should be aggregated and/or passed to the beacon node.
use super::SubnetServiceMessage; use super::SubnetServiceMessage;
#[cfg(any(test, feature = "deterministic_long_lived_attnets"))]
use std::collections::HashSet; use std::collections::HashSet;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::pin::Pin; use std::pin::Pin;
@ -14,10 +13,8 @@ use std::time::Duration;
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use delay_map::{HashMapDelay, HashSetDelay}; use delay_map::{HashMapDelay, HashSetDelay};
use futures::prelude::*; use futures::prelude::*;
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery}; use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDiscovery};
#[cfg(not(feature = "deterministic_long_lived_attnets"))] use slog::{debug, error, info, o, trace, warn};
use rand::seq::SliceRandom;
use slog::{debug, error, o, trace, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription}; use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
@ -27,10 +24,6 @@ use crate::metrics;
/// slot is less than this number, skip the peer discovery process. /// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes at most 30 secs, 2 slots take 24s. /// Subnet discovery query takes at most 30 secs, 2 slots take 24s.
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from
/// the random gossip topics that we subscribed to due to the validator connection.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150;
/// The fraction of a slot that we subscribe to a subnet before the required slot. /// The fraction of a slot that we subscribe to a subnet before the required slot.
/// ///
/// Currently a whole slot ahead. /// Currently a whole slot ahead.
@ -67,30 +60,23 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// Subnets we are currently subscribed to as short lived subscriptions. /// Subnets we are currently subscribed to as short lived subscriptions.
/// ///
/// Once they expire, we unsubscribe from these. /// Once they expire, we unsubscribe from these.
/// We subscribe to subnets when we are an aggregator for an exact subnet.
short_lived_subscriptions: HashMapDelay<SubnetId, Slot>, short_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
/// Subnets we are currently subscribed to as long lived subscriptions. /// Subnets we are currently subscribed to as long lived subscriptions.
/// ///
/// We advertise these in our ENR. When these expire, the subnet is removed from our ENR. /// We advertise these in our ENR. When these expire, the subnet is removed from our ENR.
#[cfg(feature = "deterministic_long_lived_attnets")] /// These are required of all beacon nodes. The exact number is determined by the chain
/// specification.
long_lived_subscriptions: HashSet<SubnetId>, long_lived_subscriptions: HashSet<SubnetId>,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
long_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
/// Short lived subscriptions that need to be done in the future. /// Short lived subscriptions that need to be executed in the future.
scheduled_short_lived_subscriptions: HashSetDelay<ExactSubnet>, scheduled_short_lived_subscriptions: HashSetDelay<ExactSubnet>,
/// A collection timeouts to track the existence of aggregate validator subscriptions at an /// A collection timeouts to track the existence of aggregate validator subscriptions at an
/// `ExactSubnet`. /// `ExactSubnet`.
aggregate_validators_on_subnet: Option<HashSetDelay<ExactSubnet>>, aggregate_validators_on_subnet: Option<HashSetDelay<ExactSubnet>>,
/// A collection of seen validators. These dictate how many random subnets we should be
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update
/// our ENR.
/// This is a set of validator indices.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
known_validators: HashSetDelay<u64>,
/// The waker for the current thread. /// The waker for the current thread.
waker: Option<std::task::Waker>, waker: Option<std::task::Waker>,
@ -100,16 +86,10 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// We are always subscribed to all subnets. /// We are always subscribed to all subnets.
subscribe_all_subnets: bool, subscribe_all_subnets: bool,
/// For how many slots we subscribe to long lived subnets.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
long_lived_subnet_subscription_slots: u64,
/// Our Discv5 node_id. /// Our Discv5 node_id.
#[cfg(feature = "deterministic_long_lived_attnets")] node_id: NodeId,
node_id: ethereum_types::U256,
/// Future used to manage subscribing and unsubscribing from long lived subnets. /// Future used to manage subscribing and unsubscribing from long lived subnets.
#[cfg(feature = "deterministic_long_lived_attnets")]
next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>, next_long_lived_subscription_event: Pin<Box<tokio::time::Sleep>>,
/// Whether this node is a block proposer-only node. /// Whether this node is a block proposer-only node.
@ -122,62 +102,22 @@ pub struct AttestationService<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> AttestationService<T> { impl<T: BeaconChainTypes> AttestationService<T> {
/* Public functions */ /* Public functions */
#[cfg(not(feature = "deterministic_long_lived_attnets"))] /// Establish the service based on the passed configuration.
pub fn new( pub fn new(
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
node_id: NodeId,
config: &NetworkConfig, config: &NetworkConfig,
log: &slog::Logger, log: &slog::Logger,
) -> Self { ) -> Self {
let log = log.new(o!("service" => "attestation_service")); let log = log.new(o!("service" => "attestation_service"));
// Calculate the random subnet duration from the spec constants.
let spec = &beacon_chain.spec;
let slot_duration = beacon_chain.slot_clock.slot_duration(); let slot_duration = beacon_chain.slot_clock.slot_duration();
let long_lived_subnet_subscription_slots = spec
.epochs_per_random_subnet_subscription
.saturating_mul(T::EthSpec::slots_per_epoch());
let long_lived_subscription_duration = Duration::from_millis(
slot_duration.as_millis() as u64 * long_lived_subnet_subscription_slots,
);
// Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS is not too large. if config.subscribe_all_subnets {
let last_seen_val_timeout = slot_duration slog::info!(log, "Subscribing to all subnets");
.checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS) } else {
.expect("LAST_SEEN_VALIDATOR_TIMEOUT must not be ridiculously large"); slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node, "subscription_duration_in_epochs" => beacon_chain.spec.epochs_per_subnet_subscription);
let track_validators = !config.import_all_attestations;
let aggregate_validators_on_subnet =
track_validators.then(|| HashSetDelay::new(slot_duration));
AttestationService {
events: VecDeque::with_capacity(10),
beacon_chain,
short_lived_subscriptions: HashMapDelay::new(slot_duration),
long_lived_subscriptions: HashMapDelay::new(long_lived_subscription_duration),
scheduled_short_lived_subscriptions: HashSetDelay::default(),
aggregate_validators_on_subnet,
known_validators: HashSetDelay::new(last_seen_val_timeout),
waker: None,
discovery_disabled: config.disable_discovery,
proposer_only: config.proposer_only,
subscribe_all_subnets: config.subscribe_all_subnets,
long_lived_subnet_subscription_slots,
log,
} }
}
#[cfg(feature = "deterministic_long_lived_attnets")]
pub fn new(
beacon_chain: Arc<BeaconChain<T>>,
node_id: ethereum_types::U256,
config: &NetworkConfig,
log: &slog::Logger,
) -> Self {
let log = log.new(o!("service" => "attestation_service"));
// Calculate the random subnet duration from the spec constants.
let slot_duration = beacon_chain.slot_clock.slot_duration();
slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node);
let track_validators = !config.import_all_attestations; let track_validators = !config.import_all_attestations;
let aggregate_validators_on_subnet = let aggregate_validators_on_subnet =
@ -198,9 +138,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// value with a smarter timing // value with a smarter timing
Box::pin(tokio::time::sleep(Duration::from_secs(1))) Box::pin(tokio::time::sleep(Duration::from_secs(1)))
}, },
proposer_only: config.proposer_only,
log, log,
}; };
service.recompute_long_lived_subnets();
// If we are not subscribed to all subnets, handle the deterministic set of subnets
if !config.subscribe_all_subnets {
service.recompute_long_lived_subnets();
}
service service
} }
@ -210,20 +156,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if self.subscribe_all_subnets { if self.subscribe_all_subnets {
self.beacon_chain.spec.attestation_subnet_count as usize self.beacon_chain.spec.attestation_subnet_count as usize
} else { } else {
#[cfg(feature = "deterministic_long_lived_attnets")]
let count = self let count = self
.short_lived_subscriptions .short_lived_subscriptions
.keys() .keys()
.chain(self.long_lived_subscriptions.iter()) .chain(self.long_lived_subscriptions.iter())
.collect::<HashSet<_>>() .collect::<HashSet<_>>()
.len(); .len();
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
let count = self
.short_lived_subscriptions
.keys()
.chain(self.long_lived_subscriptions.keys())
.collect::<HashSet<_>>()
.len();
count count
} }
} }
@ -236,20 +174,20 @@ impl<T: BeaconChainTypes> AttestationService<T> {
subscription_kind: SubscriptionKind, subscription_kind: SubscriptionKind,
) -> bool { ) -> bool {
match subscription_kind { match subscription_kind {
#[cfg(feature = "deterministic_long_lived_attnets")]
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id), SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id),
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::LongLived => self.long_lived_subscriptions.contains_key(subnet_id),
SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id), SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id),
} }
} }
#[cfg(test)]
pub(crate) fn long_lived_subscriptions(&self) -> &HashSet<SubnetId> {
&self.long_lived_subscriptions
}
/// Processes a list of validator subscriptions. /// Processes a list of validator subscriptions.
/// ///
/// This will: /// This will:
/// - Register new validators as being known. /// - Register new validators as being known.
/// - Subscribe to the required number of random subnets.
/// - Update the local ENR for new random subnets due to seeing new validators.
/// - Search for peers for required subnets. /// - Search for peers for required subnets.
/// - Request subscriptions for subnets on specific slots when required. /// - Request subscriptions for subnets on specific slots when required.
/// - Build the timeouts for each of these events. /// - Build the timeouts for each of these events.
@ -267,18 +205,17 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// Maps each subnet_id subscription to it's highest slot // Maps each subnet_id subscription to it's highest slot
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new(); let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
// Registers the validator with the attestation service.
for subscription in subscriptions { for subscription in subscriptions {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS); metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
// Registers the validator with the attestation service.
// This will subscribe to long-lived random subnets if required.
trace!(self.log, trace!(self.log,
"Validator subscription"; "Validator subscription";
"subscription" => ?subscription, "subscription" => ?subscription,
); );
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
self.add_known_validator(subscription.validator_index);
// Compute the subnet that is associated with this subscription
let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>( let subnet_id = match SubnetId::compute_subnet::<T::EthSpec>(
subscription.slot, subscription.slot,
subscription.attestation_committee_index, subscription.attestation_committee_index,
@ -316,7 +253,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if subscription.is_aggregator { if subscription.is_aggregator {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS); metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS);
if let Err(e) = self.subscribe_to_subnet(exact_subnet) { if let Err(e) = self.subscribe_to_short_lived_subnet(exact_subnet) {
warn!(self.log, warn!(self.log,
"Subscription to subnet error"; "Subscription to subnet error";
"error" => e, "error" => e,
@ -347,14 +284,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
Ok(()) Ok(())
} }
#[cfg(feature = "deterministic_long_lived_attnets")]
fn recompute_long_lived_subnets(&mut self) { fn recompute_long_lived_subnets(&mut self) {
// Ensure the next computation is scheduled even if assigning subnets fails. // Ensure the next computation is scheduled even if assigning subnets fails.
let next_subscription_event = self let next_subscription_event = self
.recompute_long_lived_subnets_inner() .recompute_long_lived_subnets_inner()
.unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration()); .unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration());
debug!(self.log, "Recomputing deterministic long lived attnets"); debug!(self.log, "Recomputing deterministic long lived subnets");
self.next_long_lived_subscription_event = self.next_long_lived_subscription_event =
Box::pin(tokio::time::sleep(next_subscription_event)); Box::pin(tokio::time::sleep(next_subscription_event));
@ -365,14 +301,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// Gets the long lived subnets the node should be subscribed to during the current epoch and /// Gets the long lived subnets the node should be subscribed to during the current epoch and
/// the remaining duration for which they remain valid. /// the remaining duration for which they remain valid.
#[cfg(feature = "deterministic_long_lived_attnets")]
fn recompute_long_lived_subnets_inner(&mut self) -> Result<Duration, ()> { fn recompute_long_lived_subnets_inner(&mut self) -> Result<Duration, ()> {
let current_epoch = self.beacon_chain.epoch().map_err( let current_epoch = self.beacon_chain.epoch().map_err(
|e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e), |e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e),
)?; )?;
let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::<T::EthSpec>( let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::<T::EthSpec>(
self.node_id, self.node_id.raw().into(),
current_epoch, current_epoch,
&self.beacon_chain.spec, &self.beacon_chain.spec,
) )
@ -396,17 +331,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
Ok(next_subscription_event) Ok(next_subscription_event)
} }
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
pub fn update_long_lived_subnets_testing(&mut self, subnets: HashSet<SubnetId>) {
self.update_long_lived_subnets(subnets)
}
/// Updates the long lived subnets. /// Updates the long lived subnets.
/// ///
/// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr /// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr
/// updated accordingly. /// updated accordingly.
#[cfg(feature = "deterministic_long_lived_attnets")]
fn update_long_lived_subnets(&mut self, mut subnets: HashSet<SubnetId>) { fn update_long_lived_subnets(&mut self, mut subnets: HashSet<SubnetId>) {
info!(self.log, "Subscribing to long-lived subnets"; "subnets" => ?subnets.iter().collect::<Vec<_>>());
for subnet in &subnets { for subnet in &subnets {
// Add the events for those subnets that are new as long lived subscriptions. // Add the events for those subnets that are new as long lived subscriptions.
if !self.long_lived_subscriptions.contains(subnet) { if !self.long_lived_subscriptions.contains(subnet) {
@ -430,28 +360,15 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
} }
// Check for subnets that are being removed // Update the long_lived_subnets set and check for subnets that are being removed
std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets); std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets);
for subnet in subnets { for subnet in subnets {
if !self.long_lived_subscriptions.contains(&subnet) { if !self.long_lived_subscriptions.contains(&subnet) {
if !self.short_lived_subscriptions.contains_key(&subnet) { self.handle_removed_subnet(subnet, SubscriptionKind::LongLived);
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet, "subscription_kind" => ?SubscriptionKind::LongLived);
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet,
)));
}
self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(subnet)));
} }
} }
} }
/// Overwrites the long lived subscriptions for testing.
#[cfg(all(test, feature = "deterministic_long_lived_attnets"))]
pub fn set_long_lived_subscriptions(&mut self, subnets: HashSet<SubnetId>) {
self.long_lived_subscriptions = subnets
}
/// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip
/// verification, re-propagates and returns false. /// verification, re-propagates and returns false.
pub fn should_process_attestation( pub fn should_process_attestation(
@ -535,7 +452,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
// Subscribes to the subnet if it should be done immediately, or schedules it if required. // Subscribes to the subnet if it should be done immediately, or schedules it if required.
fn subscribe_to_subnet( fn subscribe_to_short_lived_subnet(
&mut self, &mut self,
ExactSubnet { subnet_id, slot }: ExactSubnet, ExactSubnet { subnet_id, slot }: ExactSubnet,
) -> Result<(), &'static str> { ) -> Result<(), &'static str> {
@ -564,12 +481,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// immediately. // immediately.
if time_to_subscription_start.is_zero() { if time_to_subscription_start.is_zero() {
// This is a current or past slot, we subscribe immediately. // This is a current or past slot, we subscribe immediately.
self.subscribe_to_subnet_immediately( self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)?;
subnet_id,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::ShortLived,
slot + 1,
)?;
} else { } else {
// This is a future slot, schedule subscribing. // This is a future slot, schedule subscribing.
trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start); trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start);
@ -580,79 +492,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
Ok(()) Ok(())
} }
/// Updates the `known_validators` mapping and subscribes to long lived subnets if required.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn add_known_validator(&mut self, validator_index: u64) {
let previously_known = self.known_validators.contains_key(&validator_index);
// Add the new validator or update the current timeout for a known validator.
self.known_validators.insert(validator_index);
if !previously_known {
// New validator has subscribed.
// Subscribe to random topics and update the ENR if needed.
self.subscribe_to_random_subnets();
}
}
/// Subscribe to long-lived random subnets and update the local ENR bitfield.
/// The number of subnets to subscribe depends on the number of active validators and number of
/// current subscriptions.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn subscribe_to_random_subnets(&mut self) {
if self.subscribe_all_subnets {
// This case is not handled by this service.
return;
}
let max_subnets = self.beacon_chain.spec.attestation_subnet_count;
// Calculate how many subnets we need,
let required_long_lived_subnets = {
let subnets_for_validators = self
.known_validators
.len()
.saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize);
subnets_for_validators // How many subnets we need
.min(max_subnets as usize) // Capped by the max
.saturating_sub(self.long_lived_subscriptions.len()) // Minus those we have
};
if required_long_lived_subnets == 0 {
// Nothing to do.
return;
}
// Build a list of the subnets that we are not currently advertising.
let available_subnets = (0..max_subnets)
.map(SubnetId::new)
.filter(|subnet_id| !self.long_lived_subscriptions.contains_key(subnet_id))
.collect::<Vec<_>>();
let subnets_to_subscribe: Vec<_> = available_subnets
.choose_multiple(&mut rand::thread_rng(), required_long_lived_subnets)
.cloned()
.collect();
// Calculate in which slot does this subscription end.
let end_slot = match self.beacon_chain.slot_clock.now() {
Some(slot) => slot + self.long_lived_subnet_subscription_slots,
None => {
return debug!(
self.log,
"Failed to calculate end slot of long lived subnet subscriptions."
)
}
};
for subnet_id in &subnets_to_subscribe {
if let Err(e) = self.subscribe_to_subnet_immediately(
*subnet_id,
SubscriptionKind::LongLived,
end_slot,
) {
debug!(self.log, "Failed to subscribe to long lived subnet"; "subnet" => ?subnet_id, "err" => e);
}
}
}
/* A collection of functions that handle the various timeouts */ /* A collection of functions that handle the various timeouts */
/// Registers a subnet as subscribed. /// Registers a subnet as subscribed.
@ -662,11 +501,9 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// out the appropriate events. /// out the appropriate events.
/// ///
/// On determinist long lived subnets, this is only used for short lived subscriptions. /// On determinist long lived subnets, this is only used for short lived subscriptions.
fn subscribe_to_subnet_immediately( fn subscribe_to_short_lived_subnet_immediately(
&mut self, &mut self,
subnet_id: SubnetId, subnet_id: SubnetId,
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
subscription_kind: SubscriptionKind,
end_slot: Slot, end_slot: Slot,
) -> Result<(), &'static str> { ) -> Result<(), &'static str> {
if self.subscribe_all_subnets { if self.subscribe_all_subnets {
@ -685,25 +522,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
return Err("Time when subscription would end has already passed."); return Err("Time when subscription would end has already passed.");
} }
#[cfg(feature = "deterministic_long_lived_attnets")]
let subscription_kind = SubscriptionKind::ShortLived; let subscription_kind = SubscriptionKind::ShortLived;
// We need to check and add a subscription for the right kind, regardless of the presence // We need to check and add a subscription for the right kind, regardless of the presence
// of the subnet as a subscription of the other kind. This is mainly since long lived // of the subnet as a subscription of the other kind. This is mainly since long lived
// subscriptions can be removed at any time when a validator goes offline. // subscriptions can be removed at any time when a validator goes offline.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind {
SubscriptionKind::ShortLived => (
&mut self.short_lived_subscriptions,
self.long_lived_subscriptions.contains_key(&subnet_id),
),
SubscriptionKind::LongLived => (
&mut self.long_lived_subscriptions,
self.short_lived_subscriptions.contains_key(&subnet_id),
),
};
#[cfg(feature = "deterministic_long_lived_attnets")]
let (subscriptions, already_subscribed_as_other_kind) = ( let (subscriptions, already_subscribed_as_other_kind) = (
&mut self.short_lived_subscriptions, &mut self.short_lived_subscriptions,
self.long_lived_subscriptions.contains(&subnet_id), self.long_lived_subscriptions.contains(&subnet_id),
@ -738,57 +562,19 @@ impl<T: BeaconChainTypes> AttestationService<T> {
subnet_id, subnet_id,
))); )));
} }
// If this is a new long lived subscription, send out the appropriate events.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
if SubscriptionKind::LongLived == subscription_kind {
let subnet = Subnet::Attestation(subnet_id);
// Advertise this subnet in our ENR.
self.long_lived_subscriptions.insert_at(
subnet_id,
end_slot,
time_to_subscription_end,
);
self.queue_event(SubnetServiceMessage::EnrAdd(subnet));
if !self.discovery_disabled {
self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![
SubnetDiscovery {
subnet,
min_ttl: None,
},
]))
}
}
} }
} }
Ok(()) Ok(())
} }
/// A random subnet has expired.
///
/// This function selects a new subnet to join, or extends the expiry if there are no more
/// available subnets to choose from.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) {
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
// Remove the ENR bitfield bit and choose a new random on from the available subnets
// Subscribe to a new random subnet.
self.subscribe_to_random_subnets();
}
// Unsubscribes from a subnet that was removed if it does not continue to exist as a // Unsubscribes from a subnet that was removed if it does not continue to exist as a
// subscription of the other kind. For long lived subscriptions, it also removes the // subscription of the other kind. For long lived subscriptions, it also removes the
// advertisement from our ENR. // advertisement from our ENR.
fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) { fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) {
let exists_in_other_subscriptions = match subscription_kind { let exists_in_other_subscriptions = match subscription_kind {
SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id), SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id),
#[cfg(feature = "deterministic_long_lived_attnets")]
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id), SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id),
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains_key(&subnet_id),
}; };
if !exists_in_other_subscriptions { if !exists_in_other_subscriptions {
@ -806,48 +592,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
))); )));
} }
} }
/// A known validator has not sent a subscription in a while. They are considered offline and the
/// beacon node no longer needs to be subscribed to the allocated random subnets.
///
/// We don't keep track of a specific validator to random subnet, rather the ratio of active
/// validators to random subnets. So when a validator goes offline, we can simply remove the
/// allocated amount of random subnets.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn handle_known_validator_expiry(&mut self) {
// Calculate how many subnets should we remove.
let extra_subnet_count = {
let max_subnets = self.beacon_chain.spec.attestation_subnet_count;
let subnets_for_validators = self
.known_validators
.len()
.saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize)
.min(max_subnets as usize);
self.long_lived_subscriptions
.len()
.saturating_sub(subnets_for_validators)
};
if extra_subnet_count == 0 {
// Nothing to do
return;
}
let advertised_subnets = self
.long_lived_subscriptions
.keys()
.cloned()
.collect::<Vec<_>>();
let to_remove_subnets = advertised_subnets
.choose_multiple(&mut rand::thread_rng(), extra_subnet_count)
.cloned();
for subnet_id in to_remove_subnets {
self.long_lived_subscriptions.remove(&subnet_id);
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
}
}
} }
impl<T: BeaconChainTypes> Stream for AttestationService<T> { impl<T: BeaconChainTypes> Stream for AttestationService<T> {
@ -868,37 +612,34 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
return Poll::Ready(Some(event)); return Poll::Ready(Some(event));
} }
// Process first any known validator expiries, since these affect how many long lived // If we aren't subscribed to all subnets, handle the deterministic long-lived subnets
// subnets we need. if !self.subscribe_all_subnets {
#[cfg(not(feature = "deterministic_long_lived_attnets"))] match self.next_long_lived_subscription_event.as_mut().poll(cx) {
match self.known_validators.poll_next_unpin(cx) { Poll::Ready(_) => {
Poll::Ready(Some(Ok(_validator_index))) => { self.recompute_long_lived_subnets();
self.handle_known_validator_expiry(); // We re-wake the task as there could be other subscriptions to process
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
}
Poll::Pending => {}
} }
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
#[cfg(feature = "deterministic_long_lived_attnets")]
match self.next_long_lived_subscription_event.as_mut().poll(cx) {
Poll::Ready(_) => self.recompute_long_lived_subnets(),
Poll::Pending => {}
} }
// Process scheduled subscriptions that might be ready, since those can extend a soon to // Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription. // expire subscription.
match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) { match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => { Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => {
if let Err(e) = self.subscribe_to_subnet_immediately( if let Err(e) =
subnet_id, self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)
#[cfg(not(feature = "deterministic_long_lived_attnets"))] {
SubscriptionKind::ShortLived,
slot + 1,
) {
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e); debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e);
} }
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e); error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e);
@ -910,6 +651,11 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
match self.short_lived_subscriptions.poll_next_unpin(cx) { match self.short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => { Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived); self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived);
// We re-wake the task as there could be other subscriptions to process
self.waker
.as_ref()
.expect("Waker has been set")
.wake_by_ref();
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e); error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
@ -917,18 +663,6 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
Poll::Ready(None) | Poll::Pending => {} Poll::Ready(None) | Poll::Pending => {}
} }
// Process any random subnet expiries.
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
match self.long_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
self.handle_random_subnet_expiry(subnet_id)
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Poll to remove entries on expiration, no need to act on expiration events. // Poll to remove entries on expiration, no need to act on expiration events.
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() { if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) { if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) {

View File

@ -126,10 +126,7 @@ fn get_attestation_service(
AttestationService::new( AttestationService::new(
beacon_chain, beacon_chain,
#[cfg(feature = "deterministic_long_lived_attnets")] lighthouse_network::discv5::enr::NodeId::random(),
lighthouse_network::discv5::enr::NodeId::random()
.raw()
.into(),
&config, &config,
&log, &log,
) )
@ -179,9 +176,6 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
mod attestation_service { mod attestation_service {
#[cfg(feature = "deterministic_long_lived_attnets")]
use std::collections::HashSet;
#[cfg(not(windows))] #[cfg(not(windows))]
use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD; use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
@ -192,8 +186,8 @@ mod attestation_service {
attestation_committee_index: CommitteeIndex, attestation_committee_index: CommitteeIndex,
slot: Slot, slot: Slot,
committee_count_at_slot: u64, committee_count_at_slot: u64,
is_aggregator: bool,
) -> ValidatorSubscription { ) -> ValidatorSubscription {
let is_aggregator = true;
ValidatorSubscription { ValidatorSubscription {
validator_index, validator_index,
attestation_committee_index, attestation_committee_index,
@ -203,11 +197,11 @@ mod attestation_service {
} }
} }
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
fn get_subscriptions( fn get_subscriptions(
validator_count: u64, validator_count: u64,
slot: Slot, slot: Slot,
committee_count_at_slot: u64, committee_count_at_slot: u64,
is_aggregator: bool,
) -> Vec<ValidatorSubscription> { ) -> Vec<ValidatorSubscription> {
(0..validator_count) (0..validator_count)
.map(|validator_index| { .map(|validator_index| {
@ -216,6 +210,7 @@ mod attestation_service {
validator_index, validator_index,
slot, slot,
committee_count_at_slot, committee_count_at_slot,
is_aggregator,
) )
}) })
.collect() .collect()
@ -229,6 +224,7 @@ mod attestation_service {
// Keep a low subscription slot so that there are no additional subnet discovery events. // Keep a low subscription slot so that there are no additional subnet discovery events.
let subscription_slot = 0; let subscription_slot = 0;
let committee_count = 1; let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// create the attestation service and subscriptions // create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None); let mut attestation_service = get_attestation_service(None);
@ -243,6 +239,7 @@ mod attestation_service {
committee_index, committee_index,
current_slot + Slot::new(subscription_slot), current_slot + Slot::new(subscription_slot),
committee_count, committee_count,
true,
)]; )];
// submit the subscriptions // submit the subscriptions
@ -266,16 +263,19 @@ mod attestation_service {
// Wait for 1 slot duration to get the unsubscription event // Wait for 1 slot duration to get the unsubscription event
let events = get_events( let events = get_events(
&mut attestation_service, &mut attestation_service,
Some(5), Some(subnets_per_node * 3 + 2),
(MainnetEthSpec::slots_per_epoch() * 3) as u32, (MainnetEthSpec::slots_per_epoch() * 3) as u32,
) )
.await; .await;
matches::assert_matches!( matches::assert_matches!(
events[..3], events[..6],
[ [
SubnetServiceMessage::Subscribe(_any1), SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3), SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_), SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_),
SubnetServiceMessage::EnrAdd(_),
SubnetServiceMessage::DiscoverPeers(_),
] ]
); );
@ -284,10 +284,10 @@ mod attestation_service {
if !attestation_service if !attestation_service
.is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived) .is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived)
{ {
assert_eq!(expected[..], events[3..]); assert_eq!(expected[..], events[subnets_per_node * 3..]);
} }
// Should be subscribed to only 1 long lived subnet after unsubscription. // Should be subscribed to only subnets_per_node long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1); assert_eq!(attestation_service.subscription_count(), subnets_per_node);
} }
/// Test to verify that we are not unsubscribing to a subnet before a required subscription. /// Test to verify that we are not unsubscribing to a subnet before a required subscription.
@ -297,6 +297,7 @@ mod attestation_service {
// subscription config // subscription config
let validator_index = 1; let validator_index = 1;
let committee_count = 1; let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// Makes 2 validator subscriptions to the same subnet but at different slots. // Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
@ -318,6 +319,7 @@ mod attestation_service {
com1, com1,
current_slot + Slot::new(subscription_slot1), current_slot + Slot::new(subscription_slot1),
committee_count, committee_count,
true,
); );
let sub2 = get_subscription( let sub2 = get_subscription(
@ -325,6 +327,7 @@ mod attestation_service {
com2, com2,
current_slot + Slot::new(subscription_slot2), current_slot + Slot::new(subscription_slot2),
committee_count, committee_count,
true,
); );
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>( let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
@ -366,16 +369,22 @@ mod attestation_service {
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are // Should be still subscribed to 2 long lived and up to 1 short lived subnet if both are
// different. // different.
if !attestation_service.is_subscribed( if !attestation_service.is_subscribed(
&subnet_id1, &subnet_id1,
attestation_subnets::SubscriptionKind::LongLived, attestation_subnets::SubscriptionKind::LongLived,
) { ) {
assert_eq!(expected, events[3]); // The index is 3*subnets_per_node (because we subscribe + discover + enr per long lived
assert_eq!(attestation_service.subscription_count(), 2); // subnet) + 1
let index = 3 * subnets_per_node;
assert_eq!(expected, events[index]);
assert_eq!(
attestation_service.subscription_count(),
subnets_per_node + 1
);
} else { } else {
assert_eq!(attestation_service.subscription_count(), 1); assert!(attestation_service.subscription_count() == subnets_per_node);
} }
// Get event for 1 more slot duration, we should get the unsubscribe event now. // Get event for 1 more slot duration, we should get the unsubscribe event now.
@ -395,17 +404,17 @@ mod attestation_service {
); );
} }
// Should be subscribed to only 1 long lived subnet after unsubscription. // Should be subscribed 2 long lived subnet after unsubscription.
assert_eq!(attestation_service.subscription_count(), 1); assert_eq!(attestation_service.subscription_count(), subnets_per_node);
} }
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
#[tokio::test] #[tokio::test]
async fn subscribe_all_random_subnets() { async fn subscribe_all_subnets() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 10; let subscription_slot = 3;
let subscription_count = attestation_subnet_count; let subscription_count = attestation_subnet_count;
let committee_count = 1; let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// create the attestation service and subscriptions // create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None); let mut attestation_service = get_attestation_service(None);
@ -419,6 +428,7 @@ mod attestation_service {
subscription_count, subscription_count,
current_slot + subscription_slot, current_slot + subscription_slot,
committee_count, committee_count,
true,
); );
// submit the subscriptions // submit the subscriptions
@ -426,42 +436,52 @@ mod attestation_service {
.validator_subscriptions(subscriptions) .validator_subscriptions(subscriptions)
.unwrap(); .unwrap();
let events = get_events(&mut attestation_service, None, 3).await; let events = get_events(&mut attestation_service, Some(131), 10).await;
let mut discover_peer_count = 0; let mut discover_peer_count = 0;
let mut enr_add_count = 0; let mut enr_add_count = 0;
let mut unexpected_msg_count = 0; let mut unexpected_msg_count = 0;
let mut unsubscribe_event_count = 0;
for event in &events { for event in &events {
match event { match event {
SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1, SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1,
SubnetServiceMessage::Subscribe(_any_subnet) => {} SubnetServiceMessage::Subscribe(_any_subnet) => {}
SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1, SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1,
SubnetServiceMessage::Unsubscribe(_) => unsubscribe_event_count += 1,
_ => unexpected_msg_count += 1, _ => unexpected_msg_count += 1,
} }
} }
// There should be a Subscribe Event, and Enr Add event and a DiscoverPeers event for each
// long-lived subnet initially. The next event should be a bulk discovery event.
let bulk_discovery_index = 3 * subnets_per_node;
// The bulk discovery request length should be equal to validator_count // The bulk discovery request length should be equal to validator_count
let bulk_discovery_event = events.last().unwrap(); let bulk_discovery_event = &events[bulk_discovery_index];
if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event { if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event {
assert_eq!(d.len(), attestation_subnet_count as usize); assert_eq!(d.len(), attestation_subnet_count as usize);
} else { } else {
panic!("Unexpected event {:?}", bulk_discovery_event); panic!("Unexpected event {:?}", bulk_discovery_event);
} }
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets // 64 `DiscoverPeer` requests of length 1 corresponding to deterministic subnets
// and 1 `DiscoverPeer` request corresponding to bulk subnet discovery. // and 1 `DiscoverPeer` request corresponding to bulk subnet discovery.
assert_eq!(discover_peer_count, subscription_count + 1); assert_eq!(discover_peer_count, subnets_per_node + 1);
assert_eq!(attestation_service.subscription_count(), 64); assert_eq!(attestation_service.subscription_count(), subnets_per_node);
assert_eq!(enr_add_count, 64); assert_eq!(enr_add_count, subnets_per_node);
assert_eq!(
unsubscribe_event_count,
attestation_subnet_count - subnets_per_node as u64
);
assert_eq!(unexpected_msg_count, 0); assert_eq!(unexpected_msg_count, 0);
// test completed successfully // test completed successfully
} }
#[cfg(not(feature = "deterministic_long_lived_attnets"))]
#[tokio::test] #[tokio::test]
async fn subscribe_all_random_subnets_plus_one() { async fn subscribe_correct_number_of_subnets() {
let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count;
let subscription_slot = 10; let subscription_slot = 10;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// the 65th subscription should result in no more messages than the previous scenario // the 65th subscription should result in no more messages than the previous scenario
let subscription_count = attestation_subnet_count + 1; let subscription_count = attestation_subnet_count + 1;
let committee_count = 1; let committee_count = 1;
@ -478,6 +498,7 @@ mod attestation_service {
subscription_count, subscription_count,
current_slot + subscription_slot, current_slot + subscription_slot,
committee_count, committee_count,
true,
); );
// submit the subscriptions // submit the subscriptions
@ -506,12 +527,12 @@ mod attestation_service {
} else { } else {
panic!("Unexpected event {:?}", bulk_discovery_event); panic!("Unexpected event {:?}", bulk_discovery_event);
} }
// 64 `DiscoverPeer` requests of length 1 corresponding to random subnets // subnets_per_node `DiscoverPeer` requests of length 1 corresponding to long-lived subnets
// and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery. // and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery.
// For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity.
assert_eq!(discover_peer_count, 64 + 1); assert_eq!(discover_peer_count, subnets_per_node + 1);
assert_eq!(attestation_service.subscription_count(), 64); assert_eq!(attestation_service.subscription_count(), subnets_per_node);
assert_eq!(enr_add_count, 64); assert_eq!(enr_add_count, subnets_per_node);
assert_eq!(unexpected_msg_count, 0); assert_eq!(unexpected_msg_count, 0);
} }
@ -521,6 +542,7 @@ mod attestation_service {
// subscription config // subscription config
let validator_index = 1; let validator_index = 1;
let committee_count = 1; let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
// Makes 2 validator subscriptions to the same subnet but at different slots. // Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
@ -542,6 +564,7 @@ mod attestation_service {
com1, com1,
current_slot + Slot::new(subscription_slot1), current_slot + Slot::new(subscription_slot1),
committee_count, committee_count,
true,
); );
let sub2 = get_subscription( let sub2 = get_subscription(
@ -549,6 +572,7 @@ mod attestation_service {
com2, com2,
current_slot + Slot::new(subscription_slot2), current_slot + Slot::new(subscription_slot2),
committee_count, committee_count,
true,
); );
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>( let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
@ -596,11 +620,10 @@ mod attestation_service {
&subnet_id1, &subnet_id1,
attestation_subnets::SubscriptionKind::LongLived, attestation_subnets::SubscriptionKind::LongLived,
) { ) {
assert_eq!(expected_subscription, events[3]); assert_eq!(expected_subscription, events[subnets_per_node * 3]);
// fourth is a discovery event assert_eq!(expected_unsubscription, events[subnets_per_node * 3 + 2]);
assert_eq!(expected_unsubscription, events[5]);
} }
assert_eq!(attestation_service.subscription_count(), 1); assert_eq!(attestation_service.subscription_count(), 2);
println!("{events:?}"); println!("{events:?}");
let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the
@ -633,40 +656,44 @@ mod attestation_service {
} }
#[tokio::test] #[tokio::test]
#[cfg(feature = "deterministic_long_lived_attnets")]
async fn test_update_deterministic_long_lived_subnets() { async fn test_update_deterministic_long_lived_subnets() {
let mut attestation_service = get_attestation_service(None); let mut attestation_service = get_attestation_service(None);
let new_subnet = SubnetId::new(1); let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
let maintained_subnet = SubnetId::new(2);
let removed_subnet = SubnetId::new(3);
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let subscriptions = get_subscriptions(20, current_slot, 30, false);
// submit the subscriptions
attestation_service attestation_service
.set_long_lived_subscriptions(HashSet::from([removed_subnet, maintained_subnet])); .validator_subscriptions(subscriptions)
// clear initial events .unwrap();
let _events = get_events(&mut attestation_service, None, 1).await;
attestation_service // There should only be the same subscriptions as there are in the specification,
.update_long_lived_subnets_testing(HashSet::from([maintained_subnet, new_subnet])); // regardless of subscriptions
let events = get_events(&mut attestation_service, None, 1).await;
let new_subnet = Subnet::Attestation(new_subnet);
let removed_subnet = Subnet::Attestation(removed_subnet);
assert_eq!( assert_eq!(
events, attestation_service.long_lived_subscriptions().len(),
subnets_per_node
);
let events = get_events(&mut attestation_service, None, 4).await;
// Check that we attempt to subscribe and register ENRs
matches::assert_matches!(
events[..6],
[ [
// events for the new subnet SubnetServiceMessage::Subscribe(_),
SubnetServiceMessage::Subscribe(new_subnet), SubnetServiceMessage::EnrAdd(_),
SubnetServiceMessage::EnrAdd(new_subnet), SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery { SubnetServiceMessage::Subscribe(_),
subnet: new_subnet, SubnetServiceMessage::EnrAdd(_),
min_ttl: None SubnetServiceMessage::DiscoverPeers(_),
}]),
// events for the removed subnet
SubnetServiceMessage::Unsubscribe(removed_subnet),
SubnetServiceMessage::EnrRemove(removed_subnet),
] ]
); );
println!("{events:?}")
} }
} }

View File

@ -86,3 +86,7 @@ PROPOSER_SCORE_BOOST: 40
DEPOSIT_CHAIN_ID: 100 DEPOSIT_CHAIN_ID: 100
DEPOSIT_NETWORK_ID: 100 DEPOSIT_NETWORK_ID: 100
DEPOSIT_CONTRACT_ADDRESS: 0x0B98057eA310F4d31F2a452B414647007d1645d9 DEPOSIT_CONTRACT_ADDRESS: 0x0B98057eA310F4d31F2a452B414647007d1645d9
# Network
# ---------------------------------------------------------------
SUBNETS_PER_NODE: 4

View File

@ -86,3 +86,7 @@ PROPOSER_SCORE_BOOST: 40
DEPOSIT_CHAIN_ID: 1 DEPOSIT_CHAIN_ID: 1
DEPOSIT_NETWORK_ID: 1 DEPOSIT_NETWORK_ID: 1
DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa
# Network
# ---------------------------------------------------------------
SUBNETS_PER_NODE: 2

View File

@ -86,3 +86,7 @@ DEPOSIT_CHAIN_ID: 5
DEPOSIT_NETWORK_ID: 5 DEPOSIT_NETWORK_ID: 5
# Prater test deposit contract on Goerli Testnet # Prater test deposit contract on Goerli Testnet
DEPOSIT_CONTRACT_ADDRESS: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b DEPOSIT_CONTRACT_ADDRESS: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b
# Network
# ---------------------------------------------------------------
SUBNETS_PER_NODE: 2

View File

@ -74,3 +74,7 @@ PROPOSER_SCORE_BOOST: 40
DEPOSIT_CHAIN_ID: 11155111 DEPOSIT_CHAIN_ID: 11155111
DEPOSIT_NETWORK_ID: 11155111 DEPOSIT_NETWORK_ID: 11155111
DEPOSIT_CONTRACT_ADDRESS: 0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D DEPOSIT_CONTRACT_ADDRESS: 0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D
# Network
# ---------------------------------------------------------------
SUBNETS_PER_NODE: 2

View File

@ -168,11 +168,9 @@ pub struct ChainSpec {
pub maximum_gossip_clock_disparity_millis: u64, pub maximum_gossip_clock_disparity_millis: u64,
pub target_aggregators_per_committee: u64, pub target_aggregators_per_committee: u64,
pub attestation_subnet_count: u64, pub attestation_subnet_count: u64,
pub random_subnets_per_validator: u64,
pub epochs_per_random_subnet_subscription: u64,
pub subnets_per_node: u8, pub subnets_per_node: u8,
pub epochs_per_subnet_subscription: u64, pub epochs_per_subnet_subscription: u64,
attestation_subnet_extra_bits: u8, pub attestation_subnet_extra_bits: u8,
/* /*
* Application params * Application params
@ -455,17 +453,7 @@ impl ChainSpec {
#[allow(clippy::integer_arithmetic)] #[allow(clippy::integer_arithmetic)]
pub const fn attestation_subnet_prefix_bits(&self) -> u32 { pub const fn attestation_subnet_prefix_bits(&self) -> u32 {
// maybe use log2 when stable https://github.com/rust-lang/rust/issues/70887 let attestation_subnet_count_bits = self.attestation_subnet_count.ilog2();
// NOTE: this line is here simply to guarantee that if self.attestation_subnet_count type
// is changed, a compiler warning will be raised. This code depends on the type being u64.
let attestation_subnet_count: u64 = self.attestation_subnet_count;
let attestation_subnet_count_bits = if attestation_subnet_count == 0 {
0
} else {
63 - attestation_subnet_count.leading_zeros()
};
self.attestation_subnet_extra_bits as u32 + attestation_subnet_count_bits self.attestation_subnet_extra_bits as u32 + attestation_subnet_count_bits
} }
@ -625,13 +613,11 @@ impl ChainSpec {
network_id: 1, // mainnet network id network_id: 1, // mainnet network id
attestation_propagation_slot_range: 32, attestation_propagation_slot_range: 32,
attestation_subnet_count: 64, attestation_subnet_count: 64,
random_subnets_per_validator: 1, subnets_per_node: 2,
subnets_per_node: 1,
maximum_gossip_clock_disparity_millis: 500, maximum_gossip_clock_disparity_millis: 500,
target_aggregators_per_committee: 16, target_aggregators_per_committee: 16,
epochs_per_random_subnet_subscription: 256,
epochs_per_subnet_subscription: 256, epochs_per_subnet_subscription: 256,
attestation_subnet_extra_bits: 6, attestation_subnet_extra_bits: 0,
/* /*
* Application specific * Application specific
@ -852,13 +838,11 @@ impl ChainSpec {
network_id: 100, // Gnosis Chain network id network_id: 100, // Gnosis Chain network id
attestation_propagation_slot_range: 32, attestation_propagation_slot_range: 32,
attestation_subnet_count: 64, attestation_subnet_count: 64,
random_subnets_per_validator: 1, subnets_per_node: 4, // Make this larger than usual to avoid network damage
subnets_per_node: 1,
maximum_gossip_clock_disparity_millis: 500, maximum_gossip_clock_disparity_millis: 500,
target_aggregators_per_committee: 16, target_aggregators_per_committee: 16,
epochs_per_random_subnet_subscription: 256,
epochs_per_subnet_subscription: 256, epochs_per_subnet_subscription: 256,
attestation_subnet_extra_bits: 6, attestation_subnet_extra_bits: 0,
/* /*
* Application specific * Application specific
@ -946,6 +930,9 @@ pub struct Config {
shard_committee_period: u64, shard_committee_period: u64,
#[serde(with = "serde_utils::quoted_u64")] #[serde(with = "serde_utils::quoted_u64")]
eth1_follow_distance: u64, eth1_follow_distance: u64,
#[serde(default = "default_subnets_per_node")]
#[serde(with = "serde_utils::quoted_u8")]
subnets_per_node: u8,
#[serde(with = "serde_utils::quoted_u64")] #[serde(with = "serde_utils::quoted_u64")]
inactivity_score_bias: u64, inactivity_score_bias: u64,
@ -1002,6 +989,10 @@ fn default_safe_slots_to_import_optimistically() -> u64 {
128u64 128u64
} }
fn default_subnets_per_node() -> u8 {
2u8
}
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
let chain_spec = MainnetEthSpec::default_spec(); let chain_spec = MainnetEthSpec::default_spec();
@ -1084,6 +1075,7 @@ impl Config {
min_validator_withdrawability_delay: spec.min_validator_withdrawability_delay, min_validator_withdrawability_delay: spec.min_validator_withdrawability_delay,
shard_committee_period: spec.shard_committee_period, shard_committee_period: spec.shard_committee_period,
eth1_follow_distance: spec.eth1_follow_distance, eth1_follow_distance: spec.eth1_follow_distance,
subnets_per_node: spec.subnets_per_node,
inactivity_score_bias: spec.inactivity_score_bias, inactivity_score_bias: spec.inactivity_score_bias,
inactivity_score_recovery_rate: spec.inactivity_score_recovery_rate, inactivity_score_recovery_rate: spec.inactivity_score_recovery_rate,
@ -1130,6 +1122,7 @@ impl Config {
min_validator_withdrawability_delay, min_validator_withdrawability_delay,
shard_committee_period, shard_committee_period,
eth1_follow_distance, eth1_follow_distance,
subnets_per_node,
inactivity_score_bias, inactivity_score_bias,
inactivity_score_recovery_rate, inactivity_score_recovery_rate,
ejection_balance, ejection_balance,
@ -1162,6 +1155,7 @@ impl Config {
min_validator_withdrawability_delay, min_validator_withdrawability_delay,
shard_committee_period, shard_committee_period,
eth1_follow_distance, eth1_follow_distance,
subnets_per_node,
inactivity_score_bias, inactivity_score_bias,
inactivity_score_recovery_rate, inactivity_score_recovery_rate,
ejection_balance, ejection_balance,

View File

@ -86,10 +86,6 @@ pub fn get_extra_fields(spec: &ChainSpec) -> HashMap<String, Value> {
"domain_application_mask".to_uppercase()=> u32_hex(spec.domain_application_mask), "domain_application_mask".to_uppercase()=> u32_hex(spec.domain_application_mask),
"target_aggregators_per_committee".to_uppercase() => "target_aggregators_per_committee".to_uppercase() =>
spec.target_aggregators_per_committee.to_string().into(), spec.target_aggregators_per_committee.to_string().into(),
"random_subnets_per_validator".to_uppercase() =>
spec.random_subnets_per_validator.to_string().into(),
"epochs_per_random_subnet_subscription".to_uppercase() =>
spec.epochs_per_random_subnet_subscription.to_string().into(),
"domain_contribution_and_proof".to_uppercase() => "domain_contribution_and_proof".to_uppercase() =>
u32_hex(spec.domain_contribution_and_proof), u32_hex(spec.domain_contribution_and_proof),
"domain_sync_committee".to_uppercase() => u32_hex(spec.domain_sync_committee), "domain_sync_committee".to_uppercase() => u32_hex(spec.domain_sync_committee),

View File

@ -80,15 +80,26 @@ impl SubnetId {
epoch: Epoch, epoch: Epoch,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<(impl Iterator<Item = SubnetId>, Epoch), &'static str> { ) -> Result<(impl Iterator<Item = SubnetId>, Epoch), &'static str> {
// Simplify the variable name
let subscription_duration = spec.epochs_per_subnet_subscription;
let node_id_prefix = let node_id_prefix =
(node_id >> (256 - spec.attestation_subnet_prefix_bits() as usize)).as_usize(); (node_id >> (256 - spec.attestation_subnet_prefix_bits() as usize)).as_usize();
let subscription_event_idx = epoch.as_u64() / spec.epochs_per_subnet_subscription; // NOTE: The as_u64() panics if the number is larger than u64::max_value(). This cannot be
// true as spec.epochs_per_subnet_subscription is a u64.
let node_offset = (node_id % ethereum_types::U256::from(subscription_duration)).as_u64();
// Calculate at which epoch this node needs to re-evaluate
let valid_until_epoch = epoch.as_u64()
+ subscription_duration
.saturating_sub((epoch.as_u64() + node_offset) % subscription_duration);
let subscription_event_idx = (epoch.as_u64() + node_offset) / subscription_duration;
let permutation_seed = let permutation_seed =
ethereum_hashing::hash(&int_to_bytes::int_to_bytes8(subscription_event_idx)); ethereum_hashing::hash(&int_to_bytes::int_to_bytes8(subscription_event_idx));
let num_subnets = 1 << spec.attestation_subnet_prefix_bits(); let num_subnets = 1 << spec.attestation_subnet_prefix_bits();
let permutated_prefix = compute_shuffled_index( let permutated_prefix = compute_shuffled_index(
node_id_prefix, node_id_prefix,
num_subnets, num_subnets,
@ -107,7 +118,6 @@ impl SubnetId {
let subnet_set_generator = (0..subnets_per_node).map(move |idx| { let subnet_set_generator = (0..subnets_per_node).map(move |idx| {
SubnetId::new((permutated_prefix + idx as u64) % attestation_subnet_count) SubnetId::new((permutated_prefix + idx as u64) % attestation_subnet_count)
}); });
let valid_until_epoch = (subscription_event_idx + 1) * spec.epochs_per_subnet_subscription;
Ok((subnet_set_generator, valid_until_epoch.into())) Ok((subnet_set_generator, valid_until_epoch.into()))
} }
} }
@ -149,3 +159,80 @@ impl AsRef<str> for SubnetId {
subnet_id_to_string(self.0) subnet_id_to_string(self.0)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
/// A set of tests compared to the python specification
#[test]
fn compute_subnets_for_epoch_unit_test() {
// Randomized variables used generated with the python specification
let node_ids = [
"0",
"88752428858350697756262172400162263450541348766581994718383409852729519486397",
"18732750322395381632951253735273868184515463718109267674920115648614659369468",
"27726842142488109545414954493849224833670205008410190955613662332153332462900",
"39755236029158558527862903296867805548949739810920318269566095185775868999998",
"31899136003441886988955119620035330314647133604576220223892254902004850516297",
"58579998103852084482416614330746509727562027284701078483890722833654510444626",
"28248042035542126088870192155378394518950310811868093527036637864276176517397",
"60930578857433095740782970114409273483106482059893286066493409689627770333527",
"103822458477361691467064888613019442068586830412598673713899771287914656699997",
]
.into_iter()
.map(|v| ethereum_types::U256::from_dec_str(v).unwrap())
.collect::<Vec<_>>();
let epochs = [
54321u64, 1017090249, 1827566880, 846255942, 766597383, 1204990115, 1616209495,
1774367616, 1484598751, 3525502229,
]
.into_iter()
.map(Epoch::from)
.collect::<Vec<_>>();
// Test mainnet
let spec = ChainSpec::mainnet();
// Calculated by hand
let expected_valid_time: Vec<u64> = [
54528, 1017090371, 1827567108, 846256076, 766597570, 1204990135, 1616209582,
1774367723, 1484598953, 3525502371,
]
.into();
// Calculated from pyspec
let expected_subnets = vec![
vec![4u64, 5u64],
vec![61, 62],
vec![23, 24],
vec![38, 39],
vec![53, 54],
vec![39, 40],
vec![48, 49],
vec![39, 40],
vec![34, 35],
vec![37, 38],
];
for x in 0..node_ids.len() {
println!("Test: {}", x);
println!(
"NodeId: {}\n Epoch: {}\n, expected_update_time: {}\n, expected_subnets: {:?}",
node_ids[x], epochs[x], expected_valid_time[x], expected_subnets[x]
);
let (computed_subnets, valid_time) = SubnetId::compute_subnets_for_epoch::<
crate::MainnetEthSpec,
>(node_ids[x], epochs[x], &spec)
.unwrap();
assert_eq!(Epoch::from(expected_valid_time[x]), valid_time);
assert_eq!(
expected_subnets[x],
computed_subnets.map(SubnetId::into).collect::<Vec<u64>>()
);
}
}
}