Fix duties override bug in VC (#5305)

* Fix duties override bug in VC

* Use initial request efficiently

* Prevent expired subscriptions by construction

* Clean up selection proof logic

* Add test
This commit is contained in:
Michael Sproul 2024-03-05 10:15:05 +11:00 committed by GitHub
parent f919f82e4f
commit cff6258bb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -122,43 +122,37 @@ pub struct SubscriptionSlots {
slots: Vec<(Slot, AtomicBool)>, slots: Vec<(Slot, AtomicBool)>,
} }
impl DutyAndProof { /// Create a selection proof for `duty`.
/// Instantiate `Self`, computing the selection proof as well. ///
pub async fn new_with_selection_proof<T: SlotClock + 'static, E: EthSpec>( /// Return `Ok(None)` if the attesting validator is not an aggregator.
duty: AttesterData, async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
validator_store: &ValidatorStore<T, E>, duty: &AttesterData,
spec: &ChainSpec, validator_store: &ValidatorStore<T, E>,
) -> Result<Self, Error> { spec: &ChainSpec,
let selection_proof = validator_store ) -> Result<Option<SelectionProof>, Error> {
.produce_selection_proof(duty.pubkey, duty.slot) let selection_proof = validator_store
.await .produce_selection_proof(duty.pubkey, duty.slot)
.map_err(Error::FailedToProduceSelectionProof)?; .await
.map_err(Error::FailedToProduceSelectionProof)?;
let selection_proof = selection_proof selection_proof
.is_aggregator(duty.committee_length as usize, spec) .is_aggregator(duty.committee_length as usize, spec)
.map_err(Error::InvalidModulo) .map_err(Error::InvalidModulo)
.map(|is_aggregator| { .map(|is_aggregator| {
if is_aggregator { if is_aggregator {
Some(selection_proof) Some(selection_proof)
} else { } else {
// Don't bother storing the selection proof if the validator isn't an // Don't bother storing the selection proof if the validator isn't an
// aggregator, we won't need it. // aggregator, we won't need it.
None None
} }
})?;
let subscription_slots = SubscriptionSlots::new(duty.slot);
Ok(Self {
duty,
selection_proof,
subscription_slots,
}) })
} }
impl DutyAndProof {
/// Create a new `DutyAndProof` with the selection proof waiting to be filled in. /// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self { pub fn new_without_selection_proof(duty: AttesterData, current_slot: Slot) -> Self {
let subscription_slots = SubscriptionSlots::new(duty.slot); let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
Self { Self {
duty, duty,
selection_proof: None, selection_proof: None,
@ -168,10 +162,13 @@ impl DutyAndProof {
} }
impl SubscriptionSlots { impl SubscriptionSlots {
fn new(duty_slot: Slot) -> Arc<Self> { fn new(duty_slot: Slot, current_slot: Slot) -> Arc<Self> {
let slots = ATTESTATION_SUBSCRIPTION_OFFSETS let slots = ATTESTATION_SUBSCRIPTION_OFFSETS
.into_iter() .into_iter()
.filter_map(|offset| duty_slot.safe_sub(offset).ok()) .filter_map(|offset| duty_slot.safe_sub(offset).ok())
// Keep only scheduled slots that haven't happened yet. This avoids sending expired
// subscriptions.
.filter(|scheduled_slot| *scheduled_slot > current_slot)
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false))) .map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect(); .collect();
Arc::new(Self { slots }) Arc::new(Self { slots })
@ -787,14 +784,14 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// request for extra data unless necessary in order to save on network bandwidth. // request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators = let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys); get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let indices_to_request = if !uninitialized_validators.is_empty() { let initial_indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice() uninitialized_validators.as_slice()
} else { } else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
}; };
let response = let response =
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?; post_validator_duties_attester(duties_service, epoch, initial_indices_to_request).await?;
let dependent_root = response.dependent_root; let dependent_root = response.dependent_root;
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
@ -818,24 +815,29 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
return Ok(()); return Ok(());
} }
// Filter out validators which have already been requested. // Make a request for all indices that require updating which we have not already made a request
let initial_duties = &response.data; // for.
let indices_to_request = validators_to_update let indices_to_request = validators_to_update
.iter() .iter()
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let new_duties = if !indices_to_request.is_empty() { // Filter the initial duties by their relevance so that we don't hit the warning below about
// overwriting duties. There was previously a bug here.
let new_initial_duties = response
.data
.into_iter()
.filter(|duty| validators_to_update.contains(&&duty.pubkey));
let mut new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice()) post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await? .await?
.data .data
.into_iter()
.chain(response.data)
.collect::<Vec<_>>()
} else { } else {
response.data vec![]
}; };
new_duties.extend(new_initial_duties);
drop(fetch_timer); drop(fetch_timer);
@ -854,26 +856,53 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
// Update the duties service with the new `DutyAndProof` messages. // Update the duties service with the new `DutyAndProof` messages.
let mut attesters = duties_service.attesters.write(); let mut attesters = duties_service.attesters.write();
let mut already_warned = Some(()); let mut already_warned = Some(());
let current_slot = duties_service
.slot_clock
.now_or_genesis()
.unwrap_or_default();
for duty in &new_duties { for duty in &new_duties {
let attester_map = attesters.entry(duty.pubkey).or_default(); let attester_map = attesters.entry(duty.pubkey).or_default();
// Create initial entries in the map without selection proofs. We'll compute them in the // Create initial entries in the map without selection proofs. We'll compute them in the
// background later to avoid creating a thundering herd of signing threads whenever new // background later to avoid creating a thundering herd of signing threads whenever new
// duties are computed. // duties are computed.
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone()); let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone(), current_slot);
if let Some((prior_dependent_root, _)) = match attester_map.entry(epoch) {
attester_map.insert(epoch, (dependent_root, duty_and_proof)) hash_map::Entry::Occupied(mut occupied) => {
{ let mut_value = occupied.get_mut();
// Using `already_warned` avoids excessive logs. let (prior_dependent_root, prior_duty_and_proof) = &mut_value;
if dependent_root != prior_dependent_root && already_warned.take().is_some() {
warn!( // Guard against overwriting an existing value for the same duty. If we did
log, // overwrite we could lose a selection proof or information from
"Attester duties re-org"; // `subscription_slots`. Hitting this branch should be prevented by our logic for
"prior_dependent_root" => %prior_dependent_root, // fetching duties only for unknown indices.
"dependent_root" => %dependent_root, if dependent_root == *prior_dependent_root
"msg" => "this may happen from time to time" && prior_duty_and_proof.duty == duty_and_proof.duty
) {
warn!(
log,
"Redundant attester duty update";
"dependent_root" => %dependent_root,
"validator_index" => duty.validator_index,
);
continue;
}
// Using `already_warned` avoids excessive logs.
if dependent_root != *prior_dependent_root && already_warned.take().is_some() {
warn!(
log,
"Attester duties re-org";
"prior_dependent_root" => %prior_dependent_root,
"dependent_root" => %dependent_root,
"msg" => "this may happen from time to time"
)
}
*mut_value = (dependent_root, duty_and_proof);
}
hash_map::Entry::Vacant(vacant) => {
vacant.insert((dependent_root, duty_and_proof));
} }
} }
} }
@ -1030,12 +1059,13 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// Sign selection proofs (serially). // Sign selection proofs (serially).
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async { .then(|duty| async {
DutyAndProof::new_with_selection_proof( let opt_selection_proof = make_selection_proof(
duty, &duty,
&duties_service.validator_store, &duties_service.validator_store,
&duties_service.spec, &duties_service.spec,
) )
.await .await?;
Ok((duty, opt_selection_proof))
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await; .await;
@ -1043,7 +1073,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// Add to attesters store. // Add to attesters store.
let mut attesters = duties_service.attesters.write(); let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results { for result in duty_and_proof_results {
let duty_and_proof = match result { let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof, Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof( Err(Error::FailedToProduceSelectionProof(
ValidatorStoreError::UnknownPubkey(pubkey), ValidatorStoreError::UnknownPubkey(pubkey),
@ -1071,12 +1101,12 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
} }
}; };
let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch()); let epoch = duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) { match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => { hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed. // No need to update duties for which no proof was computed.
let Some(selection_proof) = duty_and_proof.selection_proof else { let Some(selection_proof) = selection_proof else {
continue; continue;
}; };
@ -1097,6 +1127,14 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
} }
} }
hash_map::Entry::Vacant(entry) => { hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the
// entry so we may as well.
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof)); entry.insert((dependent_root, duty_and_proof));
} }
} }
@ -1320,13 +1358,15 @@ mod test {
#[test] #[test]
fn subscription_slots_exact() { fn subscription_slots_exact() {
// Set current slot in the past so no duties are considered expired.
let current_slot = Slot::new(0);
for duty_slot in [ for duty_slot in [
Slot::new(32), Slot::new(33),
Slot::new(47), Slot::new(47),
Slot::new(99), Slot::new(99),
Slot::new(1002003), Slot::new(1002003),
] { ] {
let subscription_slots = SubscriptionSlots::new(duty_slot); let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);
// Run twice to check idempotence (subscription slots shouldn't be marked as done until // Run twice to check idempotence (subscription slots shouldn't be marked as done until
// we mark them manually). // we mark them manually).
@ -1360,8 +1400,9 @@ mod test {
#[test] #[test]
fn subscription_slots_mark_multiple() { fn subscription_slots_mark_multiple() {
for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() { for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() {
let current_slot = Slot::new(0);
let duty_slot = Slot::new(64); let duty_slot = Slot::new(64);
let subscription_slots = SubscriptionSlots::new(duty_slot); let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);
subscription_slots.record_successful_subscription_at(duty_slot - offset); subscription_slots.record_successful_subscription_at(duty_slot - offset);
@ -1376,4 +1417,22 @@ mod test {
} }
} }
} }
/// Test the boundary condition where all subscription slots are *just* expired.
#[test]
fn subscription_slots_expired() {
let current_slot = Slot::new(100);
let duty_slot = current_slot + ATTESTATION_SUBSCRIPTION_OFFSETS[0];
let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot);
for offset in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter() {
let slot = duty_slot - offset;
assert!(!subscription_slots.should_send_subscription_at(slot));
}
assert!(subscription_slots.slots.is_empty());
// If the duty slot is 1 later, we get a non-empty set of duties.
let subscription_slots = SubscriptionSlots::new(duty_slot + 1, current_slot);
assert_eq!(subscription_slots.slots.len(), 1);
assert!(subscription_slots.should_send_subscription_at(current_slot + 1),);
}
} }