Subscribe to subnets only when needed (#3419)

## Issue Addressed

We currently subscribe to attestation subnets as soon as the subscription arrives (one epoch in advance), this makes it so that subscriptions for future slots are scheduled instead of done immediately. 

## Proposed Changes

- Schedule subscriptions to subnets for future slots.
- Finish removing hashmap_delay, in favor of [delay_map](https://github.com/AgeManning/delay_map). This was the only remaining service to do this.
- Subscriptions for past slots are rejected, before we would subscribe for one slot.
- Add a new test for subscriptions that are not consecutive.

## Additional Info

This is also an effort in making the code easier to understand
This commit is contained in:
Divma 2022-09-05 00:22:48 +00:00
parent aa022f4685
commit 473abc14ca
11 changed files with 587 additions and 567 deletions

23
Cargo.lock generated
View File

@ -1250,6 +1250,16 @@ version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b72465f46d518f6015d9cf07f7f3013a95dd6b9c2747c3d65ae0cce43929d14f"
[[package]]
name = "delay_map"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6716ce9729be9628979ae1ff63e8bc8b7ad53b5472a2633bf079607a55328d36"
dependencies = [
"futures",
"tokio-util 0.6.10",
]
[[package]]
name = "deposit_contract"
version = "0.2.0"
@ -2554,15 +2564,6 @@ dependencies = [
"hashbrown 0.11.2",
]
[[package]]
name = "hashset_delay"
version = "0.2.0"
dependencies = [
"futures",
"tokio",
"tokio-util 0.6.10",
]
[[package]]
name = "headers"
version = "0.3.7"
@ -3656,6 +3657,7 @@ dependencies = [
name = "lighthouse_network"
version = "0.2.0"
dependencies = [
"delay_map",
"directory",
"dirs",
"discv5",
@ -3666,7 +3668,6 @@ dependencies = [
"exit-future",
"fnv",
"futures",
"hashset_delay",
"hex",
"lazy_static",
"libp2p",
@ -4146,6 +4147,7 @@ name = "network"
version = "0.2.0"
dependencies = [
"beacon_chain",
"delay_map",
"derivative",
"environment",
"error-chain",
@ -4155,7 +4157,6 @@ dependencies = [
"fnv",
"futures",
"genesis",
"hashset_delay",
"hex",
"if-addrs 0.6.7",
"igd",

View File

@ -28,7 +28,6 @@ members = [
"common/eth2_interop_keypairs",
"common/eth2_network_config",
"common/eth2_wallet_manager",
"common/hashset_delay",
"common/lighthouse_metrics",
"common/lighthouse_version",
"common/lockfile",

View File

@ -8,7 +8,6 @@ edition = "2021"
discv5 = { version = "0.1.0-beta.13", features = ["libp2p"] }
unsigned-varint = { version = "0.6.0", features = ["codec"] }
types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" }
eth2_ssz_types = "0.2.2"
serde = { version = "1.0.116", features = ["derive"] }
serde_derive = "1.0.116"
@ -40,6 +39,7 @@ strum = { version = "0.24.0", features = ["derive"] }
superstruct = "0.5.0"
prometheus-client = "0.16.0"
unused_port = { path = "../../common/unused_port" }
delay_map = "0.1.1"
[dependencies.libp2p]
version = "0.45.1"

View File

@ -5,8 +5,8 @@ use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCo
use crate::{error, metrics, Gossipsub};
use crate::{NetworkGlobals, PeerId};
use crate::{Subnet, SubnetDiscovery};
use delay_map::HashSetDelay;
use discv5::Enr;
use hashset_delay::HashSetDelay;
use libp2p::identify::IdentifyInfo;
use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom;

View File

@ -17,7 +17,6 @@ environment = { path = "../../lighthouse/environment" }
beacon_chain = { path = "../beacon_chain" }
store = { path = "../store" }
lighthouse_network = { path = "../lighthouse_network" }
hashset_delay = { path = "../../common/hashset_delay" }
types = { path = "../../consensus/types" }
slot_clock = { path = "../../common/slot_clock" }
slog = { version = "2.5.2", features = ["max_level_trace"] }
@ -44,3 +43,4 @@ if-addrs = "0.6.4"
strum = "0.24.0"
tokio-util = { version = "0.6.3", features = ["time"] }
derivative = "2.2.0"
delay_map = "0.1.1"

View File

@ -3,19 +3,20 @@
//! determines whether attestations should be aggregated and/or passed to the beacon node.
use super::SubnetServiceMessage;
use std::collections::{HashMap, HashSet, VecDeque};
#[cfg(test)]
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::prelude::*;
use rand::seq::SliceRandom;
use slog::{debug, error, o, trace, warn};
use std::time::Duration;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use hashset_delay::HashSetDelay;
use delay_map::{HashMapDelay, HashSetDelay};
use futures::prelude::*;
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery};
use rand::seq::SliceRandom;
use slog::{debug, error, o, trace, warn};
use slot_clock::SlotClock;
use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription};
@ -24,20 +25,29 @@ use crate::metrics;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process.
/// Subnet discovery query takes at most 30 secs, 2 slots take 24s.
const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
/// gossip topics that we subscribed to due to the validator connection.
const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150;
pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2;
/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from
/// the random gossip topics that we subscribed to due to the validator connection.
const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150;
/// The fraction of a slot that we subscribe to a subnet before the required slot.
///
/// Note: The time is calculated as `time = seconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`.
const ADVANCE_SUBSCRIBE_TIME: u32 = 3;
/// The default number of slots before items in hash delay sets used by this class should expire.
/// 36s at 12s slot time
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3;
/// Currently a whole slot ahead.
const ADVANCE_SUBSCRIBE_SLOT_FRACTION: u32 = 1;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub(crate) enum SubscriptionKind {
/// Long lived subscriptions.
///
/// These have a longer duration and are advertised in our ENR.
LongLived,
/// Short lived subscriptions.
///
/// Subscribing to these subnets has a short duration and we don't advertise it in our ENR.
ShortLived,
}
/// A particular subnet at a given slot.
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
pub struct ExactSubnet {
/// The `SubnetId` associated with this subnet.
pub subnet_id: SubnetId,
@ -52,17 +62,22 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// A reference to the beacon chain to process received attestations.
pub(crate) beacon_chain: Arc<BeaconChain<T>>,
/// The collection of currently subscribed random subnets mapped to their expiry deadline.
pub(crate) random_subnets: HashSetDelay<SubnetId>,
/// Subnets we are currently subscribed to as short lived subscriptions.
///
/// Once they expire, we unsubscribe from these.
short_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
/// The collection of all currently subscribed subnets (long-lived **and** short-lived).
subscriptions: HashSet<SubnetId>,
/// Subnets we are currently subscribed to as long lived subscriptions.
///
/// We advertise these in our ENR. When these expire, the subnet is removed from our ENR.
long_lived_subscriptions: HashMapDelay<SubnetId, Slot>,
/// A collection of timeouts for when to unsubscribe from a shard subnet.
unsubscriptions: HashSetDelay<ExactSubnet>,
/// Short lived subscriptions that need to be done in the future.
scheduled_short_lived_subscriptions: HashSetDelay<ExactSubnet>,
/// A collection timeouts to track the existence of aggregate validator subscriptions at an `ExactSubnet`.
aggregate_validators_on_subnet: HashSetDelay<ExactSubnet>,
/// A collection timeouts to track the existence of aggregate validator subscriptions at an
/// `ExactSubnet`.
aggregate_validators_on_subnet: Option<HashSetDelay<ExactSubnet>>,
/// A collection of seen validators. These dictate how many random subnets we should be
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update
@ -79,8 +94,8 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// We are always subscribed to all subnets.
subscribe_all_subnets: bool,
/// We process and aggregate all attestations on subscribed subnets.
import_all_attestations: bool,
/// For how many slots we subscribe to long lived subnets.
long_lived_subnet_subscription_slots: u64,
/// The logger for the attestation service.
log: slog::Logger,
@ -96,34 +111,36 @@ impl<T: BeaconChainTypes> AttestationService<T> {
) -> Self {
let log = log.new(o!("service" => "attestation_service"));
// calculate the random subnet duration from the spec constants
// Calculate the random subnet duration from the spec constants.
let spec = &beacon_chain.spec;
let slot_duration = beacon_chain.slot_clock.slot_duration();
let random_subnet_duration_millis = spec
let long_lived_subnet_subscription_slots = spec
.epochs_per_random_subnet_subscription
.saturating_mul(T::EthSpec::slots_per_epoch())
.saturating_mul(slot_duration.as_millis() as u64);
.saturating_mul(T::EthSpec::slots_per_epoch());
let long_lived_subscription_duration = Duration::from_millis(
slot_duration.as_millis() as u64 * long_lived_subnet_subscription_slots,
);
// Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT is not too large.
// Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS is not too large.
let last_seen_val_timeout = slot_duration
.checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT)
.checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS)
.expect("LAST_SEEN_VALIDATOR_TIMEOUT must not be ridiculously large");
let default_timeout = slot_duration
.checked_mul(DEFAULT_EXPIRATION_TIMEOUT)
.expect("DEFAULT_EXPIRATION_TIMEOUT must not be ridiculoustly large");
let track_validators = !config.import_all_attestations;
let aggregate_validators_on_subnet =
track_validators.then(|| HashSetDelay::new(slot_duration));
AttestationService {
events: VecDeque::with_capacity(10),
beacon_chain,
random_subnets: HashSetDelay::new(Duration::from_millis(random_subnet_duration_millis)),
subscriptions: HashSet::new(),
unsubscriptions: HashSetDelay::new(default_timeout),
aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
short_lived_subscriptions: HashMapDelay::new(slot_duration),
long_lived_subscriptions: HashMapDelay::new(long_lived_subscription_duration),
scheduled_short_lived_subscriptions: HashSetDelay::default(),
aggregate_validators_on_subnet,
known_validators: HashSetDelay::new(last_seen_val_timeout),
waker: None,
subscribe_all_subnets: config.subscribe_all_subnets,
import_all_attestations: config.import_all_attestations,
discovery_disabled: config.disable_discovery,
subscribe_all_subnets: config.subscribe_all_subnets,
long_lived_subnet_subscription_slots,
log,
}
}
@ -134,10 +151,25 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if self.subscribe_all_subnets {
self.beacon_chain.spec.attestation_subnet_count as usize
} else {
self.subscriptions.len()
self.short_lived_subscriptions
.keys()
.chain(self.long_lived_subscriptions.keys())
.collect::<HashSet<_>>()
.len()
}
}
/// Give access to the current subscriptions for testing purposes.
#[cfg(test)]
pub(crate) fn subscriptions(
&self,
subscription_kind: SubscriptionKind,
) -> &HashMapDelay<SubnetId, Slot> {
match subscription_kind {
SubscriptionKind::LongLived => &self.long_lived_subscriptions,
SubscriptionKind::ShortLived => &self.short_lived_subscriptions,
}
}
/// Processes a list of validator subscriptions.
///
/// This will:
@ -158,7 +190,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
let mut subnets_to_discover: HashMap<SubnetId, Slot> = HashMap::new();
for subscription in subscriptions {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS);
//NOTE: We assume all subscriptions have been verified before reaching this service
// Registers the validator with the attestation service.
// This will subscribe to long-lived random subnets if required.
@ -205,8 +236,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
if subscription.is_aggregator {
metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS);
// set the subscription timer to subscribe to the next subnet if required
if let Err(e) = self.subscribe_to_subnet(exact_subnet.clone()) {
if let Err(e) = self.subscribe_to_subnet(exact_subnet) {
warn!(self.log,
"Subscription to subnet error";
"error" => e,
@ -234,10 +264,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
};
}
// pre-emptively wake the thread to check for new events
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
Ok(())
}
@ -248,19 +274,27 @@ impl<T: BeaconChainTypes> AttestationService<T> {
subnet: SubnetId,
attestation: &Attestation<T::EthSpec>,
) -> bool {
if self.import_all_attestations {
return true;
}
let exact_subnet = ExactSubnet {
subnet_id: subnet,
slot: attestation.data.slot,
};
self.aggregate_validators_on_subnet.contains(&exact_subnet)
self.aggregate_validators_on_subnet
.as_ref()
.map(|tracked_vals| {
tracked_vals.contains_key(&ExactSubnet {
subnet_id: subnet,
slot: attestation.data.slot,
})
})
.unwrap_or(true)
}
/* Internal private functions */
/// Adds an event to the event queue and notifies that this service is ready to be polled
/// again.
fn queue_event(&mut self, ev: SubnetServiceMessage) {
self.events.push_back(ev);
if let Some(waker) = &self.waker {
waker.wake_by_ref()
}
}
/// Checks if there are currently queued discovery requests and the time required to make the
/// request.
///
@ -277,12 +311,13 @@ impl<T: BeaconChainTypes> AttestationService<T> {
let discovery_subnets: Vec<SubnetDiscovery> = exact_subnets
.filter_map(|exact_subnet| {
// check if there is enough time to perform a discovery lookup
// Check if there is enough time to perform a discovery lookup.
if exact_subnet.slot
>= current_slot.saturating_add(MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD)
{
// if the slot is more than epoch away, add an event to start looking for peers
// add one slot to ensure we keep the peer for the subscription slot
// Send out an event to start looking for peers.
// Require the peer for an additional slot to ensure we keep the peer for the
// duration of the subscription.
let min_ttl = self
.beacon_chain
.slot_clock
@ -305,244 +340,279 @@ impl<T: BeaconChainTypes> AttestationService<T> {
.collect();
if !discovery_subnets.is_empty() {
self.events
.push_back(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
self.queue_event(SubnetServiceMessage::DiscoverPeers(discovery_subnets));
}
Ok(())
}
/// Checks the current random subnets and subscriptions to determine if a new subscription for this
/// subnet is required for the given slot.
///
/// If required, adds a subscription event and an associated unsubscription event.
fn subscribe_to_subnet(&mut self, exact_subnet: ExactSubnet) -> Result<(), &'static str> {
// initialise timing variables
let current_slot = self
.beacon_chain
.slot_clock
.now()
.ok_or("Could not get the current slot")?;
// Subscribes to the subnet if it should be done immediately, or schedules it if required.
fn subscribe_to_subnet(
&mut self,
ExactSubnet { subnet_id, slot }: ExactSubnet,
) -> Result<(), &'static str> {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// Calculate the duration to the unsubscription event.
// There are two main cases. Attempting to subscribe to the current slot and all others.
let expected_end_subscription_duration = if current_slot >= exact_subnet.slot {
self.beacon_chain
// Calculate how long before we need to subscribe to the subnet.
let time_to_subscription_start = {
// The short time we schedule the subscription before it's actually required. This
// ensures we are subscribed on time, and allows consecutive subscriptions to the same
// subnet to overlap, reducing subnet churn.
let advance_subscription_duration = slot_duration / ADVANCE_SUBSCRIBE_SLOT_FRACTION;
// The time to the required slot.
let time_to_subscription_slot = self
.beacon_chain
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?
} else {
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
// the duration until we no longer need this subscription. We assume a single slot is
// sufficient.
self.beacon_chain
.slot_clock
.duration_to_slot(exact_subnet.slot)
.ok_or("Unable to determine duration to subscription slot")?
+ slot_duration
.duration_to_slot(slot)
.unwrap_or_default(); // If this is a past slot we will just get a 0 duration.
time_to_subscription_slot.saturating_sub(advance_subscription_duration)
};
// Regardless of whether or not we have already subscribed to a subnet, track the expiration
// of aggregate validator subscriptions to exact subnets so we know whether or not to drop
// attestations for a given subnet + slot
self.aggregate_validators_on_subnet
.insert_at(exact_subnet.clone(), expected_end_subscription_duration);
// Checks on current subscriptions
// Note: We may be connected to a long-lived random subnet. In this case we still add the
// subscription timeout and check this case when the timeout fires. This is because a
// long-lived random subnet can be unsubscribed at any time when a validator becomes
// in-active. This case is checked on the subscription event (see `handle_subscriptions`).
// Return if we already have a subscription for this subnet_id and slot
if self.unsubscriptions.contains(&exact_subnet) || self.subscribe_all_subnets {
return Ok(());
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
tracked_vals.insert(ExactSubnet { subnet_id, slot });
}
// We are not currently subscribed and have no waiting subscription, create one
self.handle_subscriptions(exact_subnet.clone());
// If the subscription should be done in the future, schedule it. Otherwise subscribe
// immediately.
if time_to_subscription_start.is_zero() {
// This is a current or past slot, we subscribe immediately.
self.subscribe_to_subnet_immediately(
subnet_id,
SubscriptionKind::ShortLived,
slot + 1,
)?;
} else {
// This is a future slot, schedule subscribing.
trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start);
self.scheduled_short_lived_subscriptions
.insert_at(ExactSubnet { subnet_id, slot }, time_to_subscription_start);
}
// if there is an unsubscription event for the slot prior, we remove it to prevent
// unsubscriptions immediately after the subscription. We also want to minimize
// subscription churn and maintain a consecutive subnet subscriptions.
self.unsubscriptions.retain(|subnet| {
!(subnet.subnet_id == exact_subnet.subnet_id && subnet.slot <= exact_subnet.slot)
});
// add an unsubscription event to remove ourselves from the subnet once completed
self.unsubscriptions
.insert_at(exact_subnet, expected_end_subscription_duration);
Ok(())
}
/// Updates the `known_validators` mapping and subscribes to a set of random subnets if required.
///
/// This also updates the ENR to indicate our long-lived subscription to the subnet
/// Updates the `known_validators` mapping and subscribes to long lived subnets if required.
fn add_known_validator(&mut self, validator_index: u64) {
if self.known_validators.get(&validator_index).is_none() && !self.subscribe_all_subnets {
// New validator has subscribed
// Subscribe to random topics and update the ENR if needed.
let spec = &self.beacon_chain.spec;
if self.random_subnets.len() < spec.attestation_subnet_count as usize {
// Still room for subscriptions
self.subscribe_to_random_subnets(
self.beacon_chain.spec.random_subnets_per_validator as usize,
);
}
}
// add the new validator or update the current timeout for a known validator
let previously_known = self.known_validators.contains_key(&validator_index);
// Add the new validator or update the current timeout for a known validator.
self.known_validators.insert(validator_index);
if !previously_known {
// New validator has subscribed.
// Subscribe to random topics and update the ENR if needed.
self.subscribe_to_random_subnets();
}
}
/// Subscribe to long-lived random subnets and update the local ENR bitfield.
fn subscribe_to_random_subnets(&mut self, no_subnets_to_subscribe: usize) {
let subnet_count = self.beacon_chain.spec.attestation_subnet_count;
/// The number of subnets to subscribe depends on the number of active validators and number of
/// current subscriptions.
fn subscribe_to_random_subnets(&mut self) {
if self.subscribe_all_subnets {
// This case is not handled by this service.
return;
}
// Build a list of random subnets that we are not currently subscribed to.
let available_subnets = (0..subnet_count)
let max_subnets = self.beacon_chain.spec.attestation_subnet_count;
// Calculate how many subnets we need,
let required_long_lived_subnets = {
let subnets_for_validators = self
.known_validators
.len()
.saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize);
subnets_for_validators // How many subnets we need
.min(max_subnets as usize) // Capped by the max
.saturating_sub(self.long_lived_subscriptions.len()) // Minus those we have
};
if required_long_lived_subnets == 0 {
// Nothing to do.
return;
}
// Build a list of the subnets that we are not currently advertising.
let available_subnets = (0..max_subnets)
.map(SubnetId::new)
.filter(|subnet_id| self.random_subnets.get(subnet_id).is_none())
.filter(|subnet_id| !self.long_lived_subscriptions.contains_key(subnet_id))
.collect::<Vec<_>>();
let to_subscribe_subnets = {
if available_subnets.len() < no_subnets_to_subscribe {
debug!(self.log, "Reached maximum random subnet subscriptions");
available_subnets
} else {
// select a random sample of available subnets
available_subnets
.choose_multiple(&mut rand::thread_rng(), no_subnets_to_subscribe)
.cloned()
.collect::<Vec<_>>()
let subnets_to_subscribe: Vec<_> = available_subnets
.choose_multiple(&mut rand::thread_rng(), required_long_lived_subnets)
.cloned()
.collect();
// Calculate in which slot does this subscription end.
let end_slot = match self.beacon_chain.slot_clock.now() {
Some(slot) => slot + self.long_lived_subnet_subscription_slots,
None => {
return debug!(
self.log,
"Failed to calculate end slot of long lived subnet subscriptions."
)
}
};
for subnet_id in to_subscribe_subnets {
// remove this subnet from any immediate un-subscription events
self.unsubscriptions
.retain(|exact_subnet| exact_subnet.subnet_id != subnet_id);
// insert a new random subnet
self.random_subnets.insert(subnet_id);
// send discovery request
// Note: it's wasteful to send a DiscoverPeers request if we already have peers for this subnet.
// However, subscribing to random subnets ideally shouldn't happen very often (once in ~27 hours) and
// this makes it easier to deterministically test the attestations service.
self.events
.push_back(SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery {
subnet: Subnet::Attestation(subnet_id),
min_ttl: None,
}]));
// if we are not already subscribed, then subscribe
if !self.subscriptions.contains(&subnet_id) {
self.subscriptions.insert(subnet_id);
debug!(self.log, "Subscribing to random subnet"; "subnet_id" => ?subnet_id);
self.events
.push_back(SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id,
)));
for subnet_id in &subnets_to_subscribe {
if let Err(e) = self.subscribe_to_subnet_immediately(
*subnet_id,
SubscriptionKind::LongLived,
end_slot,
) {
debug!(self.log, "Failed to subscribe to long lived subnet"; "subnet" => ?subnet_id, "err" => e);
}
// add the subnet to the ENR bitfield
self.events
.push_back(SubnetServiceMessage::EnrAdd(Subnet::Attestation(subnet_id)));
}
}
/* A collection of functions that handle the various timeouts */
/// A queued subscription is ready.
/// Registers a subnet as subscribed.
///
/// We add subscriptions events even if we are already subscribed to a random subnet (as these
/// can be unsubscribed at any time by inactive validators). If we are
/// still subscribed at the time the event fires, we don't re-subscribe.
fn handle_subscriptions(&mut self, exact_subnet: ExactSubnet) {
// Check if the subnet currently exists as a long-lasting random subnet
if let Some(expiry) = self.random_subnets.get(&exact_subnet.subnet_id) {
// we are subscribed via a random subnet, if this is to expire during the time we need
// to be subscribed, just extend the expiry
let slot_duration = self.beacon_chain.slot_clock.slot_duration();
let advance_subscription_duration = slot_duration
.checked_div(ADVANCE_SUBSCRIBE_TIME)
.expect("ADVANCE_SUBSCRIPTION_TIME cannot be too large");
// we require the subnet subscription for at least a slot on top of the initial
// subscription time
let expected_end_subscription_duration = slot_duration + advance_subscription_duration;
/// Checks that the time in which the subscription would end is not in the past. If we are
/// already subscribed, extends the timeout if necessary. If this is a new subscription, we send
/// out the appropriate events.
fn subscribe_to_subnet_immediately(
&mut self,
subnet_id: SubnetId,
subscription_kind: SubscriptionKind,
end_slot: Slot,
) -> Result<(), &'static str> {
if self.subscribe_all_subnets {
// Case not handled by this service.
return Ok(());
}
if expiry < &(Instant::now() + expected_end_subscription_duration) {
self.random_subnets
.update_timeout(&exact_subnet.subnet_id, expected_end_subscription_duration);
let time_to_subscription_end = self
.beacon_chain
.slot_clock
.duration_to_slot(end_slot)
.unwrap_or_default();
// First check this is worth doing.
if time_to_subscription_end.is_zero() {
return Err("Time when subscription would end has already passed.");
}
// We need to check and add a subscription for the right kind, regardless of the presence
// of the subnet as a subscription of the other kind. This is mainly since long lived
// subscriptions can be removed at any time when a validator goes offline.
let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind {
SubscriptionKind::ShortLived => (
&mut self.short_lived_subscriptions,
self.long_lived_subscriptions.contains_key(&subnet_id),
),
SubscriptionKind::LongLived => (
&mut self.long_lived_subscriptions,
self.short_lived_subscriptions.contains_key(&subnet_id),
),
};
match subscriptions.get(&subnet_id) {
Some(current_end_slot) => {
// We are already subscribed. Check if we need to extend the subscription.
if &end_slot > current_end_slot {
trace!(self.log, "Extending subscription to subnet";
"subnet" => ?subnet_id,
"prev_end_slot" => current_end_slot,
"new_end_slot" => end_slot,
"subscription_kind" => ?subscription_kind,
);
subscriptions.insert_at(subnet_id, end_slot, time_to_subscription_end);
}
}
} else {
// we are also not un-subscribing from a subnet if the next slot requires us to be
// subscribed. Therefore there could be the case that we are already still subscribed
// to the required subnet. In which case we do not issue another subscription request.
if !self.subscriptions.contains(&exact_subnet.subnet_id) {
// we are not already subscribed
debug!(self.log, "Subscribing to subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot.as_u64());
self.subscriptions.insert(exact_subnet.subnet_id);
self.events
.push_back(SubnetServiceMessage::Subscribe(Subnet::Attestation(
exact_subnet.subnet_id,
None => {
// This is a new subscription. Add with the corresponding timeout and send the
// notification.
subscriptions.insert_at(subnet_id, end_slot, time_to_subscription_end);
// Inform of the subscription.
if !already_subscribed_as_other_kind {
debug!(self.log, "Subscribing to subnet";
"subnet" => ?subnet_id,
"end_slot" => end_slot,
"subscription_kind" => ?subscription_kind,
);
self.queue_event(SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id,
)));
}
// If this is a new long lived subscription, send out the appropriate events.
if SubscriptionKind::LongLived == subscription_kind {
let subnet = Subnet::Attestation(subnet_id);
// Advertise this subnet in our ENR.
self.long_lived_subscriptions.insert_at(
subnet_id,
end_slot,
time_to_subscription_end,
);
self.queue_event(SubnetServiceMessage::EnrAdd(subnet));
if !self.discovery_disabled {
self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![
SubnetDiscovery {
subnet,
min_ttl: None,
},
]))
}
}
}
}
}
/// A queued unsubscription is ready.
///
/// Unsubscription events are added, even if we are subscribed to long-lived random subnets. If
/// a random subnet is present, we do not unsubscribe from it.
fn handle_unsubscriptions(&mut self, exact_subnet: ExactSubnet) {
// Check if the subnet currently exists as a long-lasting random subnet
if self.random_subnets.contains(&exact_subnet.subnet_id) {
return;
}
debug!(self.log, "Unsubscribing from subnet"; "subnet" => *exact_subnet.subnet_id, "processed_slot" => exact_subnet.slot.as_u64());
self.subscriptions.remove(&exact_subnet.subnet_id);
self.events
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
exact_subnet.subnet_id,
)));
Ok(())
}
/// A random subnet has expired.
///
/// This function selects a new subnet to join, or extends the expiry if there are no more
/// available subnets to choose from.
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) {
fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId, end_slot: Slot) {
let subnet_count = self.beacon_chain.spec.attestation_subnet_count;
if self.random_subnets.len() == (subnet_count - 1) as usize {
// We are at capacity, simply increase the timeout of the current subnet
self.random_subnets.insert(subnet_id);
return;
}
// If there are no unsubscription events for `subnet_id`, we unsubscribe immediately.
if !self
.unsubscriptions
.keys()
.any(|s| s.subnet_id == subnet_id)
{
// we are not at capacity, unsubscribe from the current subnet.
debug!(self.log, "Unsubscribing from random subnet"; "subnet_id" => *subnet_id);
self.events
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
if self.long_lived_subscriptions.len() == (subnet_count - 1) as usize {
let end_slot = end_slot + self.long_lived_subnet_subscription_slots;
// This is just an extra accuracy precaution, we could use the default timeout if
// needed.
if let Some(time_to_subscription_end) =
self.beacon_chain.slot_clock.duration_to_slot(end_slot)
{
// We are at capacity, simply increase the timeout of the current subnet.
self.long_lived_subscriptions.insert_at(
subnet_id,
)));
end_slot + 1,
time_to_subscription_end,
);
} else {
self.long_lived_subscriptions.insert(subnet_id, end_slot);
}
return;
}
// Remove the ENR bitfield bit and choose a new random on from the available subnets
self.events
.push_back(SubnetServiceMessage::EnrRemove(Subnet::Attestation(
// Subscribe to a new random subnet.
self.subscribe_to_random_subnets();
}
// Unsubscribes from a subnet that was removed if it does not continue to exist as a
// subscription of the other kind. For long lived subscriptions, it also removes the
// advertisement from our ENR.
fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) {
let other_subscriptions = match subscription_kind {
SubscriptionKind::LongLived => &self.short_lived_subscriptions,
SubscriptionKind::ShortLived => &self.long_lived_subscriptions,
};
if !other_subscriptions.contains_key(&subnet_id) {
// Subscription no longer exists as short lived or long lived.
debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet_id, "subscription_kind" => ?subscription_kind);
self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id,
)));
// Subscribe to a new random subnet
self.subscribe_to_random_subnets(1);
}
if subscription_kind == SubscriptionKind::LongLived {
// Remove from our ENR even if we remain subscribed in other way.
self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(
subnet_id,
)));
}
}
/// A known validator has not sent a subscription in a while. They are considered offline and the
@ -552,39 +622,37 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// validators to random subnets. So when a validator goes offline, we can simply remove the
/// allocated amount of random subnets.
fn handle_known_validator_expiry(&mut self) {
let spec = &self.beacon_chain.spec;
let subnet_count = spec.attestation_subnet_count;
let random_subnets_per_validator = spec.random_subnets_per_validator;
if self.known_validators.len() as u64 * random_subnets_per_validator >= subnet_count {
// have too many validators, ignore
// Calculate how many subnets should we remove.
let extra_subnet_count = {
let max_subnets = self.beacon_chain.spec.attestation_subnet_count;
let subnets_for_validators = self
.known_validators
.len()
.saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize)
.min(max_subnets as usize);
self.long_lived_subscriptions
.len()
.saturating_sub(subnets_for_validators)
};
if extra_subnet_count == 0 {
// Nothing to do
return;
}
let subscribed_subnets = self.random_subnets.keys().cloned().collect::<Vec<_>>();
let to_remove_subnets = subscribed_subnets.choose_multiple(
&mut rand::thread_rng(),
random_subnets_per_validator as usize,
);
let advertised_subnets = self
.long_lived_subscriptions
.keys()
.cloned()
.collect::<Vec<_>>();
let to_remove_subnets = advertised_subnets
.choose_multiple(&mut rand::thread_rng(), extra_subnet_count)
.cloned();
for subnet_id in to_remove_subnets {
// If there are no unsubscription events for `subnet_id`, we unsubscribe immediately.
if !self
.unsubscriptions
.keys()
.any(|s| s.subnet_id == *subnet_id)
{
self.events
.push_back(SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
*subnet_id,
)));
}
// as the long lasting subnet subscription is being removed, remove the subnet_id from
// the ENR bitfield
self.events
.push_back(SubnetServiceMessage::EnrRemove(Subnet::Attestation(
*subnet_id,
)));
self.random_subnets.remove(subnet_id);
self.long_lived_subscriptions.remove(&subnet_id);
self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived);
}
}
}
@ -593,7 +661,7 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
type Item = SubnetServiceMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// update the waker if needed
// Update the waker if needed.
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
@ -602,25 +670,13 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
self.waker = Some(cx.waker().clone());
}
// process any un-subscription events
match self.unsubscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(exact_subnet))) => self.handle_unsubscriptions(exact_subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
// Send out any generated events.
if let Some(event) = self.events.pop_front() {
return Poll::Ready(Some(event));
}
// process any random subnet expiries
match self.random_subnets.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(subnet))) => self.handle_random_subnet_expiry(subnet),
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any known validator expiries
// Process first any known validator expiries, since these affect how many long lived
// subnets we need.
match self.known_validators.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(_validator_index))) => {
self.handle_known_validator_expiry();
@ -630,14 +686,52 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
}
Poll::Ready(None) | Poll::Pending => {}
}
// poll to remove entries on expiration, no need to act on expiration events
if let Poll::Ready(Some(Err(e))) = self.aggregate_validators_on_subnet.poll_next_unpin(cx) {
error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> e);
// Process scheduled subscriptions that might be ready, since those can extend a soon to
// expire subscription.
match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => {
if let Err(e) = self.subscribe_to_subnet_immediately(
subnet_id,
SubscriptionKind::ShortLived,
slot + 1,
) {
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e);
}
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// process any generated events
if let Some(event) = self.events.pop_front() {
return Poll::Ready(Some(event));
// Finally process any expired subscriptions.
match self.short_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => {
self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived);
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Process any random subnet expiries.
match self.long_lived_subscriptions.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((subnet_id, end_slot)))) => {
self.handle_random_subnet_expiry(subnet_id, end_slot)
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for random subnet cycles"; "error"=> e);
}
Poll::Ready(None) | Poll::Pending => {}
}
// Poll to remove entries on expiration, no need to act on expiration events.
if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() {
if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) {
error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> e);
}
}
Poll::Pending

View File

@ -12,7 +12,7 @@ use slog::{debug, error, o, trace, warn};
use super::SubnetServiceMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use hashset_delay::HashSetDelay;
use delay_map::HashSetDelay;
use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery};
use slot_clock::SlotClock;
use types::{Epoch, EthSpec, SyncCommitteeSubscription, SyncSubnetId};

View File

@ -8,7 +8,7 @@ use futures::prelude::*;
use genesis::{generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use lazy_static::lazy_static;
use lighthouse_network::NetworkConfig;
use slog::Logger;
use slog::{o, Drain, Logger};
use sloggers::{null::NullLoggerBuilder, Build};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::sync::Arc;
@ -42,7 +42,7 @@ impl TestBeaconChain {
let keypairs = generate_deterministic_keypairs(1);
let log = get_logger();
let log = get_logger(None);
let store =
HotColdDB::open_ephemeral(StoreConfig::default(), spec.clone(), log.clone()).unwrap();
@ -93,16 +93,32 @@ pub fn recent_genesis_time() -> u64 {
.as_secs()
}
fn get_logger() -> Logger {
NullLoggerBuilder.build().expect("logger should build")
fn get_logger(log_level: Option<slog::Level>) -> Logger {
if let Some(level) = log_level {
let drain = {
let decorator = slog_term::TermDecorator::new().build();
let decorator =
logging::AlignedTermDecorator::new(decorator, logging::MAX_MESSAGE_WIDTH);
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).chan_size(2048).build();
drain.filter_level(level)
};
Logger::root(drain.fuse(), o!())
} else {
let builder = NullLoggerBuilder;
builder.build().expect("should build logger")
}
}
lazy_static! {
static ref CHAIN: TestBeaconChain = TestBeaconChain::new_with_system_clock();
}
fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
let log = get_logger();
fn get_attestation_service(
log_level: Option<slog::Level>,
) -> AttestationService<TestBeaconChainType> {
let log = get_logger(log_level);
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
@ -111,7 +127,7 @@ fn get_attestation_service() -> AttestationService<TestBeaconChainType> {
}
fn get_sync_committee_service() -> SyncCommitteeService<TestBeaconChainType> {
let log = get_logger();
let log = get_logger(None);
let config = NetworkConfig::default();
let beacon_chain = CHAIN.chain.clone();
@ -128,28 +144,34 @@ async fn get_events<S: Stream<Item = SubnetServiceMessage> + Unpin>(
) -> Vec<SubnetServiceMessage> {
let mut events = Vec::new();
let collect_stream_fut = async {
loop {
if let Some(result) = stream.next().await {
events.push(result);
let timeout =
tokio::time::sleep(Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout);
futures::pin_mut!(timeout);
loop {
tokio::select! {
Some(event) = stream.next() => {
events.push(event);
if let Some(num) = num_events {
if events.len() == num {
return;
break;
}
}
}
}
};
_ = timeout.as_mut() => {
break;
}
tokio::select! {
_ = collect_stream_fut => events,
_ = tokio::time::sleep(
Duration::from_millis(SLOT_DURATION_MILLIS) * num_slots_before_timeout,
) => events
}
}
events
}
mod attestation_service {
use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD;
use super::*;
fn get_subscription(
@ -195,7 +217,7 @@ mod attestation_service {
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
.beacon_chain
.slot_clock
@ -237,15 +259,18 @@ mod attestation_service {
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3)
SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_),
]
);
// If the long lived and short lived subnets are the same, there should be no more events
// as we don't resubscribe already subscribed subnets.
if !attestation_service.random_subnets.contains(&subnet_id) {
if !attestation_service
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
.contains_key(&subnet_id)
{
assert_eq!(expected[..], events[3..]);
}
// Should be subscribed to only 1 long lived subnet after unsubscription.
@ -267,7 +292,7 @@ mod attestation_service {
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
.beacon_chain
.slot_clock
@ -319,16 +344,19 @@ mod attestation_service {
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::DiscoverPeers(_),
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3)
SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_),
]
);
let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
// Should be still subscribed to 1 long lived and 1 short lived subnet if both are different.
if !attestation_service.random_subnets.contains(&subnet_id1) {
if !attestation_service
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
.contains_key(&subnet_id1)
{
assert_eq!(expected, events[3]);
assert_eq!(attestation_service.subscription_count(), 2);
} else {
@ -339,7 +367,10 @@ mod attestation_service {
let unsubscribe_event = get_events(&mut attestation_service, None, 1).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service.random_subnets.contains(&subnet_id1) {
if !attestation_service
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
.contains_key(&subnet_id1)
{
assert_eq!(
[SubnetServiceMessage::Unsubscribe(Subnet::Attestation(
subnet_id1
@ -360,7 +391,7 @@ mod attestation_service {
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
.beacon_chain
.slot_clock
@ -418,7 +449,7 @@ mod attestation_service {
let committee_count = 1;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service();
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
.beacon_chain
.slot_clock
@ -465,6 +496,122 @@ mod attestation_service {
assert_eq!(enr_add_count, 64);
assert_eq!(unexpected_msg_count, 0);
}
#[tokio::test]
async fn test_subscribe_same_subnet_several_slots_apart() {
// subscription config
let validator_index = 1;
let committee_count = 1;
// Makes 2 validator subscriptions to the same subnet but at different slots.
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
let subscription_slot1 = 0;
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
let com2 = 0;
// create the attestation service and subscriptions
let mut attestation_service = get_attestation_service(None);
let current_slot = attestation_service
.beacon_chain
.slot_clock
.now()
.expect("Could not get current slot");
let sub1 = get_subscription(
validator_index,
com1,
current_slot + Slot::new(subscription_slot1),
committee_count,
);
let sub2 = get_subscription(
validator_index,
com2,
current_slot + Slot::new(subscription_slot2),
committee_count,
);
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot1),
com1,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
let subnet_id2 = SubnetId::compute_subnet::<MainnetEthSpec>(
current_slot + Slot::new(subscription_slot2),
com2,
committee_count,
&attestation_service.beacon_chain.spec,
)
.unwrap();
// Assert that subscriptions are different but their subnet is the same
assert_ne!(sub1, sub2);
assert_eq!(subnet_id1, subnet_id2);
// submit the subscriptions
attestation_service
.validator_subscriptions(vec![sub1, sub2])
.unwrap();
// Unsubscription event should happen at the end of the slot.
let events = get_events(&mut attestation_service, None, 1).await;
matches::assert_matches!(
events[..3],
[
SubnetServiceMessage::Subscribe(_any1),
SubnetServiceMessage::EnrAdd(_any3),
SubnetServiceMessage::DiscoverPeers(_),
]
);
let expected_subscription =
SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1));
let expected_unsubscription =
SubnetServiceMessage::Unsubscribe(Subnet::Attestation(subnet_id1));
if !attestation_service
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
.contains_key(&subnet_id1)
{
assert_eq!(expected_subscription, events[3]);
// fourth is a discovery event
assert_eq!(expected_unsubscription, events[5]);
}
assert_eq!(attestation_service.subscription_count(), 1);
println!("{events:?}");
let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the
// advance subscription time
let wait_slots = attestation_service
.beacon_chain
.slot_clock
.duration_to_slot(subscription_slot)
.unwrap()
.as_millis() as u64
/ SLOT_DURATION_MILLIS;
let no_events = dbg!(get_events(&mut attestation_service, None, wait_slots as u32).await);
assert_eq!(no_events, []);
let second_subscribe_event = get_events(&mut attestation_service, None, 2).await;
// If the long lived and short lived subnets are different, we should get an unsubscription event.
if !attestation_service
.subscriptions(attestation_subnets::SubscriptionKind::LongLived)
.contains_key(&subnet_id1)
{
assert_eq!(
[SubnetServiceMessage::Subscribe(Subnet::Attestation(
subnet_id1
))],
second_subscribe_event[..]
);
}
}
}
mod sync_committee_service {

View File

@ -1,12 +0,0 @@
[package]
name = "hashset_delay"
version = "0.2.0"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2021"
[dependencies]
futures = "0.3.7"
tokio-util = { version = "0.6.2", features = ["time"] }
[dev-dependencies]
tokio = { version = "1.14.0", features = ["time", "rt-multi-thread", "macros"] }

View File

@ -1,197 +0,0 @@
//NOTE: This is just a specific case of a HashMapDelay.
// The code has been copied to make unique `insert` and `insert_at` functions.
/// The default delay for entries, in seconds. This is only used when `insert()` is used to add
/// entries.
const DEFAULT_DELAY: u64 = 30;
use futures::prelude::*;
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio_util::time::delay_queue::{self, DelayQueue};
pub struct HashSetDelay<K>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
/// The given entries.
entries: HashMap<K, MapEntry>,
/// A queue holding the timeouts of each entry.
expirations: DelayQueue<K>,
/// The default expiration timeout of an entry.
default_entry_timeout: Duration,
}
/// A wrapping around entries that adds the link to the entry's expiration, via a `delay_queue` key.
struct MapEntry {
/// The expiration key for the entry.
key: delay_queue::Key,
/// The actual entry.
value: Instant,
}
impl<K> Default for HashSetDelay<K>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
fn default() -> Self {
HashSetDelay::new(Duration::from_secs(DEFAULT_DELAY))
}
}
impl<K> HashSetDelay<K>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
/// Creates a new instance of `HashSetDelay`.
pub fn new(default_entry_timeout: Duration) -> Self {
HashSetDelay {
entries: HashMap::new(),
expirations: DelayQueue::new(),
default_entry_timeout,
}
}
/// Insert an entry into the mapping. Entries will expire after the `default_entry_timeout`.
pub fn insert(&mut self, key: K) {
self.insert_at(key, self.default_entry_timeout);
}
/// Inserts an entry that will expire at a given instant. If the entry already exists, the
/// timeout is updated.
pub fn insert_at(&mut self, key: K, entry_duration: Duration) {
if self.contains(&key) {
// update the timeout
self.update_timeout(&key, entry_duration);
} else {
let delay_key = self.expirations.insert(key.clone(), entry_duration);
let entry = MapEntry {
key: delay_key,
value: Instant::now() + entry_duration,
};
self.entries.insert(key, entry);
}
}
/// Gets a reference to an entry if it exists.
///
/// Returns None if the entry does not exist.
pub fn get(&self, key: &K) -> Option<&Instant> {
self.entries.get(key).map(|entry| &entry.value)
}
/// Returns true if the key exists, false otherwise.
pub fn contains(&self, key: &K) -> bool {
self.entries.contains_key(key)
}
/// Returns the length of the mapping.
pub fn len(&self) -> usize {
self.entries.len()
}
/// Checks if the mapping is empty.
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Updates the timeout for a given key. Returns true if the key existed, false otherwise.
///
/// Panics if the duration is too far in the future.
pub fn update_timeout(&mut self, key: &K, timeout: Duration) -> bool {
if let Some(entry) = self.entries.get(key) {
self.expirations.reset(&entry.key, timeout);
true
} else {
false
}
}
/// Removes a key from the map returning the value associated with the key that was in the map.
///
/// Return false if the key was not in the map.
pub fn remove(&mut self, key: &K) -> bool {
if let Some(entry) = self.entries.remove(key) {
self.expirations.remove(&entry.key);
return true;
}
false
}
/// Retains only the elements specified by the predicate.
///
/// In other words, remove all pairs `(k, v)` such that `f(&k,&mut v)` returns false.
pub fn retain<F: FnMut(&K) -> bool>(&mut self, mut f: F) {
let expiration = &mut self.expirations;
self.entries.retain(|key, entry| {
let result = f(key);
if !result {
expiration.remove(&entry.key);
}
result
})
}
/// Removes all entries from the map.
pub fn clear(&mut self) {
self.entries.clear();
self.expirations.clear();
}
/// Returns a vector of referencing all keys in the map.
pub fn keys(&self) -> impl Iterator<Item = &K> {
self.entries.keys()
}
}
impl<K> Stream for HashSetDelay<K>
where
K: std::cmp::Eq + std::hash::Hash + std::clone::Clone + Unpin,
{
type Item = Result<K, String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.expirations.poll_expired(cx) {
Poll::Ready(Some(Ok(key))) => match self.entries.remove(key.get_ref()) {
Some(_) => Poll::Ready(Some(Ok(key.into_inner()))),
None => Poll::Ready(Some(Err("Value no longer exists in expirations".into()))),
},
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(format!("delay queue error: {:?}", e))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn should_not_panic() {
let key = 2u8;
let mut map = HashSetDelay::default();
map.insert(key);
map.update_timeout(&key, Duration::from_secs(100));
let fut = |cx: &mut Context| {
let _ = map.poll_next_unpin(cx);
let _ = map.poll_next_unpin(cx);
Poll::Ready(())
};
future::poll_fn(fut).await;
map.insert(key);
map.update_timeout(&key, Duration::from_secs(100));
}
}

View File

@ -1,12 +0,0 @@
//! This crate provides a single type (its counter-part HashMapDelay has been removed as it
//! currently is not in use in lighthouse):
//! - `HashSetDelay`
//!
//! # HashSetDelay
//!
//! This is similar to a `HashMapDelay` except the mapping maps to the expiry time. This
//! allows users to add objects and check their expiry deadlines before the `Stream`
//! consumes them.
mod hashset_delay;
pub use crate::hashset_delay::HashSetDelay;