From cf239fed61732849efb315c91afd0e4f69650299 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 15 May 2023 02:10:40 +0000 Subject: [PATCH] Reduce bandwidth over the VC<>BN API using dependant roots (#4170) ## Issue Addressed #4157 ## Proposed Changes See description in #4157. In diagram form: ![reduce-attestation-bandwidth](https://user-images.githubusercontent.com/742762/230277084-f97301c1-0c5d-4fb3-92f9-91f99e4dc7d4.png) Co-authored-by: Jimmy Chen --- validator_client/src/duties_service.rs | 237 ++++++++++++++++--------- 1 file changed, 155 insertions(+), 82 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index c335c67ab..3cab6e782 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -16,12 +16,15 @@ use crate::{ validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}, }; use environment::RuntimeContext; -use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; +use eth2::types::{ + AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, +}; use futures::{stream, StreamExt}; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; +use std::cmp::min; use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; /// flag in the cli to enable collection of per validator metrics. const VALIDATOR_METRICS_MIN_COUNT: usize = 64; +/// The number of validators to request duty information for in the initial request. +/// The initial request is used to determine if further requests are required, so that it +/// reduces the amount of data that needs to be transferred. +const INITIAL_DUTIES_QUERY_SIZE: usize = 1; + #[derive(Debug)] pub enum Error { UnableToReadSlotClock, @@ -531,7 +539,6 @@ async fn poll_beacon_attesters( current_epoch, &local_indices, &local_pubkeys, - current_slot, ) .await { @@ -544,6 +551,8 @@ async fn poll_beacon_attesters( ) } + update_per_validator_duty_metrics::(duties_service, current_epoch, current_slot); + drop(current_epoch_timer); let next_epoch_timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, @@ -551,14 +560,9 @@ async fn poll_beacon_attesters( ); // Download the duties and update the duties for the next epoch. - if let Err(e) = poll_beacon_attesters_for_epoch( - duties_service, - next_epoch, - &local_indices, - &local_pubkeys, - current_slot, - ) - .await + if let Err(e) = + poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys) + .await { error!( log, @@ -569,6 +573,8 @@ async fn poll_beacon_attesters( ) } + update_per_validator_duty_metrics::(duties_service, next_epoch, current_slot); + drop(next_epoch_timer); let subscriptions_timer = metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]); @@ -655,7 +661,6 @@ async fn poll_beacon_attesters_for_epoch( epoch: Epoch, local_indices: &[u64], local_pubkeys: &HashSet, - current_slot: Slot, ) -> Result<(), Error> { let log = duties_service.context.log(); @@ -674,84 +679,69 @@ async fn poll_beacon_attesters_for_epoch( &[metrics::UPDATE_ATTESTERS_FETCH], ); - let response = duties_service - .beacon_nodes - .first_success( - duties_service.require_synced, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::ATTESTER_DUTIES_HTTP_POST], - ); - beacon_node - .post_validator_duties_attester(epoch, local_indices) - .await - }, - ) - .await - .map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?; + // Request duties for all uninitialized validators. If there isn't any, we will just request for + // `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to + // determine whether validator duties need to be updated. This is to ensure that we don't + // request for extra data unless necessary in order to save on network bandwidth. + let uninitialized_validators = + get_uninitialized_validators(duties_service, &epoch, local_pubkeys); + let indices_to_request = if !uninitialized_validators.is_empty() { + uninitialized_validators.as_slice() + } else { + &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())] + }; + + let response = + post_validator_duties_attester(duties_service, epoch, indices_to_request).await?; + let dependent_root = response.dependent_root; + + // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch. + let validators_to_update: Vec<_> = { + // Avoid holding the read-lock for any longer than required. + let attesters = duties_service.attesters.read(); + local_pubkeys + .iter() + .filter(|pubkey| { + attesters.get(pubkey).map_or(true, |duties| { + duties + .get(&epoch) + .map_or(true, |(prior, _)| *prior != dependent_root) + }) + }) + .collect::>() + }; + + if validators_to_update.is_empty() { + // No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch. + return Ok(()); + } + + // Filter out validators which have already been requested. + let initial_duties = &response.data; + let indices_to_request = validators_to_update + .iter() + .filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey)) + .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .collect::>(); + + let new_duties = if !indices_to_request.is_empty() { + post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice()) + .await? + .data + .into_iter() + .chain(response.data) + .collect::>() + } else { + response.data + }; drop(fetch_timer); + let _store_timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, &[metrics::UPDATE_ATTESTERS_STORE], ); - let dependent_root = response.dependent_root; - - // Filter any duties that are not relevant or already known. - let new_duties = { - // Avoid holding the read-lock for any longer than required. - let attesters = duties_service.attesters.read(); - response - .data - .into_iter() - .filter(|duty| { - if duties_service.per_validator_metrics() { - let validator_index = duty.validator_index; - let duty_slot = duty.slot; - if let Some(existing_slot_gauge) = - get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) - { - let existing_slot = Slot::new(existing_slot_gauge.get() as u64); - let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); - - // First condition ensures that we switch to the next epoch duty slot - // once the current epoch duty slot passes. - // Second condition is to ensure that next epoch duties don't override - // current epoch duties. - if existing_slot < current_slot - || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch - && duty_slot > current_slot - && duty_slot != existing_slot) - { - existing_slot_gauge.set(duty_slot.as_u64() as i64); - } - } else { - set_int_gauge( - &ATTESTATION_DUTY, - &[&validator_index.to_string()], - duty_slot.as_u64() as i64, - ); - } - } - - local_pubkeys.contains(&duty.pubkey) && { - // Only update the duties if either is true: - // - // - There were no known duties for this epoch. - // - The dependent root has changed, signalling a re-org. - attesters.get(&duty.pubkey).map_or(true, |duties| { - duties - .get(&epoch) - .map_or(true, |(prior, _)| *prior != dependent_root) - }) - } - }) - .collect::>() - }; - debug!( log, "Downloaded attester duties"; @@ -799,6 +789,89 @@ async fn poll_beacon_attesters_for_epoch( Ok(()) } +/// Get a filtered list of local validators for which we don't already know their duties for that epoch +fn get_uninitialized_validators( + duties_service: &Arc>, + epoch: &Epoch, + local_pubkeys: &HashSet, +) -> Vec { + let attesters = duties_service.attesters.read(); + local_pubkeys + .iter() + .filter(|pubkey| { + attesters + .get(pubkey) + .map_or(true, |duties| !duties.contains_key(epoch)) + }) + .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .collect::>() +} + +fn update_per_validator_duty_metrics( + duties_service: &Arc>, + epoch: Epoch, + current_slot: Slot, +) { + if duties_service.per_validator_metrics() { + let attesters = duties_service.attesters.read(); + attesters.values().for_each(|attester_duties_by_epoch| { + if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) { + let duty = &duty_and_proof.duty; + let validator_index = duty.validator_index; + let duty_slot = duty.slot; + if let Some(existing_slot_gauge) = + get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()]) + { + let existing_slot = Slot::new(existing_slot_gauge.get() as u64); + let existing_epoch = existing_slot.epoch(E::slots_per_epoch()); + + // First condition ensures that we switch to the next epoch duty slot + // once the current epoch duty slot passes. + // Second condition is to ensure that next epoch duties don't override + // current epoch duties. + if existing_slot < current_slot + || (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch + && duty_slot > current_slot + && duty_slot != existing_slot) + { + existing_slot_gauge.set(duty_slot.as_u64() as i64); + } + } else { + set_int_gauge( + &ATTESTATION_DUTY, + &[&validator_index.to_string()], + duty_slot.as_u64() as i64, + ); + } + } + }); + } +} + +async fn post_validator_duties_attester( + duties_service: &Arc>, + epoch: Epoch, + validator_indices: &[u64], +) -> Result>, Error> { + duties_service + .beacon_nodes + .first_success( + duties_service.require_synced, + OfflineOnFailure::Yes, + |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTER_DUTIES_HTTP_POST], + ); + beacon_node + .post_validator_duties_attester(epoch, validator_indices) + .await + }, + ) + .await + .map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) +} + /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// /// Duties are computed in batches each slot. If a re-org is detected then the process will