Reduce bandwidth over the VC<>BN API using dependant roots (#4170)

## Issue Addressed

#4157 

## Proposed Changes

See description in #4157.

In diagram form:

![reduce-attestation-bandwidth](https://user-images.githubusercontent.com/742762/230277084-f97301c1-0c5d-4fb3-92f9-91f99e4dc7d4.png)


Co-authored-by: Jimmy Chen <jimmy@sigmaprime.io>
This commit is contained in:
Jimmy Chen 2023-05-15 02:10:40 +00:00
parent b7b4549545
commit cf239fed61

View File

@ -16,12 +16,15 @@ use crate::{
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}, validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore},
}; };
use environment::RuntimeContext; use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
};
use futures::{stream, StreamExt}; use futures::{stream, StreamExt};
use parking_lot::RwLock; use parking_lot::RwLock;
use safe_arith::ArithError; use safe_arith::ArithError;
use slog::{debug, error, info, warn, Logger}; use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// flag in the cli to enable collection of per validator metrics. /// flag in the cli to enable collection of per validator metrics.
const VALIDATOR_METRICS_MIN_COUNT: usize = 64; const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
/// The number of validators to request duty information for in the initial request.
/// The initial request is used to determine if further requests are required, so that it
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
UnableToReadSlotClock, UnableToReadSlotClock,
@ -531,7 +539,6 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
current_epoch, current_epoch,
&local_indices, &local_indices,
&local_pubkeys, &local_pubkeys,
current_slot,
) )
.await .await
{ {
@ -544,6 +551,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
) )
} }
update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);
drop(current_epoch_timer); drop(current_epoch_timer);
let next_epoch_timer = metrics::start_timer_vec( let next_epoch_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES, &metrics::DUTIES_SERVICE_TIMES,
@ -551,14 +560,9 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
); );
// Download the duties and update the duties for the next epoch. // Download the duties and update the duties for the next epoch.
if let Err(e) = poll_beacon_attesters_for_epoch( if let Err(e) =
duties_service, poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys)
next_epoch, .await
&local_indices,
&local_pubkeys,
current_slot,
)
.await
{ {
error!( error!(
log, log,
@ -569,6 +573,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
) )
} }
update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);
drop(next_epoch_timer); drop(next_epoch_timer);
let subscriptions_timer = let subscriptions_timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]); metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);
@ -655,7 +661,6 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
epoch: Epoch, epoch: Epoch,
local_indices: &[u64], local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>, local_pubkeys: &HashSet<PublicKeyBytes>,
current_slot: Slot,
) -> Result<(), Error> { ) -> Result<(), Error> {
let log = duties_service.context.log(); let log = duties_service.context.log();
@ -674,84 +679,69 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
&[metrics::UPDATE_ATTESTERS_FETCH], &[metrics::UPDATE_ATTESTERS_FETCH],
); );
let response = duties_service // Request duties for all uninitialized validators. If there isn't any, we will just request for
.beacon_nodes // `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to
.first_success( // determine whether validator duties need to be updated. This is to ensure that we don't
duties_service.require_synced, // request for extra data unless necessary in order to save on network bandwidth.
OfflineOnFailure::Yes, let uninitialized_validators =
|beacon_node| async move { get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let _timer = metrics::start_timer_vec( let indices_to_request = if !uninitialized_validators.is_empty() {
&metrics::DUTIES_SERVICE_TIMES, uninitialized_validators.as_slice()
&[metrics::ATTESTER_DUTIES_HTTP_POST], } else {
); &local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
beacon_node };
.post_validator_duties_attester(epoch, local_indices)
.await let response =
}, post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
) let dependent_root = response.dependent_root;
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?; // Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
let validators_to_update: Vec<_> = {
// Avoid holding the read-lock for any longer than required.
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters.get(pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
})
.collect::<Vec<_>>()
};
if validators_to_update.is_empty() {
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
return Ok(());
}
// Filter out validators which have already been requested.
let initial_duties = &response.data;
let indices_to_request = validators_to_update
.iter()
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>();
let new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
.into_iter()
.chain(response.data)
.collect::<Vec<_>>()
} else {
response.data
};
drop(fetch_timer); drop(fetch_timer);
let _store_timer = metrics::start_timer_vec( let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES, &metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE], &[metrics::UPDATE_ATTESTERS_STORE],
); );
let dependent_root = response.dependent_root;
// Filter any duties that are not relevant or already known.
let new_duties = {
// Avoid holding the read-lock for any longer than required.
let attesters = duties_service.attesters.read();
response
.data
.into_iter()
.filter(|duty| {
if duties_service.per_validator_metrics() {
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}
local_pubkeys.contains(&duty.pubkey) && {
// Only update the duties if either is true:
//
// - There were no known duties for this epoch.
// - The dependent root has changed, signalling a re-org.
attesters.get(&duty.pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
}
})
.collect::<Vec<_>>()
};
debug!( debug!(
log, log,
"Downloaded attester duties"; "Downloaded attester duties";
@ -799,6 +789,89 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
Ok(()) Ok(())
} }
/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.map_or(true, |duties| !duties.contains_key(epoch))
})
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>()
}
fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
current_slot: Slot,
) {
if duties_service.per_validator_metrics() {
let attesters = duties_service.attesters.read();
attesters.values().for_each(|attester_duties_by_epoch| {
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) {
let duty = &duty_and_proof.duty;
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());
// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}
});
}
}
async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, validator_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
/// ///
/// Duties are computed in batches each slot. If a re-org is detected then the process will /// Duties are computed in batches each slot. If a re-org is detected then the process will