From 267d8babc8cfe20f5b27327f16823324ed0b45a9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Wed, 9 Mar 2022 00:42:05 +0000 Subject: [PATCH] Prepare proposer (#3043) ## Issue Addressed Resolves #2936 ## Proposed Changes Adds functionality for calling [`validator/prepare_beacon_proposer`](https://ethereum.github.io/beacon-APIs/?urls.primaryName=dev#/Validator/prepareBeaconProposer) in advance. There is a `BeaconChain::prepare_beacon_proposer` method which, which called, computes the proposer for the next slot. If that proposer has been registered via the `validator/prepare_beacon_proposer` API method, then the `beacon_chain.execution_layer` will be provided the `PayloadAttributes` for us in all future forkchoiceUpdated calls. An artificial forkchoiceUpdated call will be created 4s before each slot, when the head updates and when a validator updates their information. Additionally, I added strict ordering for calls from the `BeaconChain` to the `ExecutionLayer`. I'm not certain the `ExecutionLayer` will always maintain this ordering, but it's a good start to have consistency from the `BeaconChain`. There are some deadlock opportunities introduced, they are documented in the code. ## Additional Info - ~~Blocked on #2837~~ Co-authored-by: realbigsean --- Cargo.lock | 3 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 371 ++++++++++++++++-- .../beacon_chain/src/beacon_proposer_cache.rs | 60 ++- beacon_node/beacon_chain/src/errors.rs | 2 + beacon_node/beacon_chain/src/lib.rs | 3 +- .../beacon_chain/src/proposer_prep_service.rs | 74 ++++ .../tests/payload_invalidation.rs | 73 ++++ beacon_node/client/src/builder.rs | 35 +- beacon_node/execution_layer/Cargo.toml | 2 + beacon_node/execution_layer/src/engine_api.rs | 2 +- beacon_node/execution_layer/src/engines.rs | 10 + beacon_node/execution_layer/src/lib.rs | 194 ++++++++- beacon_node/execution_layer/src/metrics.rs | 34 ++ .../src/test_utils/handle_rpc.rs | 2 + .../src/test_utils/mock_execution_layer.rs | 34 +- .../execution_layer/src/test_utils/mod.rs | 6 + beacon_node/http_api/src/lib.rs | 7 + beacon_node/http_api/src/proposer_duties.rs | 76 +--- beacon_node/network/src/sync/manager.rs | 38 +- .../src/test_rig.rs | 37 +- 21 files changed, 883 insertions(+), 181 deletions(-) create mode 100644 beacon_node/beacon_chain/src/proposer_prep_service.rs create mode 100644 beacon_node/execution_layer/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index fb75387bd..905705352 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,6 +316,7 @@ dependencies = [ "sensitive_url", "serde", "serde_derive", + "serde_json", "slasher", "slog", "sloggers", @@ -1844,6 +1845,8 @@ dependencies = [ "futures", "hex", "jsonwebtoken", + "lazy_static", + "lighthouse_metrics", "lru", "parking_lot 0.11.2", "rand 0.7.3", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index eeef54589..2628d1436 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -14,6 +14,7 @@ fork_from_env = [] # Initialise the harness chain spec from the FORK_NAME env va [dev-dependencies] maplit = "1.0.2" environment = { path = "../../lighthouse/environment" } +serde_json = "1.0.58" [dependencies] merkle_proof = { path = "../../consensus/merkle_proof" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7655faa0f..77900d82c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4,6 +4,7 @@ use crate::attestation_verification::{ VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; +use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ @@ -35,6 +36,7 @@ use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; +use crate::proposer_prep_service::PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::snapshot_cache::SnapshotCache; use crate::sync_committee_verification::{ @@ -52,7 +54,7 @@ use crate::{metrics, BeaconChainError}; use eth2::types::{ EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead, SyncDuty, }; -use execution_layer::{ExecutionLayer, PayloadStatus}; +use execution_layer::{ExecutionLayer, PayloadAttributes, PayloadStatus}; use fork_choice::{AttestationFromBlock, ForkChoice, InvalidationOperation}; use futures::channel::mpsc::Sender; use itertools::process_results; @@ -112,6 +114,11 @@ pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero(); /// Defines how old a block can be before it's no longer a candidate for the early attester cache. const EARLY_ATTESTER_CACHE_HISTORIC_SLOTS: u64 = 4; +/// Defines a distance between the head block slot and the current slot. +/// +/// If the head block is older than this value, don't bother preparing beacon proposers. +const PREPARE_PROPOSER_HISTORIC_EPOCHS: u64 = 4; + /// Reported to the user when the justified block has an invalid execution payload. pub const INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON: &str = "Justified block has an invalid execution payload."; @@ -206,6 +213,7 @@ pub struct HeadInfo { pub proposer_shuffling_decision_root: Hash256, pub is_merge_transition_complete: bool, pub execution_payload_block_hash: Option, + pub random: Hash256, } pub trait BeaconChainTypes: Send + Sync + 'static { @@ -1116,6 +1124,10 @@ impl BeaconChain { .beacon_state .proposer_shuffling_decision_root(head.beacon_block_root)?; + // The `random` value is used whilst producing an `ExecutionPayload` atop the head. + let current_epoch = head.beacon_state.current_epoch(); + let random = *head.beacon_state.get_randao_mix(current_epoch)?; + Ok(HeadInfo { slot: head.beacon_block.slot(), block_root: head.beacon_block_root, @@ -1134,6 +1146,7 @@ impl BeaconChain { .execution_payload() .ok() .map(|ep| ep.block_hash), + random, }) }) } @@ -3437,13 +3450,6 @@ impl BeaconChain { .attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Current); // Used later for the execution engine. - let new_head_execution_block_hash_opt = new_head - .beacon_block - .message() - .body() - .execution_payload() - .ok() - .map(|ep| ep.block_hash); let is_merge_transition_complete = is_merge_transition_complete(&new_head.beacon_state); drop(lag_timer); @@ -3653,24 +3659,260 @@ impl BeaconChain { } // If this is a post-merge block, update the execution layer. - if let Some(new_head_execution_block_hash) = new_head_execution_block_hash_opt { - if is_merge_transition_complete { - let finalized_execution_block_hash = finalized_block - .execution_status - .block_hash() - .unwrap_or_else(ExecutionBlockHash::zero); - if let Err(e) = self.update_execution_engine_forkchoice_blocking( - finalized_execution_block_hash, - beacon_block_root, - new_head_execution_block_hash, - ) { - crit!( - self.log, - "Failed to update execution head"; - "error" => ?e - ); - } + if is_merge_transition_complete { + let current_slot = self.slot()?; + + if let Err(e) = self.update_execution_engine_forkchoice_blocking(current_slot) { + crit!( + self.log, + "Failed to update execution head"; + "error" => ?e + ); } + + // Performing this call immediately after + // `update_execution_engine_forkchoice_blocking` might result in two calls to fork + // choice updated, one *without* payload attributes and then a second *with* + // payload attributes. + // + // This seems OK. It's not a significant waste of EL<>CL bandwidth or resources, as + // far as I know. + if let Err(e) = self.prepare_beacon_proposer_blocking() { + crit!( + self.log, + "Failed to prepare proposers after fork choice"; + "error" => ?e + ); + } + } + + Ok(()) + } + + pub fn prepare_beacon_proposer_blocking(&self) -> Result<(), Error> { + let execution_layer = self + .execution_layer + .as_ref() + .ok_or(Error::ExecutionLayerMissing)?; + + execution_layer + .block_on_generic(|_| self.prepare_beacon_proposer_async()) + .map_err(Error::PrepareProposerBlockingFailed)? + } + + /// Determines the beacon proposer for the next slot. If that proposer is registered in the + /// `execution_layer`, provide the `execution_layer` with the necessary information to produce + /// `PayloadAttributes` for future calls to fork choice. + /// + /// The `PayloadAttributes` are used by the EL to give it a look-ahead for preparing an optimal + /// set of transactions for a new `ExecutionPayload`. + /// + /// This function will result in a call to `forkchoiceUpdated` on the EL if: + /// + /// 1. We're in the tail-end of the slot (as defined by PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR) + /// 2. The head block is one slot (or less) behind the prepare slot (e.g., we're preparing for + /// the next slot and the block at the current slot is already known). + pub async fn prepare_beacon_proposer_async(&self) -> Result<(), Error> { + let execution_layer = self + .execution_layer + .clone() + .ok_or(Error::ExecutionLayerMissing)?; + + // Nothing to do if there are no proposers registered with the EL, exit early to avoid + // wasting cycles. + if !execution_layer.has_any_proposer_preparation_data().await { + return Ok(()); + } + + let head = self.head_info()?; + let current_slot = self.slot()?; + + // Don't bother with proposer prep if the head is more than + // `PREPARE_PROPOSER_HISTORIC_EPOCHS` prior to the current slot. + // + // This prevents the routine from running during sync. + if head.slot + T::EthSpec::slots_per_epoch() * PREPARE_PROPOSER_HISTORIC_EPOCHS + < current_slot + { + debug!( + self.log, + "Head too old for proposer prep"; + "head_slot" => head.slot, + "current_slot" => current_slot, + ); + return Ok(()); + } + + // We only start to push preparation data for some chain *after* the transition block + // has been imported. + // + // There is no payload preparation for the transition block (i.e., the first block with + // execution enabled in some chain). + if head.execution_payload_block_hash.is_none() { + return Ok(()); + }; + + let head_epoch = head.slot.epoch(T::EthSpec::slots_per_epoch()); + let prepare_slot = current_slot + 1; + let prepare_epoch = prepare_slot.epoch(T::EthSpec::slots_per_epoch()); + + // Ensure that the shuffling decision root is correct relative to the epoch we wish to + // query. + let shuffling_decision_root = if head_epoch == prepare_epoch { + head.proposer_shuffling_decision_root + } else { + head.block_root + }; + + // Read the proposer from the proposer cache. + let cached_proposer = self + .beacon_proposer_cache + .lock() + .get_slot::(shuffling_decision_root, prepare_slot); + let proposer = if let Some(proposer) = cached_proposer { + proposer.index + } else { + if head_epoch + 2 < prepare_epoch { + warn!( + self.log, + "Skipping proposer preparation"; + "msg" => "this is a non-critical issue that can happen on unhealthy nodes or \ + networks.", + "prepare_epoch" => prepare_epoch, + "head_epoch" => head_epoch, + ); + + // Don't skip the head forward more than two epochs. This avoids burdening an + // unhealthy node. + // + // Although this node might miss out on preparing for a proposal, they should still + // be able to propose. This will prioritise beacon chain health over efficient + // packing of execution blocks. + return Ok(()); + } + + let (proposers, decision_root, fork) = + compute_proposer_duties_from_head(prepare_epoch, self)?; + + let proposer_index = prepare_slot.as_usize() % (T::EthSpec::slots_per_epoch() as usize); + let proposer = *proposers + .get(proposer_index) + .ok_or(BeaconChainError::NoProposerForSlot(prepare_slot))?; + + self.beacon_proposer_cache.lock().insert( + prepare_epoch, + decision_root, + proposers, + fork, + )?; + + // It's possible that the head changes whilst computing these duties. If so, abandon + // this routine since the change of head would have also spawned another instance of + // this routine. + // + // Exit now, after updating the cache. + if decision_root != shuffling_decision_root { + warn!( + self.log, + "Head changed during proposer preparation"; + ); + return Ok(()); + } + + proposer + }; + + // If the execution layer doesn't have any proposer data for this validator then we assume + // it's not connected to this BN and no action is required. + if !execution_layer + .has_proposer_preparation_data(proposer as u64) + .await + { + return Ok(()); + } + + let payload_attributes = PayloadAttributes { + timestamp: self + .slot_clock + .start_of(prepare_slot) + .ok_or(Error::InvalidSlot(prepare_slot))? + .as_secs(), + prev_randao: head.random, + suggested_fee_recipient: execution_layer + .get_suggested_fee_recipient(proposer as u64) + .await, + }; + + debug!( + self.log, + "Preparing beacon proposer"; + "payload_attributes" => ?payload_attributes, + "head_root" => ?head.block_root, + "prepare_slot" => prepare_slot, + "validator" => proposer, + ); + + let already_known = execution_layer + .insert_proposer( + prepare_slot, + head.block_root, + proposer as u64, + payload_attributes, + ) + .await; + // Only push a log to the user if this is the first time we've seen this proposer for this + // slot. + if !already_known { + info!( + self.log, + "Prepared beacon proposer"; + "already_known" => already_known, + "prepare_slot" => prepare_slot, + "validator" => proposer, + ); + } + + let till_prepare_slot = + if let Some(duration) = self.slot_clock.duration_to_slot(prepare_slot) { + duration + } else { + // `SlotClock::duration_to_slot` will return `None` when we are past the start + // of `prepare_slot`. Don't bother sending a `forkchoiceUpdated` in that case, + // it's too late. + // + // This scenario might occur on an overloaded/under-resourced node. + warn!( + self.log, + "Delayed proposer preparation"; + "prepare_slot" => prepare_slot, + "validator" => proposer, + ); + return Ok(()); + }; + + // If either of the following are true, send a fork-choice update message to the + // EL: + // + // 1. We're in the tail-end of the slot (as defined by + // PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR) + // 2. The head block is one slot (or less) behind the prepare slot (e.g., we're + // preparing for the next slot and the block at the current slot is already + // known). + if till_prepare_slot + <= self.slot_clock.slot_duration() / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR + || head.slot + 1 >= prepare_slot + { + debug!( + self.log, + "Pushing update to prepare proposer"; + "till_prepare_slot" => ?till_prepare_slot, + "prepare_slot" => prepare_slot + ); + + // Use the blocking method here so that we don't form a queue of these functions when + // routinely calling them. + self.update_execution_engine_forkchoice_async(current_slot) + .await?; } Ok(()) @@ -3678,9 +3920,7 @@ impl BeaconChain { pub fn update_execution_engine_forkchoice_blocking( &self, - finalized_execution_block_hash: ExecutionBlockHash, - head_block_root: Hash256, - head_execution_block_hash: ExecutionBlockHash, + current_slot: Slot, ) -> Result<(), Error> { let execution_layer = self .execution_layer @@ -3688,22 +3928,70 @@ impl BeaconChain { .ok_or(Error::ExecutionLayerMissing)?; execution_layer - .block_on_generic(|_| { - self.update_execution_engine_forkchoice_async( - finalized_execution_block_hash, - head_block_root, - head_execution_block_hash, - ) - }) + .block_on_generic(|_| self.update_execution_engine_forkchoice_async(current_slot)) .map_err(Error::ForkchoiceUpdate)? } pub async fn update_execution_engine_forkchoice_async( &self, - finalized_execution_block_hash: ExecutionBlockHash, - head_block_root: Hash256, - head_execution_block_hash: ExecutionBlockHash, + current_slot: Slot, ) -> Result<(), Error> { + let execution_layer = self + .execution_layer + .as_ref() + .ok_or(Error::ExecutionLayerMissing)?; + + // Take the global lock for updating the execution engine fork choice. + // + // Whilst holding this lock we must: + // + // 1. Read the canonical head. + // 2. Issue a forkchoiceUpdated call to the execution engine. + // + // This will allow us to ensure that we provide the execution layer with an *ordered* view + // of the head. I.e., we will never communicate a past head after communicating a later + // one. + // + // There are two "deadlock warnings" in this function. The downside of this nice ordering + // is the potential for deadlock. I would advise against any other use of + // `execution_engine_forkchoice_lock` apart from the one here. + let forkchoice_lock = execution_layer.execution_engine_forkchoice_lock().await; + + // Deadlock warning: + // + // We are taking the `self.canonical_head` lock whilst holding the `forkchoice_lock`. This + // is intentional, since it allows us to ensure a consistent ordering of messages to the + // execution layer. + let head = self.head_info()?; + let head_block_root = head.block_root; + let head_execution_block_hash = if let Some(hash) = head + .execution_payload_block_hash + .filter(|h| *h != ExecutionBlockHash::zero()) + { + hash + } else { + // Don't send fork choice updates to the execution layer before the transition block. + return Ok(()); + }; + + let finalized_root = if head.finalized_checkpoint.root == Hash256::zero() { + // De-alias `0x00..00` to the genesis block root. + self.genesis_block_root + } else { + head.finalized_checkpoint.root + }; + // Deadlock warning: + // + // The same as above, but the lock on `self.fork_choice`. + let finalized_execution_block_hash = self + .fork_choice + .read() + .get_block(&finalized_root) + .ok_or(Error::FinalizedBlockMissingFromForkChoice(finalized_root))? + .execution_status + .block_hash() + .unwrap_or_else(ExecutionBlockHash::zero); + let forkchoice_updated_response = self .execution_layer .as_ref() @@ -3711,11 +3999,16 @@ impl BeaconChain { .notify_forkchoice_updated( head_execution_block_hash, finalized_execution_block_hash, - None, + current_slot, + head_block_root, ) .await .map_err(Error::ExecutionForkChoiceUpdateFailed); + // The head has been read and the execution layer has been updated. It is now valid to send + // another fork choice update. + drop(forkchoice_lock); + match forkchoice_updated_response { Ok(status) => match &status { PayloadStatus::Valid | PayloadStatus::Syncing => Ok(()), diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index 646884b60..d645201a5 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -8,9 +8,14 @@ //! very simple to reason about, but it might store values that are useless due to finalization. The //! values it stores are very small, so this should not be an issue. +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use lru::LruCache; use smallvec::SmallVec; -use types::{BeaconStateError, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned}; +use state_processing::state_advance::partial_state_advance; +use std::cmp::Ordering; +use types::{ + BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned, +}; /// The number of sets of proposer indices that should be cached. const CACHE_SIZE: usize = 16; @@ -125,3 +130,56 @@ impl BeaconProposerCache { Ok(()) } } + +/// Compute the proposer duties using the head state without cache. +pub fn compute_proposer_duties_from_head( + current_epoch: Epoch, + chain: &BeaconChain, +) -> Result<(Vec, Hash256, Fork), BeaconChainError> { + // Take a copy of the head of the chain. + let head = chain.head()?; + let mut state = head.beacon_state; + let head_state_root = head.beacon_block.state_root(); + + // Advance the state into the requested epoch. + ensure_state_is_in_epoch(&mut state, head_state_root, current_epoch, &chain.spec)?; + + let indices = state + .get_beacon_proposer_indices(&chain.spec) + .map_err(BeaconChainError::from)?; + + let dependent_root = state + // The only block which decides its own shuffling is the genesis block. + .proposer_shuffling_decision_root(chain.genesis_block_root) + .map_err(BeaconChainError::from)?; + + Ok((indices, dependent_root, state.fork())) +} + +/// If required, advance `state` to `target_epoch`. +/// +/// ## Details +/// +/// - Returns an error if `state.current_epoch() > target_epoch`. +/// - No-op if `state.current_epoch() == target_epoch`. +/// - It must be the case that `state.canonical_root() == state_root`, but this function will not +/// check that. +pub fn ensure_state_is_in_epoch( + state: &mut BeaconState, + state_root: Hash256, + target_epoch: Epoch, + spec: &ChainSpec, +) -> Result<(), BeaconChainError> { + match state.current_epoch().cmp(&target_epoch) { + // Protects against an inconsistent slot clock. + Ordering::Greater => Err(BeaconStateError::SlotOutOfBounds.into()), + // The state needs to be advanced. + Ordering::Less => { + let target_slot = target_epoch.start_slot(E::slots_per_epoch()); + partial_state_advance(state, Some(state_root), target_slot, spec) + .map_err(BeaconChainError::from) + } + // The state is suitable, nothing to do. + Ordering::Equal => Ok(()), + } +} diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index e8cc157ce..79f7346ca 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -138,6 +138,7 @@ pub enum BeaconChainError { AltairForkDisabled, ExecutionLayerMissing, ExecutionForkChoiceUpdateFailed(execution_layer::Error), + PrepareProposerBlockingFailed(execution_layer::Error), ExecutionForkChoiceUpdateInvalid { status: PayloadStatus, }, @@ -160,6 +161,7 @@ pub enum BeaconChainError { head_state: Checkpoint, fork_choice: Hash256, }, + InvalidSlot(Slot), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 74649bdee..89a341210 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -3,7 +3,7 @@ pub mod attestation_verification; mod attester_cache; mod beacon_chain; mod beacon_fork_choice_store; -mod beacon_proposer_cache; +pub mod beacon_proposer_cache; mod beacon_snapshot; pub mod block_reward; mod block_times_cache; @@ -28,6 +28,7 @@ pub mod observed_operations; mod persisted_beacon_chain; mod persisted_fork_choice; mod pre_finalization_cache; +pub mod proposer_prep_service; pub mod schema_change; mod shuffling_cache; mod snapshot_cache; diff --git a/beacon_node/beacon_chain/src/proposer_prep_service.rs b/beacon_node/beacon_chain/src/proposer_prep_service.rs new file mode 100644 index 000000000..59977f02c --- /dev/null +++ b/beacon_node/beacon_chain/src/proposer_prep_service.rs @@ -0,0 +1,74 @@ +use crate::{BeaconChain, BeaconChainTypes}; +use slog::{debug, error}; +use slot_clock::SlotClock; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::time::sleep; + +/// At 12s slot times, the means that the payload preparation routine will run 4s before the start +/// of each slot (`12 / 3 = 4`). +pub const PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR: u32 = 3; + +/// Spawns a routine which ensures the EL is provided advance notice of any block producers. +/// +/// This routine will run once per slot, at `slot_duration / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR` +/// before the start of each slot. +/// +/// The service will not be started if there is no `execution_layer` on the `chain`. +pub fn start_proposer_prep_service( + executor: TaskExecutor, + chain: Arc>, +) { + // Avoid spawning the service if there's no EL, it'll just error anyway. + if chain.execution_layer.is_some() { + executor.clone().spawn( + async move { proposer_prep_service(executor, chain).await }, + "proposer_prep_service", + ); + } +} + +/// Loop indefinitely, calling `BeaconChain::prepare_beacon_proposer_async` at an interval. +async fn proposer_prep_service( + executor: TaskExecutor, + chain: Arc>, +) { + let slot_duration = chain.slot_clock.slot_duration(); + + loop { + match chain.slot_clock.duration_to_next_slot() { + Some(duration) => { + let additional_delay = slot_duration + - chain.slot_clock.slot_duration() / PAYLOAD_PREPARATION_LOOKAHEAD_FACTOR; + sleep(duration + additional_delay).await; + + debug!( + chain.log, + "Proposer prepare routine firing"; + ); + + let inner_chain = chain.clone(); + executor.spawn( + async move { + if let Err(e) = inner_chain.prepare_beacon_proposer_async().await { + error!( + inner_chain.log, + "Proposer prepare routine failed"; + "error" => ?e + ); + } + }, + "proposer_prep_update", + ); + + continue; + } + None => { + error!(chain.log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + }; + } +} diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 6d3ffff19..4d2dfccac 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -5,7 +5,11 @@ use beacon_chain::{ BeaconChainError, BlockError, ExecutionPayloadError, HeadInfo, StateSkipConfig, WhenSlotSkipped, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; +use execution_layer::{ + json_structures::JsonPayloadAttributesV1, ExecutionLayer, PayloadAttributes, +}; use proto_array::ExecutionStatus; +use slot_clock::SlotClock; use task_executor::ShutdownReason; use types::*; @@ -54,6 +58,10 @@ impl InvalidPayloadRig { self } + fn execution_layer(&self) -> ExecutionLayer { + self.harness.chain.execution_layer.clone().unwrap() + } + fn block_hash(&self, block_root: Hash256) -> ExecutionBlockHash { self.harness .chain @@ -85,6 +93,19 @@ impl InvalidPayloadRig { self.harness.chain.head_info().unwrap() } + fn previous_payload_attributes(&self) -> PayloadAttributes { + let mock_execution_layer = self.harness.mock_execution_layer.as_ref().unwrap(); + let json = mock_execution_layer + .server + .take_previous_request() + .expect("no previous request"); + let params = json.get("params").expect("no params"); + let payload_param_json = params.get(1).expect("no payload param"); + let attributes: JsonPayloadAttributesV1 = + serde_json::from_value(payload_param_json.clone()).unwrap(); + attributes.into() + } + fn move_to_terminal_block(&self) { let mock_execution_layer = self.harness.mock_execution_layer.as_ref().unwrap(); mock_execution_layer @@ -620,3 +641,55 @@ fn invalid_after_optimistic_sync() { let head = rig.harness.chain.head_info().unwrap(); assert_eq!(head.block_root, roots[1]); } + +#[test] +fn payload_preparation() { + let mut rig = InvalidPayloadRig::new(); + rig.move_to_terminal_block(); + rig.import_block(Payload::Valid); + + let el = rig.execution_layer(); + let head = rig.harness.chain.head().unwrap(); + let current_slot = rig.harness.chain.slot().unwrap(); + assert_eq!(head.beacon_state.slot(), 1); + assert_eq!(current_slot, 1); + + let next_slot = current_slot + 1; + let proposer = head + .beacon_state + .get_beacon_proposer_index(next_slot, &rig.harness.chain.spec) + .unwrap(); + + let fee_recipient = Address::repeat_byte(99); + + // Provide preparation data to the EL for `proposer`. + el.update_proposer_preparation_blocking( + Epoch::new(1), + &[ProposerPreparationData { + validator_index: proposer as u64, + fee_recipient, + }], + ) + .unwrap(); + + rig.harness + .chain + .prepare_beacon_proposer_blocking() + .unwrap(); + + let payload_attributes = PayloadAttributes { + timestamp: rig + .harness + .chain + .slot_clock + .start_of(next_slot) + .unwrap() + .as_secs(), + prev_randao: *head + .beacon_state + .get_randao_mix(head.beacon_state.current_epoch()) + .unwrap(), + suggested_fee_recipient: fee_recipient, + }; + assert_eq!(rig.previous_payload_attributes(), payload_attributes); +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 0f1b17ffd..19022f6cc 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,6 +1,7 @@ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; +use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, @@ -665,26 +666,16 @@ where .head_info() .map_err(|e| format!("Unable to read beacon chain head: {:?}", e))?; + let current_slot = beacon_chain + .slot() + .map_err(|e| format!("Unable to read slot: {:?}", e))?; + // Issue the head to the execution engine on startup. This ensures it can start // syncing. - if let Some(block_hash) = head.execution_payload_block_hash { - let finalized_root = head.finalized_checkpoint.root; - let finalized_block = beacon_chain - .store - .get_block(&finalized_root) - .map_err(|e| format!("Failed to read finalized block from DB: {:?}", e))? - .ok_or(format!( - "Finalized block missing from store: {:?}", - finalized_root - ))?; - let finalized_execution_block_hash = finalized_block - .message() - .body() - .execution_payload() - .ok() - .map(|ep| ep.block_hash) - .unwrap_or_else(ExecutionBlockHash::zero); - + if head + .execution_payload_block_hash + .map_or(false, |h| h != ExecutionBlockHash::zero()) + { // Spawn a new task using the "async" fork choice update method, rather than // using the "blocking" method. // @@ -694,11 +685,7 @@ where runtime_context.executor.spawn( async move { let result = inner_chain - .update_execution_engine_forkchoice_async( - finalized_execution_block_hash, - head.block_root, - block_hash, - ) + .update_execution_engine_forkchoice_async(current_slot) .await; // No need to exit early if setting the head fails. It will be set again if/when the @@ -726,6 +713,8 @@ where // Spawns a routine that polls the `exchange_transition_configuration` endpoint. execution_layer.spawn_transition_configuration_poll(beacon_chain.spec.clone()); } + + start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); } Ok(Client { diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index a7d369ac0..b12d30ea2 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -33,3 +33,5 @@ slot_clock = { path = "../../common/slot_clock" } tempfile = "3.1.0" rand = "0.7.3" zeroize = { version = "1.4.2", features = ["zeroize_derive"] } +lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +lazy_static = "1.4.0" \ No newline at end of file diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 0a509c09b..51c689ac6 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -130,7 +130,7 @@ pub struct ExecutionBlock { pub total_difficulty: Uint256, } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub struct PayloadAttributes { pub timestamp: u64, pub prev_randao: Hash256, diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 6dec9983c..a2c40ceb3 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -152,6 +152,16 @@ impl Engines { let latest_forkchoice_state = self.get_latest_forkchoice_state().await; if let Some(forkchoice_state) = latest_forkchoice_state { + if forkchoice_state.head_block_hash == ExecutionBlockHash::zero() { + debug!( + self.log, + "No need to call forkchoiceUpdated"; + "msg" => "head does not have execution enabled", + "id" => &engine.id, + ); + return; + } + info!( self.log, "Issuing forkchoiceUpdated"; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 5e61e7196..c46549e4e 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -18,19 +18,22 @@ use std::future::Future; use std::io::Write; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::{ - sync::{Mutex, MutexGuard}, + sync::{Mutex, MutexGuard, RwLock}, time::{sleep, sleep_until, Instant}, }; -use types::{ChainSpec, Epoch, ExecutionBlockHash, ProposerPreparationData}; +use types::{ChainSpec, Epoch, ExecutionBlockHash, ProposerPreparationData, Slot}; -pub use engine_api::{http::HttpJsonRpc, PayloadAttributes, PayloadStatusV1Status}; +pub use engine_api::{ + http::HttpJsonRpc, json_structures, PayloadAttributes, PayloadStatusV1Status, +}; pub use payload_status::PayloadStatus; mod engine_api; mod engines; +mod metrics; mod payload_status; pub mod test_utils; @@ -72,17 +75,31 @@ impl From for Error { } } -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub struct ProposerPreparationDataEntry { update_epoch: Epoch, preparation_data: ProposerPreparationData, } +#[derive(Hash, PartialEq, Eq)] +pub struct ProposerKey { + slot: Slot, + head_block_root: Hash256, +} + +#[derive(PartialEq, Clone)] +pub struct Proposer { + validator_index: u64, + payload_attributes: PayloadAttributes, +} + struct Inner { engines: Engines, + execution_engine_forkchoice_lock: Mutex<()>, suggested_fee_recipient: Option
, proposer_preparation_data: Mutex>, execution_blocks: Mutex>, + proposers: RwLock>, executor: TaskExecutor, log: Logger, } @@ -204,8 +221,10 @@ impl ExecutionLayer { latest_forkchoice_state: <_>::default(), log: log.clone(), }, + execution_engine_forkchoice_lock: <_>::default(), suggested_fee_recipient, proposer_preparation_data: Mutex::new(HashMap::new()), + proposers: RwLock::new(HashMap::new()), execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)), executor, log, @@ -240,10 +259,18 @@ impl ExecutionLayer { self.inner.proposer_preparation_data.lock().await } + fn proposers(&self) -> &RwLock> { + &self.inner.proposers + } + fn log(&self) -> &Logger { &self.inner.log } + pub async fn execution_engine_forkchoice_lock(&self) -> MutexGuard<'_, ()> { + self.inner.execution_engine_forkchoice_lock.lock().await + } + /// Convenience function to allow calling async functions in a non-async context. pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result where @@ -421,7 +448,7 @@ impl ExecutionLayer { self.block_on_generic(|_| async move { self.update_proposer_preparation(update_epoch, preparation_data) .await - })? + }) } /// Updates the proposer preparation data provided by validators @@ -429,19 +456,21 @@ impl ExecutionLayer { &self, update_epoch: Epoch, preparation_data: &[ProposerPreparationData], - ) -> Result<(), Error> { + ) { let mut proposer_preparation_data = self.proposer_preparation_data().await; for preparation_entry in preparation_data { - proposer_preparation_data.insert( - preparation_entry.validator_index, - ProposerPreparationDataEntry { - update_epoch, - preparation_data: preparation_entry.clone(), - }, - ); - } + let new = ProposerPreparationDataEntry { + update_epoch, + preparation_data: preparation_entry.clone(), + }; - Ok(()) + let existing = + proposer_preparation_data.insert(preparation_entry.validator_index, new.clone()); + + if existing != Some(new) { + metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_DATA_UPDATED); + } + } } /// Removes expired entries from cached proposer preparations @@ -457,8 +486,22 @@ impl ExecutionLayer { Ok(()) } + /// Returns `true` if there have been any validators registered via + /// `Self::update_proposer_preparation`. + pub async fn has_any_proposer_preparation_data(&self) -> bool { + !self.proposer_preparation_data().await.is_empty() + } + + /// Returns `true` if the `proposer_index` has registered as a local validator via + /// `Self::update_proposer_preparation`. + pub async fn has_proposer_preparation_data(&self, proposer_index: u64) -> bool { + self.proposer_preparation_data() + .await + .contains_key(&proposer_index) + } + /// Returns the fee-recipient address that should be used to build a block - async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address { + pub async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address { if let Some(preparation_data_entry) = self.proposer_preparation_data().await.get(&proposer_index) { @@ -500,6 +543,11 @@ impl ExecutionLayer { finalized_block_hash: ExecutionBlockHash, proposer_index: u64, ) -> Result, Error> { + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::GET_PAYLOAD], + ); + let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await; debug!( @@ -517,6 +565,10 @@ impl ExecutionLayer { .await { // The payload id has been cached for this engine. + metrics::inc_counter_vec( + &metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID, + &[metrics::HIT], + ); id } else { // The payload id has *not* been cached for this engine. Trigger an artificial @@ -525,6 +577,10 @@ impl ExecutionLayer { // TODO(merge): a better algorithm might try to favour a node that already had a // cached payload id, since a payload that has had more time to produce is // likely to be more profitable. + metrics::inc_counter_vec( + &metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID, + &[metrics::MISS], + ); let fork_choice_state = ForkChoiceState { head_block_hash: parent_hash, safe_block_hash: parent_hash, @@ -579,6 +635,11 @@ impl ExecutionLayer { &self, execution_payload: &ExecutionPayload, ) -> Result { + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::NEW_PAYLOAD], + ); + trace!( self.log(), "Issuing engine_newPayload"; @@ -599,6 +660,64 @@ impl ExecutionLayer { ) } + /// Register that the given `validator_index` is going to produce a block at `slot`. + /// + /// The block will be built atop `head_block_root` and the EL will need to prepare an + /// `ExecutionPayload` as defined by the given `payload_attributes`. + pub async fn insert_proposer( + &self, + slot: Slot, + head_block_root: Hash256, + validator_index: u64, + payload_attributes: PayloadAttributes, + ) -> bool { + let proposers_key = ProposerKey { + slot, + head_block_root, + }; + + let existing = self.proposers().write().await.insert( + proposers_key, + Proposer { + validator_index, + payload_attributes, + }, + ); + + if existing.is_none() { + metrics::inc_counter(&metrics::EXECUTION_LAYER_PROPOSER_INSERTED); + } + + existing.is_some() + } + + /// If there has been a proposer registered via `Self::insert_proposer` with a matching `slot` + /// `head_block_root`, then return the appropriate `PayloadAttributes` for inclusion in + /// `forkchoiceUpdated` calls. + pub async fn payload_attributes( + &self, + current_slot: Slot, + head_block_root: Hash256, + ) -> Option { + let proposers_key = ProposerKey { + slot: current_slot, + head_block_root, + }; + + let proposer = self.proposers().read().await.get(&proposers_key).cloned()?; + + debug!( + self.log(), + "Beacon proposer found"; + "payload_attributes" => ?proposer.payload_attributes, + "head_block_root" => ?head_block_root, + "slot" => current_slot, + "validator_index" => proposer.validator_index, + ); + + Some(proposer.payload_attributes) + } + /// Maps to the `engine_consensusValidated` JSON-RPC call. /// /// ## Fallback Behaviour @@ -616,8 +735,14 @@ impl ExecutionLayer { &self, head_block_hash: ExecutionBlockHash, finalized_block_hash: ExecutionBlockHash, - payload_attributes: Option, + current_slot: Slot, + head_block_root: Hash256, ) -> Result { + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::FORKCHOICE_UPDATED], + ); + trace!( self.log(), "Issuing engine_forkchoiceUpdated"; @@ -625,6 +750,29 @@ impl ExecutionLayer { "head_block_hash" => ?head_block_hash, ); + let next_slot = current_slot + 1; + let payload_attributes = self.payload_attributes(next_slot, head_block_root).await; + + // Compute the "lookahead", the time between when the payload will be produced and now. + if let Some(payload_attributes) = payload_attributes { + if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) { + let timestamp = Duration::from_secs(payload_attributes.timestamp); + if let Some(lookahead) = timestamp.checked_sub(now) { + metrics::observe_duration( + &metrics::EXECUTION_LAYER_PAYLOAD_ATTRIBUTES_LOOKAHEAD, + lookahead, + ); + } else { + debug!( + self.log(), + "Late payload attributes"; + "timestamp" => ?timestamp, + "now" => ?now, + ) + } + } + } + // see https://hackmd.io/@n0ble/kintsugi-spec#Engine-API // for now, we must set safe_block_hash = head_block_hash let forkchoice_state = ForkChoiceState { @@ -725,6 +873,11 @@ impl ExecutionLayer { &self, spec: &ChainSpec, ) -> Result, Error> { + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::GET_TERMINAL_POW_BLOCK_HASH], + ); + let hash_opt = self .engines() .first_success(|engine| async move { @@ -836,6 +989,11 @@ impl ExecutionLayer { block_hash: ExecutionBlockHash, spec: &ChainSpec, ) -> Result, Error> { + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::IS_VALID_TERMINAL_POW_BLOCK_HASH], + ); + let broadcast_results = self .engines() .broadcast(|engine| async move { diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs new file mode 100644 index 000000000..4a761c8e4 --- /dev/null +++ b/beacon_node/execution_layer/src/metrics.rs @@ -0,0 +1,34 @@ +pub use lighthouse_metrics::*; + +pub const HIT: &str = "hit"; +pub const MISS: &str = "miss"; +pub const GET_PAYLOAD: &str = "get_payload"; +pub const NEW_PAYLOAD: &str = "new_payload"; +pub const FORKCHOICE_UPDATED: &str = "forkchoice_updated"; +pub const GET_TERMINAL_POW_BLOCK_HASH: &str = "get_terminal_pow_block_hash"; +pub const IS_VALID_TERMINAL_POW_BLOCK_HASH: &str = "is_valid_terminal_pow_block_hash"; + +lazy_static::lazy_static! { + pub static ref EXECUTION_LAYER_PROPOSER_INSERTED: Result = try_create_int_counter( + "execution_layer_proposer_inserted", + "Count of times a new proposer is known", + ); + pub static ref EXECUTION_LAYER_PROPOSER_DATA_UPDATED: Result = try_create_int_counter( + "execution_layer_proposer_data_updated", + "Count of times new proposer data is supplied", + ); + pub static ref EXECUTION_LAYER_REQUEST_TIMES: Result = try_create_histogram_vec( + "execution_layer_request_times", + "Duration of calls to ELs", + &["method"] + ); + pub static ref EXECUTION_LAYER_PAYLOAD_ATTRIBUTES_LOOKAHEAD: Result = try_create_histogram( + "execution_layer_payload_attributes_lookahead", + "Duration between an fcU call with PayloadAttributes and when the block should be produced", + ); + pub static ref EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID: Result = try_create_int_counter_vec( + "execution_layer_pre_prepared_payload_id", + "Indicates hits or misses for already having prepared a payload id before payload production", + &["event"] + ); +} diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 1ee29ce7a..61038f40a 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -10,6 +10,8 @@ pub async fn handle_rpc( body: JsonValue, ctx: Arc>, ) -> Result { + *ctx.previous_request.lock() = Some(body.clone()); + let method = body .get("method") .and_then(JsonValue::as_str) diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index a72f34b1a..cf8c8516f 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -124,15 +124,29 @@ impl MockExecutionLayer { let prev_randao = Hash256::from_low_u64_be(block_number); let finalized_block_hash = parent_hash; + // Insert a proposer to ensure the fork choice updated command works. + let slot = Slot::new(0); + let head_block_root = Hash256::repeat_byte(42); + let validator_index = 0; + self.el + .insert_proposer( + slot, + head_block_root, + validator_index, + PayloadAttributes { + timestamp, + prev_randao, + suggested_fee_recipient: Address::repeat_byte(42), + }, + ) + .await; + self.el .notify_forkchoice_updated( parent_hash, ExecutionBlockHash::zero(), - Some(PayloadAttributes { - timestamp, - prev_randao, - suggested_fee_recipient: Address::repeat_byte(42), - }), + slot, + head_block_root, ) .await .unwrap(); @@ -158,8 +172,16 @@ impl MockExecutionLayer { let status = self.el.notify_new_payload(&payload).await.unwrap(); assert_eq!(status, PayloadStatus::Valid); + // Use junk values for slot/head-root to ensure there is no payload supplied. + let slot = Slot::new(0); + let head_block_root = Hash256::repeat_byte(13); self.el - .notify_forkchoice_updated(block_hash, ExecutionBlockHash::zero(), None) + .notify_forkchoice_updated( + block_hash, + ExecutionBlockHash::zero(), + slot, + head_block_root, + ) .await .unwrap(); diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 0172d8595..99adfa655 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -65,6 +65,7 @@ impl MockServer { log: null_logger().unwrap(), last_echo_request: last_echo_request.clone(), execution_block_generator: RwLock::new(execution_block_generator), + previous_request: <_>::default(), preloaded_responses, static_new_payload_response: <_>::default(), _phantom: PhantomData, @@ -120,6 +121,10 @@ impl MockServer { self.ctx.preloaded_responses.lock().push(response) } + pub fn take_previous_request(&self) -> Option { + self.ctx.previous_request.lock().take() + } + pub fn all_payloads_valid(&self) { let response = StaticNewPayloadResponse { status: PayloadStatusV1 { @@ -241,6 +246,7 @@ pub struct Context { pub last_echo_request: Arc>>, pub execution_block_generator: RwLock>, pub preloaded_responses: Arc>>, + pub previous_request: Arc>>, pub static_new_payload_response: Arc>>, pub _phantom: PhantomData, } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index dcc6528a9..44c2c36bd 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2229,6 +2229,13 @@ pub fn serve( ) })?; + chain.prepare_beacon_proposer_blocking().map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "error updating proposer preparations: {:?}", + e + )) + })?; + Ok(()) }) }, diff --git a/beacon_node/http_api/src/proposer_duties.rs b/beacon_node/http_api/src/proposer_duties.rs index d817c9f65..b040eec77 100644 --- a/beacon_node/http_api/src/proposer_duties.rs +++ b/beacon_node/http_api/src/proposer_duties.rs @@ -2,15 +2,15 @@ use crate::state_id::StateId; use beacon_chain::{ + beacon_proposer_cache::{compute_proposer_duties_from_head, ensure_state_is_in_epoch}, BeaconChain, BeaconChainError, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, }; use eth2::types::{self as api_types}; use safe_arith::SafeArith; use slog::{debug, Logger}; use slot_clock::SlotClock; -use state_processing::state_advance::partial_state_advance; use std::cmp::Ordering; -use types::{BeaconState, ChainSpec, CloneConfig, Epoch, EthSpec, Fork, Hash256, Slot}; +use types::{CloneConfig, Epoch, EthSpec, Hash256, Slot}; /// The struct that is returned to the requesting HTTP client. type ApiDuties = api_types::DutiesResponse>; @@ -55,7 +55,9 @@ pub fn proposer_duties( .safe_add(1) .map_err(warp_utils::reject::arith_error)? { - let (proposers, dependent_root, _) = compute_proposer_duties(request_epoch, chain)?; + let (proposers, dependent_root, _) = + compute_proposer_duties_from_head(request_epoch, chain) + .map_err(warp_utils::reject::beacon_chain_error)?; convert_to_api_response(chain, request_epoch, dependent_root, proposers) } else if request_epoch > current_epoch @@ -130,7 +132,8 @@ fn compute_and_cache_proposer_duties( current_epoch: Epoch, chain: &BeaconChain, ) -> Result { - let (indices, dependent_root, fork) = compute_proposer_duties(current_epoch, chain)?; + let (indices, dependent_root, fork) = compute_proposer_duties_from_head(current_epoch, chain) + .map_err(warp_utils::reject::beacon_chain_error)?; // Prime the proposer shuffling cache with the newly-learned value. chain @@ -143,35 +146,6 @@ fn compute_and_cache_proposer_duties( convert_to_api_response(chain, current_epoch, dependent_root, indices) } -/// Compute the proposer duties using the head state without cache. -fn compute_proposer_duties( - current_epoch: Epoch, - chain: &BeaconChain, -) -> Result<(Vec, Hash256, Fork), warp::reject::Rejection> { - // Take a copy of the head of the chain. - let head = chain - .head() - .map_err(warp_utils::reject::beacon_chain_error)?; - let mut state = head.beacon_state; - let head_state_root = head.beacon_block.state_root(); - - // Advance the state into the requested epoch. - ensure_state_is_in_epoch(&mut state, head_state_root, current_epoch, &chain.spec)?; - - let indices = state - .get_beacon_proposer_indices(&chain.spec) - .map_err(BeaconChainError::from) - .map_err(warp_utils::reject::beacon_chain_error)?; - - let dependent_root = state - // The only block which decides its own shuffling is the genesis block. - .proposer_shuffling_decision_root(chain.genesis_block_root) - .map_err(BeaconChainError::from) - .map_err(warp_utils::reject::beacon_chain_error)?; - - Ok((indices, dependent_root, state.fork())) -} - /// Compute some proposer duties by reading a `BeaconState` from disk, completely ignoring the /// `beacon_proposer_cache`. fn compute_historic_proposer_duties( @@ -198,7 +172,8 @@ fn compute_historic_proposer_duties( let state = if let Some((state_root, mut state)) = state_opt { // If we've loaded the head state it might be from a previous epoch, ensure it's in a // suitable epoch. - ensure_state_is_in_epoch(&mut state, state_root, epoch, &chain.spec)?; + ensure_state_is_in_epoch(&mut state, state_root, epoch, &chain.spec) + .map_err(warp_utils::reject::beacon_chain_error)?; state } else { StateId::slot(epoch.start_slot(T::EthSpec::slots_per_epoch())).state(chain)? @@ -228,39 +203,6 @@ fn compute_historic_proposer_duties( convert_to_api_response(chain, epoch, dependent_root, indices) } -/// If required, advance `state` to `target_epoch`. -/// -/// ## Details -/// -/// - Returns an error if `state.current_epoch() > target_epoch`. -/// - No-op if `state.current_epoch() == target_epoch`. -/// - It must be the case that `state.canonical_root() == state_root`, but this function will not -/// check that. -fn ensure_state_is_in_epoch( - state: &mut BeaconState, - state_root: Hash256, - target_epoch: Epoch, - spec: &ChainSpec, -) -> Result<(), warp::reject::Rejection> { - match state.current_epoch().cmp(&target_epoch) { - // Protects against an inconsistent slot clock. - Ordering::Greater => Err(warp_utils::reject::custom_server_error(format!( - "state epoch {} is later than target epoch {}", - state.current_epoch(), - target_epoch - ))), - // The state needs to be advanced. - Ordering::Less => { - let target_slot = target_epoch.start_slot(E::slots_per_epoch()); - partial_state_advance(state, Some(state_root), target_slot, spec) - .map_err(BeaconChainError::from) - .map_err(warp_utils::reject::beacon_chain_error) - } - // The state is suitable, nothing to do. - Ordering::Equal => Ok(()), - } -} - /// Converts the internal representation of proposer duties into one that is compatible with the /// standard API. fn convert_to_api_response( diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0c84087f9..021a12c18 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -209,6 +209,9 @@ pub struct SyncManager { /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: mpsc::Sender>, + /// Used for spawning tasks. + executor: task_executor::TaskExecutor, + /// The logger for the import manager. log: Logger, } @@ -269,6 +272,7 @@ pub fn spawn( failed_chains: LRUCache::new(500), single_block_lookups: FnvHashMap::default(), beacon_processor_send, + executor: executor.clone(), log: log.clone(), }; @@ -475,19 +479,27 @@ impl SyncManager { ); info!(self.log, "Processed block"; "block" => %block_root); - match self.chain.fork_choice() { - Ok(()) => trace!( - self.log, - "Fork choice success"; - "location" => "single block" - ), - Err(e) => error!( - self.log, - "Fork choice failed"; - "error" => ?e, - "location" => "single block" - ), - } + // Spawn `BeaconChain::fork_choice` in a blocking task. It's + // potentially long-running and it might panic if run from an async + // context. + let chain = self.chain.clone(); + let log = self.log.clone(); + self.executor.spawn_blocking( + move || match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "single block" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => ?e, + "location" => "single block" + ), + }, + "sync_manager_fork_choice", + ); } Err(BlockError::ParentUnknown { .. }) => { // We don't know of the blocks parent, begin a parent lookup search diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index a59586555..b788a7565 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::time::sleep; -use types::{Address, ChainSpec, EthSpec, ExecutionBlockHash, Hash256, MainnetEthSpec, Uint256}; +use types::{ + Address, ChainSpec, EthSpec, ExecutionBlockHash, Hash256, MainnetEthSpec, Slot, Uint256, +}; const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(10); @@ -187,11 +189,12 @@ impl TestRig { */ let head_block_hash = valid_payload.block_hash; let finalized_block_hash = ExecutionBlockHash::zero(); - let payload_attributes = None; + let slot = Slot::new(42); + let head_block_root = Hash256::repeat_byte(42); let status = self .ee_a .execution_layer - .notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes) + .notify_forkchoice_updated(head_block_hash, finalized_block_hash, slot, head_block_root) .await .unwrap(); assert_eq!(status, PayloadStatus::Syncing); @@ -219,11 +222,12 @@ impl TestRig { */ let head_block_hash = valid_payload.block_hash; let finalized_block_hash = ExecutionBlockHash::zero(); - let payload_attributes = None; + let slot = Slot::new(42); + let head_block_root = Hash256::repeat_byte(42); let status = self .ee_a .execution_layer - .notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes) + .notify_forkchoice_updated(head_block_hash, finalized_block_hash, slot, head_block_root) .await .unwrap(); assert_eq!(status, PayloadStatus::Valid); @@ -289,15 +293,22 @@ impl TestRig { */ let head_block_hash = valid_payload.block_hash; let finalized_block_hash = ExecutionBlockHash::zero(); - let payload_attributes = Some(PayloadAttributes { + let payload_attributes = PayloadAttributes { timestamp: second_payload.timestamp + 1, prev_randao: Hash256::zero(), suggested_fee_recipient: Address::zero(), - }); + }; + let slot = Slot::new(42); + let head_block_root = Hash256::repeat_byte(100); + let validator_index = 0; + self.ee_a + .execution_layer + .insert_proposer(slot, head_block_root, validator_index, payload_attributes) + .await; let status = self .ee_a .execution_layer - .notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes) + .notify_forkchoice_updated(head_block_hash, finalized_block_hash, slot, head_block_root) .await .unwrap(); assert_eq!(status, PayloadStatus::Valid); @@ -322,11 +333,12 @@ impl TestRig { */ let head_block_hash = second_payload.block_hash; let finalized_block_hash = ExecutionBlockHash::zero(); - let payload_attributes = None; + let slot = Slot::new(42); + let head_block_root = Hash256::repeat_byte(42); let status = self .ee_b .execution_layer - .notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes) + .notify_forkchoice_updated(head_block_hash, finalized_block_hash, slot, head_block_root) .await .unwrap(); assert_eq!(status, PayloadStatus::Syncing); @@ -365,11 +377,12 @@ impl TestRig { */ let head_block_hash = second_payload.block_hash; let finalized_block_hash = ExecutionBlockHash::zero(); - let payload_attributes = None; + let slot = Slot::new(42); + let head_block_root = Hash256::repeat_byte(42); let status = self .ee_b .execution_layer - .notify_forkchoice_updated(head_block_hash, finalized_block_hash, payload_attributes) + .notify_forkchoice_updated(head_block_hash, finalized_block_hash, slot, head_block_root) .await .unwrap(); assert_eq!(status, PayloadStatus::Valid);