diff --git a/Cargo.lock b/Cargo.lock index 4b0da7bb8..db3771222 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -621,7 +621,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "3.5.0" +version = "3.5.1" dependencies = [ "beacon_chain", "clap", @@ -790,7 +790,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "3.5.0" +version = "3.5.1" dependencies = [ "beacon_node", "clap", @@ -2048,6 +2048,27 @@ dependencies = [ "types", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -3584,6 +3605,16 @@ dependencies = [ "webrtc-util", ] +[[package]] +name = "io-lifetimes" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3" +dependencies = [ + "libc", + "windows-sys 0.45.0", +] + [[package]] name = "ipconfig" version = "0.3.1" @@ -3752,7 +3783,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "3.5.0" +version = "3.5.1" dependencies = [ "account_utils", "beacon_chain", @@ -4358,7 +4389,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "3.5.0" +version = "3.5.1" dependencies = [ "account_manager", "account_utils", @@ -4480,6 +4511,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "linux-raw-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" + [[package]] name = "lmdb-rkv" version = "0.14.0" @@ -5343,9 +5380,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "111.25.0+1.1.1t" +version = "111.25.1+1.1.1t" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3173cd3626c43e3854b1b727422a276e568d9ec5fe8cec197822cf52cfb743d6" +checksum = "1ef9a9cc6ea7d9d5e7c4a913dc4b48d0e359eddf01af1dfec96ba7064b4aba10" dependencies = [ "cc", ] @@ -6279,15 +6316,6 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - [[package]] name = "reqwest" version = "0.11.14" @@ -6513,6 +6541,20 @@ dependencies = [ "nom 7.1.3", ] +[[package]] +name = "rustix" +version = "0.36.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd5c6ff11fecd55b40746d1995a02f2eb375bf8c00d192d521ee09f42bef37bc" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.45.0", +] + [[package]] name = "rustls" version = "0.19.1" @@ -7638,16 +7680,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" dependencies = [ "cfg-if", "fastrand", - "libc", "redox_syscall", - "remove_dir_all", - "winapi", + "rustix", + "windows-sys 0.42.0", ] [[package]] @@ -7840,9 +7881,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.25.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg 1.1.0", "bytes", @@ -7855,7 +7896,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 63f52b22d..c7eccf2e3 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "3.5.0" +version = "3.5.1" authors = ["Paul Hauner ", "Age Manning BeaconChain { })) } + pub fn get_expected_withdrawals( + &self, + forkchoice_update_params: &ForkchoiceUpdateParameters, + proposal_slot: Slot, + ) -> Result, Error> { + let cached_head = self.canonical_head.cached_head(); + let head_state = &cached_head.snapshot.beacon_state; + + let parent_block_root = forkchoice_update_params.head_root; + + let (unadvanced_state, unadvanced_state_root) = + if cached_head.head_block_root() == parent_block_root { + (Cow::Borrowed(head_state), cached_head.head_state_root()) + } else if let Some(snapshot) = self + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .ok_or(Error::SnapshotCacheLockTimeout)? + .get_cloned(parent_block_root, CloneConfig::none()) + { + debug!( + self.log, + "Hit snapshot cache during withdrawals calculation"; + "slot" => proposal_slot, + "parent_block_root" => ?parent_block_root, + ); + let state_root = snapshot.beacon_state_root(); + (Cow::Owned(snapshot.beacon_state), state_root) + } else { + info!( + self.log, + "Missed snapshot cache during withdrawals calculation"; + "slot" => proposal_slot, + "parent_block_root" => ?parent_block_root + ); + let block = self + .get_blinded_block(&parent_block_root)? + .ok_or(Error::MissingBeaconBlock(parent_block_root))?; + let state = self + .get_state(&block.state_root(), Some(block.slot()))? + .ok_or(Error::MissingBeaconState(block.state_root()))?; + (Cow::Owned(state), block.state_root()) + }; + + // Parent state epoch is the same as the proposal, we don't need to advance because the + // list of expected withdrawals can only change after an epoch advance or a + // block application. + let proposal_epoch = proposal_slot.epoch(T::EthSpec::slots_per_epoch()); + if head_state.current_epoch() == proposal_epoch { + return get_expected_withdrawals(&unadvanced_state, &self.spec) + .map_err(Error::PrepareProposerFailed); + } + + // Advance the state using the partial method. + debug!( + self.log, + "Advancing state for withdrawals calculation"; + "proposal_slot" => proposal_slot, + "parent_block_root" => ?parent_block_root, + ); + let mut advanced_state = unadvanced_state.into_owned(); + partial_state_advance( + &mut advanced_state, + Some(unadvanced_state_root), + proposal_epoch.start_slot(T::EthSpec::slots_per_epoch()), + &self.spec, + )?; + get_expected_withdrawals(&advanced_state, &self.spec).map_err(Error::PrepareProposerFailed) + } + /// Determine whether a fork choice update to the execution layer should be overridden. /// /// This is *only* necessary when proposer re-orgs are enabled, because we have to prevent the @@ -4858,7 +4928,9 @@ impl BeaconChain { // Nothing to do if there are no proposers registered with the EL, exit early to avoid // wasting cycles. - if !execution_layer.has_any_proposer_preparation_data().await { + if !self.config.always_prepare_payload + && !execution_layer.has_any_proposer_preparation_data().await + { return Ok(()); } @@ -4915,64 +4987,60 @@ impl BeaconChain { // If the execution layer doesn't have any proposer data for this validator then we assume // it's not connected to this BN and no action is required. let proposer = pre_payload_attributes.proposer_index; - if !execution_layer - .has_proposer_preparation_data(proposer) - .await + if !self.config.always_prepare_payload + && !execution_layer + .has_proposer_preparation_data(proposer) + .await { return Ok(()); } - let withdrawals = match self.spec.fork_name_at_slot::(prepare_slot) { - ForkName::Base | ForkName::Altair | ForkName::Merge => None, - ForkName::Capella | ForkName::Eip4844 => { - // We must use the advanced state because balances can change at epoch boundaries - // and balances affect withdrawals. - // FIXME(mark) - // Might implement caching here in the future.. - let prepare_state = self - .state_at_slot(prepare_slot, StateSkipConfig::WithoutStateRoots) - .map_err(|e| { - error!(self.log, "State advance for withdrawals failed"; "error" => ?e); - e - })?; - Some(get_expected_withdrawals(&prepare_state, &self.spec)) - } - } - .transpose() - .map_err(|e| { - error!(self.log, "Error preparing beacon proposer"; "error" => ?e); - e - }) - .map(|withdrawals_opt| withdrawals_opt.map(|w| w.into())) - .map_err(Error::PrepareProposerFailed)?; - + // Fetch payoad attributes from the execution layer's cache, or compute them from scratch + // if no matching entry is found. This saves recomputing the withdrawals which can take + // considerable time to compute if a state load is required. let head_root = forkchoice_update_params.head_root; - let payload_attributes = PayloadAttributes::new( - self.slot_clock - .start_of(prepare_slot) - .ok_or(Error::InvalidSlot(prepare_slot))? - .as_secs(), - pre_payload_attributes.prev_randao, - execution_layer.get_suggested_fee_recipient(proposer).await, - withdrawals, - ); + let payload_attributes = if let Some(payload_attributes) = execution_layer + .payload_attributes(prepare_slot, head_root) + .await + { + payload_attributes + } else { + let withdrawals = match self.spec.fork_name_at_slot::(prepare_slot) { + ForkName::Base | ForkName::Altair | ForkName::Merge => None, + ForkName::Capella | ForkName::Eip4844 => { + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + chain.get_expected_withdrawals(&forkchoice_update_params, prepare_slot) + }, + "prepare_beacon_proposer_withdrawals", + ) + .await? + .map(Some)? + } + }; - debug!( - self.log, - "Preparing beacon proposer"; - "payload_attributes" => ?payload_attributes, - "prepare_slot" => prepare_slot, - "validator" => proposer, - "parent_root" => ?head_root, - ); + let payload_attributes = PayloadAttributes::new( + self.slot_clock + .start_of(prepare_slot) + .ok_or(Error::InvalidSlot(prepare_slot))? + .as_secs(), + pre_payload_attributes.prev_randao, + execution_layer.get_suggested_fee_recipient(proposer).await, + withdrawals.map(Into::into), + ); - let already_known = execution_layer - .insert_proposer(prepare_slot, head_root, proposer, payload_attributes) - .await; + execution_layer + .insert_proposer( + prepare_slot, + head_root, + proposer, + payload_attributes.clone(), + ) + .await; - // Only push a log to the user if this is the first time we've seen this proposer for this - // slot. - if !already_known { + // Only push a log to the user if this is the first time we've seen this proposer for + // this slot. info!( self.log, "Prepared beacon proposer"; @@ -4980,6 +5048,23 @@ impl BeaconChain { "validator" => proposer, "parent_root" => ?head_root, ); + payload_attributes + }; + + // Push a server-sent event (probably to a block builder or relay). + if let Some(event_handler) = &self.event_handler { + if event_handler.has_payload_attributes_subscribers() { + event_handler.register(EventKind::PayloadAttributes(ForkVersionedResponse { + data: SseExtendedPayloadAttributes { + proposal_slot: prepare_slot, + proposer_index: proposer, + parent_block_root: head_root, + parent_block_hash: forkchoice_update_params.head_hash.unwrap_or_default(), + payload_attributes: payload_attributes.into(), + }, + version: Some(self.spec.fork_name_at_slot::(prepare_slot)), + })); + } } let till_prepare_slot = @@ -5002,7 +5087,9 @@ impl BeaconChain { // If we are close enough to the proposal slot, send an fcU, which will have payload // attributes filled in by the execution layer cache we just primed. - if till_prepare_slot <= self.config.prepare_payload_lookahead { + if self.config.always_prepare_payload + || till_prepare_slot <= self.config.prepare_payload_lookahead + { debug!( self.log, "Sending forkchoiceUpdate for proposer prep"; diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 2051a6236..6e3538aed 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -67,6 +67,10 @@ pub struct ChainConfig { pub prepare_payload_lookahead: Duration, /// Use EL-free optimistic sync for the finalized part of the chain. pub optimistic_finalized_sync: bool, + /// Whether to send payload attributes every slot, regardless of connected proposers. + /// + /// This is useful for block builders and testing. + pub always_prepare_payload: bool, } impl Default for ChainConfig { @@ -93,6 +97,7 @@ impl Default for ChainConfig { prepare_payload_lookahead: Duration::from_secs(4), // This value isn't actually read except in tests. optimistic_finalized_sync: true, + always_prepare_payload: false, } } } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 6f4415ef4..b3fa6627f 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -14,6 +14,7 @@ pub struct ServerSentEventHandler { exit_tx: Sender>, chain_reorg_tx: Sender>, contribution_tx: Sender>, + payload_attributes_tx: Sender>, late_head: Sender>, block_reward_tx: Sender>, log: Logger, @@ -32,6 +33,7 @@ impl ServerSentEventHandler { let (exit_tx, _) = broadcast::channel(capacity); let (chain_reorg_tx, _) = broadcast::channel(capacity); let (contribution_tx, _) = broadcast::channel(capacity); + let (payload_attributes_tx, _) = broadcast::channel(capacity); let (late_head, _) = broadcast::channel(capacity); let (block_reward_tx, _) = broadcast::channel(capacity); @@ -43,6 +45,7 @@ impl ServerSentEventHandler { exit_tx, chain_reorg_tx, contribution_tx, + payload_attributes_tx, late_head, block_reward_tx, log, @@ -50,28 +53,55 @@ impl ServerSentEventHandler { } pub fn register(&self, kind: EventKind) { - let result = match kind { - EventKind::Attestation(attestation) => self + let log_count = |name, count| { + trace!( + self.log, + "Registering server-sent event"; + "kind" => name, + "receiver_count" => count + ); + }; + let result = match &kind { + EventKind::Attestation(_) => self .attestation_tx - .send(EventKind::Attestation(attestation)) - .map(|count| trace!(self.log, "Registering server-sent attestation event"; "receiver_count" => count)), - EventKind::Block(block) => self.block_tx.send(EventKind::Block(block)) - .map(|count| trace!(self.log, "Registering server-sent block event"; "receiver_count" => count)), - EventKind::FinalizedCheckpoint(checkpoint) => self.finalized_tx - .send(EventKind::FinalizedCheckpoint(checkpoint)) - .map(|count| trace!(self.log, "Registering server-sent finalized checkpoint event"; "receiver_count" => count)), - EventKind::Head(head) => self.head_tx.send(EventKind::Head(head)) - .map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)), - EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit)) - .map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)), - EventKind::ChainReorg(reorg) => self.chain_reorg_tx.send(EventKind::ChainReorg(reorg)) - .map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)), - EventKind::ContributionAndProof(contribution_and_proof) => self.contribution_tx.send(EventKind::ContributionAndProof(contribution_and_proof)) - .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), - EventKind::LateHead(late_head) => self.late_head.send(EventKind::LateHead(late_head)) - .map(|count| trace!(self.log, "Registering server-sent late head event"; "receiver_count" => count)), - EventKind::BlockReward(block_reward) => self.block_reward_tx.send(EventKind::BlockReward(block_reward)) - .map(|count| trace!(self.log, "Registering server-sent contribution and proof event"; "receiver_count" => count)), + .send(kind) + .map(|count| log_count(count, "attestation")), + EventKind::Block(_) => self + .block_tx + .send(kind) + .map(|count| log_count(count, "block")), + EventKind::FinalizedCheckpoint(_) => self + .finalized_tx + .send(kind) + .map(|count| log_count(count, "finalized checkpoint")), + EventKind::Head(_) => self + .head_tx + .send(kind) + .map(|count| log_count(count, "head")), + EventKind::VoluntaryExit(_) => self + .exit_tx + .send(kind) + .map(|count| log_count(count, "exit")), + EventKind::ChainReorg(_) => self + .chain_reorg_tx + .send(kind) + .map(|count| log_count(count, "chain reorg")), + EventKind::ContributionAndProof(_) => self + .contribution_tx + .send(kind) + .map(|count| log_count(count, "contribution and proof")), + EventKind::PayloadAttributes(_) => self + .payload_attributes_tx + .send(kind) + .map(|count| log_count(count, "payload attributes")), + EventKind::LateHead(_) => self + .late_head + .send(kind) + .map(|count| log_count(count, "late head")), + EventKind::BlockReward(_) => self + .block_reward_tx + .send(kind) + .map(|count| log_count(count, "block reward")), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -106,6 +136,10 @@ impl ServerSentEventHandler { self.contribution_tx.subscribe() } + pub fn subscribe_payload_attributes(&self) -> Receiver> { + self.payload_attributes_tx.subscribe() + } + pub fn subscribe_late_head(&self) -> Receiver> { self.late_head.subscribe() } @@ -142,6 +176,10 @@ impl ServerSentEventHandler { self.contribution_tx.receiver_count() > 0 } + pub fn has_payload_attributes_subscribers(&self) -> bool { + self.payload_attributes_tx.receiver_count() > 0 + } + pub fn has_late_head_subscribers(&self) -> bool { self.late_head.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 8ede8672e..f8dee76eb 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -111,6 +111,14 @@ pub enum AttestationStrategy { SomeValidators(Vec), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncCommitteeStrategy { + /// All sync committee validators sign. + AllValidators, + /// No validators sign. + NoValidators, +} + /// Indicates whether the `BeaconChainHarness` should use the `state.current_sync_committee` or /// `state.next_sync_committee` when creating sync messages or contributions. #[derive(Clone, Debug)] @@ -1772,15 +1780,64 @@ where self.process_attestations(attestations); } + pub fn sync_committee_sign_block( + &self, + state: &BeaconState, + block_hash: Hash256, + slot: Slot, + relative_sync_committee: RelativeSyncCommittee, + ) { + let sync_contributions = + self.make_sync_contributions(state, block_hash, slot, relative_sync_committee); + self.process_sync_contributions(sync_contributions).unwrap() + } + pub async fn add_attested_block_at_slot( &self, slot: Slot, state: BeaconState, state_root: Hash256, validators: &[usize], + ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { + self.add_attested_block_at_slot_with_sync( + slot, + state, + state_root, + validators, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn add_attested_block_at_slot_with_sync( + &self, + slot: Slot, + state: BeaconState, + state_root: Hash256, + validators: &[usize], + sync_committee_strategy: SyncCommitteeStrategy, ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { let (block_hash, block, state) = self.add_block_at_slot(slot, state).await?; self.attest_block(&state, state_root, block_hash, &block, validators); + + if sync_committee_strategy == SyncCommitteeStrategy::AllValidators + && state.current_sync_committee().is_ok() + { + self.sync_committee_sign_block( + &state, + block_hash.into(), + slot, + if (slot + 1).epoch(E::slots_per_epoch()) + % self.spec.epochs_per_sync_committee_period + == 0 + { + RelativeSyncCommittee::Next + } else { + RelativeSyncCommittee::Current + }, + ); + } + Ok((block_hash, state)) } @@ -1790,10 +1847,35 @@ where state_root: Hash256, slots: &[Slot], validators: &[usize], + ) -> AddBlocksResult { + self.add_attested_blocks_at_slots_with_sync( + state, + state_root, + slots, + validators, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn add_attested_blocks_at_slots_with_sync( + &self, + state: BeaconState, + state_root: Hash256, + slots: &[Slot], + validators: &[usize], + sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { assert!(!slots.is_empty()); - self.add_attested_blocks_at_slots_given_lbh(state, state_root, slots, validators, None) - .await + self.add_attested_blocks_at_slots_given_lbh( + state, + state_root, + slots, + validators, + None, + sync_committee_strategy, + ) + .await } async fn add_attested_blocks_at_slots_given_lbh( @@ -1803,6 +1885,7 @@ where slots: &[Slot], validators: &[usize], mut latest_block_hash: Option, + sync_committee_strategy: SyncCommitteeStrategy, ) -> AddBlocksResult { assert!( slots.windows(2).all(|w| w[0] <= w[1]), @@ -1812,7 +1895,13 @@ where let mut state_hash_from_slot: HashMap = HashMap::new(); for slot in slots { let (block_hash, new_state) = self - .add_attested_block_at_slot(*slot, state, state_root, validators) + .add_attested_block_at_slot_with_sync( + *slot, + state, + state_root, + validators, + sync_committee_strategy, + ) .await .unwrap(); state = new_state; @@ -1894,6 +1983,7 @@ where &epoch_slots, &validators, Some(head_block), + SyncCommitteeStrategy::NoValidators, // for backwards compat ) .await; @@ -2011,6 +2101,22 @@ where num_blocks: usize, block_strategy: BlockStrategy, attestation_strategy: AttestationStrategy, + ) -> Hash256 { + self.extend_chain_with_sync( + num_blocks, + block_strategy, + attestation_strategy, + SyncCommitteeStrategy::NoValidators, + ) + .await + } + + pub async fn extend_chain_with_sync( + &self, + num_blocks: usize, + block_strategy: BlockStrategy, + attestation_strategy: AttestationStrategy, + sync_committee_strategy: SyncCommitteeStrategy, ) -> Hash256 { let (mut state, slots) = match block_strategy { BlockStrategy::OnCanonicalHead => { @@ -2042,7 +2148,13 @@ where }; let state_root = state.update_tree_hash_cache().unwrap(); let (_, _, last_produced_block_hash, _) = self - .add_attested_blocks_at_slots(state, state_root, &slots, &validators) + .add_attested_blocks_at_slots_with_sync( + state, + state_root, + &slots, + &validators, + sync_committee_strategy, + ) .await; last_produced_block_hash.into() } diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 10eeb3a48..f9d26d271 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -88,6 +88,7 @@ pub struct Config { pub monitoring_api: Option, pub slasher: Option, pub logger_config: LoggerConfig, + pub always_prefer_builder_payload: bool, } impl Default for Config { @@ -116,6 +117,7 @@ impl Default for Config { validator_monitor_pubkeys: vec![], validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, logger_config: LoggerConfig::default(), + always_prefer_builder_payload: false, } } } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 4e9dc4291..4abd6d787 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -5,6 +5,7 @@ use crate::http::{ ENGINE_GET_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3, }; use crate::BlobTxConversionError; +use eth2::types::{SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2}; pub use ethers_core::types::Transaction; use ethers_core::utils::rlp::{self, Decodable, Rlp}; use http::deposit_methods::RpcError; @@ -312,6 +313,33 @@ impl PayloadAttributes { } } +impl From for SsePayloadAttributes { + fn from(pa: PayloadAttributes) -> Self { + match pa { + PayloadAttributes::V1(PayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + }) => Self::V1(SsePayloadAttributesV1 { + timestamp, + prev_randao, + suggested_fee_recipient, + }), + PayloadAttributes::V2(PayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + }) => Self::V2(SsePayloadAttributesV2 { + timestamp, + prev_randao, + suggested_fee_recipient, + withdrawals, + }), + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct ForkchoiceUpdatedResponse { pub payload_status: PayloadStatusV1, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 86fe89d7b..9b6e3679a 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -268,6 +268,7 @@ struct Inner { payload_cache: PayloadCache, builder_profit_threshold: Uint256, log: Logger, + always_prefer_builder_payload: bool, } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -290,6 +291,7 @@ pub struct Config { /// The minimum value of an external payload for it to be considered in a proposal. pub builder_profit_threshold: u128, pub execution_timeout_multiplier: Option, + pub always_prefer_builder_payload: bool, } /// Provides access to one execution engine and provides a neat interface for consumption by the @@ -312,6 +314,7 @@ impl ExecutionLayer { default_datadir, builder_profit_threshold, execution_timeout_multiplier, + always_prefer_builder_payload, } = config; if urls.len() > 1 { @@ -384,6 +387,7 @@ impl ExecutionLayer { payload_cache: PayloadCache::default(), builder_profit_threshold: Uint256::from(builder_profit_threshold), log, + always_prefer_builder_payload, }; Ok(Self { @@ -845,7 +849,9 @@ impl ExecutionLayer { let relay_value = relay.data.message.value; let local_value = *local.block_value(); - if local_value >= relay_value { + if !self.inner.always_prefer_builder_payload + && local_value >= relay_value + { info!( self.log(), "Local block is more profitable than relay block"; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5b5f0ff3e..a089b6e97 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3551,6 +3551,9 @@ pub fn serve( api_types::EventTopic::ContributionAndProof => { event_handler.subscribe_contributions() } + api_types::EventTopic::PayloadAttributes => { + event_handler.subscribe_payload_attributes() + } api_types::EventTopic::LateHead => { event_handler.subscribe_late_head() } diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 7096fac42..7db1b22d6 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -2,13 +2,15 @@ use crate::common::*; use beacon_chain::{ chain_config::ReOrgThreshold, - test_utils::{AttestationStrategy, BlockStrategy}, + test_utils::{AttestationStrategy, BlockStrategy, SyncCommitteeStrategy}, }; use eth2::types::DepositContractData; use execution_layer::{ForkchoiceState, PayloadAttributes}; use parking_lot::Mutex; use slot_clock::SlotClock; -use state_processing::state_advance::complete_state_advance; +use state_processing::{ + per_block_processing::get_expected_withdrawals, state_advance::complete_state_advance, +}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -106,13 +108,15 @@ pub struct ReOrgTest { percent_head_votes: usize, should_re_org: bool, misprediction: bool, + /// Whether to expect withdrawals to change on epoch boundaries. + expect_withdrawals_change_on_epoch: bool, } impl Default for ReOrgTest { /// Default config represents a regular easy re-org. fn default() -> Self { Self { - head_slot: Slot::new(30), + head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 1, head_distance: 1, re_org_threshold: 20, @@ -122,6 +126,7 @@ impl Default for ReOrgTest { percent_head_votes: 0, should_re_org: true, misprediction: false, + expect_withdrawals_change_on_epoch: false, } } } @@ -136,13 +141,40 @@ pub async fn proposer_boost_re_org_zero_weight() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_epoch_boundary() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(31), + head_slot: Slot::new(E::slots_per_epoch() - 1), should_re_org: false, ..Default::default() }) .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary_skip1() { + // Proposing a block on a boundary after a skip will change the set of expected withdrawals + // sent in the payload attributes. + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(2 * E::slots_per_epoch() - 2), + head_distance: 2, + should_re_org: false, + expect_withdrawals_change_on_epoch: true, + ..Default::default() + }) + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn proposer_boost_re_org_epoch_boundary_skip32() { + // Propose a block at 64 after a whole epoch of skipped slots. + proposer_boost_re_org_test(ReOrgTest { + head_slot: Slot::new(E::slots_per_epoch() - 1), + head_distance: E::slots_per_epoch() + 1, + should_re_org: false, + expect_withdrawals_change_on_epoch: true, + ..Default::default() + }) + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_slot_after_epoch_boundary() { proposer_boost_re_org_test(ReOrgTest { @@ -187,7 +219,7 @@ pub async fn proposer_boost_re_org_finality() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_parent_distance() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(30), + head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 2, should_re_org: false, ..Default::default() @@ -198,7 +230,7 @@ pub async fn proposer_boost_re_org_parent_distance() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_head_distance() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(29), + head_slot: Slot::new(E::slots_per_epoch() - 3), head_distance: 2, should_re_org: false, ..Default::default() @@ -209,7 +241,7 @@ pub async fn proposer_boost_re_org_head_distance() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_very_unhealthy() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(31), + head_slot: Slot::new(E::slots_per_epoch() - 1), parent_distance: 2, head_distance: 2, percent_parent_votes: 10, @@ -225,7 +257,6 @@ pub async fn proposer_boost_re_org_very_unhealthy() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn proposer_boost_re_org_weight_misprediction() { proposer_boost_re_org_test(ReOrgTest { - head_slot: Slot::new(30), percent_empty_votes: 70, percent_head_votes: 30, should_re_org: false, @@ -254,12 +285,13 @@ pub async fn proposer_boost_re_org_test( percent_head_votes, should_re_org, misprediction, + expect_withdrawals_change_on_epoch, }: ReOrgTest, ) { assert!(head_slot > 0); - // We require a network with execution enabled so we can check EL message timings. - let mut spec = ForkName::Merge.make_genesis_spec(E::default_spec()); + // Test using Capella so that we simulate conditions as similar to mainnet as possible. + let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec()); spec.terminal_total_difficulty = 1.into(); // Ensure there are enough validators to have `attesters_per_slot`. @@ -323,13 +355,15 @@ pub async fn proposer_boost_re_org_test( ) .await; - // Create some chain depth. + // Create some chain depth. Sign sync committee signatures so validator balances don't dip + // below 32 ETH and become ineligible for withdrawals. harness.advance_slot(); harness - .extend_chain( + .extend_chain_with_sync( num_initial as usize, BlockStrategy::OnCanonicalHead, AttestationStrategy::AllValidators, + SyncCommitteeStrategy::AllValidators, ) .await; @@ -364,6 +398,16 @@ pub async fn proposer_boost_re_org_test( let slot_b = slot_a + parent_distance; let slot_c = slot_b + head_distance; + // We need to transition to at least epoch 2 in order to trigger + // `process_rewards_and_penalties`. This allows us to test withdrawals changes at epoch + // boundaries. + if expect_withdrawals_change_on_epoch { + assert!( + slot_c.epoch(E::slots_per_epoch()) >= 2, + "for withdrawals to change, test must end at an epoch >= 2" + ); + } + harness.advance_slot(); let (block_a_root, block_a, state_a) = harness .add_block_at_slot(slot_a, harness.get_current_state()) @@ -457,6 +501,10 @@ pub async fn proposer_boost_re_org_test( // Produce block C. // Advance state_b so we can get the proposer. + assert_eq!(state_b.slot(), slot_b); + let pre_advance_withdrawals = get_expected_withdrawals(&state_b, &harness.chain.spec) + .unwrap() + .to_vec(); complete_state_advance(&mut state_b, None, slot_c, &harness.chain.spec).unwrap(); let proposer_index = state_b @@ -514,6 +562,28 @@ pub async fn proposer_boost_re_org_test( .unwrap(); let payload_attribs = first_update.payload_attributes.as_ref().unwrap(); + // Check that withdrawals from the payload attributes match those computed from the parent's + // advanced state. + let expected_withdrawals = if should_re_org { + let mut state_a_advanced = state_a.clone(); + complete_state_advance(&mut state_a_advanced, None, slot_c, &harness.chain.spec).unwrap(); + get_expected_withdrawals(&state_a_advanced, &harness.chain.spec) + } else { + get_expected_withdrawals(&state_b, &harness.chain.spec) + } + .unwrap() + .to_vec(); + let payload_attribs_withdrawals = payload_attribs.withdrawals().unwrap(); + assert_eq!(expected_withdrawals, *payload_attribs_withdrawals); + assert!(!expected_withdrawals.is_empty()); + + if should_re_org + || expect_withdrawals_change_on_epoch + && slot_c.epoch(E::slots_per_epoch()) != slot_b.epoch(E::slots_per_epoch()) + { + assert_ne!(expected_withdrawals, pre_advance_withdrawals); + } + let lookahead = slot_clock .start_of(slot_c) .unwrap() diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index b0ada1828..cedb06f8c 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -861,6 +861,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { for ensuring the EL is given ample notice. Default: 1/3 of a slot.") .takes_value(true) ) + .arg( + Arg::with_name("always-prepare-payload") + .long("always-prepare-payload") + .help("Send payload attributes with every fork choice update. This is intended for \ + use by block builders, relays and developers. You should set a fee \ + recipient on this BN and also consider adjusting the \ + --prepare-payload-lookahead flag.") + .takes_value(false) + ) .arg( Arg::with_name("fork-choice-before-proposal-timeout") .long("fork-choice-before-proposal-timeout") @@ -990,4 +999,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { This is equivalent to --http and --validator-monitor-auto.") .takes_value(false) ) + .arg( + Arg::with_name("always-prefer-builder-payload") + .long("always-prefer-builder-payload") + .help("If set, the beacon node always uses the payload from the builder instead of the local payload.") + // The builder profit threshold flag is used to provide preference + // to local payloads, therefore it fundamentally conflicts with + // always using the builder. + .conflicts_with("builder-profit-threshold") + ) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index afa94a54e..aa1270487 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -748,6 +748,8 @@ pub fn get_config( / DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR }); + client_config.chain.always_prepare_payload = cli_args.is_present("always-prepare-payload"); + if let Some(timeout) = clap_utils::parse_optional(cli_args, "fork-choice-before-proposal-timeout")? { @@ -788,6 +790,11 @@ pub fn get_config( client_config.chain.optimistic_finalized_sync = !cli_args.is_present("disable-optimistic-finalized-sync"); + // Payload selection configs + if cli_args.is_present("always-prefer-builder-payload") { + client_config.always_prefer_builder_payload = true; + } + Ok(client_config) } diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index 5d24d1203..266dcdfe6 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "boot_node" -version = "3.5.0" +version = "3.5.1" authors = ["Sigma Prime "] edition = "2021" diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 1a0b46e38..d6db3e15f 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -897,6 +897,76 @@ pub struct SseLateHead { pub execution_optimistic: bool, } +#[superstruct( + variants(V1, V2), + variant_attributes(derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)) +)] +#[derive(Clone, Debug, Eq, Hash, PartialEq, Deserialize, Serialize)] +#[serde(untagged)] +pub struct SsePayloadAttributes { + #[superstruct(getter(copy))] + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub timestamp: u64, + #[superstruct(getter(copy))] + pub prev_randao: Hash256, + #[superstruct(getter(copy))] + pub suggested_fee_recipient: Address, + #[superstruct(only(V2))] + pub withdrawals: Vec, +} + +#[derive(PartialEq, Debug, Deserialize, Serialize, Clone)] +pub struct SseExtendedPayloadAttributesGeneric { + pub proposal_slot: Slot, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub proposer_index: u64, + pub parent_block_root: Hash256, + pub parent_block_hash: ExecutionBlockHash, + pub payload_attributes: T, +} + +pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric; +pub type VersionedSsePayloadAttributes = ForkVersionedResponse; + +impl ForkVersionDeserialize for SsePayloadAttributes { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Merge => serde_json::from_value(value) + .map(Self::V1) + .map_err(serde::de::Error::custom), + ForkName::Capella | ForkName::Eip4844 => serde_json::from_value(value) + .map(Self::V2) + .map_err(serde::de::Error::custom), + ForkName::Base | ForkName::Altair => Err(serde::de::Error::custom(format!( + "SsePayloadAttributes deserialization for {fork_name} not implemented" + ))), + } + } +} + +impl ForkVersionDeserialize for SseExtendedPayloadAttributes { + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + let helper: SseExtendedPayloadAttributesGeneric = + serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Self { + proposal_slot: helper.proposal_slot, + proposer_index: helper.proposer_index, + parent_block_root: helper.parent_block_root, + parent_block_hash: helper.parent_block_hash, + payload_attributes: SsePayloadAttributes::deserialize_by_fork::( + helper.payload_attributes, + fork_name, + )?, + }) + } +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "T: EthSpec", untagged)] pub enum EventKind { @@ -910,6 +980,7 @@ pub enum EventKind { LateHead(SseLateHead), #[cfg(feature = "lighthouse")] BlockReward(BlockReward), + PayloadAttributes(VersionedSsePayloadAttributes), } impl EventKind { @@ -922,6 +993,7 @@ impl EventKind { EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", EventKind::ChainReorg(_) => "chain_reorg", EventKind::ContributionAndProof(_) => "contribution_and_proof", + EventKind::PayloadAttributes(_) => "payload_attributes", EventKind::LateHead(_) => "late_head", #[cfg(feature = "lighthouse")] EventKind::BlockReward(_) => "block_reward", @@ -977,6 +1049,11 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Contribution and Proof: {:?}", e)) })?, ))), + "payload_attributes" => Ok(EventKind::PayloadAttributes( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Payload Attributes: {:?}", e)) + })?, + )), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), @@ -1006,6 +1083,7 @@ pub enum EventTopic { ChainReorg, ContributionAndProof, LateHead, + PayloadAttributes, #[cfg(feature = "lighthouse")] BlockReward, } @@ -1022,6 +1100,7 @@ impl FromStr for EventTopic { "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), "chain_reorg" => Ok(EventTopic::ChainReorg), "contribution_and_proof" => Ok(EventTopic::ContributionAndProof), + "payload_attributes" => Ok(EventTopic::PayloadAttributes), "late_head" => Ok(EventTopic::LateHead), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventTopic::BlockReward), @@ -1040,6 +1119,7 @@ impl fmt::Display for EventTopic { EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), EventTopic::ChainReorg => write!(f, "chain_reorg"), EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"), + EventTopic::PayloadAttributes => write!(f, "payload_attributes"), EventTopic::LateHead => write!(f, "late_head"), #[cfg(feature = "lighthouse")] EventTopic::BlockReward => write!(f, "block_reward"), diff --git a/common/eth2_network_config/built_in_network_configs/prater/config.yaml b/common/eth2_network_config/built_in_network_configs/prater/config.yaml index d173be20d..69d65ca8f 100644 --- a/common/eth2_network_config/built_in_network_configs/prater/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/prater/config.yaml @@ -35,8 +35,11 @@ ALTAIR_FORK_EPOCH: 36660 # Merge BELLATRIX_FORK_VERSION: 0x02001020 BELLATRIX_FORK_EPOCH: 112260 +# Capella +CAPELLA_FORK_VERSION: 0x03001020 +CAPELLA_FORK_EPOCH: 162304 # Sharding -SHARDING_FORK_VERSION: 0x03001020 +SHARDING_FORK_VERSION: 0x04001020 SHARDING_FORK_EPOCH: 18446744073709551615 # TBD, 2**32 is a placeholder. Merge transition approach is in active R&D. diff --git a/common/lighthouse_version/src/lib.rs b/common/lighthouse_version/src/lib.rs index 8ad4aa86f..10d1a8c32 100644 --- a/common/lighthouse_version/src/lib.rs +++ b/common/lighthouse_version/src/lib.rs @@ -17,8 +17,8 @@ pub const VERSION: &str = git_version!( // NOTE: using --match instead of --exclude for compatibility with old Git "--match=thiswillnevermatchlol" ], - prefix = "Lighthouse/v3.5.0-", - fallback = "Lighthouse/v3.5.0" + prefix = "Lighthouse/v3.5.1-", + fallback = "Lighthouse/v3.5.1" ); /// Returns `VERSION`, but with platform information appended to the end. diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index e877198db..cd530109e 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "lcli" description = "Lighthouse CLI (modeled after zcli)" -version = "3.5.0" +version = "3.5.1" authors = ["Paul Hauner "] edition = "2021" diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index eb46faa53..d0a414ad3 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lighthouse" -version = "3.5.0" +version = "3.5.1" authors = ["Sigma Prime "] edition = "2021" autotests = false diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index a677abf86..c6990ba3d 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -182,6 +182,21 @@ fn prepare_payload_lookahead_shorter() { }); } +#[test] +fn always_prepare_payload_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.chain.always_prepare_payload)); +} + +#[test] +fn always_prepare_payload_override() { + CommandLineTest::new() + .flag("always-prepare-payload", None) + .run_with_zero_port() + .with_config(|config| assert!(config.chain.always_prepare_payload)); +} + #[test] fn paranoid_block_proposal_default() { CommandLineTest::new() @@ -325,6 +340,21 @@ fn trusted_peers_flag() { }); } +#[test] +fn always_prefer_builder_payload_flag() { + CommandLineTest::new() + .flag("always-prefer-builder-payload", None) + .run_with_zero_port() + .with_config(|config| assert!(config.always_prefer_builder_payload)); +} + +#[test] +fn no_flag_sets_always_prefer_builder_payload_to_false() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(!config.always_prefer_builder_payload)); +} + // Tests for Eth1 flags. #[test] fn dummy_eth1_flag() { diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index f0ed4f737..45cd989a4 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -476,3 +476,28 @@ fn disable_run_on_all() { assert!(config.disable_run_on_all); }); } + +#[test] +fn latency_measurement_service() { + CommandLineTest::new().run().with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", None) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("true")) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("false")) + .run() + .with_config(|config| { + assert!(!config.enable_latency_measurement_service); + }); +} diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 06ddcbaf3..3e667429b 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -14,10 +14,13 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; +/// Message emitted when the VC detects the BN is using a different spec. +const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updating"; + /// The number of seconds *prior* to slot start that we will try and update the state of fallback /// nodes. /// @@ -27,6 +30,14 @@ use types::{ChainSpec, Config, EthSpec}; /// having the correct nodes up and running prior to the start of the slot. const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1); +/// Indicates a measurement of latency between the VC and a BN. +pub struct LatencyMeasurement { + /// An identifier for the beacon node (e.g. the URL). + pub beacon_node_id: String, + /// The round-trip latency, if the BN responded successfully. + pub latency: Option, +} + /// Starts a service that will routinely try and update the status of the provided `beacon_nodes`. /// /// See `SLOT_LOOKAHEAD` for information about when this should run. @@ -262,6 +273,7 @@ impl CandidateBeaconNode { "Beacon node has mismatched Altair fork epoch"; "endpoint" => %self.beacon_node, "endpoint_altair_fork_epoch" => ?beacon_node_spec.altair_fork_epoch, + "hint" => UPDATE_REQUIRED_LOG_HINT, ); } else if beacon_node_spec.bellatrix_fork_epoch != spec.bellatrix_fork_epoch { warn!( @@ -269,6 +281,15 @@ impl CandidateBeaconNode { "Beacon node has mismatched Bellatrix fork epoch"; "endpoint" => %self.beacon_node, "endpoint_bellatrix_fork_epoch" => ?beacon_node_spec.bellatrix_fork_epoch, + "hint" => UPDATE_REQUIRED_LOG_HINT, + ); + } else if beacon_node_spec.capella_fork_epoch != spec.capella_fork_epoch { + warn!( + log, + "Beacon node has mismatched Capella fork epoch"; + "endpoint" => %self.beacon_node, + "endpoint_capella_fork_epoch" => ?beacon_node_spec.capella_fork_epoch, + "hint" => UPDATE_REQUIRED_LOG_HINT, ); } @@ -394,6 +415,47 @@ impl BeaconNodeFallback { let _ = future::join_all(futures).await; } + /// Concurrently send a request to all candidates (regardless of + /// offline/online) status and attempt to collect a rough reading on the + /// latency between the VC and candidate. + pub async fn measure_latency(&self) -> Vec { + let futures: Vec<_> = self + .candidates + .iter() + .map(|candidate| async { + let beacon_node_id = candidate.beacon_node.to_string(); + // The `node/version` endpoint is used since I imagine it would + // require the least processing in the BN and therefore measure + // the connection moreso than the BNs processing speed. + // + // I imagine all clients have the version string availble as a + // pre-computed string. + let response_instant = candidate + .beacon_node + .get_node_version() + .await + .ok() + .map(|_| Instant::now()); + (beacon_node_id, response_instant) + }) + .collect(); + + let request_instant = Instant::now(); + + // Send the request to all BNs at the same time. This might involve some + // queueing on the sending host, however I hope it will avoid bias + // caused by sending requests at different times. + future::join_all(futures) + .await + .into_iter() + .map(|(beacon_node_id, response_instant)| LatencyMeasurement { + beacon_node_id, + latency: response_instant + .and_then(|response| response.checked_duration_since(request_instant)), + }) + .collect() + } + /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. /// diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 6fd519eba..3b3749237 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -407,17 +407,22 @@ impl BlockService { ) .await?; + let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); let signed_block = self_ref .validator_store .sign_block::(*validator_pubkey_ref, block, current_slot) .await .map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?; + let signing_time_ms = + Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); info!( log, "Publishing signed block"; "slot" => slot.as_u64(), + "signing_time_ms" => signing_time_ms, ); + // Publish block with first available beacon node. self.beacon_nodes .first_success( diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 9142a0c7e..fd96aa1f5 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -318,6 +318,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { set here moves too far from the previous block's gas limit. [default: 30,000,000]") .requires("builder-proposals"), ) + .arg( + Arg::with_name("latency-measurement-service") + .long("latency-measurement-service") + .value_name("BOOLEAN") + .help("Set to 'true' to enable a service that periodically attempts to measure latency to BNs. \ + Set to 'false' to disable.") + .default_value("true") + .takes_value(true), + ) /* * Experimental/development options. */ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 0f24e81d5..724d6c74f 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -73,6 +73,8 @@ pub struct Config { pub block_delay: Option, /// Disables publishing http api requests to all beacon nodes for select api calls. pub disable_run_on_all: bool, + /// Enables a service which attempts to measure latency between the VC and BNs. + pub enable_latency_measurement_service: bool, } impl Default for Config { @@ -111,6 +113,7 @@ impl Default for Config { builder_registration_timestamp_override: None, gas_limit: None, disable_run_on_all: false, + enable_latency_measurement_service: true, } } } @@ -357,6 +360,9 @@ impl Config { ); } + config.enable_latency_measurement_service = + parse_optional(cli_args, "latency-measurement-service")?.unwrap_or(true); + /* * Experimental */ diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 6ba2a2d1f..c335c67ab 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -17,13 +17,14 @@ use crate::{ }; use environment::RuntimeContext; use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId}; -use futures::future::join_all; +use futures::{stream, StreamExt}; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use sync::poll_sync_committee_duties; use sync::SyncDutiesMap; use tokio::{sync::mpsc::Sender, time::sleep}; @@ -40,6 +41,14 @@ const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2; /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. const HISTORICAL_DUTIES_EPOCHS: u64 = 2; +/// Compute attestation selection proofs this many slots before they are required. +/// +/// At start-up selection proofs will be computed with less lookahead out of necessity. +const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; + +/// Fraction of a slot at which selection proof signing should happen (2 means half way). +const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; + /// Minimum number of validators for which we auto-enable per-validator metrics. /// For validators greater than this value, we need to manually set the `enable-per-validator-metrics` /// flag in the cli to enable collection of per validator metrics. @@ -71,7 +80,7 @@ pub struct DutyAndProof { impl DutyAndProof { /// Instantiate `Self`, computing the selection proof as well. - pub async fn new( + pub async fn new_with_selection_proof( duty: AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, @@ -99,6 +108,14 @@ impl DutyAndProof { selection_proof, }) } + + /// Create a new `DutyAndProof` with the selection proof waiting to be filled in. + pub fn new_without_selection_proof(duty: AttesterData) -> Self { + Self { + duty, + selection_proof: None, + } + } } /// To assist with readability, the dependent root for attester/proposer duties. @@ -471,7 +488,7 @@ async fn poll_validator_indices( /// 3. Push out any attestation subnet subscriptions to the BN. /// 4. Prune old entries from `duties_service.attesters`. async fn poll_beacon_attesters( - duties_service: &DutiesService, + duties_service: &Arc>, ) -> Result<(), Error> { let current_epoch_timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, @@ -634,7 +651,7 @@ async fn poll_beacon_attesters( /// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and /// store them in `duties_service.attesters`. async fn poll_beacon_attesters_for_epoch( - duties_service: &DutiesService, + duties_service: &Arc>, epoch: Epoch, local_indices: &[u64], local_pubkeys: &HashSet, @@ -742,31 +759,16 @@ async fn poll_beacon_attesters_for_epoch( "num_new_duties" => new_duties.len(), ); - // Produce the `DutyAndProof` messages in parallel. - let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| { - DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec) - })) - .await; - // Update the duties service with the new `DutyAndProof` messages. let mut attesters = duties_service.attesters.write(); let mut already_warned = Some(()); - for result in duty_and_proof_results { - let duty_and_proof = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(e) => { - error!( - log, - "Failed to produce duty and proof"; - "error" => ?e, - "msg" => "may impair attestation duties" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; + for duty in &new_duties { + let attester_map = attesters.entry(duty.pubkey).or_default(); - let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + // Create initial entries in the map without selection proofs. We'll compute them in the + // background later to avoid creating a thundering herd of signing threads whenever new + // duties are computed. + let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone()); if let Some((prior_dependent_root, _)) = attester_map.insert(epoch, (dependent_root, duty_and_proof)) @@ -785,9 +787,144 @@ async fn poll_beacon_attesters_for_epoch( } drop(attesters); + // Spawn the background task to compute selection proofs. + let subservice = duties_service.clone(); + duties_service.context.executor.spawn( + async move { + fill_in_selection_proofs(subservice, new_duties, dependent_root).await; + }, + "duties_service_selection_proofs_background", + ); + Ok(()) } +/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. +/// +/// Duties are computed in batches each slot. If a re-org is detected then the process will +/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant. +async fn fill_in_selection_proofs( + duties_service: Arc>, + duties: Vec, + dependent_root: Hash256, +) { + let log = duties_service.context.log(); + + // Sort duties by slot in a BTreeMap. + let mut duties_by_slot: BTreeMap> = BTreeMap::new(); + + for duty in duties { + duties_by_slot.entry(duty.slot).or_default().push(duty); + } + + // At halfway through each slot when nothing else is likely to be getting signed, sign a batch + // of selection proofs and insert them into the duties service `attesters` map. + let slot_clock = &duties_service.slot_clock; + let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM; + + while !duties_by_slot.is_empty() { + if let Some(duration) = slot_clock.duration_to_next_slot() { + sleep(duration.saturating_sub(slot_offset)).await; + + let Some(current_slot) = slot_clock.now() else { + continue; + }; + + let lookahead_slot = current_slot + SELECTION_PROOF_SLOT_LOOKAHEAD; + + let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot); + std::mem::swap(&mut relevant_duties, &mut duties_by_slot); + + let batch_size = relevant_duties.values().map(Vec::len).sum::(); + + if batch_size == 0 { + continue; + } + + let timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTATION_SELECTION_PROOFS], + ); + + // Sign selection proofs (serially). + let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) + .then(|duty| async { + DutyAndProof::new_with_selection_proof( + duty, + &duties_service.validator_store, + &duties_service.spec, + ) + .await + }) + .collect::>() + .await; + + // Add to attesters store. + let mut attesters = duties_service.attesters.write(); + for result in duty_and_proof_results { + let duty_and_proof = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(e) => { + error!( + log, + "Failed to produce duty and proof"; + "error" => ?e, + "msg" => "may impair attestation duties" + ); + // Do not abort the entire batch for a single failure. + continue; + } + }; + + let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default(); + let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = duty_and_proof.selection_proof else { + continue; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon + // this entire background process. + debug!( + log, + "Stopping selection proof background task"; + "reason" => "re-org" + ); + return; + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((dependent_root, duty_and_proof)); + } + } + } + drop(attesters); + + let time_taken_ms = + Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); + debug!( + log, + "Computed attestation selection proofs"; + "batch_size" => batch_size, + "lookahead_slot" => lookahead_slot, + "time_taken_ms" => time_taken_ms + ); + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(duties_service.slot_clock.slot_duration()).await; + } + } +} + /// Download the proposer duties for the current epoch and store them in `duties_service.proposers`. /// If there are any proposer for this slot, send out a notification to the block proposers. /// diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 2d5b9b1db..3beb5dff1 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -32,6 +32,7 @@ pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get"; pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get"; pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post"; pub const UPDATE_PROPOSERS: &str = "update_proposers"; +pub const ATTESTATION_SELECTION_PROOFS: &str = "attestation_selection_proofs"; pub const SUBSCRIPTIONS: &str = "subscriptions"; pub const LOCAL_KEYSTORE: &str = "local_keystore"; pub const WEB3SIGNER: &str = "web3signer"; @@ -177,12 +178,28 @@ lazy_static::lazy_static! { "Duration to obtain a signature", &["type"] ); + pub static ref BLOCK_SIGNING_TIMES: Result = try_create_histogram( + "vc_block_signing_times_seconds", + "Duration to obtain a signature for a block", + ); pub static ref ATTESTATION_DUTY: Result = try_create_int_gauge_vec( "vc_attestation_duty_slot", "Attestation duty slot for all managed validators", &["validator"] ); + /* + * BN latency + */ + pub static ref VC_BEACON_NODE_LATENCY: Result = try_create_histogram_vec( + "vc_beacon_node_latency", + "Round-trip latency for a simple API endpoint on each BN", + &["endpoint"] + ); + pub static ref VC_BEACON_NODE_LATENCY_PRIMARY_ENDPOINT: Result = try_create_histogram( + "vc_beacon_node_latency_primary_endpoint", + "Round-trip latency for the primary BN endpoint", + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/latency.rs b/validator_client/src/latency.rs new file mode 100644 index 000000000..7e752f292 --- /dev/null +++ b/validator_client/src/latency.rs @@ -0,0 +1,64 @@ +use crate::{http_metrics::metrics, BeaconNodeFallback}; +use environment::RuntimeContext; +use slog::debug; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::time::sleep; +use types::EthSpec; + +/// The latency service will run 11/12ths of the way through the slot. +pub const SLOT_DELAY_MULTIPLIER: u32 = 11; +pub const SLOT_DELAY_DENOMINATOR: u32 = 12; + +/// Starts a service that periodically checks the latency between the VC and the +/// candidate BNs. +pub fn start_latency_service( + context: RuntimeContext, + slot_clock: T, + beacon_nodes: Arc>, +) { + let log = context.log().clone(); + + let future = async move { + loop { + let sleep_time = slot_clock + .duration_to_next_slot() + .map(|next_slot| { + // This is 11/12ths through the next slot. On mainnet this + // will happen in the 11th second of each slot, one second + // before the next slot. + next_slot + (next_slot / SLOT_DELAY_DENOMINATOR) * SLOT_DELAY_MULTIPLIER + }) + // If we can't read the slot clock, just wait one slot. Running + // the measurement at a non-exact time is not a big issue. + .unwrap_or_else(|| slot_clock.slot_duration()); + + // Sleep until it's time to perform the measurement. + sleep(sleep_time).await; + + for (i, measurement) in beacon_nodes.measure_latency().await.iter().enumerate() { + if let Some(latency) = measurement.latency { + debug!( + log, + "Measured BN latency"; + "node" => &measurement.beacon_node_id, + "latency" => latency.as_millis(), + ); + metrics::observe_timer_vec( + &metrics::VC_BEACON_NODE_LATENCY, + &[&measurement.beacon_node_id], + latency, + ); + if i == 0 { + metrics::observe_duration( + &metrics::VC_BEACON_NODE_LATENCY_PRIMARY_ENDPOINT, + latency, + ); + } + } + } + } + }; + + context.executor.spawn(future, "latency"); +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f2d647490..82cacccc6 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -8,6 +8,7 @@ mod duties_service; mod graffiti_file; mod http_metrics; mod key_cache; +mod latency; mod notifier; mod preparation_service; mod signing_method; @@ -563,6 +564,14 @@ impl ProductionValidatorClient { None }; + if self.config.enable_latency_measurement_service { + latency::start_latency_service( + self.context.clone(), + self.duties_service.slot_clock.clone(), + self.duties_service.beacon_nodes.clone(), + ); + } + Ok(()) } }