Add distributed flag to VC to enable support for DVT (#4867)

* Initial flag building

* Update validator_client/src/cli.rs

Co-authored-by: Abhishek Kumar <43061995+xenowits@users.noreply.github.com>

* Merge latest unstable

* Per slot aggregates

* One slot lookahead for sync committee aggregates

* Update validator_client/src/duties_service.rs

Co-authored-by: Abhishek Kumar <43061995+xenowits@users.noreply.github.com>

* Rename selection_look_ahead

* Merge branch 'unstable' into vc-distributed

* Merge remote-tracking branch 'origin/unstable' into vc-distributed

* Update CLI text
This commit is contained in:
Age Manning 2024-02-15 23:23:58 +11:00 committed by GitHub
parent 0e819fa785
commit 49536ff103
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 172 additions and 126 deletions

View File

@ -26,6 +26,9 @@ FLAGS:
but is only safe if slashing protection is enabled on the remote signer and is implemented correctly. DO NOT
ENABLE THIS FLAG UNLESS YOU ARE CERTAIN THAT SLASHING PROTECTION IS ENABLED ON THE REMOTE SIGNER. YOU WILL
GET SLASHED IF YOU USE THIS FLAG WITHOUT ENABLING WEB3SIGNER'S SLASHING PROTECTION.
--distributed
Enables functionality required for running the validator in a distributed validator cluster.
--enable-doppelganger-protection
If this flag is set, Lighthouse will delay startup for three epochs and monitor for messages on the network
by any of the validators managed by this client. This will result in three (possibly four) epochs worth of

View File

@ -10,6 +10,7 @@ path = "src/lib.rs"
[dev-dependencies]
tokio = { workspace = true }
itertools = { workspace = true }
[dependencies]
tree_hash = { workspace = true }
@ -51,7 +52,6 @@ ring = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
lighthouse_metrics = { workspace = true }
lazy_static = { workspace = true }
itertools = { workspace = true }
monitoring_api = { workspace = true }
sensitive_url = { workspace = true }
task_executor = { workspace = true }

View File

@ -145,6 +145,12 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
future.")
.takes_value(false)
)
.arg(
Arg::with_name("distributed")
.long("distributed")
.help("Enables functionality required for running the validator in a distributed validator cluster.")
.takes_value(false)
)
/* REST API related arguments */
.arg(
Arg::with_name("http")

View File

@ -84,6 +84,8 @@ pub struct Config {
pub builder_boost_factor: Option<u64>,
/// If true, Lighthouse will prefer builder proposals, if available.
pub prefer_builder_proposals: bool,
/// Whether we are running with distributed network support.
pub distributed: bool,
pub web3_signer_keep_alive_timeout: Option<Duration>,
pub web3_signer_max_idle_connections: Option<usize>,
}
@ -130,6 +132,7 @@ impl Default for Config {
produce_block_v3: false,
builder_boost_factor: None,
prefer_builder_proposals: false,
distributed: false,
web3_signer_keep_alive_timeout: Some(Duration::from_secs(90)),
web3_signer_max_idle_connections: None,
}
@ -233,6 +236,10 @@ impl Config {
config.beacon_nodes_tls_certs = Some(tls_certs.split(',').map(PathBuf::from).collect());
}
if cli_args.is_present("distributed") {
config.distributed = true;
}
if cli_args.is_present("disable-run-on-all") {
warn!(
log,

View File

@ -6,7 +6,7 @@
//! The `DutiesService` is also responsible for sending events to the `BlockService` which trigger
//! block production.
mod sync;
pub mod sync;
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, OfflineOnFailure, RequireSynced};
use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
@ -42,6 +42,9 @@ const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
/// At start-up selection proofs will be computed with less lookahead out of necessity.
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
/// The attestation selection proof lookahead for those running with the --distributed flag.
const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1;
/// Fraction of a slot at which selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
@ -211,16 +214,21 @@ pub struct DutiesService<T, E: EthSpec> {
/// proposals for any validators which are not registered locally.
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap,
pub sync_duties: SyncDutiesMap<E>,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<ValidatorStore<T, E>>,
/// Tracks the current slot.
pub slot_clock: T,
/// Provides HTTP access to remote beacon nodes.
pub beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
pub enable_high_validator_count_metrics: bool,
/// The runtime for spawning tasks.
pub context: RuntimeContext<E>,
/// The current chain spec.
pub spec: ChainSpec,
//// Whether we permit large validator counts in the metrics.
pub enable_high_validator_count_metrics: bool,
/// If this validator is running in distributed mode.
pub distributed: bool,
}
impl<T: SlotClock + 'static, E: EthSpec> DutiesService<T, E> {
@ -997,7 +1005,13 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
continue;
};
let lookahead_slot = current_slot + SELECTION_PROOF_SLOT_LOOKAHEAD;
let selection_lookahead = if duties_service.distributed {
SELECTION_PROOF_SLOT_LOOKAHEAD_DVT
} else {
SELECTION_PROOF_SLOT_LOOKAHEAD
};
let lookahead_slot = current_slot + selection_lookahead;
let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot);
std::mem::swap(&mut relevant_duties, &mut duties_by_slot);

View File

@ -7,18 +7,18 @@ use crate::{
};
use futures::future::join_all;
use itertools::Itertools;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slog::{crit, debug, info, warn};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::Arc;
use types::{
ChainSpec, Epoch, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId,
};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
/// Number of epochs in advance to compute selection proofs.
/// Number of epochs in advance to compute selection proofs when not in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Number of slots in advance to compute selection proofs when in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;
/// Top-level data-structure containing sync duty information.
///
@ -32,9 +32,12 @@ pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// 2. One-at-a-time locking. For the innermost locks on the aggregator duties, all of the functions
/// in this file take care to only lock one validator at a time. We never hold a lock while
/// trying to obtain another one (hence no lock ordering issues).
pub struct SyncDutiesMap {
pub struct SyncDutiesMap<E: EthSpec> {
/// Map from sync committee period to duties for members of that sync committee.
committees: RwLock<HashMap<u64, CommitteeDuties>>,
/// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute.
distributed: bool,
_phantom: PhantomData<E>,
}
/// Duties for a single sync committee period.
@ -59,8 +62,8 @@ pub struct ValidatorDuties {
/// Aggregator duties for a single validator.
pub struct AggregatorDuties {
/// The epoch up to which aggregation proofs have already been computed (inclusive).
pre_compute_epoch: RwLock<Option<Epoch>>,
/// The slot up to which aggregation proofs have already been computed (inclusive).
pre_compute_slot: RwLock<Option<Slot>>,
/// Map from slot & subnet ID to proof that this validator is an aggregator.
///
/// The slot is the slot at which the signed contribution and proof should be broadcast,
@ -82,15 +85,15 @@ pub struct SlotDuties {
pub aggregators: HashMap<SyncSubnetId, Vec<(u64, PublicKeyBytes, SyncSelectionProof)>>,
}
impl Default for SyncDutiesMap {
fn default() -> Self {
impl<E: EthSpec> SyncDutiesMap<E> {
pub fn new(distributed: bool) -> Self {
Self {
committees: RwLock::new(HashMap::new()),
distributed,
_phantom: PhantomData,
}
}
}
impl SyncDutiesMap {
/// Check if duties are already known for all of the given validators for `committee_period`.
fn all_duties_known(&self, committee_period: u64, validator_indices: &[u64]) -> bool {
self.committees
@ -104,22 +107,34 @@ impl SyncDutiesMap {
})
}
/// Number of slots in advance to compute selection proofs
fn aggregation_pre_compute_slots(&self) -> u64 {
if self.distributed {
AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED
} else {
E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS
}
}
/// Prepare for pre-computation of selection proofs for `committee_period`.
///
/// Return the epoch up to which proofs should be pre-computed, as well as a vec of
/// `(previous_pre_compute_epoch, sync_duty)` pairs for all validators which need to have proofs
/// Return the slot up to which proofs should be pre-computed, as well as a vec of
/// `(previous_pre_compute_slot, sync_duty)` pairs for all validators which need to have proofs
/// computed. See `fill_in_aggregation_proofs` for the actual calculation.
fn prepare_for_aggregator_pre_compute(
&self,
committee_period: u64,
current_epoch: Epoch,
current_slot: Slot,
spec: &ChainSpec,
) -> (Epoch, Vec<(Epoch, SyncDuty)>) {
let default_start_epoch =
std::cmp::max(current_epoch, first_epoch_of_period(committee_period, spec));
let pre_compute_epoch = std::cmp::min(
current_epoch + AGGREGATION_PRE_COMPUTE_EPOCHS,
last_epoch_of_period(committee_period, spec),
) -> (Slot, Vec<(Slot, SyncDuty)>) {
let default_start_slot = std::cmp::max(
current_slot,
first_slot_of_period::<E>(committee_period, spec),
);
let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots();
let pre_compute_slot = std::cmp::min(
current_slot + pre_compute_lookahead_slots,
last_slot_of_period::<E>(committee_period, spec),
);
let pre_compute_duties = self.committees.read().get(&committee_period).map_or_else(
@ -130,18 +145,18 @@ impl SyncDutiesMap {
.values()
.filter_map(|maybe_duty| {
let duty = maybe_duty.as_ref()?;
let old_pre_compute_epoch = duty
let old_pre_compute_slot = duty
.aggregation_duties
.pre_compute_epoch
.pre_compute_slot
.write()
.replace(pre_compute_epoch);
.replace(pre_compute_slot);
match old_pre_compute_epoch {
match old_pre_compute_slot {
// No proofs pre-computed previously, compute all from the start of
// the period or the current epoch (whichever is later).
None => Some((default_start_epoch, duty.duty.clone())),
// the period or the current slot (whichever is later).
None => Some((default_start_slot, duty.duty.clone())),
// Proofs computed up to `prev`, start from the subsequent epoch.
Some(prev) if prev < pre_compute_epoch => {
Some(prev) if prev < pre_compute_slot => {
Some((prev + 1, duty.duty.clone()))
}
// Proofs already known, no need to compute.
@ -151,7 +166,7 @@ impl SyncDutiesMap {
.collect()
},
);
(pre_compute_epoch, pre_compute_duties)
(pre_compute_slot, pre_compute_duties)
}
fn get_or_create_committee_duties<'a, 'b>(
@ -176,7 +191,7 @@ impl SyncDutiesMap {
/// Get duties for all validators for the given `wall_clock_slot`.
///
/// This is the entry-point for the sync committee service.
pub fn get_duties_for_slot<E: EthSpec>(
pub fn get_duties_for_slot(
&self,
wall_clock_slot: Slot,
spec: &ChainSpec,
@ -253,7 +268,7 @@ impl ValidatorDuties {
Self {
duty,
aggregation_duties: AggregatorDuties {
pre_compute_epoch: RwLock::new(None),
pre_compute_slot: RwLock::new(None),
proofs: RwLock::new(HashMap::new()),
},
}
@ -265,12 +280,12 @@ fn epoch_offset(spec: &ChainSpec) -> u64 {
spec.epochs_per_sync_committee_period.as_u64() / 2
}
fn first_epoch_of_period(sync_committee_period: u64, spec: &ChainSpec) -> Epoch {
spec.epochs_per_sync_committee_period * sync_committee_period
fn first_slot_of_period<E: EthSpec>(sync_committee_period: u64, spec: &ChainSpec) -> Slot {
(spec.epochs_per_sync_committee_period * sync_committee_period).start_slot(E::slots_per_epoch())
}
fn last_epoch_of_period(sync_committee_period: u64, spec: &ChainSpec) -> Epoch {
first_epoch_of_period(sync_committee_period + 1, spec) - 1
fn last_slot_of_period<E: EthSpec>(sync_committee_period: u64, spec: &ChainSpec) -> Slot {
first_slot_of_period::<E>(sync_committee_period + 1, spec) - 1
}
pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
@ -278,11 +293,11 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
) -> Result<(), Error> {
let sync_duties = &duties_service.sync_duties;
let spec = &duties_service.spec;
let current_epoch = duties_service
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?
.epoch(E::slots_per_epoch());
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(E::slots_per_epoch());
// If the Altair fork is yet to be activated, do not attempt to poll for duties.
if spec
@ -330,8 +345,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
}
// Pre-compute aggregator selection proofs for the current period.
let (current_pre_compute_epoch, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(current_sync_committee_period, current_epoch, spec);
let (current_pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(current_sync_committee_period, current_slot, spec);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
@ -341,8 +356,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
sub_duties_service,
&new_pre_compute_duties,
current_sync_committee_period,
current_epoch,
current_pre_compute_epoch,
current_slot,
current_pre_compute_slot,
)
.await
},
@ -368,11 +383,14 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
}
// Pre-compute aggregator selection proofs for the next period.
if (current_epoch + AGGREGATION_PRE_COMPUTE_EPOCHS).sync_committee_period(spec)?
let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots();
if (current_slot + aggregate_pre_compute_lookahead_slots)
.epoch(E::slots_per_epoch())
.sync_committee_period(spec)?
== next_sync_committee_period
{
let (pre_compute_epoch, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(next_sync_committee_period, current_epoch, spec);
let (pre_compute_slot, new_pre_compute_duties) = sync_duties
.prepare_for_aggregator_pre_compute(next_sync_committee_period, current_slot, spec);
if !new_pre_compute_duties.is_empty() {
let sub_duties_service = duties_service.clone();
@ -382,8 +400,8 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
sub_duties_service,
&new_pre_compute_duties,
next_sync_committee_period,
current_epoch,
pre_compute_epoch,
current_slot,
pre_compute_slot,
)
.await
},
@ -495,10 +513,10 @@ pub async fn poll_sync_committee_duties_for_period<T: SlotClock + 'static, E: Et
pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
pre_compute_duties: &[(Epoch, SyncDuty)],
pre_compute_duties: &[(Slot, SyncDuty)],
sync_committee_period: u64,
current_epoch: Epoch,
pre_compute_epoch: Epoch,
current_slot: Slot,
pre_compute_slot: Slot,
) {
let log = duties_service.context.log();
@ -506,16 +524,16 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
log,
"Calculating sync selection proofs";
"period" => sync_committee_period,
"current_epoch" => current_epoch,
"pre_compute_epoch" => pre_compute_epoch
"current_slot" => current_slot,
"pre_compute_slot" => pre_compute_slot
);
// Generate selection proofs for each validator at each slot, one epoch at a time.
for epoch in (current_epoch.as_u64()..=pre_compute_epoch.as_u64()).map(Epoch::new) {
// Generate selection proofs for each validator at each slot, one slot at a time.
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
let mut validator_proofs = vec![];
for (validator_start_epoch, duty) in pre_compute_duties {
// Proofs are already known at this epoch for this validator.
if epoch < *validator_start_epoch {
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
continue;
}
@ -533,67 +551,64 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = epoch
.slot_iter(E::slots_per_epoch())
.cartesian_product(&subnet_ids)
.map(|(duty_slot, subnet_id)| async move {
// Construct proof for prior slot.
let slot = duty_slot - 1;
let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
let proof = match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, slot, *subnet_id)
.await
{
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"pubkey" => ?duty.pubkey,
"slot" => slot,
);
return None;
}
Err(e) => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => slot,
);
return None;
}
};
match proof.is_aggregator::<E>() {
Ok(true) => {
debug!(
log,
"Validator is sync aggregator";
"validator_index" => duty.validator_index,
"slot" => slot,
"subnet_id" => %subnet_id,
);
Some(((slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => slot,
"error" => ?e,
);
None
}
let proof = match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
}
});
Err(e) => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
}
};
match proof.is_aggregator::<E>() {
Ok(true) => {
debug!(
log,
"Validator is sync aggregator";
"validator_index" => duty.validator_index,
"slot" => proof_slot,
"subnet_id" => %subnet_id,
);
Some(((proof_slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
"error" => ?e,
);
None
}
}
});
// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
@ -635,7 +650,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
debug!(
log,
"Finished computing sync selection proofs";
"epoch" => epoch,
"slot" => slot,
"updated_validators" => num_validators_updated,
);
}

View File

@ -39,7 +39,7 @@ use account_utils::validator_definitions::ValidatorDefinitions;
use attestation_service::{AttestationService, AttestationServiceBuilder};
use block_service::{BlockService, BlockServiceBuilder};
use clap::ArgMatches;
use duties_service::DutiesService;
use duties_service::{sync::SyncDutiesMap, DutiesService};
use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, types::Graffiti, BeaconNodeHttpClient, StatusCode, Timeouts};
use http_api::ApiSecret;
@ -451,13 +451,14 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let duties_service = Arc::new(DutiesService {
attesters: <_>::default(),
proposers: <_>::default(),
sync_duties: <_>::default(),
sync_duties: SyncDutiesMap::new(config.distributed),
slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(),
spec: context.eth2_config.spec.clone(),
context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
distributed: config.distributed,
});
// Update the metrics server.

View File

@ -161,7 +161,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
let Some(slot_duties) = self
.duties_service
.sync_duties
.get_duties_for_slot::<E>(slot, &self.duties_service.spec)
.get_duties_for_slot(slot, &self.duties_service.spec)
else {
debug!(log, "No duties known for slot {}", slot);
return Ok(());
@ -548,7 +548,7 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
match self
.duties_service
.sync_duties
.get_duties_for_slot::<E>(duty_slot, spec)
.get_duties_for_slot(duty_slot, spec)
{
Some(duties) => subscriptions.extend(subscriptions_from_sync_duties(
duties.duties,