From cff6258bb14ddac9eb70bb8682e713b4597c7421 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 5 Mar 2024 10:15:05 +1100 Subject: [PATCH] Fix duties override bug in VC (#5305) * Fix duties override bug in VC * Use initial request efficiently * Prevent expired subscriptions by construction * Clean up selection proof logic * Add test --- validator_client/src/duties_service.rs | 193 ++++++++++++++++--------- 1 file changed, 126 insertions(+), 67 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 290803e25..b5b56943c 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -122,43 +122,37 @@ pub struct SubscriptionSlots { slots: Vec<(Slot, AtomicBool)>, } -impl DutyAndProof { - /// Instantiate `Self`, computing the selection proof as well. - pub async fn new_with_selection_proof( - duty: AttesterData, - validator_store: &ValidatorStore, - spec: &ChainSpec, - ) -> Result { - let selection_proof = validator_store - .produce_selection_proof(duty.pubkey, duty.slot) - .await - .map_err(Error::FailedToProduceSelectionProof)?; +/// Create a selection proof for `duty`. +/// +/// Return `Ok(None)` if the attesting validator is not an aggregator. +async fn make_selection_proof( + duty: &AttesterData, + validator_store: &ValidatorStore, + spec: &ChainSpec, +) -> Result, Error> { + let selection_proof = validator_store + .produce_selection_proof(duty.pubkey, duty.slot) + .await + .map_err(Error::FailedToProduceSelectionProof)?; - let selection_proof = selection_proof - .is_aggregator(duty.committee_length as usize, spec) - .map_err(Error::InvalidModulo) - .map(|is_aggregator| { - if is_aggregator { - Some(selection_proof) - } else { - // Don't bother storing the selection proof if the validator isn't an - // aggregator, we won't need it. - None - } - })?; - - let subscription_slots = SubscriptionSlots::new(duty.slot); - - Ok(Self { - duty, - selection_proof, - subscription_slots, + selection_proof + .is_aggregator(duty.committee_length as usize, spec) + .map_err(Error::InvalidModulo) + .map(|is_aggregator| { + if is_aggregator { + Some(selection_proof) + } else { + // Don't bother storing the selection proof if the validator isn't an + // aggregator, we won't need it. + None + } }) - } +} +impl DutyAndProof { /// Create a new `DutyAndProof` with the selection proof waiting to be filled in. - pub fn new_without_selection_proof(duty: AttesterData) -> Self { - let subscription_slots = SubscriptionSlots::new(duty.slot); + pub fn new_without_selection_proof(duty: AttesterData, current_slot: Slot) -> Self { + let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot); Self { duty, selection_proof: None, @@ -168,10 +162,13 @@ impl DutyAndProof { } impl SubscriptionSlots { - fn new(duty_slot: Slot) -> Arc { + fn new(duty_slot: Slot, current_slot: Slot) -> Arc { let slots = ATTESTATION_SUBSCRIPTION_OFFSETS .into_iter() .filter_map(|offset| duty_slot.safe_sub(offset).ok()) + // Keep only scheduled slots that haven't happened yet. This avoids sending expired + // subscriptions. + .filter(|scheduled_slot| *scheduled_slot > current_slot) .map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false))) .collect(); Arc::new(Self { slots }) @@ -787,14 +784,14 @@ async fn poll_beacon_attesters_for_epoch( // request for extra data unless necessary in order to save on network bandwidth. let uninitialized_validators = get_uninitialized_validators(duties_service, &epoch, local_pubkeys); - let indices_to_request = if !uninitialized_validators.is_empty() { + let initial_indices_to_request = if !uninitialized_validators.is_empty() { uninitialized_validators.as_slice() } else { &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] }; let response = - post_validator_duties_attester(duties_service, epoch, indices_to_request).await?; + post_validator_duties_attester(duties_service, epoch, initial_indices_to_request).await?; let dependent_root = response.dependent_root; // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. @@ -818,24 +815,29 @@ async fn poll_beacon_attesters_for_epoch( return Ok(()); } - // Filter out validators which have already been requested. - let initial_duties = &response.data; + // Make a request for all indices that require updating which we have not already made a request + // for. let indices_to_request = validators_to_update .iter() - .filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey)) .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .filter(|validator_index| !initial_indices_to_request.contains(validator_index)) .collect::>(); - let new_duties = if !indices_to_request.is_empty() { + // Filter the initial duties by their relevance so that we don't hit the warning below about + // overwriting duties. There was previously a bug here. + let new_initial_duties = response + .data + .into_iter() + .filter(|duty| validators_to_update.contains(&&duty.pubkey)); + + let mut new_duties = if !indices_to_request.is_empty() { post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice()) .await? .data - .into_iter() - .chain(response.data) - .collect::>() } else { - response.data + vec![] }; + new_duties.extend(new_initial_duties); drop(fetch_timer); @@ -854,26 +856,53 @@ async fn poll_beacon_attesters_for_epoch( // Update the duties service with the new `DutyAndProof` messages. let mut attesters = duties_service.attesters.write(); let mut already_warned = Some(()); + let current_slot = duties_service + .slot_clock + .now_or_genesis() + .unwrap_or_default(); for duty in &new_duties { let attester_map = attesters.entry(duty.pubkey).or_default(); // Create initial entries in the map without selection proofs. We'll compute them in the // background later to avoid creating a thundering herd of signing threads whenever new // duties are computed. - let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone()); + let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone(), current_slot); - if let Some((prior_dependent_root, _)) = - attester_map.insert(epoch, (dependent_root, duty_and_proof)) - { - // Using `already_warned` avoids excessive logs. - if dependent_root != prior_dependent_root && already_warned.take().is_some() { - warn!( - log, - "Attester duties re-org"; - "prior_dependent_root" => %prior_dependent_root, - "dependent_root" => %dependent_root, - "msg" => "this may happen from time to time" - ) + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut occupied) => { + let mut_value = occupied.get_mut(); + let (prior_dependent_root, prior_duty_and_proof) = &mut_value; + + // Guard against overwriting an existing value for the same duty. If we did + // overwrite we could lose a selection proof or information from + // `subscription_slots`. Hitting this branch should be prevented by our logic for + // fetching duties only for unknown indices. + if dependent_root == *prior_dependent_root + && prior_duty_and_proof.duty == duty_and_proof.duty + { + warn!( + log, + "Redundant attester duty update"; + "dependent_root" => %dependent_root, + "validator_index" => duty.validator_index, + ); + continue; + } + + // Using `already_warned` avoids excessive logs. + if dependent_root != *prior_dependent_root && already_warned.take().is_some() { + warn!( + log, + "Attester duties re-org"; + "prior_dependent_root" => %prior_dependent_root, + "dependent_root" => %dependent_root, + "msg" => "this may happen from time to time" + ) + } + *mut_value = (dependent_root, duty_and_proof); + } + hash_map::Entry::Vacant(vacant) => { + vacant.insert((dependent_root, duty_and_proof)); } } } @@ -1030,12 +1059,13 @@ async fn fill_in_selection_proofs( // Sign selection proofs (serially). let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) .then(|duty| async { - DutyAndProof::new_with_selection_proof( - duty, + let opt_selection_proof = make_selection_proof( + &duty, &duties_service.validator_store, &duties_service.spec, ) - .await + .await?; + Ok((duty, opt_selection_proof)) }) .collect::>() .await; @@ -1043,7 +1073,7 @@ async fn fill_in_selection_proofs( // Add to attesters store. let mut attesters = duties_service.attesters.write(); for result in duty_and_proof_results { - let duty_and_proof = match result { + let (duty, selection_proof) = match result { Ok(duty_and_proof) => duty_and_proof, Err(Error::FailedToProduceSelectionProof( ValidatorStoreError::UnknownPubkey(pubkey), @@ -1071,12 +1101,12 @@ async fn fill_in_selection_proofs( } }; - let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); - let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch()); + let attester_map = attesters.entry(duty.pubkey).or_default(); + let epoch = duty.slot.epoch(E::slots_per_epoch()); match attester_map.entry(epoch) { hash_map::Entry::Occupied(mut entry) => { // No need to update duties for which no proof was computed. - let Some(selection_proof) = duty_and_proof.selection_proof else { + let Some(selection_proof) = selection_proof else { continue; }; @@ -1097,6 +1127,14 @@ async fn fill_in_selection_proofs( } } hash_map::Entry::Vacant(entry) => { + // This probably shouldn't happen, but we have enough info to fill in the + // entry so we may as well. + let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot); + let duty_and_proof = DutyAndProof { + duty, + selection_proof, + subscription_slots, + }; entry.insert((dependent_root, duty_and_proof)); } } @@ -1320,13 +1358,15 @@ mod test { #[test] fn subscription_slots_exact() { + // Set current slot in the past so no duties are considered expired. + let current_slot = Slot::new(0); for duty_slot in [ - Slot::new(32), + Slot::new(33), Slot::new(47), Slot::new(99), Slot::new(1002003), ] { - let subscription_slots = SubscriptionSlots::new(duty_slot); + let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot); // Run twice to check idempotence (subscription slots shouldn't be marked as done until // we mark them manually). @@ -1360,8 +1400,9 @@ mod test { #[test] fn subscription_slots_mark_multiple() { for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() { + let current_slot = Slot::new(0); let duty_slot = Slot::new(64); - let subscription_slots = SubscriptionSlots::new(duty_slot); + let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot); subscription_slots.record_successful_subscription_at(duty_slot - offset); @@ -1376,4 +1417,22 @@ mod test { } } } + + /// Test the boundary condition where all subscription slots are *just* expired. + #[test] + fn subscription_slots_expired() { + let current_slot = Slot::new(100); + let duty_slot = current_slot + ATTESTATION_SUBSCRIPTION_OFFSETS[0]; + let subscription_slots = SubscriptionSlots::new(duty_slot, current_slot); + for offset in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter() { + let slot = duty_slot - offset; + assert!(!subscription_slots.should_send_subscription_at(slot)); + } + assert!(subscription_slots.slots.is_empty()); + + // If the duty slot is 1 later, we get a non-empty set of duties. + let subscription_slots = SubscriptionSlots::new(duty_slot + 1, current_slot); + assert_eq!(subscription_slots.slots.len(), 1); + assert!(subscription_slots.should_send_subscription_at(current_slot + 1),); + } }