Improve aggregate validator logic (#1020)

* track whether we have aggregate validator subscriptions to exact subnets, so we know whether or not to drop incoming attestations

* fix is aggregator check

* fix CI

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
realbigsean 2020-04-30 01:39:10 -04:00 committed by GitHub
parent 78a08ec1e6
commit dea01be00e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -10,7 +10,7 @@ use rand::seq::SliceRandom;
use rest_types::ValidatorSubscription; use rest_types::ValidatorSubscription;
use slog::{crit, debug, error, o, warn}; use slog::{crit, debug, error, o, warn};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::collections::{HashMap, VecDeque}; use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use types::{Attestation, EthSpec, Slot, SubnetId}; use types::{Attestation, EthSpec, Slot, SubnetId};
@ -77,8 +77,8 @@ pub struct AttestationService<T: BeaconChainTypes> {
/// A collection of timeouts for when to unsubscribe from a shard subnet. /// A collection of timeouts for when to unsubscribe from a shard subnet.
unsubscriptions: HashSetDelay<ExactSubnet>, unsubscriptions: HashSetDelay<ExactSubnet>,
/// A mapping indicating the number of known aggregate validators for a given `ExactSubnet`. /// A collection timeouts to track the existence of aggregate validator subscriptions at an `ExactSubnet`.
_aggregate_validators_on_subnet: HashMap<ExactSubnet, usize>, aggregate_validators_on_subnet: HashSetDelay<ExactSubnet>,
/// A collection of seen validators. These dictate how many random subnets we should be /// A collection of seen validators. These dictate how many random subnets we should be
/// subscribed to. As these time out, we unsubscribe for the required random subnets and update /// subscribed to. As these time out, we unsubscribe for the required random subnets and update
@ -124,7 +124,7 @@ impl<T: BeaconChainTypes> AttestationService<T> {
discover_peers: HashSetDelay::new(default_timeout), discover_peers: HashSetDelay::new(default_timeout),
subscriptions: HashSetDelay::new(default_timeout), subscriptions: HashSetDelay::new(default_timeout),
unsubscriptions: HashSetDelay::new(default_timeout), unsubscriptions: HashSetDelay::new(default_timeout),
_aggregate_validators_on_subnet: HashMap::new(), aggregate_validators_on_subnet: HashSetDelay::new(default_timeout),
known_validators: HashSetDelay::new(last_seen_val_timeout), known_validators: HashSetDelay::new(last_seen_val_timeout),
log, log,
} }
@ -176,10 +176,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
// sophisticated logic should be added using known future forks. // sophisticated logic should be added using known future forks.
// TODO: Implement // TODO: Implement
// set the subscription timer to subscribe to the next subnet if required if subscription.is_aggregator {
if let Err(e) = self.subscribe_to_subnet(exact_subnet) { // set the subscription timer to subscribe to the next subnet if required
warn!(self.log, "Subscription to subnet error"; "error" => e); if let Err(e) = self.subscribe_to_subnet(exact_subnet) {
return Err(()); warn!(self.log, "Subscription to subnet error"; "error" => e);
return Err(());
}
} }
} }
Ok(()) Ok(())
@ -208,8 +210,11 @@ impl<T: BeaconChainTypes> AttestationService<T> {
return false; return false;
} }
// TODO: Correctly handle validation aggregator checks let exact_subnet = ExactSubnet {
true subnet_id: subnet.clone(),
slot: attestation.data.slot,
};
self.aggregate_validators_on_subnet.contains(&exact_subnet)
} }
/* Internal private functions */ /* Internal private functions */
@ -335,6 +340,12 @@ impl<T: BeaconChainTypes> AttestationService<T> {
} }
}; };
// Regardless of whether or not we have already subscribed to a subnet, track the expiration
// of aggregate validator subscriptions to exact subnets so we know whether or not to drop
// attestations for a given subnet + slot
self.aggregate_validators_on_subnet
.insert_at(exact_subnet.clone(), expected_end_subscription_duration);
// Checks on current subscriptions // Checks on current subscriptions
// Note: We may be connected to a long-lived random subnet. In this case we still add the // Note: We may be connected to a long-lived random subnet. In this case we still add the
// subscription timeout and check this case when the timeout fires. This is because a // subscription timeout and check this case when the timeout fires. This is because a
@ -641,6 +652,8 @@ impl<T: BeaconChainTypes> Stream for AttestationService<T> {
{ {
let _ = self.handle_known_validator_expiry(); let _ = self.handle_known_validator_expiry();
} }
// poll to remove entries on expiration, no need to act on expiration events
let _ = self.aggregate_validators_on_subnet.poll().map_err(|e| { error!(self.log, "Failed to check for aggregate validator on subnet expirations"; "error"=> format!("{}", e)); });
// process any generated events // process any generated events
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {