Broadcast various requests as per #4684 (#4920)

* multiple broadcast flags

* rewrite with single --broadcast option

* satisfy cargo fmt

* shorten sync-committee-messages

* fix a doc comment and a test

* use strum

* Add broadcast test to simulator

* bring --disable-run-on-all flag back with deprecation notice
This commit is contained in:
Alexander Uvizhev 2023-11-27 07:39:37 +03:00 committed by GitHub
parent e856a904ef
commit b4556a3d62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 255 additions and 55 deletions

1
Cargo.lock generated
View File

@ -8437,6 +8437,7 @@ dependencies = [
"slashing_protection",
"slog",
"slot_clock",
"strum",
"sysinfo",
"system_health",
"task_executor",

View File

@ -1,4 +1,4 @@
use validator_client::Config;
use validator_client::{ApiTopic, Config};
use crate::exec::CommandLineTestExec;
use bls::{Keypair, PublicKeyBytes};
@ -493,20 +493,78 @@ fn monitoring_endpoint() {
assert_eq!(api_conf.update_period_secs, Some(30));
});
}
#[test]
fn disable_run_on_all_default() {
CommandLineTest::new().run().with_config(|config| {
assert!(!config.disable_run_on_all);
});
}
#[test]
fn disable_run_on_all() {
fn disable_run_on_all_flag() {
CommandLineTest::new()
.flag("disable-run-on-all", None)
.run()
.with_config(|config| {
assert!(config.disable_run_on_all);
assert_eq!(config.broadcast_topics, vec![]);
});
// --broadcast flag takes precedence
CommandLineTest::new()
.flag("disable-run-on-all", None)
.flag("broadcast", Some("attestations"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]);
});
}
#[test]
fn no_broadcast_flag() {
CommandLineTest::new().run().with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Subscriptions]);
});
}
#[test]
fn broadcast_flag() {
// "none" variant
CommandLineTest::new()
.flag("broadcast", Some("none"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![]);
});
// "none" with other values is ignored
CommandLineTest::new()
.flag("broadcast", Some("none,sync-committee"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::SyncCommittee]);
});
// Other valid variants
CommandLineTest::new()
.flag("broadcast", Some("blocks, subscriptions"))
.run()
.with_config(|config| {
assert_eq!(
config.broadcast_topics,
vec![ApiTopic::Blocks, ApiTopic::Subscriptions],
);
});
// Omitted "subscription" overrides default
CommandLineTest::new()
.flag("broadcast", Some("attestations"))
.run()
.with_config(|config| {
assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]);
});
}
#[test]
#[should_panic(expected = "Unknown API topic")]
fn wrong_broadcast_flag() {
CommandLineTest::new()
.flag("broadcast", Some("foo, subscriptions"))
.run()
.with_config(|config| {
assert_eq!(
config.broadcast_topics,
vec![ApiTopic::Blocks, ApiTopic::Subscriptions],
);
});
}

View File

@ -11,7 +11,7 @@ types = { workspace = true }
tempfile = { workspace = true }
eth2 = { workspace = true }
validator_client = { workspace = true }
validator_dir = { workspace = true }
validator_dir = { workspace = true, features = ["insecure_keys"] }
sensitive_url = { workspace = true }
execution_layer = { workspace = true }
tokio = { workspace = true }

View File

@ -21,7 +21,7 @@ pub use eth2;
pub use execution_layer::test_utils::{
Config as MockServerConfig, MockExecutionConfig, MockServer,
};
pub use validator_client::Config as ValidatorConfig;
pub use validator_client::{ApiTopic, Config as ValidatorConfig};
/// The global timeout for HTTP requests to the beacon node.
const HTTP_TIMEOUT: Duration = Duration::from_secs(8);

View File

@ -10,7 +10,8 @@ use futures::prelude::*;
use node_test_rig::environment::RuntimeContext;
use node_test_rig::{
environment::{EnvironmentBuilder, LoggerConfig},
testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles,
testing_client_config, testing_validator_config, ApiTopic, ClientConfig, ClientGenesis,
ValidatorFiles,
};
use rayon::prelude::*;
use sensitive_url::SensitiveUrl;
@ -159,9 +160,24 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
validator_config.fee_recipient = Some(SUGGESTED_FEE_RECIPIENT.into());
}
println!("Adding validator client {}", i);
// Enable broadcast on every 4th node.
if i % 4 == 0 {
validator_config.broadcast_topics = ApiTopic::all();
let beacon_nodes = vec![i, (i + 1) % node_count];
network_1
.add_validator_client_with_fallbacks(
validator_config,
i,
beacon_nodes,
files,
)
.await
} else {
network_1
.add_validator_client(validator_config, i, files, i % 2 == 0)
.await
}
.expect("should add validator");
},
"vc",

View File

@ -270,6 +270,48 @@ impl<E: EthSpec> LocalNetwork<E> {
Ok(())
}
pub async fn add_validator_client_with_fallbacks(
&self,
mut validator_config: ValidatorConfig,
validator_index: usize,
beacon_nodes: Vec<usize>,
validator_files: ValidatorFiles,
) -> Result<(), String> {
let context = self
.context
.service_context(format!("validator_{}", validator_index));
let self_1 = self.clone();
let mut beacon_node_urls = vec![];
for beacon_node in beacon_nodes {
let socket_addr = {
let read_lock = self.beacon_nodes.read();
let beacon_node = read_lock
.get(beacon_node)
.ok_or_else(|| format!("No beacon node for index {}", beacon_node))?;
beacon_node
.client
.http_api_listen_addr()
.expect("Must have http started")
};
let beacon_node = SensitiveUrl::parse(
format!("http://{}:{}", socket_addr.ip(), socket_addr.port()).as_str(),
)
.unwrap();
beacon_node_urls.push(beacon_node);
}
validator_config.beacon_nodes = beacon_node_urls;
let validator_client = LocalValidatorClient::production_with_insecure_keypairs(
context,
validator_config,
validator_files,
)
.await?;
self_1.validator_clients.write().push(validator_client);
Ok(())
}
/// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API.
pub fn remote_nodes(&self) -> Result<Vec<BeaconNodeHttpClient>, String> {
let beacon_nodes = self.beacon_nodes.read();

View File

@ -61,3 +61,4 @@ malloc_utils = { workspace = true }
sysinfo = { workspace = true }
system_health = { path = "../common/system_health" }
logging = { workspace = true }
strum = { workspace = true }

View File

@ -1,4 +1,4 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::{
duties_service::{DutiesService, DutyAndProof},
http_metrics::metrics,
@ -433,9 +433,10 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
// Post the attestations to the BN.
match self
.beacon_nodes
.first_success(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Attestations,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::ATTESTATION_SERVICE_TIMES,

View File

@ -7,6 +7,7 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::fmt;
@ -15,6 +16,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::{EnumString, EnumVariantNames};
use tokio::{sync::RwLock, time::sleep};
use types::{ChainSpec, Config, EthSpec};
@ -330,7 +332,7 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
pub struct BeaconNodeFallback<T, E> {
candidates: Vec<CandidateBeaconNode<E>>,
slot_clock: Option<T>,
disable_run_on_all: bool,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
}
@ -338,14 +340,14 @@ pub struct BeaconNodeFallback<T, E> {
impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub fn new(
candidates: Vec<CandidateBeaconNode<E>>,
disable_run_on_all: bool,
broadcast_topics: Vec<ApiTopic>,
spec: ChainSpec,
log: Logger,
) -> Self {
Self {
candidates,
slot_clock: None,
disable_run_on_all,
broadcast_topics,
spec,
log,
}
@ -579,7 +581,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// It returns a list of errors along with the beacon node id that failed for `func`.
/// Since this ignores the actual result of `func`, this function should only be used for beacon
/// node calls whose results we do not care about, only that they completed successfully.
pub async fn run_on_all<'a, F, O, Err, R>(
pub async fn broadcast<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
@ -687,11 +689,12 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
}
/// Call `func` on first beacon node that returns success or on all beacon nodes
/// depending on the value of `disable_run_on_all`.
pub async fn run<'a, F, Err, R>(
/// depending on the `topic` and configuration.
pub async fn request<'a, F, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
topic: ApiTopic,
func: F,
) -> Result<(), Errors<Err>>
where
@ -699,13 +702,47 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
if self.disable_run_on_all {
if self.broadcast_topics.contains(&topic) {
self.broadcast(require_synced, offline_on_failure, func)
.await
} else {
self.first_success(require_synced, offline_on_failure, func)
.await?;
Ok(())
} else {
self.run_on_all(require_synced, offline_on_failure, func)
.await
}
}
}
/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted.
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
pub enum ApiTopic {
Attestations,
Blocks,
Subscriptions,
SyncCommittee,
}
impl ApiTopic {
pub fn all() -> Vec<ApiTopic> {
use ApiTopic::*;
vec![Attestations, Blocks, Subscriptions, SyncCommittee]
}
}
#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
use strum::VariantNames;
#[test]
fn api_topic_all() {
let all = ApiTopic::all();
assert_eq!(all.len(), ApiTopic::VARIANTS.len());
assert!(ApiTopic::VARIANTS
.iter()
.map(|topic| ApiTopic::from_str(topic).unwrap())
.eq(all.into_iter()));
}
}

View File

@ -1,6 +1,6 @@
use crate::beacon_node_fallback::{Error as FallbackError, Errors};
use crate::{
beacon_node_fallback::{BeaconNodeFallback, RequireSynced},
beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced},
determine_graffiti,
graffiti_file::GraffitiFile,
OfflineOnFailure,
@ -147,35 +147,41 @@ pub struct ProposerFallback<T, E: EthSpec> {
impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
// Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`.
pub async fn first_success_try_proposers_first<'a, F, O, Err, R>(
pub async fn request_proposers_first<'a, F, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
func: F,
) -> Result<O, Errors<Err>>
) -> Result<(), Errors<Err>>
where
F: Fn(&'a BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
R: Future<Output = Result<(), Err>>,
Err: Debug,
{
// If there are proposer nodes, try calling `func` on them and return early if they are successful.
if let Some(proposer_nodes) = &self.proposer_nodes {
if let Ok(result) = proposer_nodes
.first_success(require_synced, offline_on_failure, func.clone())
if proposer_nodes
.request(
require_synced,
offline_on_failure,
ApiTopic::Blocks,
func.clone(),
)
.await
.is_ok()
{
return Ok(result);
return Ok(());
}
}
// If the proposer nodes failed, try on the non-proposer nodes.
self.beacon_nodes
.first_success(require_synced, offline_on_failure, func)
.request(require_synced, offline_on_failure, ApiTopic::Blocks, func)
.await
}
// Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`.
pub async fn first_success_try_proposers_last<'a, F, O, Err, R>(
pub async fn request_proposers_last<'a, F, O, Err, R>(
&'a self,
require_synced: RequireSynced,
offline_on_failure: OfflineOnFailure,
@ -476,7 +482,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// Try the proposer nodes last, since it's likely that they don't have a
// great view of attestations on the network.
let block_contents = proposer_fallback
.first_success_try_proposers_last(
.request_proposers_last(
RequireSynced::No,
OfflineOnFailure::Yes,
move |beacon_node| {
@ -569,7 +575,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
// protect them from DoS attacks and they're most likely to successfully
// publish a block.
proposer_fallback
.first_success_try_proposers_first(
.request_proposers_first(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {

View File

@ -26,15 +26,28 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
)
.takes_value(true),
)
// TODO remove this flag in a future release
.arg(
Arg::with_name("disable-run-on-all")
.long("disable-run-on-all")
.value_name("DISABLE_RUN_ON_ALL")
.help("By default, Lighthouse publishes attestation, sync committee subscriptions \
.help("DEPRECATED. Use --broadcast. \
By default, Lighthouse publishes attestation, sync committee subscriptions \
and proposer preparation messages to all beacon nodes provided in the \
`--beacon-nodes flag`. This option changes that behaviour such that these \
api calls only go out to the first available and synced beacon node")
.takes_value(false)
.takes_value(false),
)
.arg(
Arg::with_name("broadcast")
.long("broadcast")
.value_name("API_TOPICS")
.help("Comma-separated list of beacon API topics to broadcast to all beacon nodes. \
Possible values are: none, attestations, blocks, subscriptions, \
sync-committee. Default (when flag is omitted) is to broadcast \
subscriptions only."
)
.takes_value(true),
)
.arg(
Arg::with_name("validators-dir")

View File

@ -1,3 +1,4 @@
use crate::beacon_node_fallback::ApiTopic;
use crate::graffiti_file::GraffitiFile;
use crate::{http_api, http_metrics};
use clap::ArgMatches;
@ -9,7 +10,7 @@ use directory::{
use eth2::types::Graffiti;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use slog::{info, Logger};
use slog::{info, warn, Logger};
use std::fs;
use std::net::IpAddr;
use std::path::PathBuf;
@ -73,8 +74,8 @@ pub struct Config {
///
/// This is *not* recommended in prod and should only be used for testing.
pub block_delay: Option<Duration>,
/// Disables publishing http api requests to all beacon nodes for select api calls.
pub disable_run_on_all: bool,
/// Enables broadcasting of various requests (by topic) to all beacon nodes.
pub broadcast_topics: Vec<ApiTopic>,
/// Enables a service which attempts to measure latency between the VC and BNs.
pub enable_latency_measurement_service: bool,
/// Defines the number of validators per `validator/register_validator` request sent to the BN.
@ -117,7 +118,7 @@ impl Default for Config {
builder_proposals: false,
builder_registration_timestamp_override: None,
gas_limit: None,
disable_run_on_all: false,
broadcast_topics: vec![ApiTopic::Subscriptions],
enable_latency_measurement_service: true,
validator_registration_batch_size: 500,
}
@ -179,7 +180,6 @@ impl Config {
.map_err(|e| format!("Unable to parse proposer node URL: {:?}", e))?;
}
config.disable_run_on_all = cli_args.is_present("disable-run-on-all");
config.disable_auto_discover = cli_args.is_present("disable-auto-discover");
config.init_slashing_protection = cli_args.is_present("init-slashing-protection");
config.use_long_timeouts = cli_args.is_present("use-long-timeouts");
@ -222,6 +222,26 @@ impl Config {
config.beacon_nodes_tls_certs = Some(tls_certs.split(',').map(PathBuf::from).collect());
}
if cli_args.is_present("disable-run-on-all") {
warn!(
log,
"The --disable-run-on-all flag is deprecated";
"msg" => "please use --broadcast instead"
);
config.broadcast_topics = vec![];
}
if let Some(broadcast_topics) = cli_args.value_of("broadcast") {
config.broadcast_topics = broadcast_topics
.split(',')
.filter(|t| *t != "none")
.map(|t| {
t.trim()
.parse::<ApiTopic>()
.map_err(|_| format!("Unknown API topic to broadcast: {t}"))
})
.collect::<Result<_, _>>()?;
}
/*
* Http API server
*/

View File

@ -8,7 +8,7 @@
mod sync;
use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, OfflineOnFailure, RequireSynced};
use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY};
use crate::{
block_service::BlockServiceNotification,
@ -700,9 +700,10 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
let subscriptions_ref = &subscriptions;
if let Err(e) = duties_service
.beacon_nodes
.run(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Subscriptions,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,

View File

@ -19,6 +19,7 @@ pub mod http_api;
pub mod initialized_validators;
pub mod validator_store;
pub use beacon_node_fallback::ApiTopic;
pub use cli::cli_app;
pub use config::Config;
use initialized_validators::InitializedValidators;
@ -369,14 +370,14 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new(
candidates,
config.disable_run_on_all,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);
let mut proposer_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new(
proposer_candidates,
config.disable_run_on_all,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);

View File

@ -1,4 +1,4 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore};
use crate::OfflineOnFailure;
use bls::PublicKeyBytes;
@ -342,9 +342,10 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
let preparation_entries = preparation_data.as_slice();
match self
.beacon_nodes
.run(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Subscriptions,
|beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)

View File

@ -1,4 +1,4 @@
use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced};
use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced};
use crate::{
duties_service::DutiesService,
validator_store::{Error as ValidatorStoreError, ValidatorStore},
@ -299,9 +299,10 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
.collect::<Vec<_>>();
self.beacon_nodes
.first_success(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::SyncCommittee,
|beacon_node| async move {
beacon_node
.post_beacon_pool_sync_committee_signatures(committee_signatures)
@ -594,9 +595,10 @@ impl<T: SlotClock + 'static, E: EthSpec> SyncCommitteeService<T, E> {
if let Err(e) = self
.beacon_nodes
.run(
.request(
RequireSynced::No,
OfflineOnFailure::Yes,
ApiTopic::Subscriptions,
|beacon_node| async move {
beacon_node
.post_validator_sync_committee_subscriptions(subscriptions_slice)