From 809b52715eeaf68fff66f250487f2dfc39b67847 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 22 Feb 2022 08:29:29 -0500 Subject: [PATCH] some block building updates --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +- .../beacon_chain/src/execution_payload.rs | 125 ++++++++++++++++++ beacon_node/execution_layer/src/lib.rs | 65 +++++++++ consensus/types/src/execution_payload.rs | 5 + 4 files changed, 199 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5373cee20..1dba7330e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -17,8 +17,9 @@ use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::ServerSentEventHandler; -use crate::execution_payload::{get_execution_payload, PreparePayloadHandle}; +use crate::execution_payload::{ PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; +use crate::execution_payload::{get_execution_payload, get_execution_payload_and_blobs}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::migrate::BackgroundMigrator; @@ -3621,7 +3622,8 @@ impl BeaconChain { }), BeaconState::Shanghai(_) => { let sync_aggregate = get_sync_aggregate()?; - let execution_payload = get_execution_payload(self, &state, proposer_index)?; + let (execution_payload, blobs) = + get_execution_payload_and_blobs(self, &state, proposer_index)?; //FIXME(sean) get blobs BeaconBlock::Shanghai(BeaconBlockShanghai { slot, diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 2221d1fc7..b0ea743b1 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -387,6 +387,33 @@ pub fn get_execution_payload< Ok(join_handle) } +/// Wraps the async `prepare_execution_payload` function as a blocking task. +pub fn prepare_execution_payload_and_blobs_blocking( + chain: &BeaconChain, + state: &BeaconState, + proposer_index: u64, +) -> Result< + Option<( + ExecutionPayload, + VariableList< + KZGCommitment, + <::EthSpec as EthSpec>::MaxObjectListSize, + >, + )>, + BlockProductionError, +> { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BlockProductionError::ExecutionLayerMissing)?; + + execution_layer + .block_on_generic(|_| async { + prepare_execution_payload_and_blobs(chain, state, proposer_index).await + }) + .map_err(BlockProductionError::BlockingFailed)? +} + /// Prepares an execution payload for inclusion in a block. /// /// Will return `Ok(None)` if the merge fork has occurred, but a terminal block has not been found. @@ -485,3 +512,101 @@ where Ok(execution_payload) } + +pub async fn prepare_execution_payload_and_blobs( + chain: &BeaconChain, + state: &BeaconState, + proposer_index: u64, +) -> Result< + Option<( + ExecutionPayload, + VariableList< + KZGCommitment, + <::EthSpec as EthSpec>::MaxObjectListSize, + >, + )>, + BlockProductionError, +> { + let spec = &chain.spec; + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(BlockProductionError::ExecutionLayerMissing)?; + + let parent_hash = if !is_merge_transition_complete(state) { + let is_terminal_block_hash_set = spec.terminal_block_hash != Hash256::zero(); + let is_activation_epoch_reached = + state.current_epoch() >= spec.terminal_block_hash_activation_epoch; + + if is_terminal_block_hash_set && !is_activation_epoch_reached { + return Ok(None); + } + + let terminal_pow_block_hash = execution_layer + .get_terminal_pow_block_hash(spec) + .await + .map_err(BlockProductionError::TerminalPoWBlockLookupFailed)?; + + if let Some(terminal_pow_block_hash) = terminal_pow_block_hash { + terminal_pow_block_hash + } else { + return Ok(None); + } + } else { + state.latest_execution_payload_header()?.block_hash + }; + + let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?; + let random = *state.get_randao_mix(state.current_epoch())?; + let finalized_root = state.finalized_checkpoint().root; + + // The finalized block hash is not included in the specification, however we provide this + // parameter so that the execution layer can produce a payload id if one is not already known + // (e.g., due to a recent reorg). + let finalized_block_hash = + if let Some(block) = chain.fork_choice.read().get_block(&finalized_root) { + block.execution_status.block_hash() + } else { + chain + .store + .get_block(&finalized_root) + .map_err(BlockProductionError::FailedToReadFinalizedBlock)? + .ok_or(BlockProductionError::MissingFinalizedBlock(finalized_root))? + .message() + .body() + .execution_payload() + .ok() + .map(|ep| ep.block_hash) + }; + + // Note: the suggested_fee_recipient is stored in the `execution_layer`, it will add this parameter. + let execution_payload = execution_layer + .get_payload( + parent_hash, + timestamp, + random, + finalized_block_hash.unwrap_or_else(Hash256::zero), + proposer_index, + ) + .await + .map_err(BlockProductionError::GetPayloadFailed)?; + + //FIXME(sean) + for tx in execution_payload.blob_txns_iter() { + let versioned_hash = Hash256::zero(); + // get versioned hash + let blob = execution_layer + .get_blob::( + parent_hash, + timestamp, + random, + finalized_root, + proposer_index, + versioned_hash, + ) + .await + .map_err(BlockProductionError::GetPayloadFailed)?; + } + + Ok(Some((execution_payload, VariableList::empty()))) +} diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 68071ee9b..fd6886c69 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -888,6 +888,71 @@ impl ExecutionLayer { .map_err(Error::EngineError) } + pub async fn get_blob( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + finalized_block_hash: Hash256, + proposer_index: u64, + versioned_hash: Hash256, + ) -> Result { + let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await; + + debug!( + self.log(), + "Issuing engine_getBlob"; + "suggested_fee_recipient" => ?suggested_fee_recipient, + "random" => ?random, + "timestamp" => timestamp, + "parent_hash" => ?parent_hash, + ); + self.engines() + .first_success(|engine| async move { + let payload_id = if let Some(id) = engine + .get_payload_id(parent_hash, timestamp, random, suggested_fee_recipient) + .await + { + // The payload id has been cached for this engine. + id + } else { + // The payload id has *not* been cached for this engine. Trigger an artificial + // fork choice update to retrieve a payload ID. + // + // TODO(merge): a better algorithm might try to favour a node that already had a + // cached payload id, since a payload that has had more time to produce is + // likely to be more profitable. + let fork_choice_state = ForkChoiceState { + head_block_hash: parent_hash, + safe_block_hash: parent_hash, + finalized_block_hash, + }; + let payload_attributes = PayloadAttributes { + timestamp, + random, + suggested_fee_recipient, + }; + + engine + .notify_forkchoice_updated( + fork_choice_state, + Some(payload_attributes), + self.log(), + ) + .await + .map(|response| response.payload_id)? + .ok_or(ApiError::PayloadIdUnavailable)? + }; + + engine + .api + .get_blob_v1::(payload_id, versioned_hash) + .await + }) + .await + .map_err(Error::EngineErrors) + } + /// Maps to the `engine_newPayload` JSON-RPC call. /// /// ## Fallback Behaviour diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 412e5a8df..78a53a367 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -3,6 +3,7 @@ use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz::Encode; use ssz_derive::{Decode, Encode}; +use std::slice::Iter; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; @@ -58,4 +59,8 @@ impl ExecutionPayload { // Max size of variable length `transactions` field + (T::max_transactions_per_payload() * (ssz::BYTES_PER_LENGTH_OFFSET + T::max_bytes_per_transaction())) } + + pub fn blob_txns_iter(&self) -> Iter<'_, Transaction> { + self.transactions.iter() + } }