diff --git a/Cargo.lock b/Cargo.lock index af9d0a0ad..21fea637f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8437,6 +8437,7 @@ dependencies = [ "slashing_protection", "slog", "slot_clock", + "strum", "sysinfo", "system_health", "task_executor", diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index a3b878ad2..32866cc4f 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -1,4 +1,4 @@ -use validator_client::Config; +use validator_client::{ApiTopic, Config}; use crate::exec::CommandLineTestExec; use bls::{Keypair, PublicKeyBytes}; @@ -493,20 +493,78 @@ fn monitoring_endpoint() { assert_eq!(api_conf.update_period_secs, Some(30)); }); } -#[test] -fn disable_run_on_all_default() { - CommandLineTest::new().run().with_config(|config| { - assert!(!config.disable_run_on_all); - }); -} #[test] -fn disable_run_on_all() { +fn disable_run_on_all_flag() { CommandLineTest::new() .flag("disable-run-on-all", None) .run() .with_config(|config| { - assert!(config.disable_run_on_all); + assert_eq!(config.broadcast_topics, vec![]); + }); + // --broadcast flag takes precedence + CommandLineTest::new() + .flag("disable-run-on-all", None) + .flag("broadcast", Some("attestations")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]); + }); +} + +#[test] +fn no_broadcast_flag() { + CommandLineTest::new().run().with_config(|config| { + assert_eq!(config.broadcast_topics, vec![ApiTopic::Subscriptions]); + }); +} + +#[test] +fn broadcast_flag() { + // "none" variant + CommandLineTest::new() + .flag("broadcast", Some("none")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![]); + }); + // "none" with other values is ignored + CommandLineTest::new() + .flag("broadcast", Some("none,sync-committee")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![ApiTopic::SyncCommittee]); + }); + // Other valid variants + CommandLineTest::new() + .flag("broadcast", Some("blocks, subscriptions")) + .run() + .with_config(|config| { + assert_eq!( + config.broadcast_topics, + vec![ApiTopic::Blocks, ApiTopic::Subscriptions], + ); + }); + // Omitted "subscription" overrides default + CommandLineTest::new() + .flag("broadcast", Some("attestations")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]); + }); +} + +#[test] +#[should_panic(expected = "Unknown API topic")] +fn wrong_broadcast_flag() { + CommandLineTest::new() + .flag("broadcast", Some("foo, subscriptions")) + .run() + .with_config(|config| { + assert_eq!( + config.broadcast_topics, + vec![ApiTopic::Blocks, ApiTopic::Subscriptions], + ); }); } diff --git a/testing/node_test_rig/Cargo.toml b/testing/node_test_rig/Cargo.toml index 5fe820d15..4696d8d2f 100644 --- a/testing/node_test_rig/Cargo.toml +++ b/testing/node_test_rig/Cargo.toml @@ -11,7 +11,7 @@ types = { workspace = true } tempfile = { workspace = true } eth2 = { workspace = true } validator_client = { workspace = true } -validator_dir = { workspace = true } +validator_dir = { workspace = true, features = ["insecure_keys"] } sensitive_url = { workspace = true } execution_layer = { workspace = true } tokio = { workspace = true } diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index 3f08c8371..6c9af707f 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -21,7 +21,7 @@ pub use eth2; pub use execution_layer::test_utils::{ Config as MockServerConfig, MockExecutionConfig, MockServer, }; -pub use validator_client::Config as ValidatorConfig; +pub use validator_client::{ApiTopic, Config as ValidatorConfig}; /// The global timeout for HTTP requests to the beacon node. const HTTP_TIMEOUT: Duration = Duration::from_secs(8); diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 57c944cf1..953dcf582 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -10,7 +10,8 @@ use futures::prelude::*; use node_test_rig::environment::RuntimeContext; use node_test_rig::{ environment::{EnvironmentBuilder, LoggerConfig}, - testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles, + testing_client_config, testing_validator_config, ApiTopic, ClientConfig, ClientGenesis, + ValidatorFiles, }; use rayon::prelude::*; use sensitive_url::SensitiveUrl; @@ -159,10 +160,25 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { validator_config.fee_recipient = Some(SUGGESTED_FEE_RECIPIENT.into()); } println!("Adding validator client {}", i); - network_1 - .add_validator_client(validator_config, i, files, i % 2 == 0) - .await - .expect("should add validator"); + + // Enable broadcast on every 4th node. + if i % 4 == 0 { + validator_config.broadcast_topics = ApiTopic::all(); + let beacon_nodes = vec![i, (i + 1) % node_count]; + network_1 + .add_validator_client_with_fallbacks( + validator_config, + i, + beacon_nodes, + files, + ) + .await + } else { + network_1 + .add_validator_client(validator_config, i, files, i % 2 == 0) + .await + } + .expect("should add validator"); }, "vc", ); diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index 1024c46e4..dc8bf0d27 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -270,6 +270,48 @@ impl LocalNetwork { Ok(()) } + pub async fn add_validator_client_with_fallbacks( + &self, + mut validator_config: ValidatorConfig, + validator_index: usize, + beacon_nodes: Vec, + validator_files: ValidatorFiles, + ) -> Result<(), String> { + let context = self + .context + .service_context(format!("validator_{}", validator_index)); + let self_1 = self.clone(); + let mut beacon_node_urls = vec![]; + for beacon_node in beacon_nodes { + let socket_addr = { + let read_lock = self.beacon_nodes.read(); + let beacon_node = read_lock + .get(beacon_node) + .ok_or_else(|| format!("No beacon node for index {}", beacon_node))?; + beacon_node + .client + .http_api_listen_addr() + .expect("Must have http started") + }; + let beacon_node = SensitiveUrl::parse( + format!("http://{}:{}", socket_addr.ip(), socket_addr.port()).as_str(), + ) + .unwrap(); + beacon_node_urls.push(beacon_node); + } + + validator_config.beacon_nodes = beacon_node_urls; + + let validator_client = LocalValidatorClient::production_with_insecure_keypairs( + context, + validator_config, + validator_files, + ) + .await?; + self_1.validator_clients.write().push(validator_client); + Ok(()) + } + /// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API. pub fn remote_nodes(&self) -> Result, String> { let beacon_nodes = self.beacon_nodes.read(); diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 0b648a815..90a82b7e3 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -61,3 +61,4 @@ malloc_utils = { workspace = true } sysinfo = { workspace = true } system_health = { path = "../common/system_health" } logging = { workspace = true } +strum = { workspace = true } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index b5bb6702a..43b9d60e2 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; use crate::{ duties_service::{DutiesService, DutyAndProof}, http_metrics::metrics, @@ -433,9 +433,10 @@ impl AttestationService { // Post the attestations to the BN. match self .beacon_nodes - .first_success( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Attestations, |beacon_node| async move { let _timer = metrics::start_timer_vec( &metrics::ATTESTATION_SERVICE_TIMES, diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 3fce61a55..23458d327 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -7,6 +7,7 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; use futures::future; +use serde::{Deserialize, Serialize}; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::fmt; @@ -15,6 +16,7 @@ use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; +use strum::{EnumString, EnumVariantNames}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; @@ -330,7 +332,7 @@ impl CandidateBeaconNode { pub struct BeaconNodeFallback { candidates: Vec>, slot_clock: Option, - disable_run_on_all: bool, + broadcast_topics: Vec, spec: ChainSpec, log: Logger, } @@ -338,14 +340,14 @@ pub struct BeaconNodeFallback { impl BeaconNodeFallback { pub fn new( candidates: Vec>, - disable_run_on_all: bool, + broadcast_topics: Vec, spec: ChainSpec, log: Logger, ) -> Self { Self { candidates, slot_clock: None, - disable_run_on_all, + broadcast_topics, spec, log, } @@ -579,7 +581,7 @@ impl BeaconNodeFallback { /// It returns a list of errors along with the beacon node id that failed for `func`. /// Since this ignores the actual result of `func`, this function should only be used for beacon /// node calls whose results we do not care about, only that they completed successfully. - pub async fn run_on_all<'a, F, O, Err, R>( + pub async fn broadcast<'a, F, O, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, @@ -687,11 +689,12 @@ impl BeaconNodeFallback { } /// Call `func` on first beacon node that returns success or on all beacon nodes - /// depending on the value of `disable_run_on_all`. - pub async fn run<'a, F, Err, R>( + /// depending on the `topic` and configuration. + pub async fn request<'a, F, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, + topic: ApiTopic, func: F, ) -> Result<(), Errors> where @@ -699,13 +702,47 @@ impl BeaconNodeFallback { R: Future>, Err: Debug, { - if self.disable_run_on_all { + if self.broadcast_topics.contains(&topic) { + self.broadcast(require_synced, offline_on_failure, func) + .await + } else { self.first_success(require_synced, offline_on_failure, func) .await?; Ok(()) - } else { - self.run_on_all(require_synced, offline_on_failure, func) - .await } } } + +/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted. +#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)] +#[strum(serialize_all = "kebab-case")] +pub enum ApiTopic { + Attestations, + Blocks, + Subscriptions, + SyncCommittee, +} + +impl ApiTopic { + pub fn all() -> Vec { + use ApiTopic::*; + vec![Attestations, Blocks, Subscriptions, SyncCommittee] + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::str::FromStr; + use strum::VariantNames; + + #[test] + fn api_topic_all() { + let all = ApiTopic::all(); + assert_eq!(all.len(), ApiTopic::VARIANTS.len()); + assert!(ApiTopic::VARIANTS + .iter() + .map(|topic| ApiTopic::from_str(topic).unwrap()) + .eq(all.into_iter())); + } +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 3518bded9..fbdfa1d68 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,6 +1,6 @@ use crate::beacon_node_fallback::{Error as FallbackError, Errors}; use crate::{ - beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, + beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}, determine_graffiti, graffiti_file::GraffitiFile, OfflineOnFailure, @@ -147,35 +147,41 @@ pub struct ProposerFallback { impl ProposerFallback { // Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`. - pub async fn first_success_try_proposers_first<'a, F, O, Err, R>( + pub async fn request_proposers_first<'a, F, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, func: F, - ) -> Result> + ) -> Result<(), Errors> where F: Fn(&'a BeaconNodeHttpClient) -> R + Clone, - R: Future>, + R: Future>, Err: Debug, { // If there are proposer nodes, try calling `func` on them and return early if they are successful. if let Some(proposer_nodes) = &self.proposer_nodes { - if let Ok(result) = proposer_nodes - .first_success(require_synced, offline_on_failure, func.clone()) + if proposer_nodes + .request( + require_synced, + offline_on_failure, + ApiTopic::Blocks, + func.clone(), + ) .await + .is_ok() { - return Ok(result); + return Ok(()); } } // If the proposer nodes failed, try on the non-proposer nodes. self.beacon_nodes - .first_success(require_synced, offline_on_failure, func) + .request(require_synced, offline_on_failure, ApiTopic::Blocks, func) .await } // Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`. - pub async fn first_success_try_proposers_last<'a, F, O, Err, R>( + pub async fn request_proposers_last<'a, F, O, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, @@ -476,7 +482,7 @@ impl BlockService { // Try the proposer nodes last, since it's likely that they don't have a // great view of attestations on the network. let block_contents = proposer_fallback - .first_success_try_proposers_last( + .request_proposers_last( RequireSynced::No, OfflineOnFailure::Yes, move |beacon_node| { @@ -569,7 +575,7 @@ impl BlockService { // protect them from DoS attacks and they're most likely to successfully // publish a block. proposer_fallback - .first_success_try_proposers_first( + .request_proposers_first( RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async { diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 4ce403517..69c97dea6 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -26,15 +26,28 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) .takes_value(true), ) + // TODO remove this flag in a future release .arg( Arg::with_name("disable-run-on-all") .long("disable-run-on-all") .value_name("DISABLE_RUN_ON_ALL") - .help("By default, Lighthouse publishes attestation, sync committee subscriptions \ + .help("DEPRECATED. Use --broadcast. \ + By default, Lighthouse publishes attestation, sync committee subscriptions \ and proposer preparation messages to all beacon nodes provided in the \ `--beacon-nodes flag`. This option changes that behaviour such that these \ api calls only go out to the first available and synced beacon node") - .takes_value(false) + .takes_value(false), + ) + .arg( + Arg::with_name("broadcast") + .long("broadcast") + .value_name("API_TOPICS") + .help("Comma-separated list of beacon API topics to broadcast to all beacon nodes. \ + Possible values are: none, attestations, blocks, subscriptions, \ + sync-committee. Default (when flag is omitted) is to broadcast \ + subscriptions only." + ) + .takes_value(true), ) .arg( Arg::with_name("validators-dir") diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index c93fe6c14..26c2bccd3 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -1,3 +1,4 @@ +use crate::beacon_node_fallback::ApiTopic; use crate::graffiti_file::GraffitiFile; use crate::{http_api, http_metrics}; use clap::ArgMatches; @@ -9,7 +10,7 @@ use directory::{ use eth2::types::Graffiti; use sensitive_url::SensitiveUrl; use serde::{Deserialize, Serialize}; -use slog::{info, Logger}; +use slog::{info, warn, Logger}; use std::fs; use std::net::IpAddr; use std::path::PathBuf; @@ -73,8 +74,8 @@ pub struct Config { /// /// This is *not* recommended in prod and should only be used for testing. pub block_delay: Option, - /// Disables publishing http api requests to all beacon nodes for select api calls. - pub disable_run_on_all: bool, + /// Enables broadcasting of various requests (by topic) to all beacon nodes. + pub broadcast_topics: Vec, /// Enables a service which attempts to measure latency between the VC and BNs. pub enable_latency_measurement_service: bool, /// Defines the number of validators per `validator/register_validator` request sent to the BN. @@ -117,7 +118,7 @@ impl Default for Config { builder_proposals: false, builder_registration_timestamp_override: None, gas_limit: None, - disable_run_on_all: false, + broadcast_topics: vec![ApiTopic::Subscriptions], enable_latency_measurement_service: true, validator_registration_batch_size: 500, } @@ -179,7 +180,6 @@ impl Config { .map_err(|e| format!("Unable to parse proposer node URL: {:?}", e))?; } - config.disable_run_on_all = cli_args.is_present("disable-run-on-all"); config.disable_auto_discover = cli_args.is_present("disable-auto-discover"); config.init_slashing_protection = cli_args.is_present("init-slashing-protection"); config.use_long_timeouts = cli_args.is_present("use-long-timeouts"); @@ -222,6 +222,26 @@ impl Config { config.beacon_nodes_tls_certs = Some(tls_certs.split(',').map(PathBuf::from).collect()); } + if cli_args.is_present("disable-run-on-all") { + warn!( + log, + "The --disable-run-on-all flag is deprecated"; + "msg" => "please use --broadcast instead" + ); + config.broadcast_topics = vec![]; + } + if let Some(broadcast_topics) = cli_args.value_of("broadcast") { + config.broadcast_topics = broadcast_topics + .split(',') + .filter(|t| *t != "none") + .map(|t| { + t.trim() + .parse::() + .map_err(|_| format!("Unknown API topic to broadcast: {t}")) + }) + .collect::>()?; + } + /* * Http API server */ diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 9b9105a62..26747f811 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -8,7 +8,7 @@ mod sync; -use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, OfflineOnFailure, RequireSynced}; use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY}; use crate::{ block_service::BlockServiceNotification, @@ -700,9 +700,10 @@ async fn poll_beacon_attesters( let subscriptions_ref = &subscriptions; if let Err(e) = duties_service .beacon_nodes - .run( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Subscriptions, |beacon_node| async move { let _timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6925e285f..c020c1415 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -19,6 +19,7 @@ pub mod http_api; pub mod initialized_validators; pub mod validator_store; +pub use beacon_node_fallback::ApiTopic; pub use cli::cli_app; pub use config::Config; use initialized_validators::InitializedValidators; @@ -369,14 +370,14 @@ impl ProductionValidatorClient { let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( candidates, - config.disable_run_on_all, + config.broadcast_topics.clone(), context.eth2_config.spec.clone(), log.clone(), ); let mut proposer_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( proposer_candidates, - config.disable_run_on_all, + config.broadcast_topics.clone(), context.eth2_config.spec.clone(), log.clone(), ); diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index 2d2221680..7aabc7d5a 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}; use crate::OfflineOnFailure; use bls::PublicKeyBytes; @@ -342,9 +342,10 @@ impl PreparationService { let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes - .run( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Subscriptions, |beacon_node| async move { beacon_node .post_validator_prepare_beacon_proposer(preparation_entries) diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index 8e904e5dd..90b62cd3b 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; use crate::{ duties_service::DutiesService, validator_store::{Error as ValidatorStoreError, ValidatorStore}, @@ -299,9 +299,10 @@ impl SyncCommitteeService { .collect::>(); self.beacon_nodes - .first_success( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::SyncCommittee, |beacon_node| async move { beacon_node .post_beacon_pool_sync_committee_signatures(committee_signatures) @@ -594,9 +595,10 @@ impl SyncCommitteeService { if let Err(e) = self .beacon_nodes - .run( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Subscriptions, |beacon_node| async move { beacon_node .post_validator_sync_committee_subscriptions(subscriptions_slice)