From 6e15533b54861a7450c4b2015ecb4f52c626364a Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 5 Mar 2023 23:43:29 +0000 Subject: [PATCH 1/9] Add latency measurement service to VC (#4024) ## Issue Addressed NA ## Proposed Changes Adds a service which periodically polls (11s into each mainnet slot) the `node/version` endpoint on each BN and roughly measures the round-trip latency. The latency is exposed as a `DEBG` log and a Prometheus metric. The `--latency-measurement-service` has been added to the VC, with the following options: - `--latency-measurement-service true`: enable the service (default). - `--latency-measurement-service`: (without a value) has the same effect. - `--latency-measurement-service false`: disable the service. ## Additional Info Whilst looking at our staking setup, I think the BN+VC latency is contributing to late blocks. Now that we have to wait for the builders to respond it's nice to try and do everything we can to reduce that latency. Having visibility is the first step. --- lighthouse/tests/validator_client.rs | 25 +++++++++ validator_client/src/beacon_node_fallback.rs | 51 ++++++++++++++++- validator_client/src/cli.rs | 9 +++ validator_client/src/config.rs | 6 ++ validator_client/src/http_metrics/metrics.rs | 8 +++ validator_client/src/latency.rs | 58 ++++++++++++++++++++ validator_client/src/lib.rs | 9 +++ 7 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 validator_client/src/latency.rs diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index f0ed4f737..45cd989a4 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -476,3 +476,28 @@ fn disable_run_on_all() { assert!(config.disable_run_on_all); }); } + +#[test] +fn latency_measurement_service() { + CommandLineTest::new().run().with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", None) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("true")) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("false")) + .run() + .with_config(|config| { + assert!(!config.enable_latency_measurement_service); + }); +} diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 06ddcbaf3..668e1bcf0 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -14,7 +14,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; @@ -27,6 +27,14 @@ use types::{ChainSpec, Config, EthSpec}; /// having the correct nodes up and running prior to the start of the slot. const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1); +/// Indicates a measurement of latency between the VC and a BN. +pub struct LatencyMeasurement { + /// An identifier for the beacon node (e.g. the URL). + pub beacon_node_id: String, + /// The round-trip latency, if the BN responded successfully. + pub latency: Option, +} + /// Starts a service that will routinely try and update the status of the provided `beacon_nodes`. /// /// See `SLOT_LOOKAHEAD` for information about when this should run. @@ -394,6 +402,47 @@ impl BeaconNodeFallback { let _ = future::join_all(futures).await; } + /// Concurrently send a request to all candidates (regardless of + /// offline/online) status and attempt to collect a rough reading on the + /// latency between the VC and candidate. + pub async fn measure_latency(&self) -> Vec { + let futures: Vec<_> = self + .candidates + .iter() + .map(|candidate| async { + let beacon_node_id = candidate.beacon_node.to_string(); + // The `node/version` endpoint is used since I imagine it would + // require the least processing in the BN and therefore measure + // the connection moreso than the BNs processing speed. + // + // I imagine all clients have the version string availble as a + // pre-computed string. + let response_instant = candidate + .beacon_node + .get_node_version() + .await + .ok() + .map(|_| Instant::now()); + (beacon_node_id, response_instant) + }) + .collect(); + + let request_instant = Instant::now(); + + // Send the request to all BNs at the same time. This might involve some + // queueing on the sending host, however I hope it will avoid bias + // caused by sending requests at different times. + future::join_all(futures) + .await + .into_iter() + .map(|(beacon_node_id, response_instant)| LatencyMeasurement { + beacon_node_id, + latency: response_instant + .and_then(|response| response.checked_duration_since(request_instant)), + }) + .collect() + } + /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. /// diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 9142a0c7e..fd96aa1f5 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -318,6 +318,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { set here moves too far from the previous block's gas limit. [default: 30,000,000]") .requires("builder-proposals"), ) + .arg( + Arg::with_name("latency-measurement-service") + .long("latency-measurement-service") + .value_name("BOOLEAN") + .help("Set to 'true' to enable a service that periodically attempts to measure latency to BNs. \ + Set to 'false' to disable.") + .default_value("true") + .takes_value(true), + ) /* * Experimental/development options. */ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 0f24e81d5..724d6c74f 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -73,6 +73,8 @@ pub struct Config { pub block_delay: Option, /// Disables publishing http api requests to all beacon nodes for select api calls. pub disable_run_on_all: bool, + /// Enables a service which attempts to measure latency between the VC and BNs. + pub enable_latency_measurement_service: bool, } impl Default for Config { @@ -111,6 +113,7 @@ impl Default for Config { builder_registration_timestamp_override: None, gas_limit: None, disable_run_on_all: false, + enable_latency_measurement_service: true, } } } @@ -357,6 +360,9 @@ impl Config { ); } + config.enable_latency_measurement_service = + parse_optional(cli_args, "latency-measurement-service")?.unwrap_or(true); + /* * Experimental */ diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 0cb3417fc..9b60d0ede 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -178,6 +178,14 @@ lazy_static::lazy_static! { "Attestation duty slot for all managed validators", &["validator"] ); + /* + * BN latency + */ + pub static ref VC_BEACON_NODE_LATENCY: Result = try_create_histogram_vec( + "vc_beacon_node_latency", + "Round-trip latency for a simple API endpoint on each BN", + &["endpoint"] + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/latency.rs b/validator_client/src/latency.rs new file mode 100644 index 000000000..9ab9b630b --- /dev/null +++ b/validator_client/src/latency.rs @@ -0,0 +1,58 @@ +use crate::{http_metrics::metrics, BeaconNodeFallback}; +use environment::RuntimeContext; +use slog::debug; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::time::sleep; +use types::EthSpec; + +/// The latency service will run 11/12ths of the way through the slot. +pub const SLOT_DELAY_MULTIPLIER: u32 = 11; +pub const SLOT_DELAY_DENOMINATOR: u32 = 12; + +/// Starts a service that periodically checks the latency between the VC and the +/// candidate BNs. +pub fn start_latency_service( + context: RuntimeContext, + slot_clock: T, + beacon_nodes: Arc>, +) { + let log = context.log().clone(); + + let future = async move { + loop { + let sleep_time = slot_clock + .duration_to_next_slot() + .map(|next_slot| { + // This is 11/12ths through the next slot. On mainnet this + // will happen in the 11th second of each slot, one second + // before the next slot. + next_slot + (next_slot / SLOT_DELAY_DENOMINATOR) * SLOT_DELAY_MULTIPLIER + }) + // If we can't read the slot clock, just wait one slot. Running + // the measurement at a non-exact time is not a big issue. + .unwrap_or_else(|| slot_clock.slot_duration()); + + // Sleep until it's time to perform the measurement. + sleep(sleep_time).await; + + for measurement in beacon_nodes.measure_latency().await { + if let Some(latency) = measurement.latency { + debug!( + log, + "Measured BN latency"; + "node" => &measurement.beacon_node_id, + "latency" => latency.as_millis(), + ); + metrics::observe_timer_vec( + &metrics::VC_BEACON_NODE_LATENCY, + &[&measurement.beacon_node_id], + latency, + ) + } + } + } + }; + + context.executor.spawn(future, "latency"); +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f2d647490..82cacccc6 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -8,6 +8,7 @@ mod duties_service; mod graffiti_file; mod http_metrics; mod key_cache; +mod latency; mod notifier; mod preparation_service; mod signing_method; @@ -563,6 +564,14 @@ impl ProductionValidatorClient { None }; + if self.config.enable_latency_measurement_service { + latency::start_latency_service( + self.context.clone(), + self.duties_service.slot_clock.clone(), + self.duties_service.beacon_nodes.clone(), + ); + } + Ok(()) } } From 01556f6f01655d77e755340566a64616e4a0d9a8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 5 Mar 2023 23:43:30 +0000 Subject: [PATCH 2/9] Optimise payload attributes calculation and add SSE (#4027) ## Issue Addressed Closes #3896 Closes #3998 Closes #3700 ## Proposed Changes - Optimise the calculation of withdrawals for payload attributes by avoiding state clones, avoiding unnecessary state advances and reading from the snapshot cache if possible. - Use the execution layer's payload attributes cache to avoid re-calculating payload attributes. I actually implemented a new LRU cache just for withdrawals but it had the exact same key and most of the same data as the existing payload attributes cache, so I deleted it. - Add a new SSE event that fires when payloadAttributes are calculated. This is useful for block builders, a la https://github.com/ethereum/beacon-APIs/issues/244. - Add a new CLI flag `--always-prepare-payload` which forces payload attributes to be sent with every fcU regardless of connected proposers. This is intended for use by builders/relays. For maximum effect, the flags I've been using to run Lighthouse in "payload builder mode" are: ``` --always-prepare-payload \ --prepare-payload-lookahead 12000 \ --suggested-fee-recipient 0x0000000000000000000000000000000000000000 ``` The fee recipient is required so Lighthouse has something to pack in the payload attributes (it can be ignored by the builder). The lookahead causes fcU to be sent at the start of every slot rather than at 8s. As usual, fcU will also be sent after each change of head block. I think this combination is sufficient for builders to build on all viable heads. Often there will be two fcU (and two payload attributes) sent for the same slot: one sent at the start of the slot with the head from `n - 1` as the parent, and one sent after the block arrives with `n` as the parent. Example usage of the new event stream: ```bash curl -N "http://localhost:5052/eth/v1/events?topics=payload_attributes" ``` ## Additional Info - [x] Tests added by updating the proposer re-org tests. This has the benefit of testing the proposer re-org code paths with withdrawals too, confirming that the new changes don't interact poorly. - [ ] Benchmarking with `blockdreamer` on devnet-7 showed promising results but I'm yet to do a comparison to `unstable`. Co-authored-by: Michael Sproul --- beacon_node/beacon_chain/src/beacon_chain.rs | 193 +++++++++++++----- beacon_node/beacon_chain/src/chain_config.rs | 5 + beacon_node/beacon_chain/src/events.rs | 80 ++++++-- beacon_node/beacon_chain/src/test_utils.rs | 120 ++++++++++- beacon_node/execution_layer/src/engine_api.rs | 28 +++ beacon_node/http_api/src/lib.rs | 3 + .../http_api/tests/interactive_tests.rs | 94 +++++++-- beacon_node/src/cli.rs | 9 + beacon_node/src/config.rs | 2 + common/eth2/src/types.rs | 80 ++++++++ lighthouse/tests/beacon_node.rs | 15 ++ 11 files changed, 539 insertions(+), 90 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6d9d290bd..9ba8f24cb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -57,7 +57,7 @@ use crate::validator_monitor::{ }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead}; -use eth2::types::{EventKind, SseBlock, SyncDuty}; +use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty}; use execution_layer::{ BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -89,6 +89,7 @@ use state_processing::{ state_advance::{complete_state_advance, partial_state_advance}, BlockSignatureStrategy, ConsensusContext, SigVerifiedOp, VerifyBlockRoot, VerifyOperation, }; +use std::borrow::Cow; use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; @@ -3878,6 +3879,75 @@ impl BeaconChain { })) } + pub fn get_expected_withdrawals( + &self, + forkchoice_update_params: &ForkchoiceUpdateParameters, + proposal_slot: Slot, + ) -> Result, Error> { + let cached_head = self.canonical_head.cached_head(); + let head_state = &cached_head.snapshot.beacon_state; + + let parent_block_root = forkchoice_update_params.head_root; + + let (unadvanced_state, unadvanced_state_root) = + if cached_head.head_block_root() == parent_block_root { + (Cow::Borrowed(head_state), cached_head.head_state_root()) + } else if let Some(snapshot) = self + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .ok_or(Error::SnapshotCacheLockTimeout)? + .get_cloned(parent_block_root, CloneConfig::none()) + { + debug!( + self.log, + "Hit snapshot cache during withdrawals calculation"; + "slot" => proposal_slot, + "parent_block_root" => ?parent_block_root, + ); + let state_root = snapshot.beacon_state_root(); + (Cow::Owned(snapshot.beacon_state), state_root) + } else { + info!( + self.log, + "Missed snapshot cache during withdrawals calculation"; + "slot" => proposal_slot, + "parent_block_root" => ?parent_block_root + ); + let block = self + .get_blinded_block(&parent_block_root)? + .ok_or(Error::MissingBeaconBlock(parent_block_root))?; + let state = self + .get_state(&block.state_root(), Some(block.slot()))? + .ok_or(Error::MissingBeaconState(block.state_root()))?; + (Cow::Owned(state), block.state_root()) + }; + + // Parent state epoch is the same as the proposal, we don't need to advance because the + // list of expected withdrawals can only change after an epoch advance or a + // block application. + let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch()); + if head_state.current_epoch() == proposal_epoch { + return get_expected_withdrawals(&unadvanced_state, &self.spec) + .map_err(Error::PrepareProposerFailed); + } + + // Advance the state using the partial method. + debug!( + self.log, + "Advancing state for withdrawals calculation"; + "proposal_slot" => proposal_slot, + "parent_block_root" => ?parent_block_root, + ); + let mut advanced_state = unadvanced_state.into_owned(); + partial_state_advance( + &mut advanced_state, + Some(unadvanced_state_root), + proposal_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + get_expected_withdrawals(&advanced_state, &self.spec).map_err(Error::PrepareProposerFailed) + } + /// Determine whether a fork choice update to the execution layer should be overridden. /// /// This is *only* necessary when proposer re-orgs are enabled, because we have to prevent the @@ -4664,7 +4734,9 @@ impl BeaconChain { // Nothing to do if there are no proposers registered with the EL, exit early to avoid // wasting cycles. - if !execution_layer.has_any_proposer_preparation_data().await { + if !self.config.always_prepare_payload + && !execution_layer.has_any_proposer_preparation_data().await + { return Ok(()); } @@ -4721,64 +4793,60 @@ impl BeaconChain { // If the execution layer doesn't have any proposer data for this validator then we assume // it's not connected to this BN and no action is required. let proposer = pre_payload_attributes.proposer_index; - if !execution_layer - .has_proposer_preparation_data(proposer) - .await + if !self.config.always_prepare_payload + && !execution_layer + .has_proposer_preparation_data(proposer) + .await { return Ok(()); } - let withdrawals = match self.spec.fork_name_at_slot::(prepare_slot) { - ForkName::Base | ForkName::Altair | ForkName::Merge => None, - ForkName::Capella => { - // We must use the advanced state because balances can change at epoch boundaries - // and balances affect withdrawals. - // FIXME(mark) - // Might implement caching here in the future.. - let prepare_state = self - .state_at_slot(prepare_slot, StateSkipConfig::WithoutStateRoots) - .map_err(|e| { - error!(self.log, "State advance for withdrawals failed"; "error" => ?e); - e - })?; - Some(get_expected_withdrawals(&prepare_state, &self.spec)) - } - } - .transpose() - .map_err(|e| { - error!(self.log, "Error preparing beacon proposer"; "error" => ?e); - e - }) - .map(|withdrawals_opt| withdrawals_opt.map(|w| w.into())) - .map_err(Error::PrepareProposerFailed)?; - + // Fetch payoad attributes from the execution layer's cache, or compute them from scratch + // if no matching entry is found. This saves recomputing the withdrawals which can take + // considerable time to compute if a state load is required. let head_root = forkchoice_update_params.head_root; - let payload_attributes = PayloadAttributes::new( - self.slot_clock - .start_of(prepare_slot) - .ok_or(Error::InvalidSlot(prepare_slot))? - .as_secs(), - pre_payload_attributes.prev_randao, - execution_layer.get_suggested_fee_recipient(proposer).await, - withdrawals, - ); + let payload_attributes = if let Some(payload_attributes) = execution_layer + .payload_attributes(prepare_slot, head_root) + .await + { + payload_attributes + } else { + let withdrawals = match self.spec.fork_name_at_slot::(prepare_slot) { + ForkName::Base | ForkName::Altair | ForkName::Merge => None, + ForkName::Capella => { + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + chain.get_expected_withdrawals(&forkchoice_update_params, prepare_slot) + }, + "prepare_beacon_proposer_withdrawals", + ) + .await? + .map(Some)? + } + }; - debug!( - self.log, - "Preparing beacon proposer"; - "payload_attributes" => ?payload_attributes, - "prepare_slot" => prepare_slot, - "validator" => proposer, - "parent_root" => ?head_root, - ); + let payload_attributes = PayloadAttributes::new( + self.slot_clock + .start_of(prepare_slot) + .ok_or(Error::InvalidSlot(prepare_slot))? + .as_secs(), + pre_payload_attributes.prev_randao, + execution_layer.get_suggested_fee_recipient(proposer).await, + withdrawals.map(Into::into), + ); - let already_known = execution_layer - .insert_proposer(prepare_slot, head_root, proposer, payload_attributes) - .await; + execution_layer + .insert_proposer( + prepare_slot, + head_root, + proposer, + payload_attributes.clone(), + ) + .await; - // Only push a log to the user if this is the first time we've seen this proposer for this - // slot. - if !already_known { + // Only push a log to the user if this is the first time we've seen this proposer for + // this slot. info!( self.log, "Prepared beacon proposer"; @@ -4786,6 +4854,23 @@ impl BeaconChain { "validator" => proposer, "parent_root" => ?head_root, ); + payload_attributes + }; + + // Push a server-sent event (probably to a block builder or relay). + if let Some(event_handler) = &self.event_handler { + if event_handler.has_payload_attributes_subscribers() { + event_handler.register(EventKind::PayloadAttributes(ForkVersionedResponse { + data: SseExtendedPayloadAttributes { + proposal_slot: prepare_slot, + proposer_index: proposer, + parent_block_root: head_root, + parent_block_hash: forkchoice_update_params.head_hash.unwrap_or_default(), + payload_attributes: payload_attributes.into(), + }, + version: Some(self.spec.fork_name_at_slot::(prepare_slot)), + })); + } } let till_prepare_slot = @@ -4808,7 +4893,9 @@ impl BeaconChain { // If we are close enough to the proposal slot, send an fcU, which will have payload // attributes filled in by the execution layer cache we just primed. - if till_prepare_slot <= self.config.prepare_payload_lookahead { + if self.config.always_prepare_payload + || till_prepare_slot <= self.config.prepare_payload_lookahead + { debug!( self.log, "Sending forkchoiceUpdate for proposer prep"; diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 2051a6236..6e3538aed 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -67,6 +67,10 @@ pub struct ChainConfig { pub prepare_payload_lookahead: Duration, /// Use EL-free optimistic sync for the finalized part of the chain. pub optimistic_finalized_sync: bool, + /// Whether to send payload attributes every slot, regardless of connected proposers. + /// + /// This is useful for block builders and testing. + pub always_prepare_payload: bool, } impl Default for ChainConfig { @@ -93,6 +97,7 @@ impl Default for ChainConfig { prepare_payload_lookahead: Duration::from_secs(4), // This value isn't actually read except in tests. optimistic_finalized_sync: true, + always_prepare_payload: false, } } } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 6f4415ef4..b3fa6627f 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -14,6 +14,7 @@ pub struct ServerSentEventHandler { exit_tx: Sender>, chain_reorg_tx: Sender>, contribution_tx: Sender>, + payload_attributes_tx: Sender>, late_head: Sender>, block_reward_tx: Sender>, log: Logger, @@ -32,6 +33,7 @@ impl ServerSentEventHandler { let (exit_tx, _) = broadcast::channel(capacity); let (chain_reorg_tx, _) = broadcast::channel(capacity); let (contribution_tx, _) = broadcast::channel(capacity); + let (payload_attributes_tx, _) = broadcast::channel(capacity); let (late_head, _) = broadcast::channel(capacity); let (block_reward_tx, _) = broadcast::channel(capacity); @@ -43,6 +45,7 @@ impl ServerSentEventHandler { exit_tx, chain_reorg_tx, contribution_tx, + payload_attributes_tx, late_head, block_reward_tx, log, @@ -50,28 +53,55 @@ impl ServerSentEventHandler { } pub fn register(&self, kind: EventKind) { - let result = match kind { - EventKind::Attestation(attestation) => self + let log_count = |name, count| { + trace!( + self.log, + "Registering server-sent event"; + "kind" => name, + "receiver_count" => count + ); + }; + let result = match &kind { + EventKind::Attestation(_) => self .attestation_tx - .send(EventKind::Attestation(attestation)) - .map(|count| trace!(self.log, "Registering server-sent attestation event"; "receiver_count" => count)), - EventKind::Block(block) => self.block_tx.send(EventKind::Block(block)) - .map(|count| trace!(self.log, "Registering server-sent block event"; "receiver_count" => count)), - EventKind::FinalizedCheckpoint(checkpoint) => self.finalized_tx - .send(EventKind::FinalizedCheckpoint(checkpoint)) - .map(|count| trace!(self.log, "Registering server-sent finalized checkpoint event"; "receiver_count" => count)), - EventKind::Head(head) => self.head_tx.send(EventKind::Head(head)) - .map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)), - EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit)) - .map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)), - EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg)) - .map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)), - EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof)) - .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), - EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head)) - .map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)), - EventKind::BlockReward(block_reward) => self.block_reward_tx.send(EventKind::BlockReward(block_reward)) - .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), + .send(kind) + .map(|count| log_count(count, "attestation")), + EventKind::Block(_) => self + .block_tx + .send(kind) + .map(|count| log_count(count, "block")), + EventKind::FinalizedCheckpoint(_) => self + .finalized_tx + .send(kind) + .map(|count| log_count(count, "finalized checkpoint")), + EventKind::Head(_) => self + .head_tx + .send(kind) + .map(|count| log_count(count, "head")), + EventKind::VoluntaryExit(_) => self + .exit_tx + .send(kind) + .map(|count| log_count(count, "exit")), + EventKind::ChainReorg(_) => self + .chain_reorg_tx + .send(kind) + .map(|count| log_count(count, "chain reorg")), + EventKind::ContributionAndProof(_) => self + .contribution_tx + .send(kind) + .map(|count| log_count(count, "contribution and proof")), + EventKind::PayloadAttributes(_) => self + .payload_attributes_tx + .send(kind) + .map(|count| log_count(count, "payload attributes")), + EventKind::LateHead(_) => self + .late_head + .send(kind) + .map(|count| log_count(count, "late head")), + EventKind::BlockReward(_) => self + .block_reward_tx + .send(kind) + .map(|count| log_count(count, "block reward")), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -106,6 +136,10 @@ impl ServerSentEventHandler { self.contribution_tx.subscribe() } + pub fn subscribe_payload_attributes(&self) -> Receiver> { + self.payload_attributes_tx.subscribe() + } + pub fn subscribe_late_head(&self) -> Receiver> { self.late_head.subscribe() } @@ -142,6 +176,10 @@ impl ServerSentEventHandler { self.contribution_tx.receiver_count() > 0 } + pub fn has_payload_attributes_subscribers(&self) -> bool { + self.payload_attributes_tx.receiver_count() > 0 + } + pub fn has_late_head_subscribers(&self) -> bool { self.late_head.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index d060bcb77..afb31bba7 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -108,6 +108,14 @@ pub enum AttestationStrategy { SomeValidators(Vec), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncCommitteeStrategy { + /// All sync committee validators sign. + AllValidators, + /// No validators sign. + NoValidators, +} + /// Indicates whether the `BeaconChainHarness` should use the `state.current_sync_committee` or /// `state.next_sync_committee` when creating sync messages or contributions. #[derive(Clone, Debug)] @@ -1752,15 +1760,64 @@ where self.process_attestations(attestations); } + pub fn sync_committee_sign_block( + &self, + state: &BeaconState, + block_hash: Hash256, + slot: Slot, + relative_sync_committee: RelativeSyncCommittee, + ) { + let sync_contributions = + self.make_sync_contributions(state, block_hash, slot, relative_sync_committee); + self.process_sync_contributions(sync_contributions).unwrap() + } + pub async fn add_attested_block_at_slot( &self, slot: Slot, state: BeaconState, state_root: Hash256, validators: &[usize], + ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { + self.add_attested_block_at_slot_with_sync( + slot, + state, + state_root, + validators, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn add_attested_block_at_slot_with_sync( + &self, + slot: Slot, + state: BeaconState, + state_root: Hash256, + validators: &[usize], + sync_committee_strategy: SyncCommitteeStrategy, ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { let (block_hash, block, state) = self.add_block_at_slot(slot, state).await?; self.attest_block(&state, state_root, block_hash, &block, validators); + + if sync_committee_strategy == SyncCommitteeStrategy::AllValidators + && state.current_sync_committee().is_ok() + { + self.sync_committee_sign_block( + &state, + block_hash.into(), + slot, + if (slot + 1).epoch(E::slots_per_epoch()) + % self.spec.epochs_per_sync_committee_period + == 0 + { + RelativeSyncCommittee::Next + } else { + RelativeSyncCommittee::Current + }, + ); + } + Ok((block_hash, state)) } @@ -1770,10 +1827,35 @@ where state_root: Hash256, slots: &[Slot], validators: &[usize], + ) -> AddBlocksResult { + self.add_attested_blocks_at_slots_with_sync( + state, + state_root, + slots, + validators, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn add_attested_blocks_at_slots_with_sync( + &self, + state: BeaconState, + state_root: Hash256, + slots: &[Slot], + validators: &[usize], + sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { assert!(!slots.is_empty()); - self.add_attested_blocks_at_slots_given_lbh(state, state_root, slots, validators, None) - .await + self.add_attested_blocks_at_slots_given_lbh( + state, + state_root, + slots, + validators, + None, + sync_committee_strategy, + ) + .await } async fn add_attested_blocks_at_slots_given_lbh( @@ -1783,6 +1865,7 @@ where slots: &[Slot], validators: &[usize], mut latest_block_hash: Option, + sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { assert!( slots.windows(2).all(|w| w[0] <= w[1]), @@ -1792,7 +1875,13 @@ where let mut state_hash_from_slot: HashMap = HashMap::new(); for slot in slots { let (block_hash, new_state) = self - .add_attested_block_at_slot(*slot, state, state_root, validators) + .add_attested_block_at_slot_with_sync( + *slot, + state, + state_root, + validators, + sync_committee_strategy, + ) .await .unwrap(); state = new_state; @@ -1874,6 +1963,7 @@ where &epoch_slots, &validators, Some(head_block), + SyncCommitteeStrategy::NoValidators, // for backwards compat ) .await; @@ -1990,6 +2080,22 @@ where num_blocks: usize, block_strategy: BlockStrategy, attestation_strategy: AttestationStrategy, + ) -> Hash256 { + self.extend_chain_with_sync( + num_blocks, + block_strategy, + attestation_strategy, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn extend_chain_with_sync( + &self, + num_blocks: usize, + block_strategy: BlockStrategy, + attestation_strategy: AttestationStrategy, + sync_committee_strategy: SyncCommitteeStrategy, ) -> Hash256 { let (mut state, slots) = match block_strategy { BlockStrategy::OnCanonicalHead => { @@ -2021,7 +2127,13 @@ where }; let state_root = state.update_tree_hash_cache().unwrap(); let (_, _, last_produced_block_hash, _) = self - .add_attested_blocks_at_slots(state, state_root, &slots, &validators) + .add_attested_blocks_at_slots_with_sync( + state, + state_root, + &slots, + &validators, + sync_committee_strategy, + ) .await; last_produced_block_hash.into() } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index ec581ea65..38311b823 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -4,6 +4,7 @@ use crate::http::{ ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, }; +use eth2::types::{SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2}; pub use ethers_core::types::Transaction; use ethers_core::utils::rlp::{self, Decodable, Rlp}; use http::deposit_methods::RpcError; @@ -269,6 +270,33 @@ impl PayloadAttributes { } } +impl From for SsePayloadAttributes { + fn from(pa: PayloadAttributes) -> Self { + match pa { + PayloadAttributes::V1(PayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + }) => Self::V1(SsePayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + }), + PayloadAttributes::V2(PayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + }) => Self::V2(SsePayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + }), + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct ForkchoiceUpdatedResponse { pub payload_status: PayloadStatusV1, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bd00bfad1..6cca673b8 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3520,6 +3520,9 @@ pub fn serve( api_types::EventTopic::ContributionAndProof => { event_handler.subscribe_contributions() } + api_types::EventTopic::PayloadAttributes => { + event_handler.subscribe_payload_attributes() + } api_types::EventTopic::LateHead => { event_handler.subscribe_late_head() } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 7096fac42..7db1b22d6 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -2,13 +2,15 @@ use crate::common::*; use beacon_chain::{ chain_config::ReOrgThreshold, - test_utils::{AttestationStrategy, BlockStrategy}, + test_utils::{AttestationStrategy, BlockStrategy, SyncCommitteeStrategy}, }; use eth2::types::DepositContractData; use execution_layer::{ForkchoiceState, PayloadAttributes}; use parking_lot::Mutex; use slot_clock::SlotClock; -use state_processing::state_advance::complete_state_advance; +use state_processing::{ + per_block_processing::get_expected_withdrawals, state_advance::complete_state_advance, +}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -106,13 +108,15 @@ pub struct ReOrgTest { percent_head_votes: usize, should_re_org: bool, misprediction: bool, + /// Whether to expect withdrawals to change on epoch boundaries. + expect_withdrawals_change_on_epoch: bool, } impl Default for ReOrgTest { /// Default config represents a regular easy re-org. fn default() -> Self { Self { - head_slot: Slot::new(30), + head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 1, head_distance: 1, re_org_threshold: 20, @@ -122,6 +126,7 @@ impl Default for ReOrgTest { percent_head_votes: 0, should_re_org: true, misprediction: false, + expect_withdrawals_change_on_epoch: false, } } } @@ -136,13 +141,40 @@ pub async fn proposer_boost_re_org_zero_weight() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_epoch_boundary() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(31), + head_slot: Slot::new(E::slots_per_epoch() - 1), should_re_org: false, ..Default::default() }) .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary_skip1() { + // Proposing a block on a boundary after a skip will change the set of expected withdrawals + // sent in the payload attributes. + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(2 * E::slots_per_epoch() - 2), + head_distance: 2, + should_re_org: false, + expect_withdrawals_change_on_epoch: true, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary_skip32() { + // Propose a block at 64 after a whole epoch of skipped slots. + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(E::slots_per_epoch() - 1), + head_distance: E::slots_per_epoch() + 1, + should_re_org: false, + expect_withdrawals_change_on_epoch: true, + ..Default::default() + }) + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_slot_after_epoch_boundary() { proposer_boost_re_org_test(ReOrgTest { @@ -187,7 +219,7 @@ pub async fn proposer_boost_re_org_finality() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_parent_distance() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(30), + head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 2, should_re_org: false, ..Default::default() @@ -198,7 +230,7 @@ pub async fn proposer_boost_re_org_parent_distance() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_head_distance() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(29), + head_slot: Slot::new(E::slots_per_epoch() - 3), head_distance: 2, should_re_org: false, ..Default::default() @@ -209,7 +241,7 @@ pub async fn proposer_boost_re_org_head_distance() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_very_unhealthy() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(31), + head_slot: Slot::new(E::slots_per_epoch() - 1), parent_distance: 2, head_distance: 2, percent_parent_votes: 10, @@ -225,7 +257,6 @@ pub async fn proposer_boost_re_org_very_unhealthy() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_weight_misprediction() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(30), percent_empty_votes: 70, percent_head_votes: 30, should_re_org: false, @@ -254,12 +285,13 @@ pub async fn proposer_boost_re_org_test( percent_head_votes, should_re_org, misprediction, + expect_withdrawals_change_on_epoch, }: ReOrgTest, ) { assert!(head_slot > 0); - // We require a network with execution enabled so we can check EL message timings. - let mut spec = ForkName::Merge.make_genesis_spec(E::default_spec()); + // Test using Capella so that we simulate conditions as similar to mainnet as possible. + let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec()); spec.terminal_total_difficulty = 1.into(); // Ensure there are enough validators to have `attesters_per_slot`. @@ -323,13 +355,15 @@ pub async fn proposer_boost_re_org_test( ) .await; - // Create some chain depth. + // Create some chain depth. Sign sync committee signatures so validator balances don't dip + // below 32 ETH and become ineligible for withdrawals. harness.advance_slot(); harness - .extend_chain( + .extend_chain_with_sync( num_initial as usize, BlockStrategy::OnCanonicalHead, AttestationStrategy::AllValidators, + SyncCommitteeStrategy::AllValidators, ) .await; @@ -364,6 +398,16 @@ pub async fn proposer_boost_re_org_test( let slot_b = slot_a + parent_distance; let slot_c = slot_b + head_distance; + // We need to transition to at least epoch 2 in order to trigger + // `process_rewards_and_penalties`. This allows us to test withdrawals changes at epoch + // boundaries. + if expect_withdrawals_change_on_epoch { + assert!( + slot_c.epoch(E::slots_per_epoch()) >= 2, + "for withdrawals to change, test must end at an epoch >= 2" + ); + } + harness.advance_slot(); let (block_a_root, block_a, state_a) = harness .add_block_at_slot(slot_a, harness.get_current_state()) @@ -457,6 +501,10 @@ pub async fn proposer_boost_re_org_test( // Produce block C. // Advance state_b so we can get the proposer. + assert_eq!(state_b.slot(), slot_b); + let pre_advance_withdrawals = get_expected_withdrawals(&state_b, &harness.chain.spec) + .unwrap() + .to_vec(); complete_state_advance(&mut state_b, None, slot_c, &harness.chain.spec).unwrap(); let proposer_index = state_b @@ -514,6 +562,28 @@ pub async fn proposer_boost_re_org_test( .unwrap(); let payload_attribs = first_update.payload_attributes.as_ref().unwrap(); + // Check that withdrawals from the payload attributes match those computed from the parent's + // advanced state. + let expected_withdrawals = if should_re_org { + let mut state_a_advanced = state_a.clone(); + complete_state_advance(&mut state_a_advanced, None, slot_c, &harness.chain.spec).unwrap(); + get_expected_withdrawals(&state_a_advanced, &harness.chain.spec) + } else { + get_expected_withdrawals(&state_b, &harness.chain.spec) + } + .unwrap() + .to_vec(); + let payload_attribs_withdrawals = payload_attribs.withdrawals().unwrap(); + assert_eq!(expected_withdrawals, *payload_attribs_withdrawals); + assert!(!expected_withdrawals.is_empty()); + + if should_re_org + || expect_withdrawals_change_on_epoch + && slot_c.epoch(E::slots_per_epoch()) != slot_b.epoch(E::slots_per_epoch()) + { + assert_ne!(expected_withdrawals, pre_advance_withdrawals); + } + let lookahead = slot_clock .start_of(slot_c) .unwrap() diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index b4da83315..bc2e705cb 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -819,6 +819,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { for ensuring the EL is given ample notice. Default: 1/3 of a slot.") .takes_value(true) ) + .arg( + Arg::with_name("always-prepare-payload") + .long("always-prepare-payload") + .help("Send payload attributes with every fork choice update. This is intended for \ + use by block builders, relays and developers. You should set a fee \ + recipient on this BN and also consider adjusting the \ + --prepare-payload-lookahead flag.") + .takes_value(false) + ) .arg( Arg::with_name("fork-choice-before-proposal-timeout") .long("fork-choice-before-proposal-timeout") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 726f8368e..fa0344e95 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -711,6 +711,8 @@ pub fn get_config( / DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR }); + client_config.chain.always_prepare_payload = cli_args.is_present("always-prepare-payload"); + if let Some(timeout) = clap_utils::parse_optional(cli_args, "fork-choice-before-proposal-timeout")? { diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 620276f3d..b4218c361 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -897,6 +897,76 @@ pub struct SseLateHead { pub execution_optimistic: bool, } +#[superstruct( + variants(V1, V2), + variant_attributes(derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)) +)] +#[derive(Clone, Debug, Eq, Hash, PartialEq, Deserialize, Serialize)] +#[serde(untagged)] +pub struct SsePayloadAttributes { + #[superstruct(getter(copy))] + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub timestamp: u64, + #[superstruct(getter(copy))] + pub prev_randao: Hash256, + #[superstruct(getter(copy))] + pub suggested_fee_recipient: Address, + #[superstruct(only(V2))] + pub withdrawals: Vec, +} + +#[derive(PartialEq, Debug, Deserialize, Serialize, Clone)] +pub struct SseExtendedPayloadAttributesGeneric { + pub proposal_slot: Slot, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub proposer_index: u64, + pub parent_block_root: Hash256, + pub parent_block_hash: ExecutionBlockHash, + pub payload_attributes: T, +} + +pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric; +pub type VersionedSsePayloadAttributes = ForkVersionedResponse; + +impl ForkVersionDeserialize for SsePayloadAttributes { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Merge => serde_json::from_value(value) + .map(Self::V1) + .map_err(serde::de::Error::custom), + ForkName::Capella => serde_json::from_value(value) + .map(Self::V2) + .map_err(serde::de::Error::custom), + ForkName::Base | ForkName::Altair => Err(serde::de::Error::custom(format!( + "SsePayloadAttributes deserialization for {fork_name} not implemented" + ))), + } + } +} + +impl ForkVersionDeserialize for SseExtendedPayloadAttributes { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + let helper: SseExtendedPayloadAttributesGeneric = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Self { + proposal_slot: helper.proposal_slot, + proposer_index: helper.proposer_index, + parent_block_root: helper.parent_block_root, + parent_block_hash: helper.parent_block_hash, + payload_attributes: SsePayloadAttributes::deserialize_by_fork::( + helper.payload_attributes, + fork_name, + )?, + }) + } +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "T: EthSpec", untagged)] pub enum EventKind { @@ -910,6 +980,7 @@ pub enum EventKind { LateHead(SseLateHead), #[cfg(feature = "lighthouse")] BlockReward(BlockReward), + PayloadAttributes(VersionedSsePayloadAttributes), } impl EventKind { @@ -922,6 +993,7 @@ impl EventKind { EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", EventKind::ContributionAndProof(_) => "contribution_and_proof", + EventKind::PayloadAttributes(_) => "payload_attributes", EventKind::LateHead(_) => "late_head", #[cfg(feature = "lighthouse")] EventKind::BlockReward(_) => "block_reward", @@ -977,6 +1049,11 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Contribution and Proof: {:?}", e)) })?, ))), + "payload_attributes" => Ok(EventKind::PayloadAttributes( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Payload Attributes: {:?}", e)) + })?, + )), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), @@ -1006,6 +1083,7 @@ pub enum EventTopic { ChainReorg, ContributionAndProof, LateHead, + PayloadAttributes, #[cfg(feature = "lighthouse")] BlockReward, } @@ -1022,6 +1100,7 @@ impl FromStr for EventTopic { "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), "chain_reorg" => Ok(EventTopic::ChainReorg), "contribution_and_proof" => Ok(EventTopic::ContributionAndProof), + "payload_attributes" => Ok(EventTopic::PayloadAttributes), "late_head" => Ok(EventTopic::LateHead), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventTopic::BlockReward), @@ -1040,6 +1119,7 @@ impl fmt::Display for EventTopic { EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"), + EventTopic::PayloadAttributes => write!(f, "payload_attributes"), EventTopic::LateHead => write!(f, "late_head"), #[cfg(feature = "lighthouse")] EventTopic::BlockReward => write!(f, "block_reward"), diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index a07502c58..bdaec9948 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -182,6 +182,21 @@ fn prepare_payload_lookahead_shorter() { }); } +#[test] +fn always_prepare_payload_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.always_prepare_payload)); +} + +#[test] +fn always_prepare_payload_override() { + CommandLineTest::new() + .flag("always-prepare-payload", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.always_prepare_payload)); +} + #[test] fn paranoid_block_proposal_default() { CommandLineTest::new() From 57dfcfd83ab9f4695eecc7e65164c6f4a97d45fd Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 5 Mar 2023 23:43:31 +0000 Subject: [PATCH 3/9] Optimise attestation selection proof signing (#4033) ## Issue Addressed Closes #3963 (hopefully) ## Proposed Changes Compute attestation selection proofs gradually each slot rather than in a single `join_all` at the start of each epoch. On a machine with 5k validators this replaces 5k tasks signing 5k proofs with 1 task that signs 5k/32 ~= 160 proofs each slot. Based on testing with Goerli validators this seems to reduce the average time to produce a signature by preventing Tokio and the OS from falling over each other trying to run hundreds of threads. My testing so far has been with local keystores, which run on a dynamic pool of up to 512 OS threads because they use [`spawn_blocking`](https://docs.rs/tokio/1.11.0/tokio/task/fn.spawn_blocking.html) (and we haven't changed the default). An earlier version of this PR hyper-optimised the time-per-signature metric to the detriment of the entire system's performance (see the reverted commits). The current PR is conservative in that it avoids touching the attestation service at all. I think there's more optimising to do here, but we can come back for that in a future PR rather than expanding the scope of this one. The new algorithm for attestation selection proofs is: - We sign a small batch of selection proofs each slot, for slots up to 8 slots in the future. On average we'll sign one slot's worth of proofs per slot, with an 8 slot lookahead. - The batch is signed halfway through the slot when there is unlikely to be contention for signature production (blocks are <4s, attestations are ~4-6 seconds, aggregates are 8s+). ## Performance Data _See first comment for updated graphs_. Graph of median signing times before this PR: ![signing_times_median](https://user-images.githubusercontent.com/4452260/221495627-3ab3c105-319f-406e-b99d-b5913e0ded9c.png) Graph of update attesters metric (includes selection proof signing) before this PR: ![update_attesters_store](https://user-images.githubusercontent.com/4452260/221497057-01ba40e4-8148-45f6-9e21-36a9567a631a.png) Median signing time after this PR (prototype from 12:00, updated version from 13:30): ![signing_times_median_updated](https://user-images.githubusercontent.com/4452260/221771578-47a040cc-b832-482f-9a1a-d1bd9854e00e.png) 99th percentile on signing times (bounded attestation signing from 16:55, now removed): ![signing_times_99pc](https://user-images.githubusercontent.com/4452260/221772055-e64081a8-2220-45ba-ba6d-9d7e344a5bde.png) Attester map update timing after this PR: ![update_attesters_store_updated](https://user-images.githubusercontent.com/4452260/221771757-c8558a48-7f4e-4bb5-9929-dee177a66c1e.png) Selection proof signings per second change: ![signing_attempts](https://user-images.githubusercontent.com/4452260/221771855-64f5da22-1655-478d-926b-810be8a3650c.png) ## Link to late blocks I believe this is related to the slow block signings because logs from Stakely in #3963 show these two logs almost 5 seconds apart: > Feb 23 18:56:23.978 INFO Received unsigned block, slot: 5862880, service: block, module: validator_client::block_service:393 > Feb 23 18:56:28.552 INFO Publishing signed block, slot: 5862880, service: block, module: validator_client::block_service:416 The only thing that happens between those two logs is the signing of the block: https://github.com/sigp/lighthouse/blob/0fb58a680d6f0c9f0dc8beecf142186debff9a8d/validator_client/src/block_service.rs#L410-L414 Helpfully, Stakely noticed this issue without any Lighthouse BNs in the mix, which pointed to a clear issue in the VC. ## TODO - [x] Further testing on testnet infrastructure. - [x] Make the attestation signing parallelism configurable. --- validator_client/src/block_service.rs | 5 + validator_client/src/duties_service.rs | 189 ++++++++++++++++--- validator_client/src/http_metrics/metrics.rs | 5 + 3 files changed, 173 insertions(+), 26 deletions(-) diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 6fd519eba..3b3749237 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -407,17 +407,22 @@ impl BlockService { ) .await?; + let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); let signed_block = self_ref .validator_store .sign_block::(*validator_pubkey_ref, block, current_slot) .await .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + let signing_time_ms = + Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); info!( log, "Publishing signed block"; "slot" => slot.as_u64(), + "signing_time_ms" => signing_time_ms, ); + // Publish block with first available beacon node. self.beacon_nodes .first_success( diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 6ba2a2d1f..c335c67ab 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -17,13 +17,14 @@ use crate::{ }; use environment::RuntimeContext; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; -use futures::future::join_all; +use futures::{stream, StreamExt}; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use sync::poll_sync_committee_duties; use sync::SyncDutiesMap; use tokio::{sync::mpsc::Sender, time::sleep}; @@ -40,6 +41,14 @@ const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2; /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. const HISTORICAL_DUTIES_EPOCHS: u64 = 2; +/// Compute attestation selection proofs this many slots before they are required. +/// +/// At start-up selection proofs will be computed with less lookahead out of necessity. +const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; + +/// Fraction of a slot at which selection proof signing should happen (2 means half way). +const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; + /// Minimum number of validators for which we auto-enable per-validator metrics. /// For validators greater than this value, we need to manually set the `enable-per-validator-metrics` /// flag in the cli to enable collection of per validator metrics. @@ -71,7 +80,7 @@ pub struct DutyAndProof { impl DutyAndProof { /// Instantiate `Self`, computing the selection proof as well. - pub async fn new( + pub async fn new_with_selection_proof( duty: AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, @@ -99,6 +108,14 @@ impl DutyAndProof { selection_proof, }) } + + /// Create a new `DutyAndProof` with the selection proof waiting to be filled in. + pub fn new_without_selection_proof(duty: AttesterData) -> Self { + Self { + duty, + selection_proof: None, + } + } } /// To assist with readability, the dependent root for attester/proposer duties. @@ -471,7 +488,7 @@ async fn poll_validator_indices( /// 3. Push out any attestation subnet subscriptions to the BN. /// 4. Prune old entries from `duties_service.attesters`. async fn poll_beacon_attesters( - duties_service: &DutiesService, + duties_service: &Arc>, ) -> Result<(), Error> { let current_epoch_timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, @@ -634,7 +651,7 @@ async fn poll_beacon_attesters( /// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and /// store them in `duties_service.attesters`. async fn poll_beacon_attesters_for_epoch( - duties_service: &DutiesService, + duties_service: &Arc>, epoch: Epoch, local_indices: &[u64], local_pubkeys: &HashSet, @@ -742,31 +759,16 @@ async fn poll_beacon_attesters_for_epoch( "num_new_duties" => new_duties.len(), ); - // Produce the `DutyAndProof` messages in parallel. - let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| { - DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec) - })) - .await; - // Update the duties service with the new `DutyAndProof` messages. let mut attesters = duties_service.attesters.write(); let mut already_warned = Some(()); - for result in duty_and_proof_results { - let duty_and_proof = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(e) => { - error!( - log, - "Failed to produce duty and proof"; - "error" => ?e, - "msg" => "may impair attestation duties" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; + for duty in &new_duties { + let attester_map = attesters.entry(duty.pubkey).or_default(); - let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + // Create initial entries in the map without selection proofs. We'll compute them in the + // background later to avoid creating a thundering herd of signing threads whenever new + // duties are computed. + let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone()); if let Some((prior_dependent_root, _)) = attester_map.insert(epoch, (dependent_root, duty_and_proof)) @@ -785,9 +787,144 @@ async fn poll_beacon_attesters_for_epoch( } drop(attesters); + // Spawn the background task to compute selection proofs. + let subservice = duties_service.clone(); + duties_service.context.executor.spawn( + async move { + fill_in_selection_proofs(subservice, new_duties, dependent_root).await; + }, + "duties_service_selection_proofs_background", + ); + Ok(()) } +/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. +/// +/// Duties are computed in batches each slot. If a re-org is detected then the process will +/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant. +async fn fill_in_selection_proofs( + duties_service: Arc>, + duties: Vec, + dependent_root: Hash256, +) { + let log = duties_service.context.log(); + + // Sort duties by slot in a BTreeMap. + let mut duties_by_slot: BTreeMap> = BTreeMap::new(); + + for duty in duties { + duties_by_slot.entry(duty.slot).or_default().push(duty); + } + + // At halfway through each slot when nothing else is likely to be getting signed, sign a batch + // of selection proofs and insert them into the duties service `attesters` map. + let slot_clock = &duties_service.slot_clock; + let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM; + + while !duties_by_slot.is_empty() { + if let Some(duration) = slot_clock.duration_to_next_slot() { + sleep(duration.saturating_sub(slot_offset)).await; + + let Some(current_slot) = slot_clock.now() else { + continue; + }; + + let lookahead_slot = current_slot + SELECTION_PROOF_SLOT_LOOKAHEAD; + + let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot); + std::mem::swap(&mut relevant_duties, &mut duties_by_slot); + + let batch_size = relevant_duties.values().map(Vec::len).sum::(); + + if batch_size == 0 { + continue; + } + + let timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTATION_SELECTION_PROOFS], + ); + + // Sign selection proofs (serially). + let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) + .then(|duty| async { + DutyAndProof::new_with_selection_proof( + duty, + &duties_service.validator_store, + &duties_service.spec, + ) + .await + }) + .collect::>() + .await; + + // Add to attesters store. + let mut attesters = duties_service.attesters.write(); + for result in duty_and_proof_results { + let duty_and_proof = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(e) => { + error!( + log, + "Failed to produce duty and proof"; + "error" => ?e, + "msg" => "may impair attestation duties" + ); + // Do not abort the entire batch for a single failure. + continue; + } + }; + + let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = duty_and_proof.selection_proof else { + continue; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon + // this entire background process. + debug!( + log, + "Stopping selection proof background task"; + "reason" => "re-org" + ); + return; + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((dependent_root, duty_and_proof)); + } + } + } + drop(attesters); + + let time_taken_ms = + Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); + debug!( + log, + "Computed attestation selection proofs"; + "batch_size" => batch_size, + "lookahead_slot" => lookahead_slot, + "time_taken_ms" => time_taken_ms + ); + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(duties_service.slot_clock.slot_duration()).await; + } + } +} + /// Download the proposer duties for the current epoch and store them in `duties_service.proposers`. /// If there are any proposer for this slot, send out a notification to the block proposers. /// diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 9b60d0ede..c9ad31feb 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -32,6 +32,7 @@ pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get"; pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get"; pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post"; pub const UPDATE_PROPOSERS: &str = "update_proposers"; +pub const ATTESTATION_SELECTION_PROOFS: &str = "attestation_selection_proofs"; pub const SUBSCRIPTIONS: &str = "subscriptions"; pub const LOCAL_KEYSTORE: &str = "local_keystore"; pub const WEB3SIGNER: &str = "web3signer"; @@ -172,6 +173,10 @@ lazy_static::lazy_static! { "Duration to obtain a signature", &["type"] ); + pub static ref BLOCK_SIGNING_TIMES: Result = try_create_histogram( + "vc_block_signing_times_seconds", + "Duration to obtain a signature for a block", + ); pub static ref ATTESTATION_DUTY: Result = try_create_int_gauge_vec( "vc_attestation_duty_slot", From 4dc0c4c5b7335fedae7ed6a8dfe82767d34337cc Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 5 Mar 2023 23:43:32 +0000 Subject: [PATCH 4/9] Update dependencies incl tempfile (#4048) ## Proposed Changes Fix the cargo audit failure caused by [RUSTSEC-2023-0018](https://rustsec.org/advisories/RUSTSEC-2023-0018) which we were exposed to via `tempfile`. ## Additional Info I've held back the libp2p crate for now because it seemed to introduce another duplicate dependency on libp2p-core, for a total of 3 copies. Maybe that's fine, but we can sort it out later. --- Cargo.lock | 79 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14e4b75cd..70dc4227a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2034,6 +2034,27 @@ dependencies = [ "types", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -3568,6 +3589,16 @@ dependencies = [ "webrtc-util", ] +[[package]] +name = "io-lifetimes" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3" +dependencies = [ + "libc", + "windows-sys 0.45.0", +] + [[package]] name = "ipconfig" version = "0.3.1" @@ -4445,6 +4476,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" + [[package]] name = "lmdb-rkv" version = "0.14.0" @@ -5308,9 +5345,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.25.0+1.1.1t" +version = "111.25.1+1.1.1t" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3173cd3626c43e3854b1b727422a276e568d9ec5fe8cec197822cf52cfb743d6" +checksum = "1ef9a9cc6ea7d9d5e7c4a913dc4b48d0e359eddf01af1dfec96ba7064b4aba10" dependencies = [ "cc", ] @@ -6244,15 +6281,6 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - [[package]] name = "reqwest" version = "0.11.14" @@ -6478,6 +6506,20 @@ dependencies = [ "nom 7.1.3", ] +[[package]] +name = "rustix" +version = "0.36.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd5c6ff11fecd55b40746d1995a02f2eb375bf8c00d192d521ee09f42bef37bc" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.45.0", +] + [[package]] name = "rustls" version = "0.19.1" @@ -7603,16 +7645,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" dependencies = [ "cfg-if", "fastrand", - "libc", "redox_syscall", - "remove_dir_all", - "winapi", + "rustix", + "windows-sys 0.42.0", ] [[package]] @@ -7805,9 +7846,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.25.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg 1.1.0", "bytes", @@ -7820,7 +7861,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] From f775404c107efdb91fcaa7db07018b594a2f4377 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 6 Mar 2023 04:08:48 +0000 Subject: [PATCH 5/9] Log a `WARN` in the VC for a mismatched Capella fork epoch (#4050) ## Issue Addressed NA ## Proposed Changes - Adds a `WARN` statement for Capella, just like the previous forks. - Adds a hint message to all those WARNs to suggest the user update the BN or VC. ## Additional Info NA --- validator_client/src/beacon_node_fallback.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 668e1bcf0..3e667429b 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -18,6 +18,9 @@ use std::time::{Duration, Instant}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; +/// Message emitted when the VC detects the BN is using a different spec. +const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updating"; + /// The number of seconds *prior* to slot start that we will try and update the state of fallback /// nodes. /// @@ -270,6 +273,7 @@ impl CandidateBeaconNode { "Beacon node has mismatched Altair fork epoch"; "endpoint" => %self.beacon_node, "endpoint_altair_fork_epoch" => ?beacon_node_spec.altair_fork_epoch, + "hint" => UPDATE_REQUIRED_LOG_HINT, ); } else if beacon_node_spec.bellatrix_fork_epoch != spec.bellatrix_fork_epoch { warn!( @@ -277,6 +281,15 @@ impl CandidateBeaconNode { "Beacon node has mismatched Bellatrix fork epoch"; "endpoint" => %self.beacon_node, "endpoint_bellatrix_fork_epoch" => ?beacon_node_spec.bellatrix_fork_epoch, + "hint" => UPDATE_REQUIRED_LOG_HINT, + ); + } else if beacon_node_spec.capella_fork_epoch != spec.capella_fork_epoch { + warn!( + log, + "Beacon node has mismatched Capella fork epoch"; + "endpoint" => %self.beacon_node, + "endpoint_capella_fork_epoch" => ?beacon_node_spec.capella_fork_epoch, + "hint" => UPDATE_REQUIRED_LOG_HINT, ); } From 3ad77fb52c9b27d1b03969f2c8d5f62b86bac8cd Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 6 Mar 2023 04:08:49 +0000 Subject: [PATCH 6/9] Add VC metric for primary BN latency (#4051) ## Issue Addressed NA ## Proposed Changes In #4024 we added metrics to expose the latency measurements from a VC to each BN. Whilst playing with these new metrics on our infra I realised it would be great to have a single metric to make sure that the primary BN for each VC has a reasonable latency. With the current "metrics for all BNs" it's hard to tell which is the primary. ## Additional Info NA --- validator_client/src/http_metrics/metrics.rs | 4 ++++ validator_client/src/latency.rs | 10 ++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index c9ad31feb..b4e400c3e 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -191,6 +191,10 @@ lazy_static::lazy_static! { "Round-trip latency for a simple API endpoint on each BN", &["endpoint"] ); + pub static ref VC_BEACON_NODE_LATENCY_PRIMARY_ENDPOINT: Result = try_create_histogram( + "vc_beacon_node_latency_primary_endpoint", + "Round-trip latency for the primary BN endpoint", + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/latency.rs b/validator_client/src/latency.rs index 9ab9b630b..7e752f292 100644 --- a/validator_client/src/latency.rs +++ b/validator_client/src/latency.rs @@ -36,7 +36,7 @@ pub fn start_latency_service( // Sleep until it's time to perform the measurement. sleep(sleep_time).await; - for measurement in beacon_nodes.measure_latency().await { + for (i, measurement) in beacon_nodes.measure_latency().await.iter().enumerate() { if let Some(latency) = measurement.latency { debug!( log, @@ -48,7 +48,13 @@ pub fn start_latency_service( &metrics::VC_BEACON_NODE_LATENCY, &[&measurement.beacon_node_id], latency, - ) + ); + if i == 0 { + metrics::observe_duration( + &metrics::VC_BEACON_NODE_LATENCY_PRIMARY_ENDPOINT, + latency, + ); + } } } } From 5bb635d17fb68c1268fd8adf495087a532c197e3 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 7 Mar 2023 03:06:52 +0000 Subject: [PATCH 7/9] Set Capella fork epoch for Goerli (#4044) ## Issue Addressed NA ## Proposed Changes Sets the Capella fork epoch as per https://github.com/eth-clients/goerli/pull/160. The fork will occur at: - Epoch: 162304 - Slot: 5193728 - UTC: 14/03/2023, 10:25:36 pm ## Additional Info - [x] Blocked on https://github.com/eth-clients/goerli/pull/160 being merged --- .../built_in_network_configs/prater/config.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/eth2_network_config/built_in_network_configs/prater/config.yaml b/common/eth2_network_config/built_in_network_configs/prater/config.yaml index d173be20d..69d65ca8f 100644 --- a/common/eth2_network_config/built_in_network_configs/prater/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/prater/config.yaml @@ -35,8 +35,11 @@ ALTAIR_FORK_EPOCH: 36660 # Merge BELLATRIX_FORK_VERSION: 0x02001020 BELLATRIX_FORK_EPOCH: 112260 +# Capella +CAPELLA_FORK_VERSION: 0x03001020 +CAPELLA_FORK_EPOCH: 162304 # Sharding -SHARDING_FORK_VERSION: 0x03001020 +SHARDING_FORK_VERSION: 0x04001020 SHARDING_FORK_EPOCH: 18446744073709551615 # TBD, 2**32 is a placeholder. Merge transition approach is in active R&D. From 4c109115cabed2492aff1d1822d60129ce5fd5d7 Mon Sep 17 00:00:00 2001 From: Daniel Ramirez Chiquillo Date: Tue, 7 Mar 2023 05:37:28 +0000 Subject: [PATCH 8/9] Add a flag to always use payloads from builders (#4052) ## Issue Addressed #4040 ## Proposed Changes - Add the `always_prefer_builder_payload` field to `Config` in `beacon_node/client/src/config.rs`. - Add that same field to `Inner` in `beacon_node/execution_layer/src/lib.rs` - Modify the logic for picking the payload in `beacon_node/execution_layer/src/lib.rs` - Add the `always-prefer-builder-payload` flag to the beacon node CLI - Test the new flags in `lighthouse/tests/beacon_node.rs` Co-authored-by: Paul Hauner --- beacon_node/client/src/config.rs | 2 ++ beacon_node/execution_layer/src/lib.rs | 8 +++++++- beacon_node/src/cli.rs | 9 +++++++++ beacon_node/src/config.rs | 5 +++++ beacon_node/tests/test.rs | 2 +- lighthouse/tests/beacon_node.rs | 15 +++++++++++++++ 6 files changed, 39 insertions(+), 2 deletions(-) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 22b868256..95a00b374 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -79,6 +79,7 @@ pub struct Config { pub monitoring_api: Option, pub slasher: Option, pub logger_config: LoggerConfig, + pub always_prefer_builder_payload: bool, } impl Default for Config { @@ -105,6 +106,7 @@ impl Default for Config { validator_monitor_pubkeys: vec![], validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, logger_config: LoggerConfig::default(), + always_prefer_builder_payload: false, } } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 46da4a67d..d12f9996d 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -219,6 +219,7 @@ struct Inner { payload_cache: PayloadCache, builder_profit_threshold: Uint256, log: Logger, + always_prefer_builder_payload: bool, } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -241,6 +242,7 @@ pub struct Config { /// The minimum value of an external payload for it to be considered in a proposal. pub builder_profit_threshold: u128, pub execution_timeout_multiplier: Option, + pub always_prefer_builder_payload: bool, } /// Provides access to one execution engine and provides a neat interface for consumption by the @@ -263,6 +265,7 @@ impl ExecutionLayer { default_datadir, builder_profit_threshold, execution_timeout_multiplier, + always_prefer_builder_payload, } = config; if urls.len() > 1 { @@ -335,6 +338,7 @@ impl ExecutionLayer { payload_cache: PayloadCache::default(), builder_profit_threshold: Uint256::from(builder_profit_threshold), log, + always_prefer_builder_payload, }; Ok(Self { @@ -796,7 +800,9 @@ impl ExecutionLayer { let relay_value = relay.data.message.value; let local_value = *local.block_value(); - if local_value >= relay_value { + if !self.inner.always_prefer_builder_payload + && local_value >= relay_value + { info!( self.log(), "Local block is more profitable than relay block"; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index bc2e705cb..792d62534 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -957,4 +957,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { This is equivalent to --http and --validator-monitor-auto.") .takes_value(false) ) + .arg( + Arg::with_name("always-prefer-builder-payload") + .long("always-prefer-builder-payload") + .help("If set, the beacon node always uses the payload from the builder instead of the local payload.") + // The builder profit threshold flag is used to provide preference + // to local payloads, therefore it fundamentally conflicts with + // always using the builder. + .conflicts_with("builder-profit-threshold") + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index fa0344e95..55335081c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -753,6 +753,11 @@ pub fn get_config( client_config.chain.optimistic_finalized_sync = !cli_args.is_present("disable-optimistic-finalized-sync"); + // Payload selection configs + if cli_args.is_present("always-prefer-builder-payload") { + client_config.always_prefer_builder_payload = true; + } + Ok(client_config) } diff --git a/beacon_node/tests/test.rs b/beacon_node/tests/test.rs index 1c11a8349..8ccb260d2 100644 --- a/beacon_node/tests/test.rs +++ b/beacon_node/tests/test.rs @@ -1,5 +1,5 @@ #![cfg(test)] -#![recursion_limit = "256"] +#![recursion_limit = "512"] use beacon_chain::StateSkipConfig; use node_test_rig::{ diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index bdaec9948..7f957b626 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -340,6 +340,21 @@ fn trusted_peers_flag() { }); } +#[test] +fn always_prefer_builder_payload_flag() { + CommandLineTest::new() + .flag("always-prefer-builder-payload", None) + .run_with_zero_port() + .with_config(|config| assert!(config.always_prefer_builder_payload)); +} + +#[test] +fn no_flag_sets_always_prefer_builder_payload_to_false() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.always_prefer_builder_payload)); +} + // Tests for Eth1 flags. #[test] fn dummy_eth1_flag() { From 319cc61afeb1dbf3692e280dfa18e7b455542b16 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 7 Mar 2023 07:57:39 +0000 Subject: [PATCH 9/9] Release v3.5.1 (#4049) ## Issue Addressed NA ## Proposed Changes Bumps versions to v3.5.1. ## Additional Info - [x] Requires further testing --- Cargo.lock | 8 ++++---- beacon_node/Cargo.toml | 2 +- boot_node/Cargo.toml | 2 +- common/lighthouse_version/src/lib.rs | 4 ++-- lcli/Cargo.toml | 2 +- lighthouse/Cargo.toml | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70dc4227a..5951b49c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,7 +617,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "3.5.0" +version = "3.5.1" dependencies = [ "beacon_chain", "clap", @@ -785,7 +785,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "3.5.0" +version = "3.5.1" dependencies = [ "beacon_node", "clap", @@ -3750,7 +3750,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "3.5.0" +version = "3.5.1" dependencies = [ "account_utils", "beacon_chain", @@ -4354,7 +4354,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "3.5.0" +version = "3.5.1" dependencies = [ "account_manager", "account_utils", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 3c37f41de..521e2b89c 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "3.5.0" +version = "3.5.1" authors = ["Paul Hauner ", "Age Manning "] edition = "2021" diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 8ad4aa86f..10d1a8c32 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v3.5.0-", - fallback = "Lighthouse/v3.5.0" + prefix = "Lighthouse/v3.5.1-", + fallback = "Lighthouse/v3.5.1" ); /// Returns `VERSION`, but with platform information appended to the end. diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 93fe17506..caceb9977 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "3.5.0" +version = "3.5.1" authors = ["Paul Hauner "] edition = "2021" diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index ecac53fb1..9360c9600 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "3.5.0" +version = "3.5.1" authors = ["Sigma Prime "] edition = "2021" autotests = false