diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 5c21d45a8..c44ec204a 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -10,7 +10,7 @@ use rand::seq::SliceRandom; use rest_types::ValidatorSubscription; use slog::{crit, debug, error, o, warn}; use slot_clock::SlotClock; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; use types::{Attestation, EthSpec, Slot, SubnetId}; @@ -77,8 +77,8 @@ pub struct AttestationService { /// A collection of timeouts for when to unsubscribe from a shard subnet. unsubscriptions: HashSetDelay, - /// A mapping indicating the number of known aggregate validators for a given `ExactSubnet`. - _aggregate_validators_on_subnet: HashMap, + /// A collection timeouts to track the existence of aggregate validator subscriptions at an `ExactSubnet`. + aggregate_validators_on_subnet: HashSetDelay, /// A collection of seen validators. These dictate how many random subnets we should be /// subscribed to. As these time out, we unsubscribe for the required random subnets and update @@ -124,7 +124,7 @@ impl AttestationService { discover_peers: HashSetDelay::new(default_timeout), subscriptions: HashSetDelay::new(default_timeout), unsubscriptions: HashSetDelay::new(default_timeout), - _aggregate_validators_on_subnet: HashMap::new(), + aggregate_validators_on_subnet: HashSetDelay::new(default_timeout), known_validators: HashSetDelay::new(last_seen_val_timeout), log, } @@ -176,10 +176,12 @@ impl AttestationService { // sophisticated logic should be added using known future forks. // TODO: Implement - // set the subscription timer to subscribe to the next subnet if required - if let Err(e) = self.subscribe_to_subnet(exact_subnet) { - warn!(self.log, "Subscription to subnet error"; "error" => e); - return Err(()); + if subscription.is_aggregator { + // set the subscription timer to subscribe to the next subnet if required + if let Err(e) = self.subscribe_to_subnet(exact_subnet) { + warn!(self.log, "Subscription to subnet error"; "error" => e); + return Err(()); + } } } Ok(()) @@ -208,8 +210,11 @@ impl AttestationService { return false; } - // TODO: Correctly handle validation aggregator checks - true + let exact_subnet = ExactSubnet { + subnet_id: subnet.clone(), + slot: attestation.data.slot, + }; + self.aggregate_validators_on_subnet.contains(&exact_subnet) } /* Internal private functions */ @@ -335,6 +340,12 @@ impl AttestationService { } }; + // Regardless of whether or not we have already subscribed to a subnet, track the expiration + // of aggregate validator subscriptions to exact subnets so we know whether or not to drop + // attestations for a given subnet + slot + self.aggregate_validators_on_subnet + .insert_at(exact_subnet.clone(), expected_end_subscription_duration); + // Checks on current subscriptions // Note: We may be connected to a long-lived random subnet. In this case we still add the // subscription timeout and check this case when the timeout fires. This is because a @@ -641,6 +652,8 @@ impl Stream for AttestationService { { let _ = self.handle_known_validator_expiry(); } + // poll to remove entries on expiration, no need to act on expiration events + let _ = self.aggregate_validators_on_subnet.poll().map_err(|e| { error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> format!("{}", e)); }); // process any generated events if let Some(event) = self.events.pop_front() {