Reduce load on validator subscription channels (#5311)

* Fix tests

* Merge branch 'unstable' into unclog-channels

* Avoid reallocations

* Reduce subscription load on beacon node
This commit is contained in:
Pawan Dhananjay 2024-03-07 18:02:27 +05:30 committed by GitHub
parent 8cd2b1ca87
commit 84a902a589
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 39 additions and 54 deletions

View File

@ -3448,34 +3448,34 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
log: Logger| { log: Logger| {
task_spawner.blocking_json_task(Priority::P0, move || { task_spawner.blocking_json_task(Priority::P0, move || {
for subscription in &subscriptions { let subscriptions: std::collections::BTreeSet<_> = subscriptions
chain .iter()
.validator_monitor .map(|subscription| {
.write() chain
.auto_register_local_validator(subscription.validator_index); .validator_monitor
.write()
let validator_subscription = api_types::ValidatorSubscription { .auto_register_local_validator(subscription.validator_index);
validator_index: subscription.validator_index, api_types::ValidatorSubscription {
attestation_committee_index: subscription.committee_index, attestation_committee_index: subscription.committee_index,
slot: subscription.slot, slot: subscription.slot,
committee_count_at_slot: subscription.committees_at_slot, committee_count_at_slot: subscription.committees_at_slot,
is_aggregator: subscription.is_aggregator, is_aggregator: subscription.is_aggregator,
}; }
})
let message = ValidatorSubscriptionMessage::AttestationSubscribe { .collect();
subscriptions: vec![validator_subscription], let message =
}; ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions };
if let Err(e) = validator_subscription_tx.try_send(message) { if let Err(e) = validator_subscription_tx.try_send(message) {
warn!( warn!(
log, log,
"Unable to process committee subscriptions"; "Unable to process committee subscriptions";
"info" => "the host may be overloaded or resource-constrained", "info" => "the host may be overloaded or resource-constrained",
"error" => ?e, "error" => ?e,
); );
return Err(warp_utils::reject::custom_server_error( return Err(warp_utils::reject::custom_server_error(
"unable to queue subscription, host may be overloaded or shutting down".to_string(), "unable to queue subscription, host may be overloaded or shutting down"
)); .to_string(),
} ));
} }
Ok(()) Ok(())

View File

@ -27,6 +27,7 @@ use lighthouse_network::{
MessageId, NetworkEvent, NetworkGlobals, PeerId, MessageId, NetworkEvent, NetworkGlobals, PeerId,
}; };
use slog::{crit, debug, error, info, o, trace, warn}; use slog::{crit, debug, error, info, o, trace, warn};
use std::collections::BTreeSet;
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
use store::HotColdDB; use store::HotColdDB;
use strum::IntoStaticStr; use strum::IntoStaticStr;
@ -119,7 +120,7 @@ pub enum NetworkMessage<T: EthSpec> {
pub enum ValidatorSubscriptionMessage { pub enum ValidatorSubscriptionMessage {
/// Subscribes a list of validators to specific slots for attestation duties. /// Subscribes a list of validators to specific slots for attestation duties.
AttestationSubscribe { AttestationSubscribe {
subscriptions: Vec<ValidatorSubscription>, subscriptions: BTreeSet<ValidatorSubscription>,
}, },
SyncCommitteeSubscribe { SyncCommitteeSubscribe {
subscriptions: Vec<SyncCommitteeSubscription>, subscriptions: Vec<SyncCommitteeSubscription>,
@ -783,7 +784,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => { ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = self if let Err(e) = self
.attestation_service .attestation_service
.validator_subscriptions(subscriptions) .validator_subscriptions(subscriptions.into_iter())
{ {
warn!(self.log, "Attestation validator subscription failed"; "error" => e); warn!(self.log, "Attestation validator subscription failed"; "error" => e);
} }

View File

@ -196,7 +196,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
/// safely dropped. /// safely dropped.
pub fn validator_subscriptions( pub fn validator_subscriptions(
&mut self, &mut self,
subscriptions: Vec<ValidatorSubscription>, subscriptions: impl Iterator<Item = ValidatorSubscription>,
) -> Result<(), String> { ) -> Result<(), String> {
// If the node is in a proposer-only state, we ignore all subnet subscriptions. // If the node is in a proposer-only state, we ignore all subnet subscriptions.
if self.proposer_only { if self.proposer_only {
@ -227,7 +227,6 @@ impl<T: BeaconChainTypes> AttestationService<T> {
warn!(self.log, warn!(self.log,
"Failed to compute subnet id for validator subscription"; "Failed to compute subnet id for validator subscription";
"error" => ?e, "error" => ?e,
"validator_index" => subscription.validator_index
); );
continue; continue;
} }
@ -257,13 +256,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
warn!(self.log, warn!(self.log,
"Subscription to subnet error"; "Subscription to subnet error";
"error" => e, "error" => e,
"validator_index" => subscription.validator_index,
); );
} else { } else {
trace!(self.log, trace!(self.log,
"Subscribed to subnet for aggregator duties"; "Subscribed to subnet for aggregator duties";
"exact_subnet" => ?exact_subnet, "exact_subnet" => ?exact_subnet,
"validator_index" => subscription.validator_index
); );
} }
} }

View File

@ -180,14 +180,12 @@ mod attestation_service {
use super::*; use super::*;
fn get_subscription( fn get_subscription(
validator_index: u64,
attestation_committee_index: CommitteeIndex, attestation_committee_index: CommitteeIndex,
slot: Slot, slot: Slot,
committee_count_at_slot: u64, committee_count_at_slot: u64,
is_aggregator: bool, is_aggregator: bool,
) -> ValidatorSubscription { ) -> ValidatorSubscription {
ValidatorSubscription { ValidatorSubscription {
validator_index,
attestation_committee_index, attestation_committee_index,
slot, slot,
committee_count_at_slot, committee_count_at_slot,
@ -204,7 +202,6 @@ mod attestation_service {
(0..validator_count) (0..validator_count)
.map(|validator_index| { .map(|validator_index| {
get_subscription( get_subscription(
validator_index,
validator_index, validator_index,
slot, slot,
committee_count_at_slot, committee_count_at_slot,
@ -217,7 +214,6 @@ mod attestation_service {
#[tokio::test] #[tokio::test]
async fn subscribe_current_slot_wait_for_unsubscribe() { async fn subscribe_current_slot_wait_for_unsubscribe() {
// subscription config // subscription config
let validator_index = 1;
let committee_index = 1; let committee_index = 1;
// Keep a low subscription slot so that there are no additional subnet discovery events. // Keep a low subscription slot so that there are no additional subnet discovery events.
let subscription_slot = 0; let subscription_slot = 0;
@ -233,7 +229,6 @@ mod attestation_service {
.expect("Could not get current slot"); .expect("Could not get current slot");
let subscriptions = vec![get_subscription( let subscriptions = vec![get_subscription(
validator_index,
committee_index, committee_index,
current_slot + Slot::new(subscription_slot), current_slot + Slot::new(subscription_slot),
committee_count, committee_count,
@ -242,7 +237,7 @@ mod attestation_service {
// submit the subscriptions // submit the subscriptions
attestation_service attestation_service
.validator_subscriptions(subscriptions) .validator_subscriptions(subscriptions.into_iter())
.unwrap(); .unwrap();
// not enough time for peer discovery, just subscribe, unsubscribe // not enough time for peer discovery, just subscribe, unsubscribe
@ -293,7 +288,6 @@ mod attestation_service {
#[tokio::test] #[tokio::test]
async fn test_same_subnet_unsubscription() { async fn test_same_subnet_unsubscription() {
// subscription config // subscription config
let validator_index = 1;
let committee_count = 1; let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
@ -313,7 +307,6 @@ mod attestation_service {
.expect("Could not get current slot"); .expect("Could not get current slot");
let sub1 = get_subscription( let sub1 = get_subscription(
validator_index,
com1, com1,
current_slot + Slot::new(subscription_slot1), current_slot + Slot::new(subscription_slot1),
committee_count, committee_count,
@ -321,7 +314,6 @@ mod attestation_service {
); );
let sub2 = get_subscription( let sub2 = get_subscription(
validator_index,
com2, com2,
current_slot + Slot::new(subscription_slot2), current_slot + Slot::new(subscription_slot2),
committee_count, committee_count,
@ -350,7 +342,7 @@ mod attestation_service {
// submit the subscriptions // submit the subscriptions
attestation_service attestation_service
.validator_subscriptions(vec![sub1, sub2]) .validator_subscriptions(vec![sub1, sub2].into_iter())
.unwrap(); .unwrap();
// Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1) // Unsubscription event should happen at slot 2 (since subnet id's are the same, unsubscription event should be at higher slot + 1)
@ -431,7 +423,7 @@ mod attestation_service {
// submit the subscriptions // submit the subscriptions
attestation_service attestation_service
.validator_subscriptions(subscriptions) .validator_subscriptions(subscriptions.into_iter())
.unwrap(); .unwrap();
let events = get_events(&mut attestation_service, Some(131), 10).await; let events = get_events(&mut attestation_service, Some(131), 10).await;
@ -501,7 +493,7 @@ mod attestation_service {
// submit the subscriptions // submit the subscriptions
attestation_service attestation_service
.validator_subscriptions(subscriptions) .validator_subscriptions(subscriptions.into_iter())
.unwrap(); .unwrap();
let events = get_events(&mut attestation_service, None, 3).await; let events = get_events(&mut attestation_service, None, 3).await;
@ -538,7 +530,6 @@ mod attestation_service {
#[tokio::test] #[tokio::test]
async fn test_subscribe_same_subnet_several_slots_apart() { async fn test_subscribe_same_subnet_several_slots_apart() {
// subscription config // subscription config
let validator_index = 1;
let committee_count = 1; let committee_count = 1;
let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize;
@ -558,7 +549,6 @@ mod attestation_service {
.expect("Could not get current slot"); .expect("Could not get current slot");
let sub1 = get_subscription( let sub1 = get_subscription(
validator_index,
com1, com1,
current_slot + Slot::new(subscription_slot1), current_slot + Slot::new(subscription_slot1),
committee_count, committee_count,
@ -566,7 +556,6 @@ mod attestation_service {
); );
let sub2 = get_subscription( let sub2 = get_subscription(
validator_index,
com2, com2,
current_slot + Slot::new(subscription_slot2), current_slot + Slot::new(subscription_slot2),
committee_count, committee_count,
@ -595,7 +584,7 @@ mod attestation_service {
// submit the subscriptions // submit the subscriptions
attestation_service attestation_service
.validator_subscriptions(vec![sub1, sub2]) .validator_subscriptions(vec![sub1, sub2].into_iter())
.unwrap(); .unwrap();
// Unsubscription event should happen at the end of the slot. // Unsubscription event should happen at the end of the slot.
@ -668,7 +657,7 @@ mod attestation_service {
// submit the subscriptions // submit the subscriptions
attestation_service attestation_service
.validator_subscriptions(subscriptions) .validator_subscriptions(subscriptions.into_iter())
.unwrap(); .unwrap();
// There should only be the same subscriptions as there are in the specification, // There should only be the same subscriptions as there are in the specification,

View File

@ -4,10 +4,8 @@ use ssz_derive::{Decode, Encode};
/// A validator subscription, created when a validator subscribes to a slot to perform optional aggregation /// A validator subscription, created when a validator subscribes to a slot to perform optional aggregation
/// duties. /// duties.
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode)] #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode, Eq, PartialOrd, Ord)]
pub struct ValidatorSubscription { pub struct ValidatorSubscription {
/// The validators index.
pub validator_index: u64,
/// The index of the committee within `slot` of which the validator is a member. Used by the /// The index of the committee within `slot` of which the validator is a member. Used by the
/// beacon node to quickly evaluate the associated `SubnetId`. /// beacon node to quickly evaluate the associated `SubnetId`.
pub attestation_committee_index: CommitteeIndex, pub attestation_committee_index: CommitteeIndex,