From 01556f6f01655d77e755340566a64616e4a0d9a8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 5 Mar 2023 23:43:30 +0000 Subject: [PATCH] Optimise payload attributes calculation and add SSE (#4027) ## Issue Addressed Closes #3896 Closes #3998 Closes #3700 ## Proposed Changes - Optimise the calculation of withdrawals for payload attributes by avoiding state clones, avoiding unnecessary state advances and reading from the snapshot cache if possible. - Use the execution layer's payload attributes cache to avoid re-calculating payload attributes. I actually implemented a new LRU cache just for withdrawals but it had the exact same key and most of the same data as the existing payload attributes cache, so I deleted it. - Add a new SSE event that fires when payloadAttributes are calculated. This is useful for block builders, a la https://github.com/ethereum/beacon-APIs/issues/244. - Add a new CLI flag `--always-prepare-payload` which forces payload attributes to be sent with every fcU regardless of connected proposers. This is intended for use by builders/relays. For maximum effect, the flags I've been using to run Lighthouse in "payload builder mode" are: ``` --always-prepare-payload \ --prepare-payload-lookahead 12000 \ --suggested-fee-recipient 0x0000000000000000000000000000000000000000 ``` The fee recipient is required so Lighthouse has something to pack in the payload attributes (it can be ignored by the builder). The lookahead causes fcU to be sent at the start of every slot rather than at 8s. As usual, fcU will also be sent after each change of head block. I think this combination is sufficient for builders to build on all viable heads. Often there will be two fcU (and two payload attributes) sent for the same slot: one sent at the start of the slot with the head from `n - 1` as the parent, and one sent after the block arrives with `n` as the parent. Example usage of the new event stream: ```bash curl -N "http://localhost:5052/eth/v1/events?topics=payload_attributes" ``` ## Additional Info - [x] Tests added by updating the proposer re-org tests. This has the benefit of testing the proposer re-org code paths with withdrawals too, confirming that the new changes don't interact poorly. - [ ] Benchmarking with `blockdreamer` on devnet-7 showed promising results but I'm yet to do a comparison to `unstable`. Co-authored-by: Michael Sproul --- beacon_node/beacon_chain/src/beacon_chain.rs | 193 +++++++++++++----- beacon_node/beacon_chain/src/chain_config.rs | 5 + beacon_node/beacon_chain/src/events.rs | 80 ++++++-- beacon_node/beacon_chain/src/test_utils.rs | 120 ++++++++++- beacon_node/execution_layer/src/engine_api.rs | 28 +++ beacon_node/http_api/src/lib.rs | 3 + .../http_api/tests/interactive_tests.rs | 94 +++++++-- beacon_node/src/cli.rs | 9 + beacon_node/src/config.rs | 2 + common/eth2/src/types.rs | 80 ++++++++ lighthouse/tests/beacon_node.rs | 15 ++ 11 files changed, 539 insertions(+), 90 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6d9d290bd..9ba8f24cb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -57,7 +57,7 @@ use crate::validator_monitor::{ }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead}; -use eth2::types::{EventKind, SseBlock, SyncDuty}; +use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty}; use execution_layer::{ BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -89,6 +89,7 @@ use state_processing::{ state_advance::{complete_state_advance, partial_state_advance}, BlockSignatureStrategy, ConsensusContext, SigVerifiedOp, VerifyBlockRoot, VerifyOperation, }; +use std::borrow::Cow; use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; @@ -3878,6 +3879,75 @@ impl BeaconChain { })) } + pub fn get_expected_withdrawals( + &self, + forkchoice_update_params: &ForkchoiceUpdateParameters, + proposal_slot: Slot, + ) -> Result, Error> { + let cached_head = self.canonical_head.cached_head(); + let head_state = &cached_head.snapshot.beacon_state; + + let parent_block_root = forkchoice_update_params.head_root; + + let (unadvanced_state, unadvanced_state_root) = + if cached_head.head_block_root() == parent_block_root { + (Cow::Borrowed(head_state), cached_head.head_state_root()) + } else if let Some(snapshot) = self + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .ok_or(Error::SnapshotCacheLockTimeout)? + .get_cloned(parent_block_root, CloneConfig::none()) + { + debug!( + self.log, + "Hit snapshot cache during withdrawals calculation"; + "slot" => proposal_slot, + "parent_block_root" => ?parent_block_root, + ); + let state_root = snapshot.beacon_state_root(); + (Cow::Owned(snapshot.beacon_state), state_root) + } else { + info!( + self.log, + "Missed snapshot cache during withdrawals calculation"; + "slot" => proposal_slot, + "parent_block_root" => ?parent_block_root + ); + let block = self + .get_blinded_block(&parent_block_root)? + .ok_or(Error::MissingBeaconBlock(parent_block_root))?; + let state = self + .get_state(&block.state_root(), Some(block.slot()))? + .ok_or(Error::MissingBeaconState(block.state_root()))?; + (Cow::Owned(state), block.state_root()) + }; + + // Parent state epoch is the same as the proposal, we don't need to advance because the + // list of expected withdrawals can only change after an epoch advance or a + // block application. + let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch()); + if head_state.current_epoch() == proposal_epoch { + return get_expected_withdrawals(&unadvanced_state, &self.spec) + .map_err(Error::PrepareProposerFailed); + } + + // Advance the state using the partial method. + debug!( + self.log, + "Advancing state for withdrawals calculation"; + "proposal_slot" => proposal_slot, + "parent_block_root" => ?parent_block_root, + ); + let mut advanced_state = unadvanced_state.into_owned(); + partial_state_advance( + &mut advanced_state, + Some(unadvanced_state_root), + proposal_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + get_expected_withdrawals(&advanced_state, &self.spec).map_err(Error::PrepareProposerFailed) + } + /// Determine whether a fork choice update to the execution layer should be overridden. /// /// This is *only* necessary when proposer re-orgs are enabled, because we have to prevent the @@ -4664,7 +4734,9 @@ impl BeaconChain { // Nothing to do if there are no proposers registered with the EL, exit early to avoid // wasting cycles. - if !execution_layer.has_any_proposer_preparation_data().await { + if !self.config.always_prepare_payload + && !execution_layer.has_any_proposer_preparation_data().await + { return Ok(()); } @@ -4721,64 +4793,60 @@ impl BeaconChain { // If the execution layer doesn't have any proposer data for this validator then we assume // it's not connected to this BN and no action is required. let proposer = pre_payload_attributes.proposer_index; - if !execution_layer - .has_proposer_preparation_data(proposer) - .await + if !self.config.always_prepare_payload + && !execution_layer + .has_proposer_preparation_data(proposer) + .await { return Ok(()); } - let withdrawals = match self.spec.fork_name_at_slot::(prepare_slot) { - ForkName::Base | ForkName::Altair | ForkName::Merge => None, - ForkName::Capella => { - // We must use the advanced state because balances can change at epoch boundaries - // and balances affect withdrawals. - // FIXME(mark) - // Might implement caching here in the future.. - let prepare_state = self - .state_at_slot(prepare_slot, StateSkipConfig::WithoutStateRoots) - .map_err(|e| { - error!(self.log, "State advance for withdrawals failed"; "error" => ?e); - e - })?; - Some(get_expected_withdrawals(&prepare_state, &self.spec)) - } - } - .transpose() - .map_err(|e| { - error!(self.log, "Error preparing beacon proposer"; "error" => ?e); - e - }) - .map(|withdrawals_opt| withdrawals_opt.map(|w| w.into())) - .map_err(Error::PrepareProposerFailed)?; - + // Fetch payoad attributes from the execution layer's cache, or compute them from scratch + // if no matching entry is found. This saves recomputing the withdrawals which can take + // considerable time to compute if a state load is required. let head_root = forkchoice_update_params.head_root; - let payload_attributes = PayloadAttributes::new( - self.slot_clock - .start_of(prepare_slot) - .ok_or(Error::InvalidSlot(prepare_slot))? - .as_secs(), - pre_payload_attributes.prev_randao, - execution_layer.get_suggested_fee_recipient(proposer).await, - withdrawals, - ); + let payload_attributes = if let Some(payload_attributes) = execution_layer + .payload_attributes(prepare_slot, head_root) + .await + { + payload_attributes + } else { + let withdrawals = match self.spec.fork_name_at_slot::(prepare_slot) { + ForkName::Base | ForkName::Altair | ForkName::Merge => None, + ForkName::Capella => { + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + chain.get_expected_withdrawals(&forkchoice_update_params, prepare_slot) + }, + "prepare_beacon_proposer_withdrawals", + ) + .await? + .map(Some)? + } + }; - debug!( - self.log, - "Preparing beacon proposer"; - "payload_attributes" => ?payload_attributes, - "prepare_slot" => prepare_slot, - "validator" => proposer, - "parent_root" => ?head_root, - ); + let payload_attributes = PayloadAttributes::new( + self.slot_clock + .start_of(prepare_slot) + .ok_or(Error::InvalidSlot(prepare_slot))? + .as_secs(), + pre_payload_attributes.prev_randao, + execution_layer.get_suggested_fee_recipient(proposer).await, + withdrawals.map(Into::into), + ); - let already_known = execution_layer - .insert_proposer(prepare_slot, head_root, proposer, payload_attributes) - .await; + execution_layer + .insert_proposer( + prepare_slot, + head_root, + proposer, + payload_attributes.clone(), + ) + .await; - // Only push a log to the user if this is the first time we've seen this proposer for this - // slot. - if !already_known { + // Only push a log to the user if this is the first time we've seen this proposer for + // this slot. info!( self.log, "Prepared beacon proposer"; @@ -4786,6 +4854,23 @@ impl BeaconChain { "validator" => proposer, "parent_root" => ?head_root, ); + payload_attributes + }; + + // Push a server-sent event (probably to a block builder or relay). + if let Some(event_handler) = &self.event_handler { + if event_handler.has_payload_attributes_subscribers() { + event_handler.register(EventKind::PayloadAttributes(ForkVersionedResponse { + data: SseExtendedPayloadAttributes { + proposal_slot: prepare_slot, + proposer_index: proposer, + parent_block_root: head_root, + parent_block_hash: forkchoice_update_params.head_hash.unwrap_or_default(), + payload_attributes: payload_attributes.into(), + }, + version: Some(self.spec.fork_name_at_slot::(prepare_slot)), + })); + } } let till_prepare_slot = @@ -4808,7 +4893,9 @@ impl BeaconChain { // If we are close enough to the proposal slot, send an fcU, which will have payload // attributes filled in by the execution layer cache we just primed. - if till_prepare_slot <= self.config.prepare_payload_lookahead { + if self.config.always_prepare_payload + || till_prepare_slot <= self.config.prepare_payload_lookahead + { debug!( self.log, "Sending forkchoiceUpdate for proposer prep"; diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 2051a6236..6e3538aed 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -67,6 +67,10 @@ pub struct ChainConfig { pub prepare_payload_lookahead: Duration, /// Use EL-free optimistic sync for the finalized part of the chain. pub optimistic_finalized_sync: bool, + /// Whether to send payload attributes every slot, regardless of connected proposers. + /// + /// This is useful for block builders and testing. + pub always_prepare_payload: bool, } impl Default for ChainConfig { @@ -93,6 +97,7 @@ impl Default for ChainConfig { prepare_payload_lookahead: Duration::from_secs(4), // This value isn't actually read except in tests. optimistic_finalized_sync: true, + always_prepare_payload: false, } } } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 6f4415ef4..b3fa6627f 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -14,6 +14,7 @@ pub struct ServerSentEventHandler { exit_tx: Sender>, chain_reorg_tx: Sender>, contribution_tx: Sender>, + payload_attributes_tx: Sender>, late_head: Sender>, block_reward_tx: Sender>, log: Logger, @@ -32,6 +33,7 @@ impl ServerSentEventHandler { let (exit_tx, _) = broadcast::channel(capacity); let (chain_reorg_tx, _) = broadcast::channel(capacity); let (contribution_tx, _) = broadcast::channel(capacity); + let (payload_attributes_tx, _) = broadcast::channel(capacity); let (late_head, _) = broadcast::channel(capacity); let (block_reward_tx, _) = broadcast::channel(capacity); @@ -43,6 +45,7 @@ impl ServerSentEventHandler { exit_tx, chain_reorg_tx, contribution_tx, + payload_attributes_tx, late_head, block_reward_tx, log, @@ -50,28 +53,55 @@ impl ServerSentEventHandler { } pub fn register(&self, kind: EventKind) { - let result = match kind { - EventKind::Attestation(attestation) => self + let log_count = |name, count| { + trace!( + self.log, + "Registering server-sent event"; + "kind" => name, + "receiver_count" => count + ); + }; + let result = match &kind { + EventKind::Attestation(_) => self .attestation_tx - .send(EventKind::Attestation(attestation)) - .map(|count| trace!(self.log, "Registering server-sent attestation event"; "receiver_count" => count)), - EventKind::Block(block) => self.block_tx.send(EventKind::Block(block)) - .map(|count| trace!(self.log, "Registering server-sent block event"; "receiver_count" => count)), - EventKind::FinalizedCheckpoint(checkpoint) => self.finalized_tx - .send(EventKind::FinalizedCheckpoint(checkpoint)) - .map(|count| trace!(self.log, "Registering server-sent finalized checkpoint event"; "receiver_count" => count)), - EventKind::Head(head) => self.head_tx.send(EventKind::Head(head)) - .map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)), - EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit)) - .map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)), - EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg)) - .map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)), - EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof)) - .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), - EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head)) - .map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)), - EventKind::BlockReward(block_reward) => self.block_reward_tx.send(EventKind::BlockReward(block_reward)) - .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), + .send(kind) + .map(|count| log_count(count, "attestation")), + EventKind::Block(_) => self + .block_tx + .send(kind) + .map(|count| log_count(count, "block")), + EventKind::FinalizedCheckpoint(_) => self + .finalized_tx + .send(kind) + .map(|count| log_count(count, "finalized checkpoint")), + EventKind::Head(_) => self + .head_tx + .send(kind) + .map(|count| log_count(count, "head")), + EventKind::VoluntaryExit(_) => self + .exit_tx + .send(kind) + .map(|count| log_count(count, "exit")), + EventKind::ChainReorg(_) => self + .chain_reorg_tx + .send(kind) + .map(|count| log_count(count, "chain reorg")), + EventKind::ContributionAndProof(_) => self + .contribution_tx + .send(kind) + .map(|count| log_count(count, "contribution and proof")), + EventKind::PayloadAttributes(_) => self + .payload_attributes_tx + .send(kind) + .map(|count| log_count(count, "payload attributes")), + EventKind::LateHead(_) => self + .late_head + .send(kind) + .map(|count| log_count(count, "late head")), + EventKind::BlockReward(_) => self + .block_reward_tx + .send(kind) + .map(|count| log_count(count, "block reward")), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -106,6 +136,10 @@ impl ServerSentEventHandler { self.contribution_tx.subscribe() } + pub fn subscribe_payload_attributes(&self) -> Receiver> { + self.payload_attributes_tx.subscribe() + } + pub fn subscribe_late_head(&self) -> Receiver> { self.late_head.subscribe() } @@ -142,6 +176,10 @@ impl ServerSentEventHandler { self.contribution_tx.receiver_count() > 0 } + pub fn has_payload_attributes_subscribers(&self) -> bool { + self.payload_attributes_tx.receiver_count() > 0 + } + pub fn has_late_head_subscribers(&self) -> bool { self.late_head.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index d060bcb77..afb31bba7 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -108,6 +108,14 @@ pub enum AttestationStrategy { SomeValidators(Vec), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncCommitteeStrategy { + /// All sync committee validators sign. + AllValidators, + /// No validators sign. + NoValidators, +} + /// Indicates whether the `BeaconChainHarness` should use the `state.current_sync_committee` or /// `state.next_sync_committee` when creating sync messages or contributions. #[derive(Clone, Debug)] @@ -1752,15 +1760,64 @@ where self.process_attestations(attestations); } + pub fn sync_committee_sign_block( + &self, + state: &BeaconState, + block_hash: Hash256, + slot: Slot, + relative_sync_committee: RelativeSyncCommittee, + ) { + let sync_contributions = + self.make_sync_contributions(state, block_hash, slot, relative_sync_committee); + self.process_sync_contributions(sync_contributions).unwrap() + } + pub async fn add_attested_block_at_slot( &self, slot: Slot, state: BeaconState, state_root: Hash256, validators: &[usize], + ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { + self.add_attested_block_at_slot_with_sync( + slot, + state, + state_root, + validators, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn add_attested_block_at_slot_with_sync( + &self, + slot: Slot, + state: BeaconState, + state_root: Hash256, + validators: &[usize], + sync_committee_strategy: SyncCommitteeStrategy, ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { let (block_hash, block, state) = self.add_block_at_slot(slot, state).await?; self.attest_block(&state, state_root, block_hash, &block, validators); + + if sync_committee_strategy == SyncCommitteeStrategy::AllValidators + && state.current_sync_committee().is_ok() + { + self.sync_committee_sign_block( + &state, + block_hash.into(), + slot, + if (slot + 1).epoch(E::slots_per_epoch()) + % self.spec.epochs_per_sync_committee_period + == 0 + { + RelativeSyncCommittee::Next + } else { + RelativeSyncCommittee::Current + }, + ); + } + Ok((block_hash, state)) } @@ -1770,10 +1827,35 @@ where state_root: Hash256, slots: &[Slot], validators: &[usize], + ) -> AddBlocksResult { + self.add_attested_blocks_at_slots_with_sync( + state, + state_root, + slots, + validators, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn add_attested_blocks_at_slots_with_sync( + &self, + state: BeaconState, + state_root: Hash256, + slots: &[Slot], + validators: &[usize], + sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { assert!(!slots.is_empty()); - self.add_attested_blocks_at_slots_given_lbh(state, state_root, slots, validators, None) - .await + self.add_attested_blocks_at_slots_given_lbh( + state, + state_root, + slots, + validators, + None, + sync_committee_strategy, + ) + .await } async fn add_attested_blocks_at_slots_given_lbh( @@ -1783,6 +1865,7 @@ where slots: &[Slot], validators: &[usize], mut latest_block_hash: Option, + sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { assert!( slots.windows(2).all(|w| w[0] <= w[1]), @@ -1792,7 +1875,13 @@ where let mut state_hash_from_slot: HashMap = HashMap::new(); for slot in slots { let (block_hash, new_state) = self - .add_attested_block_at_slot(*slot, state, state_root, validators) + .add_attested_block_at_slot_with_sync( + *slot, + state, + state_root, + validators, + sync_committee_strategy, + ) .await .unwrap(); state = new_state; @@ -1874,6 +1963,7 @@ where &epoch_slots, &validators, Some(head_block), + SyncCommitteeStrategy::NoValidators, // for backwards compat ) .await; @@ -1990,6 +2080,22 @@ where num_blocks: usize, block_strategy: BlockStrategy, attestation_strategy: AttestationStrategy, + ) -> Hash256 { + self.extend_chain_with_sync( + num_blocks, + block_strategy, + attestation_strategy, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn extend_chain_with_sync( + &self, + num_blocks: usize, + block_strategy: BlockStrategy, + attestation_strategy: AttestationStrategy, + sync_committee_strategy: SyncCommitteeStrategy, ) -> Hash256 { let (mut state, slots) = match block_strategy { BlockStrategy::OnCanonicalHead => { @@ -2021,7 +2127,13 @@ where }; let state_root = state.update_tree_hash_cache().unwrap(); let (_, _, last_produced_block_hash, _) = self - .add_attested_blocks_at_slots(state, state_root, &slots, &validators) + .add_attested_blocks_at_slots_with_sync( + state, + state_root, + &slots, + &validators, + sync_committee_strategy, + ) .await; last_produced_block_hash.into() } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index ec581ea65..38311b823 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -4,6 +4,7 @@ use crate::http::{ ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, }; +use eth2::types::{SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2}; pub use ethers_core::types::Transaction; use ethers_core::utils::rlp::{self, Decodable, Rlp}; use http::deposit_methods::RpcError; @@ -269,6 +270,33 @@ impl PayloadAttributes { } } +impl From for SsePayloadAttributes { + fn from(pa: PayloadAttributes) -> Self { + match pa { + PayloadAttributes::V1(PayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + }) => Self::V1(SsePayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + }), + PayloadAttributes::V2(PayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + }) => Self::V2(SsePayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + }), + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct ForkchoiceUpdatedResponse { pub payload_status: PayloadStatusV1, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index bd00bfad1..6cca673b8 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3520,6 +3520,9 @@ pub fn serve( api_types::EventTopic::ContributionAndProof => { event_handler.subscribe_contributions() } + api_types::EventTopic::PayloadAttributes => { + event_handler.subscribe_payload_attributes() + } api_types::EventTopic::LateHead => { event_handler.subscribe_late_head() } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 7096fac42..7db1b22d6 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -2,13 +2,15 @@ use crate::common::*; use beacon_chain::{ chain_config::ReOrgThreshold, - test_utils::{AttestationStrategy, BlockStrategy}, + test_utils::{AttestationStrategy, BlockStrategy, SyncCommitteeStrategy}, }; use eth2::types::DepositContractData; use execution_layer::{ForkchoiceState, PayloadAttributes}; use parking_lot::Mutex; use slot_clock::SlotClock; -use state_processing::state_advance::complete_state_advance; +use state_processing::{ + per_block_processing::get_expected_withdrawals, state_advance::complete_state_advance, +}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -106,13 +108,15 @@ pub struct ReOrgTest { percent_head_votes: usize, should_re_org: bool, misprediction: bool, + /// Whether to expect withdrawals to change on epoch boundaries. + expect_withdrawals_change_on_epoch: bool, } impl Default for ReOrgTest { /// Default config represents a regular easy re-org. fn default() -> Self { Self { - head_slot: Slot::new(30), + head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 1, head_distance: 1, re_org_threshold: 20, @@ -122,6 +126,7 @@ impl Default for ReOrgTest { percent_head_votes: 0, should_re_org: true, misprediction: false, + expect_withdrawals_change_on_epoch: false, } } } @@ -136,13 +141,40 @@ pub async fn proposer_boost_re_org_zero_weight() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_epoch_boundary() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(31), + head_slot: Slot::new(E::slots_per_epoch() - 1), should_re_org: false, ..Default::default() }) .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary_skip1() { + // Proposing a block on a boundary after a skip will change the set of expected withdrawals + // sent in the payload attributes. + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(2 * E::slots_per_epoch() - 2), + head_distance: 2, + should_re_org: false, + expect_withdrawals_change_on_epoch: true, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary_skip32() { + // Propose a block at 64 after a whole epoch of skipped slots. + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(E::slots_per_epoch() - 1), + head_distance: E::slots_per_epoch() + 1, + should_re_org: false, + expect_withdrawals_change_on_epoch: true, + ..Default::default() + }) + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_slot_after_epoch_boundary() { proposer_boost_re_org_test(ReOrgTest { @@ -187,7 +219,7 @@ pub async fn proposer_boost_re_org_finality() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_parent_distance() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(30), + head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 2, should_re_org: false, ..Default::default() @@ -198,7 +230,7 @@ pub async fn proposer_boost_re_org_parent_distance() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_head_distance() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(29), + head_slot: Slot::new(E::slots_per_epoch() - 3), head_distance: 2, should_re_org: false, ..Default::default() @@ -209,7 +241,7 @@ pub async fn proposer_boost_re_org_head_distance() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_very_unhealthy() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(31), + head_slot: Slot::new(E::slots_per_epoch() - 1), parent_distance: 2, head_distance: 2, percent_parent_votes: 10, @@ -225,7 +257,6 @@ pub async fn proposer_boost_re_org_very_unhealthy() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_weight_misprediction() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(30), percent_empty_votes: 70, percent_head_votes: 30, should_re_org: false, @@ -254,12 +285,13 @@ pub async fn proposer_boost_re_org_test( percent_head_votes, should_re_org, misprediction, + expect_withdrawals_change_on_epoch, }: ReOrgTest, ) { assert!(head_slot > 0); - // We require a network with execution enabled so we can check EL message timings. - let mut spec = ForkName::Merge.make_genesis_spec(E::default_spec()); + // Test using Capella so that we simulate conditions as similar to mainnet as possible. + let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec()); spec.terminal_total_difficulty = 1.into(); // Ensure there are enough validators to have `attesters_per_slot`. @@ -323,13 +355,15 @@ pub async fn proposer_boost_re_org_test( ) .await; - // Create some chain depth. + // Create some chain depth. Sign sync committee signatures so validator balances don't dip + // below 32 ETH and become ineligible for withdrawals. harness.advance_slot(); harness - .extend_chain( + .extend_chain_with_sync( num_initial as usize, BlockStrategy::OnCanonicalHead, AttestationStrategy::AllValidators, + SyncCommitteeStrategy::AllValidators, ) .await; @@ -364,6 +398,16 @@ pub async fn proposer_boost_re_org_test( let slot_b = slot_a + parent_distance; let slot_c = slot_b + head_distance; + // We need to transition to at least epoch 2 in order to trigger + // `process_rewards_and_penalties`. This allows us to test withdrawals changes at epoch + // boundaries. + if expect_withdrawals_change_on_epoch { + assert!( + slot_c.epoch(E::slots_per_epoch()) >= 2, + "for withdrawals to change, test must end at an epoch >= 2" + ); + } + harness.advance_slot(); let (block_a_root, block_a, state_a) = harness .add_block_at_slot(slot_a, harness.get_current_state()) @@ -457,6 +501,10 @@ pub async fn proposer_boost_re_org_test( // Produce block C. // Advance state_b so we can get the proposer. + assert_eq!(state_b.slot(), slot_b); + let pre_advance_withdrawals = get_expected_withdrawals(&state_b, &harness.chain.spec) + .unwrap() + .to_vec(); complete_state_advance(&mut state_b, None, slot_c, &harness.chain.spec).unwrap(); let proposer_index = state_b @@ -514,6 +562,28 @@ pub async fn proposer_boost_re_org_test( .unwrap(); let payload_attribs = first_update.payload_attributes.as_ref().unwrap(); + // Check that withdrawals from the payload attributes match those computed from the parent's + // advanced state. + let expected_withdrawals = if should_re_org { + let mut state_a_advanced = state_a.clone(); + complete_state_advance(&mut state_a_advanced, None, slot_c, &harness.chain.spec).unwrap(); + get_expected_withdrawals(&state_a_advanced, &harness.chain.spec) + } else { + get_expected_withdrawals(&state_b, &harness.chain.spec) + } + .unwrap() + .to_vec(); + let payload_attribs_withdrawals = payload_attribs.withdrawals().unwrap(); + assert_eq!(expected_withdrawals, *payload_attribs_withdrawals); + assert!(!expected_withdrawals.is_empty()); + + if should_re_org + || expect_withdrawals_change_on_epoch + && slot_c.epoch(E::slots_per_epoch()) != slot_b.epoch(E::slots_per_epoch()) + { + assert_ne!(expected_withdrawals, pre_advance_withdrawals); + } + let lookahead = slot_clock .start_of(slot_c) .unwrap() diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index b4da83315..bc2e705cb 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -819,6 +819,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { for ensuring the EL is given ample notice. Default: 1/3 of a slot.") .takes_value(true) ) + .arg( + Arg::with_name("always-prepare-payload") + .long("always-prepare-payload") + .help("Send payload attributes with every fork choice update. This is intended for \ + use by block builders, relays and developers. You should set a fee \ + recipient on this BN and also consider adjusting the \ + --prepare-payload-lookahead flag.") + .takes_value(false) + ) .arg( Arg::with_name("fork-choice-before-proposal-timeout") .long("fork-choice-before-proposal-timeout") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 726f8368e..fa0344e95 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -711,6 +711,8 @@ pub fn get_config( / DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR }); + client_config.chain.always_prepare_payload = cli_args.is_present("always-prepare-payload"); + if let Some(timeout) = clap_utils::parse_optional(cli_args, "fork-choice-before-proposal-timeout")? { diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 620276f3d..b4218c361 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -897,6 +897,76 @@ pub struct SseLateHead { pub execution_optimistic: bool, } +#[superstruct( + variants(V1, V2), + variant_attributes(derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)) +)] +#[derive(Clone, Debug, Eq, Hash, PartialEq, Deserialize, Serialize)] +#[serde(untagged)] +pub struct SsePayloadAttributes { + #[superstruct(getter(copy))] + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub timestamp: u64, + #[superstruct(getter(copy))] + pub prev_randao: Hash256, + #[superstruct(getter(copy))] + pub suggested_fee_recipient: Address, + #[superstruct(only(V2))] + pub withdrawals: Vec, +} + +#[derive(PartialEq, Debug, Deserialize, Serialize, Clone)] +pub struct SseExtendedPayloadAttributesGeneric { + pub proposal_slot: Slot, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub proposer_index: u64, + pub parent_block_root: Hash256, + pub parent_block_hash: ExecutionBlockHash, + pub payload_attributes: T, +} + +pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric; +pub type VersionedSsePayloadAttributes = ForkVersionedResponse; + +impl ForkVersionDeserialize for SsePayloadAttributes { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Merge => serde_json::from_value(value) + .map(Self::V1) + .map_err(serde::de::Error::custom), + ForkName::Capella => serde_json::from_value(value) + .map(Self::V2) + .map_err(serde::de::Error::custom), + ForkName::Base | ForkName::Altair => Err(serde::de::Error::custom(format!( + "SsePayloadAttributes deserialization for {fork_name} not implemented" + ))), + } + } +} + +impl ForkVersionDeserialize for SseExtendedPayloadAttributes { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + let helper: SseExtendedPayloadAttributesGeneric = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Self { + proposal_slot: helper.proposal_slot, + proposer_index: helper.proposer_index, + parent_block_root: helper.parent_block_root, + parent_block_hash: helper.parent_block_hash, + payload_attributes: SsePayloadAttributes::deserialize_by_fork::( + helper.payload_attributes, + fork_name, + )?, + }) + } +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "T: EthSpec", untagged)] pub enum EventKind { @@ -910,6 +980,7 @@ pub enum EventKind { LateHead(SseLateHead), #[cfg(feature = "lighthouse")] BlockReward(BlockReward), + PayloadAttributes(VersionedSsePayloadAttributes), } impl EventKind { @@ -922,6 +993,7 @@ impl EventKind { EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", EventKind::ContributionAndProof(_) => "contribution_and_proof", + EventKind::PayloadAttributes(_) => "payload_attributes", EventKind::LateHead(_) => "late_head", #[cfg(feature = "lighthouse")] EventKind::BlockReward(_) => "block_reward", @@ -977,6 +1049,11 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Contribution and Proof: {:?}", e)) })?, ))), + "payload_attributes" => Ok(EventKind::PayloadAttributes( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Payload Attributes: {:?}", e)) + })?, + )), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), @@ -1006,6 +1083,7 @@ pub enum EventTopic { ChainReorg, ContributionAndProof, LateHead, + PayloadAttributes, #[cfg(feature = "lighthouse")] BlockReward, } @@ -1022,6 +1100,7 @@ impl FromStr for EventTopic { "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), "chain_reorg" => Ok(EventTopic::ChainReorg), "contribution_and_proof" => Ok(EventTopic::ContributionAndProof), + "payload_attributes" => Ok(EventTopic::PayloadAttributes), "late_head" => Ok(EventTopic::LateHead), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventTopic::BlockReward), @@ -1040,6 +1119,7 @@ impl fmt::Display for EventTopic { EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"), + EventTopic::PayloadAttributes => write!(f, "payload_attributes"), EventTopic::LateHead => write!(f, "late_head"), #[cfg(feature = "lighthouse")] EventTopic::BlockReward => write!(f, "block_reward"), diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index a07502c58..bdaec9948 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -182,6 +182,21 @@ fn prepare_payload_lookahead_shorter() { }); } +#[test] +fn always_prepare_payload_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.always_prepare_payload)); +} + +#[test] +fn always_prepare_payload_override() { + CommandLineTest::new() + .flag("always-prepare-payload", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.always_prepare_payload)); +} + #[test] fn paranoid_block_proposal_default() { CommandLineTest::new()