Reduce attestation subscription spam from VC (#4806)

## Proposed Changes

Instead of sending every attestation subscription every slot to every BN:

- Send subscriptions 32, 16, 8, 7, 6, 5, 4, 3 slots before they occur.
- Track whether each subscription is sent successfully and retry it in subsequent slots if necessary.

## Additional Info

- [x] Add unit tests for `SubscriptionSlots`.
- [x] Test on Holesky.
- [x] Based on #4774 for testing.
This commit is contained in:
Michael Sproul 2023-10-06 06:26:18 +00:00
parent accb56e4fb
commit c3321dddb7

View File

@ -21,11 +21,12 @@ use eth2::types::{
}; };
use futures::{stream, StreamExt}; use futures::{stream, StreamExt};
use parking_lot::RwLock; use parking_lot::RwLock;
use safe_arith::ArithError; use safe_arith::{ArithError, SafeArith};
use slog::{debug, error, info, warn, Logger}; use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::cmp::min; use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use sync::poll_sync_committee_duties; use sync::poll_sync_committee_duties;
@ -33,14 +34,6 @@ use sync::SyncDutiesMap;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::{sync::mpsc::Sender, time::sleep};
use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot}; use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot};
/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than `SUBSCRIPTION_BUFFER_SLOTS` away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2; const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
@ -62,6 +55,36 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
/// reduces the amount of data that needs to be transferred. /// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1; const INITIAL_DUTIES_QUERY_SIZE: usize = 1;
/// Offsets from the attestation duty slot at which a subscription should be sent.
const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32];
/// Check that `ATTESTATION_SUBSCRIPTION_OFFSETS` is sorted ascendingly.
const _: () = assert!({
let mut i = 0;
loop {
let prev = if i > 0 {
ATTESTATION_SUBSCRIPTION_OFFSETS[i - 1]
} else {
0
};
let curr = ATTESTATION_SUBSCRIPTION_OFFSETS[i];
if curr < prev {
break false;
}
i += 1;
if i == ATTESTATION_SUBSCRIPTION_OFFSETS.len() {
break true;
}
}
});
/// Since the BN does not like it when we subscribe to slots that are close to the current time, we
/// will only subscribe to slots which are further than 2 slots away.
///
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
UnableToReadSlotClock, UnableToReadSlotClock,
@ -84,6 +107,16 @@ pub struct DutyAndProof {
pub duty: AttesterData, pub duty: AttesterData,
/// This value is only set to `Some` if the proof indicates that the validator is an aggregator. /// This value is only set to `Some` if the proof indicates that the validator is an aggregator.
pub selection_proof: Option<SelectionProof>, pub selection_proof: Option<SelectionProof>,
/// Track which slots we should send subscriptions at for this duty.
///
/// This value is updated after each subscription is successfully sent.
pub subscription_slots: Arc<SubscriptionSlots>,
}
/// Tracker containing the slots at which an attestation subscription should be sent.
pub struct SubscriptionSlots {
/// Pairs of `(slot, already_sent)` in slot-descending order.
slots: Vec<(Slot, AtomicBool)>,
} }
impl DutyAndProof { impl DutyAndProof {
@ -111,17 +144,55 @@ impl DutyAndProof {
} }
})?; })?;
let subscription_slots = SubscriptionSlots::new(duty.slot);
Ok(Self { Ok(Self {
duty, duty,
selection_proof, selection_proof,
subscription_slots,
}) })
} }
/// Create a new `DutyAndProof` with the selection proof waiting to be filled in. /// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self { pub fn new_without_selection_proof(duty: AttesterData) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot);
Self { Self {
duty, duty,
selection_proof: None, selection_proof: None,
subscription_slots,
}
}
}
impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok())
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
}
/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
}
/// Update our record of subscribed slots to account for successful subscription at `slot`.
fn record_successful_subscription_at(&self, slot: Slot) {
for (scheduled_slot, already_sent) in self.slots.iter().rev() {
if slot >= *scheduled_slot {
already_sent.store(true, Ordering::Relaxed);
} else {
break;
}
} }
} }
} }
@ -574,8 +645,24 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_timer = let subscriptions_timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]); metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);
// This vector is likely to be a little oversized, but it won't reallocate. // This vector is intentionally oversized by 10% so that it won't reallocate.
let mut subscriptions = Vec::with_capacity(local_pubkeys.len() * 2); // Each validator has 2 attestation duties occuring in the current and next epoch, for which
// they must send `ATTESTATION_SUBSCRIPTION_OFFSETS.len()` subscriptions. These subscription
// slots are approximately evenly distributed over the two epochs, usually with a slight lag
// that balances out (some subscriptions for the current epoch were sent in the previous, and
// some subscriptions for the next next epoch will be sent in the next epoch but aren't included
// in our calculation). We cancel the factor of 2 from the formula for simplicity.
let overallocation_numerator = 110;
let overallocation_denominator = 100;
let num_expected_subscriptions = overallocation_numerator
* std::cmp::max(
1,
local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len()
/ E::slots_per_epoch() as usize,
)
/ overallocation_denominator;
let mut subscriptions = Vec::with_capacity(num_expected_subscriptions);
let mut subscription_slots_to_confirm = Vec::with_capacity(num_expected_subscriptions);
// For this epoch and the next epoch, produce any beacon committee subscriptions. // For this epoch and the next epoch, produce any beacon committee subscriptions.
// //
@ -588,10 +675,10 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.read() .read()
.iter() .iter()
.filter_map(|(_, map)| map.get(epoch)) .filter_map(|(_, map)| map.get(epoch))
// The BN logs a warning if we try and subscribe to current or near-by slots. Give it a
// buffer.
.filter(|(_, duty_and_proof)| { .filter(|(_, duty_and_proof)| {
current_slot + SUBSCRIPTION_BUFFER_SLOTS < duty_and_proof.duty.slot duty_and_proof
.subscription_slots
.should_send_subscription_at(current_slot)
}) })
.for_each(|(_, duty_and_proof)| { .for_each(|(_, duty_and_proof)| {
let duty = &duty_and_proof.duty; let duty = &duty_and_proof.duty;
@ -603,7 +690,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
committees_at_slot: duty.committees_at_slot, committees_at_slot: duty.committees_at_slot,
slot: duty.slot, slot: duty.slot,
is_aggregator, is_aggregator,
}) });
subscription_slots_to_confirm.push(duty_and_proof.subscription_slots.clone());
}); });
} }
@ -632,6 +720,16 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
"Failed to subscribe validators"; "Failed to subscribe validators";
"error" => %e "error" => %e
) )
} else {
// Record that subscriptions were successfully sent.
debug!(
log,
"Broadcast attestation subscriptions";
"count" => subscriptions.len(),
);
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
} }
} }
@ -1200,3 +1298,67 @@ async fn notify_block_production_service<T: SlotClock + 'static, E: EthSpec>(
}; };
} }
} }
#[cfg(test)]
mod test {
use super::*;
#[test]
fn subscription_slots_exact() {
for duty_slot in [
Slot::new(32),
Slot::new(47),
Slot::new(99),
Slot::new(1002003),
] {
let subscription_slots = SubscriptionSlots::new(duty_slot);
// Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually).
for _ in 0..2 {
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS {
assert!(subscription_slots.should_send_subscription_at(duty_slot - offset));
}
}
// Mark each slot as complete and check that all prior slots are still marked
// incomplete.
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.enumerate()
{
subscription_slots.record_successful_subscription_at(duty_slot - offset);
for lower_offset in ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter()
.rev()
.skip(i + 1)
{
assert!(lower_offset < offset);
assert!(
subscription_slots.should_send_subscription_at(duty_slot - lower_offset)
);
}
}
}
}
#[test]
fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot);
subscription_slots.record_successful_subscription_at(duty_slot - offset);
// All past offsets (earlier slots) should be marked as complete.
for (j, other_offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let past = j >= i;
assert_eq!(other_offset >= offset, past);
assert_eq!(
subscription_slots.should_send_subscription_at(duty_slot - other_offset),
!past
);
}
}
}
}