diff --git a/Cargo.lock b/Cargo.lock index 2503176fe..602bfc261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1923,7 +1923,6 @@ dependencies = [ "eth2_ssz 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "eth2_ssz_derive 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "proto_array", - "state_processing", "store", "types", ] diff --git a/Cargo.toml b/Cargo.toml index b005ce1c1..ff0b1f1c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "beacon_node/client", "beacon_node/eth1", "beacon_node/lighthouse_network", + "beacon_node/execution_layer", "beacon_node/http_api", "beacon_node/http_metrics", "beacon_node/network", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 0f68405db..069557550 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -55,3 +55,4 @@ slasher = { path = "../../slasher" } eth2 = { path = "../../common/eth2" } strum = { version = "0.21.0", features = ["derive"] } logging = { path = "../../common/logging" } +execution_layer = { path = "../execution_layer" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7a253e4e8..5f3358754 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -49,6 +49,7 @@ use crate::{metrics, BeaconChainError}; use eth2::types::{ EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead, SyncDuty, }; +use execution_layer::ExecutionLayer; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; @@ -62,7 +63,9 @@ use slot_clock::SlotClock; use state_processing::{ common::get_indexed_attestation, per_block_processing, - per_block_processing::errors::AttestationValidationError, + per_block_processing::{ + compute_timestamp_at_slot, errors::AttestationValidationError, is_merge_complete, + }, per_slot_processing, state_advance::{complete_state_advance, partial_state_advance}, BlockSignatureStrategy, SigVerifiedOp, @@ -275,6 +278,8 @@ pub struct BeaconChain { Mutex, T::EthSpec>>, /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, + /// Interfaces with the execution client. + pub execution_layer: Option, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. pub(crate) canonical_head: TimeoutRwLock>, /// The root of the genesis block. @@ -2407,7 +2412,7 @@ impl BeaconChain { let _fork_choice_block_timer = metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_BLOCK_TIMES); fork_choice - .on_block(current_slot, &block, block_root, &state, &self.spec) + .on_block(current_slot, &block, block_root, &state) .map_err(|e| BlockError::BeaconChainError(e.into()))?; } @@ -2839,12 +2844,42 @@ impl BeaconChain { })) }; // Closure to fetch a sync aggregate in cases where it is required. - let get_execution_payload = || -> Result, BlockProductionError> { - // TODO: actually get the payload from eth1 node.. - Ok(ExecutionPayload::default()) + let get_execution_payload = |latest_execution_payload_header: &ExecutionPayloadHeader< + T::EthSpec, + >| + -> Result, BlockProductionError> { + let execution_layer = self + .execution_layer + .as_ref() + .ok_or(BlockProductionError::ExecutionLayerMissing)?; + + let parent_hash; + if !is_merge_complete(&state) { + let terminal_pow_block_hash = execution_layer + .block_on(|execution_layer| execution_layer.get_terminal_pow_block_hash()) + .map_err(BlockProductionError::TerminalPoWBlockLookupFailed)?; + + if let Some(terminal_pow_block_hash) = terminal_pow_block_hash { + parent_hash = terminal_pow_block_hash; + } else { + return Ok(<_>::default()); + } + } else { + parent_hash = latest_execution_payload_header.block_hash; + } + + let timestamp = + compute_timestamp_at_slot(&state, &self.spec).map_err(BeaconStateError::from)?; + let random = *state.get_randao_mix(state.current_epoch())?; + + execution_layer + .block_on(|execution_layer| { + execution_layer.get_payload(parent_hash, timestamp, random) + }) + .map_err(BlockProductionError::GetPayloadFailed) }; - let inner_block = match state { + let inner_block = match &state { BeaconState::Base(_) => BeaconBlock::Base(BeaconBlockBase { slot, proposer_index, @@ -2881,9 +2916,10 @@ impl BeaconChain { }, }) } - BeaconState::Merge(_) => { + BeaconState::Merge(state) => { let sync_aggregate = get_sync_aggregate()?; - let execution_payload = get_execution_payload()?; + let execution_payload = + get_execution_payload(&state.latest_execution_payload_header)?; BeaconBlock::Merge(BeaconBlockMerge { slot, proposer_index, @@ -3094,6 +3130,14 @@ impl BeaconChain { .beacon_state .attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Current); + // Used later for the execution engine. + let new_head_execution_block_hash = new_head + .beacon_block + .message() + .body() + .execution_payload() + .map(|ep| ep.block_hash); + drop(lag_timer); // Update the snapshot that stores the head of the chain at the time it received the @@ -3297,9 +3341,67 @@ impl BeaconChain { } } + // If this is a post-merge block, update the execution layer. + if let Some(new_head_execution_block_hash) = new_head_execution_block_hash { + let execution_layer = self + .execution_layer + .clone() + .ok_or(Error::ExecutionLayerMissing)?; + let store = self.store.clone(); + let log = self.log.clone(); + + // Spawn the update task, without waiting for it to complete. + execution_layer.spawn( + move |execution_layer| async move { + if let Err(e) = Self::update_execution_engine_forkchoice( + execution_layer, + store, + new_finalized_checkpoint.root, + new_head_execution_block_hash, + ) + .await + { + error!( + log, + "Failed to update execution head"; + "error" => ?e + ); + } + }, + "update_execution_engine_forkchoice", + ) + } + Ok(()) } + pub async fn update_execution_engine_forkchoice( + execution_layer: ExecutionLayer, + store: BeaconStore, + finalized_beacon_block_root: Hash256, + head_execution_block_hash: Hash256, + ) -> Result<(), Error> { + // Loading the finalized block from the store is not ideal. Perhaps it would be better to + // store it on fork-choice so we can do a lookup without hitting the database. + // + // See: https://github.com/sigp/lighthouse/pull/2627#issuecomment-927537245 + let finalized_block = store + .get_block(&finalized_beacon_block_root)? + .ok_or(Error::MissingBeaconBlock(finalized_beacon_block_root))?; + + let finalized_execution_block_hash = finalized_block + .message() + .body() + .execution_payload() + .map(|ep| ep.block_hash) + .unwrap_or_else(Hash256::zero); + + execution_layer + .forkchoice_updated(head_execution_block_hash, finalized_execution_block_hash) + .await + .map_err(Error::ExecutionForkChoiceUpdateFailed) + } + /// This function takes a configured weak subjectivity `Checkpoint` and the latest finalized `Checkpoint`. /// If the weak subjectivity checkpoint and finalized checkpoint share the same epoch, we compare /// roots. If we the weak subjectivity checkpoint is from an older epoch, we iterate back through diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 91ba04d8e..6c73fae7d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -48,8 +48,9 @@ use crate::{ BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }, - eth1_chain, metrics, BeaconChain, BeaconChainError, BeaconChainTypes, + metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; +use execution_layer::ExecutePayloadResponse; use fork_choice::{ForkChoice, ForkChoiceStore}; use parking_lot::RwLockReadGuard; use proto_array::Block as ProtoBlock; @@ -57,7 +58,7 @@ use safe_arith::ArithError; use slog::{debug, error, Logger}; use slot_clock::SlotClock; use ssz::Encode; -use state_processing::per_block_processing::is_execution_enabled; +use state_processing::per_block_processing::{is_execution_enabled, is_merge_block}; use state_processing::{ block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, per_block_processing, per_slot_processing, @@ -242,19 +243,25 @@ pub enum ExecutionPayloadError { /// ## Peer scoring /// /// As this is our fault, do not penalize the peer - NoEth1Connection, + NoExecutionConnection, /// Error occurred during engine_executePayload /// /// ## Peer scoring /// /// Some issue with our configuration, do not penalize peer - Eth1VerificationError(eth1_chain::Error), + RequestFailed(execution_layer::Error), /// The execution engine returned INVALID for the payload /// /// ## Peer scoring /// /// The block is invalid and the peer is faulty RejectedByExecutionEngine, + /// The execution engine returned SYNCING for the payload + /// + /// ## Peer scoring + /// + /// It is not known if the block is valid or invalid. + ExecutionEngineIsSyncing, /// The execution payload timestamp does not match the slot /// /// ## Peer scoring @@ -279,6 +286,38 @@ pub enum ExecutionPayloadError { /// /// The block is invalid and the peer is faulty TransactionDataExceedsSizeLimit, + /// The execution payload references an execution block that cannot trigger the merge. + /// + /// ## Peer scoring + /// + /// The block is invalid and the peer sent us a block that passes gossip propagation conditions, + /// but is invalid upon further verification. + InvalidTerminalPoWBlock, + /// The execution payload references execution blocks that are unavailable on our execution + /// nodes. + /// + /// ## Peer scoring + /// + /// It's not clear if the peer is invalid or if it's on a different execution fork to us. + TerminalPoWBlockNotFound, +} + +impl From for ExecutionPayloadError { + fn from(e: execution_layer::Error) -> Self { + ExecutionPayloadError::RequestFailed(e) + } +} + +impl From for BlockError { + fn from(e: ExecutionPayloadError) -> Self { + BlockError::ExecutionPayloadError(e) + } +} + +impl From for BlockError { + fn from(e: InconsistentFork) -> Self { + BlockError::InconsistentFork(e) + } } impl std::fmt::Display for BlockError { @@ -1054,35 +1093,79 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { } } - // This is the soonest we can run these checks as they must be called AFTER per_slot_processing - if is_execution_enabled(&state, block.message().body()) { - let eth1_chain = chain - .eth1_chain + // If this block triggers the merge, check to ensure that it references valid execution + // blocks. + // + // The specification defines this check inside `on_block` in the fork-choice specification, + // however we perform the check here for two reasons: + // + // - There's no point in importing a block that will fail fork choice, so it's best to fail + // early. + // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no + // calls to remote servers. + if is_merge_block(&state, block.message().body()) { + let execution_layer = chain + .execution_layer .as_ref() - .ok_or(BlockError::ExecutionPayloadError( - ExecutionPayloadError::NoEth1Connection, - ))?; - - let payload_valid = eth1_chain - .on_payload(block.message().body().execution_payload().ok_or_else(|| { - BlockError::InconsistentFork(InconsistentFork { + .ok_or(ExecutionPayloadError::NoExecutionConnection)?; + let execution_payload = + block + .message() + .body() + .execution_payload() + .ok_or_else(|| InconsistentFork { fork_at_slot: eth2::types::ForkName::Merge, object_fork: block.message().body().fork_name(), - }) - })?) - .map_err(|e| { - BlockError::ExecutionPayloadError(ExecutionPayloadError::Eth1VerificationError( - e, - )) - })?; + })?; - if !payload_valid { - return Err(BlockError::ExecutionPayloadError( - ExecutionPayloadError::RejectedByExecutionEngine, - )); - } + let is_valid_terminal_pow_block = execution_layer + .block_on(|execution_layer| { + execution_layer.is_valid_terminal_pow_block_hash(execution_payload.parent_hash) + }) + .map_err(ExecutionPayloadError::from)?; + + match is_valid_terminal_pow_block { + Some(true) => Ok(()), + Some(false) => Err(ExecutionPayloadError::InvalidTerminalPoWBlock), + None => Err(ExecutionPayloadError::TerminalPoWBlockNotFound), + }?; } + // This is the soonest we can run these checks as they must be called AFTER per_slot_processing + let execute_payload_handle = if is_execution_enabled(&state, block.message().body()) { + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(ExecutionPayloadError::NoExecutionConnection)?; + let execution_payload = + block + .message() + .body() + .execution_payload() + .ok_or_else(|| InconsistentFork { + fork_at_slot: eth2::types::ForkName::Merge, + object_fork: block.message().body().fork_name(), + })?; + + let (execute_payload_status, execute_payload_handle) = execution_layer + .block_on(|execution_layer| execution_layer.execute_payload(execution_payload)) + .map_err(ExecutionPayloadError::from)?; + + match execute_payload_status { + ExecutePayloadResponse::Valid => Ok(()), + ExecutePayloadResponse::Invalid => { + Err(ExecutionPayloadError::RejectedByExecutionEngine) + } + ExecutePayloadResponse::Syncing => { + Err(ExecutionPayloadError::ExecutionEngineIsSyncing) + } + }?; + + Some(execute_payload_handle) + } else { + None + }; + // If the block is sufficiently recent, notify the validator monitor. if let Some(slot) = chain.slot_clock.now() { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); @@ -1181,6 +1264,15 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { }); } + // If this block required an `executePayload` call to the execution node, inform it that the + // block is indeed valid. + // + // If the handle is dropped without explicitly declaring validity, an invalid message will + // be sent to the execution engine. + if let Some(execute_payload_handle) = execute_payload_handle { + execute_payload_handle.publish_consensus_valid(); + } + Ok(Self { block, block_root, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d96ca7082..ab0cf50c3 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -15,6 +15,7 @@ use crate::{ Eth1ChainBackend, ServerSentEventHandler, }; use eth1::Config as Eth1Config; +use execution_layer::ExecutionLayer; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use operation_pool::{OperationPool, PersistedOperationPool}; @@ -75,6 +76,7 @@ pub struct BeaconChainBuilder { >, op_pool: Option>, eth1_chain: Option>, + execution_layer: Option, event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, @@ -115,6 +117,7 @@ where fork_choice: None, op_pool: None, eth1_chain: None, + execution_layer: None, event_handler: None, slot_clock: None, shutdown_sender: None, @@ -476,6 +479,12 @@ where self } + /// Sets the `BeaconChain` execution layer. + pub fn execution_layer(mut self, execution_layer: Option) -> Self { + self.execution_layer = execution_layer; + self + } + /// Sets the `BeaconChain` event handler backend. /// /// For example, provide `ServerSentEventHandler` as a `handler`. @@ -737,6 +746,7 @@ where observed_proposer_slashings: <_>::default(), observed_attester_slashings: <_>::default(), eth1_chain: self.eth1_chain, + execution_layer: self.execution_layer, genesis_validators_root: canonical_head.beacon_state.genesis_validators_root(), canonical_head: TimeoutRwLock::new(canonical_head.clone()), genesis_block_root, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 972e70181..6bb06e889 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -134,6 +134,8 @@ pub enum BeaconChainError { new_slot: Slot, }, AltairForkDisabled, + ExecutionLayerMissing, + ExecutionForkChoiceUpdateFailed(execution_layer::Error), } easy_from_to!(SlotProcessingError, BeaconChainError); @@ -175,6 +177,9 @@ pub enum BlockProductionError { produce_at_slot: Slot, state_slot: Slot, }, + ExecutionLayerMissing, + TerminalPoWBlockLookupFailed(execution_layer::Error), + GetPayloadFailed(execution_layer::Error), } easy_from_to!(BlockProcessingError, BlockProductionError); diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 8d0545c58..31678580a 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -166,7 +166,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It let (block, _) = block.deconstruct(); fork_choice - .on_block(block.slot(), &block, block.canonical_root(), &state, spec) + .on_block(block.slot(), &block, block.canonical_root(), &state) .map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?; } diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 165904a4c..d2e673f60 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -38,3 +38,4 @@ http_metrics = { path = "../http_metrics" } slasher = { path = "../../slasher" } slasher_service = { path = "../../slasher/service" } monitoring_api = {path = "../../common/monitoring_api"} +execution_layer = { path = "../execution_layer" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 6661fa229..a535b4612 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -16,6 +16,7 @@ use eth2::{ types::{BlockId, StateId}, BeaconNodeHttpClient, Error as ApiError, Timeouts, }; +use execution_layer::ExecutionLayer; use genesis::{interop_genesis_state, Eth1GenesisService}; use lighthouse_network::NetworkGlobals; use monitoring_api::{MonitoringHttpClient, ProcessType}; @@ -146,6 +147,29 @@ where None }; + let terminal_total_difficulty = config + .terminal_total_difficulty_override + .unwrap_or(spec.terminal_total_difficulty); + let terminal_block_hash = config + .terminal_block_hash + .unwrap_or(spec.terminal_block_hash); + + let execution_layer = if let Some(execution_endpoints) = config.execution_endpoints { + let context = runtime_context.service_context("exec".into()); + let execution_layer = ExecutionLayer::from_urls( + execution_endpoints, + terminal_total_difficulty, + terminal_block_hash, + config.fee_recipient, + context.executor.clone(), + context.log().clone(), + ) + .map_err(|e| format!("unable to start execution layer endpoints: {:?}", e))?; + Some(execution_layer) + } else { + None + }; + let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log().clone()) .store(store) @@ -154,6 +178,7 @@ where .disabled_forks(disabled_forks) .graffiti(graffiti) .event_handler(event_handler) + .execution_layer(execution_layer) .monitor_validators( config.validator_monitor_auto, config.validator_monitor_pubkeys.clone(), diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 40e13898b..d1fb4bd98 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -4,7 +4,7 @@ use sensitive_url::SensitiveUrl; use serde_derive::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; -use types::{Graffiti, PublicKeyBytes}; +use types::{Address, Graffiti, Hash256, PublicKeyBytes, Uint256}; /// Default directory name for the freezer database under the top-level data dir. const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; @@ -74,6 +74,10 @@ pub struct Config { pub network: network::NetworkConfig, pub chain: beacon_chain::ChainConfig, pub eth1: eth1::Config, + pub execution_endpoints: Option>, + pub terminal_total_difficulty_override: Option, + pub terminal_block_hash: Option, + pub fee_recipient: Option
, pub http_api: http_api::Config, pub http_metrics: http_metrics::Config, pub monitoring_api: Option, @@ -94,6 +98,10 @@ impl Default for Config { dummy_eth1_backend: false, sync_eth1_chain: false, eth1: <_>::default(), + execution_endpoints: None, + terminal_total_difficulty_override: None, + terminal_block_hash: None, + fee_recipient: None, disabled_forks: Vec::new(), graffiti: Graffiti::default(), http_api: <_>::default(), diff --git a/beacon_node/eth1/src/http.rs b/beacon_node/eth1/src/http.rs index 489142377..e002b77f3 100644 --- a/beacon_node/eth1/src/http.rs +++ b/beacon_node/eth1/src/http.rs @@ -479,7 +479,7 @@ pub async fn send_rpc_request( } /// Accepts an entire HTTP body (as a string) and returns either the `result` field or the `error['message']` field, as a serde `Value`. -fn response_result_or_error(response: &str) -> Result { +pub fn response_result_or_error(response: &str) -> Result { let json = serde_json::from_str::(response) .map_err(|e| RpcError::InvalidJson(e.to_string()))?; @@ -501,7 +501,7 @@ fn response_result_or_error(response: &str) -> Result { /// Therefore, this function is only useful for numbers encoded by the JSON RPC. /// /// E.g., `0x01 == 1` -fn hex_to_u64_be(hex: &str) -> Result { +pub fn hex_to_u64_be(hex: &str) -> Result { u64::from_str_radix(strip_prefix(hex)?, 16) .map_err(|e| format!("Failed to parse hex as u64: {:?}", e)) } diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml new file mode 100644 index 000000000..cf6a4c822 --- /dev/null +++ b/beacon_node/execution_layer/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "execution_layer" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +types = { path = "../../consensus/types"} +tokio = { version = "1.10.0", features = ["full"] } +async-trait = "0.1.51" +slog = "2.5.2" +futures = "0.3.7" +sensitive_url = { path = "../../common/sensitive_url" } +reqwest = { version = "0.11.0", features = ["json","stream"] } +eth2_serde_utils = { path = "../../consensus/serde_utils" } +serde_json = "1.0.58" +serde = { version = "1.0.116", features = ["derive"] } +eth1 = { path = "../eth1" } +warp = { git = "https://github.com/paulhauner/warp ", branch = "cors-wildcard" } +environment = { path = "../../lighthouse/environment" } +bytes = "1.1.0" +task_executor = { path = "../../common/task_executor" } +hex = "0.4.2" +eth2_ssz_types = { path = "../../consensus/ssz_types"} +lru = "0.6.0" +exit-future = "0.2.0" +tree_hash = { path = "../../consensus/tree_hash"} +tree_hash_derive = { path = "../../consensus/tree_hash_derive"} diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs new file mode 100644 index 000000000..e395cc44e --- /dev/null +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -0,0 +1,114 @@ +use async_trait::async_trait; +use eth1::http::RpcError; +use serde::{Deserialize, Serialize}; + +pub const LATEST_TAG: &str = "latest"; + +pub use types::{Address, EthSpec, ExecutionPayload, Hash256, Uint256}; + +pub mod http; + +pub type PayloadId = u64; + +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + BadResponse(String), + RequestFailed(String), + JsonRpc(RpcError), + Json(serde_json::Error), + ServerMessage { code: i64, message: String }, + Eip155Failure, + IsSyncing, + ExecutionBlockNotFound(Hash256), + ExecutionHeadBlockNotFound, + ParentHashEqualsBlockHash(Hash256), +} + +impl From for Error { + fn from(e: reqwest::Error) -> Self { + Error::Reqwest(e) + } +} + +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Error::Json(e) + } +} + +/// A generic interface for an execution engine API. +#[async_trait] +pub trait EngineApi { + async fn upcheck(&self) -> Result<(), Error>; + + async fn get_block_by_number<'a>( + &self, + block_by_number: BlockByNumberQuery<'a>, + ) -> Result, Error>; + + async fn get_block_by_hash<'a>( + &self, + block_hash: Hash256, + ) -> Result, Error>; + + async fn prepare_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + fee_recipient: Address, + ) -> Result; + + async fn execute_payload( + &self, + execution_payload: ExecutionPayload, + ) -> Result; + + async fn get_payload( + &self, + payload_id: PayloadId, + ) -> Result, Error>; + + async fn consensus_validated( + &self, + block_hash: Hash256, + status: ConsensusStatus, + ) -> Result<(), Error>; + + async fn forkchoice_updated( + &self, + head_block_hash: Hash256, + finalized_block_hash: Hash256, + ) -> Result<(), Error>; +} + +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ExecutePayloadResponse { + Valid, + Invalid, + Syncing, +} + +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum ConsensusStatus { + Valid, + Invalid, +} + +#[derive(Clone, Copy, Debug, PartialEq, Serialize)] +#[serde(untagged)] +pub enum BlockByNumberQuery<'a> { + Tag(&'a str), +} + +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExecutionBlock { + pub block_hash: Hash256, + pub block_number: u64, + pub parent_hash: Hash256, + pub total_difficulty: Uint256, +} diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs new file mode 100644 index 000000000..25a26e4ee --- /dev/null +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -0,0 +1,637 @@ +//! Contains an implementation of `EngineAPI` using the JSON-RPC API via HTTP. + +use super::*; +use async_trait::async_trait; +use eth1::http::EIP155_ERROR_STR; +use reqwest::header::CONTENT_TYPE; +use sensitive_url::SensitiveUrl; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::json; +use std::time::Duration; +use types::{EthSpec, FixedVector, Transaction, Unsigned, VariableList}; + +pub use reqwest::Client; + +const STATIC_ID: u32 = 1; +pub const JSONRPC_VERSION: &str = "2.0"; + +pub const RETURN_FULL_TRANSACTION_OBJECTS: bool = false; + +pub const ETH_GET_BLOCK_BY_NUMBER: &str = "eth_getBlockByNumber"; +pub const ETH_GET_BLOCK_BY_NUMBER_TIMEOUT: Duration = Duration::from_secs(1); + +pub const ETH_GET_BLOCK_BY_HASH: &str = "eth_getBlockByHash"; +pub const ETH_GET_BLOCK_BY_HASH_TIMEOUT: Duration = Duration::from_secs(1); + +pub const ETH_SYNCING: &str = "eth_syncing"; +pub const ETH_SYNCING_TIMEOUT: Duration = Duration::from_millis(250); + +pub const ENGINE_PREPARE_PAYLOAD: &str = "engine_preparePayload"; +pub const ENGINE_PREPARE_PAYLOAD_TIMEOUT: Duration = Duration::from_millis(500); + +pub const ENGINE_EXECUTE_PAYLOAD: &str = "engine_executePayload"; +pub const ENGINE_EXECUTE_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2); + +pub const ENGINE_GET_PAYLOAD: &str = "engine_getPayload"; +pub const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(2); + +pub const ENGINE_CONSENSUS_VALIDATED: &str = "engine_consensusValidated"; +pub const ENGINE_CONSENSUS_VALIDATED_TIMEOUT: Duration = Duration::from_millis(500); + +pub const ENGINE_FORKCHOICE_UPDATED: &str = "engine_forkchoiceUpdated"; +pub const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_millis(500); + +pub struct HttpJsonRpc { + pub client: Client, + pub url: SensitiveUrl, +} + +impl HttpJsonRpc { + pub fn new(url: SensitiveUrl) -> Result { + Ok(Self { + client: Client::builder().build()?, + url, + }) + } + + pub async fn rpc_request( + &self, + method: &str, + params: serde_json::Value, + timeout: Duration, + ) -> Result { + let body = JsonRequestBody { + jsonrpc: JSONRPC_VERSION, + method, + params, + id: STATIC_ID, + }; + + let body: JsonResponseBody = self + .client + .post(self.url.full.clone()) + .timeout(timeout) + .header(CONTENT_TYPE, "application/json") + .json(&body) + .send() + .await? + .error_for_status()? + .json() + .await?; + + match (body.result, body.error) { + (result, None) => serde_json::from_value(result).map_err(Into::into), + (_, Some(error)) => { + if error.message.contains(EIP155_ERROR_STR) { + Err(Error::Eip155Failure) + } else { + Err(Error::ServerMessage { + code: error.code, + message: error.message, + }) + } + } + } + } +} + +#[async_trait] +impl EngineApi for HttpJsonRpc { + async fn upcheck(&self) -> Result<(), Error> { + let result: serde_json::Value = self + .rpc_request(ETH_SYNCING, json!([]), ETH_SYNCING_TIMEOUT) + .await?; + + /* + * TODO + * + * Check the network and chain ids. We omit this to save time for the merge f2f and since it + * also seems like it might get annoying during development. + */ + match result.as_bool() { + Some(false) => Ok(()), + _ => Err(Error::IsSyncing), + } + } + + async fn get_block_by_number<'a>( + &self, + query: BlockByNumberQuery<'a>, + ) -> Result, Error> { + let params = json!([query, RETURN_FULL_TRANSACTION_OBJECTS]); + + self.rpc_request( + ETH_GET_BLOCK_BY_NUMBER, + params, + ETH_GET_BLOCK_BY_NUMBER_TIMEOUT, + ) + .await + } + + async fn get_block_by_hash<'a>( + &self, + block_hash: Hash256, + ) -> Result, Error> { + let params = json!([block_hash, RETURN_FULL_TRANSACTION_OBJECTS]); + + self.rpc_request(ETH_GET_BLOCK_BY_HASH, params, ETH_GET_BLOCK_BY_HASH_TIMEOUT) + .await + } + + async fn prepare_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + fee_recipient: Address, + ) -> Result { + let params = json!([JsonPreparePayloadRequest { + parent_hash, + timestamp, + random, + fee_recipient + }]); + + let response: JsonPayloadId = self + .rpc_request( + ENGINE_PREPARE_PAYLOAD, + params, + ENGINE_PREPARE_PAYLOAD_TIMEOUT, + ) + .await?; + + Ok(response.payload_id) + } + + async fn execute_payload( + &self, + execution_payload: ExecutionPayload, + ) -> Result { + let params = json!([JsonExecutionPayload::from(execution_payload)]); + + self.rpc_request( + ENGINE_EXECUTE_PAYLOAD, + params, + ENGINE_EXECUTE_PAYLOAD_TIMEOUT, + ) + .await + } + + async fn get_payload( + &self, + payload_id: PayloadId, + ) -> Result, Error> { + let params = json!([JsonPayloadId { payload_id }]); + + let response: JsonExecutionPayload = self + .rpc_request(ENGINE_GET_PAYLOAD, params, ENGINE_GET_PAYLOAD_TIMEOUT) + .await?; + + Ok(ExecutionPayload::from(response)) + } + + async fn consensus_validated( + &self, + block_hash: Hash256, + status: ConsensusStatus, + ) -> Result<(), Error> { + let params = json!([JsonConsensusValidatedRequest { block_hash, status }]); + + self.rpc_request( + ENGINE_CONSENSUS_VALIDATED, + params, + ENGINE_CONSENSUS_VALIDATED_TIMEOUT, + ) + .await + } + + async fn forkchoice_updated( + &self, + head_block_hash: Hash256, + finalized_block_hash: Hash256, + ) -> Result<(), Error> { + let params = json!([JsonForkChoiceUpdatedRequest { + head_block_hash, + finalized_block_hash + }]); + + self.rpc_request( + ENGINE_FORKCHOICE_UPDATED, + params, + ENGINE_FORKCHOICE_UPDATED_TIMEOUT, + ) + .await + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct JsonRequestBody<'a> { + jsonrpc: &'a str, + method: &'a str, + params: serde_json::Value, + id: u32, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct JsonError { + code: i64, + message: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct JsonResponseBody { + jsonrpc: String, + #[serde(default)] + error: Option, + #[serde(default)] + result: serde_json::Value, + id: u32, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonPreparePayloadRequest { + pub parent_hash: Hash256, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub timestamp: u64, + pub random: Hash256, + pub fee_recipient: Address, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(transparent, rename_all = "camelCase")] +pub struct JsonPayloadId { + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub payload_id: u64, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(bound = "T: EthSpec", rename_all = "camelCase")] +pub struct JsonExecutionPayload { + pub parent_hash: Hash256, + pub coinbase: Address, + pub state_root: Hash256, + pub receipt_root: Hash256, + #[serde(with = "serde_logs_bloom")] + pub logs_bloom: FixedVector, + pub random: Hash256, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub block_number: u64, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub gas_limit: u64, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub gas_used: u64, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub timestamp: u64, + // FIXME(paul): check serialization + #[serde(with = "ssz_types::serde_utils::hex_var_list")] + pub extra_data: VariableList, + pub base_fee_per_gas: Uint256, + pub block_hash: Hash256, + // FIXME(paul): add transaction parsing. + #[serde(default, skip_deserializing)] + pub transactions: VariableList, T::MaxTransactionsPerPayload>, +} + +impl From> for JsonExecutionPayload { + fn from(e: ExecutionPayload) -> Self { + Self { + parent_hash: e.parent_hash, + coinbase: e.coinbase, + state_root: e.state_root, + receipt_root: e.receipt_root, + logs_bloom: e.logs_bloom, + random: e.random, + block_number: e.block_number, + gas_limit: e.gas_limit, + gas_used: e.gas_used, + timestamp: e.timestamp, + extra_data: e.extra_data, + base_fee_per_gas: Uint256::from_little_endian(e.base_fee_per_gas.as_bytes()), + block_hash: e.block_hash, + transactions: e.transactions, + } + } +} + +impl From> for ExecutionPayload { + fn from(e: JsonExecutionPayload) -> Self { + Self { + parent_hash: e.parent_hash, + coinbase: e.coinbase, + state_root: e.state_root, + receipt_root: e.receipt_root, + logs_bloom: e.logs_bloom, + random: e.random, + block_number: e.block_number, + gas_limit: e.gas_limit, + gas_used: e.gas_used, + timestamp: e.timestamp, + extra_data: e.extra_data, + base_fee_per_gas: uint256_to_hash256(e.base_fee_per_gas), + block_hash: e.block_hash, + transactions: e.transactions, + } + } +} + +fn uint256_to_hash256(u: Uint256) -> Hash256 { + let mut bytes = [0; 32]; + u.to_little_endian(&mut bytes); + Hash256::from_slice(&bytes) +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonConsensusValidatedRequest { + pub block_hash: Hash256, + pub status: ConsensusStatus, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JsonForkChoiceUpdatedRequest { + pub head_block_hash: Hash256, + pub finalized_block_hash: Hash256, +} + +// Serializes the `logs_bloom` field. +pub mod serde_logs_bloom { + use super::*; + use eth2_serde_utils::hex::PrefixedHexVisitor; + use serde::{Deserializer, Serializer}; + + pub fn serialize(bytes: &FixedVector, serializer: S) -> Result + where + S: Serializer, + U: Unsigned, + { + let mut hex_string: String = "0x".to_string(); + hex_string.push_str(&hex::encode(&bytes[..])); + + serializer.serialize_str(&hex_string) + } + + pub fn deserialize<'de, D, U>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + U: Unsigned, + { + let vec = deserializer.deserialize_string(PrefixedHexVisitor)?; + + FixedVector::new(vec) + .map_err(|e| serde::de::Error::custom(format!("invalid logs bloom: {:?}", e))) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::MockServer; + use std::future::Future; + use std::sync::Arc; + use types::MainnetEthSpec; + + struct Tester { + server: MockServer, + echo_client: Arc, + } + + impl Tester { + pub fn new() -> Self { + let server = MockServer::unit_testing(); + let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap(); + let echo_client = Arc::new(HttpJsonRpc::new(echo_url).unwrap()); + + Self { + server, + echo_client, + } + } + + pub async fn assert_request_equals( + self, + request_func: R, + expected_json: serde_json::Value, + ) -> Self + where + R: Fn(Arc) -> F, + F: Future, + { + request_func(self.echo_client.clone()).await; + let request_bytes = self.server.last_echo_request().await; + let request_json: serde_json::Value = + serde_json::from_slice(&request_bytes).expect("request was not valid json"); + if request_json != expected_json { + panic!( + "json mismatch!\n\nobserved: {}\n\nexpected: {}\n\n", + request_json.to_string(), + expected_json.to_string() + ) + } + self + } + } + + const HASH_00: &str = "0x0000000000000000000000000000000000000000000000000000000000000000"; + const HASH_01: &str = "0x0101010101010101010101010101010101010101010101010101010101010101"; + + const ADDRESS_00: &str = "0x0000000000000000000000000000000000000000"; + const ADDRESS_01: &str = "0x0101010101010101010101010101010101010101"; + + const LOGS_BLOOM_01: &str = "0x01010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101"; + + #[tokio::test] + async fn get_block_by_number_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client + .get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG)) + .await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ETH_GET_BLOCK_BY_NUMBER, + "params": ["latest", false] + }), + ) + .await; + } + + #[tokio::test] + async fn get_block_by_hash_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client.get_block_by_hash(Hash256::repeat_byte(1)).await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ETH_GET_BLOCK_BY_HASH, + "params": [HASH_01, false] + }), + ) + .await; + } + + #[tokio::test] + async fn prepare_payload_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client + .prepare_payload( + Hash256::repeat_byte(0), + 42, + Hash256::repeat_byte(1), + Address::repeat_byte(0), + ) + .await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ENGINE_PREPARE_PAYLOAD, + "params": [{ + "parentHash": HASH_00, + "timestamp": "0x2a", + "random": HASH_01, + "feeRecipient": ADDRESS_00, + }] + }), + ) + .await; + } + + #[tokio::test] + async fn get_payload_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client.get_payload::(42).await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ENGINE_GET_PAYLOAD, + "params": ["0x2a"] + }), + ) + .await; + } + + #[tokio::test] + async fn execute_payload_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client + .execute_payload::(ExecutionPayload { + parent_hash: Hash256::repeat_byte(0), + coinbase: Address::repeat_byte(1), + state_root: Hash256::repeat_byte(1), + receipt_root: Hash256::repeat_byte(0), + logs_bloom: vec![1; 256].into(), + random: Hash256::repeat_byte(1), + block_number: 0, + gas_limit: 1, + gas_used: 2, + timestamp: 42, + extra_data: vec![].into(), + base_fee_per_gas: uint256_to_hash256(Uint256::from(1)), + block_hash: Hash256::repeat_byte(1), + transactions: vec![].into(), + }) + .await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ENGINE_EXECUTE_PAYLOAD, + "params": [{ + "parentHash": HASH_00, + "coinbase": ADDRESS_01, + "stateRoot": HASH_01, + "receiptRoot": HASH_00, + "logsBloom": LOGS_BLOOM_01, + "random": HASH_01, + "blockNumber": "0x0", + "gasLimit": "0x1", + "gasUsed": "0x2", + "timestamp": "0x2a", + "extraData": "0x", + "baseFeePerGas": "0x1", + "blockHash": HASH_01, + "transactions": [], + }] + }), + ) + .await; + } + + #[tokio::test] + async fn consensus_validated_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client + .consensus_validated(Hash256::repeat_byte(0), ConsensusStatus::Valid) + .await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ENGINE_CONSENSUS_VALIDATED, + "params": [{ + "blockHash": HASH_00, + "status": "VALID", + }] + }), + ) + .await + .assert_request_equals( + |client| async move { + let _ = client + .consensus_validated(Hash256::repeat_byte(1), ConsensusStatus::Invalid) + .await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ENGINE_CONSENSUS_VALIDATED, + "params": [{ + "blockHash": HASH_01, + "status": "INVALID", + }] + }), + ) + .await; + } + + #[tokio::test] + async fn forkchoice_updated_request() { + Tester::new() + .assert_request_equals( + |client| async move { + let _ = client + .forkchoice_updated(Hash256::repeat_byte(0), Hash256::repeat_byte(1)) + .await; + }, + json!({ + "id": STATIC_ID, + "jsonrpc": JSONRPC_VERSION, + "method": ENGINE_FORKCHOICE_UPDATED, + "params": [{ + "headBlockHash": HASH_00, + "finalizedBlockHash": HASH_01, + }] + }), + ) + .await; + } +} diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs new file mode 100644 index 000000000..25f2dd323 --- /dev/null +++ b/beacon_node/execution_layer/src/engines.rs @@ -0,0 +1,239 @@ +//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour. + +use crate::engine_api::{EngineApi, Error as EngineApiError}; +use futures::future::join_all; +use slog::{crit, error, info, warn, Logger}; +use std::future::Future; +use tokio::sync::RwLock; + +/// Stores the remembered state of a engine. +#[derive(Copy, Clone, PartialEq)] +enum EngineState { + Online, + Offline, +} + +impl EngineState { + fn set_online(&mut self) { + *self = EngineState::Online + } + + fn set_offline(&mut self) { + *self = EngineState::Offline + } + + fn is_online(&self) -> bool { + *self == EngineState::Online + } + + fn is_offline(&self) -> bool { + *self == EngineState::Offline + } +} + +/// An execution engine. +pub struct Engine { + pub id: String, + pub api: T, + state: RwLock, +} + +impl Engine { + /// Creates a new, offline engine. + pub fn new(id: String, api: T) -> Self { + Self { + id, + api, + state: RwLock::new(EngineState::Offline), + } + } +} + +/// Holds multiple execution engines and provides functionality for managing them in a fallback +/// manner. +pub struct Engines { + pub engines: Vec>, + pub log: Logger, +} + +#[derive(Debug)] +pub enum EngineError { + Offline { id: String }, + Api { id: String, error: EngineApiError }, +} + +impl Engines { + /// Run the `EngineApi::upcheck` function on all nodes which are currently offline. + /// + /// This can be used to try and recover any offline nodes. + async fn upcheck_offline(&self) { + let upcheck_futures = self.engines.iter().map(|engine| async move { + let mut state = engine.state.write().await; + if state.is_offline() { + match engine.api.upcheck().await { + Ok(()) => { + info!( + self.log, + "Execution engine online"; + "id" => &engine.id + ); + state.set_online() + } + Err(e) => { + warn!( + self.log, + "Execution engine offline"; + "error" => ?e, + "id" => &engine.id + ) + } + } + } + *state + }); + + let num_online = join_all(upcheck_futures) + .await + .into_iter() + .filter(|state: &EngineState| state.is_online()) + .count(); + + if num_online == 0 { + crit!( + self.log, + "No execution engines online"; + ) + } + } + + /// Run `func` on all engines, in the order in which they are defined, returning the first + /// successful result that is found. + /// + /// This function might try to run `func` twice. If all nodes return an error on the first time + /// it runs, it will try to upcheck all offline nodes and then run the function again. + pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result> + where + F: Fn(&'a Engine) -> G + Copy, + G: Future>, + { + match self.first_success_without_retry(func).await { + Ok(result) => Ok(result), + Err(mut first_errors) => { + // Try to recover some nodes. + self.upcheck_offline().await; + // Retry the call on all nodes. + match self.first_success_without_retry(func).await { + Ok(result) => Ok(result), + Err(second_errors) => { + first_errors.extend(second_errors); + Err(first_errors) + } + } + } + } + } + + /// Run `func` on all engines, in the order in which they are defined, returning the first + /// successful result that is found. + async fn first_success_without_retry<'a, F, G, H>( + &'a self, + func: F, + ) -> Result> + where + F: Fn(&'a Engine) -> G, + G: Future>, + { + let mut errors = vec![]; + + for engine in &self.engines { + let engine_online = engine.state.read().await.is_online(); + if engine_online { + match func(engine).await { + Ok(result) => return Ok(result), + Err(error) => { + error!( + self.log, + "Execution engine call failed"; + "error" => ?error, + "id" => &engine.id + ); + engine.state.write().await.set_offline(); + errors.push(EngineError::Api { + id: engine.id.clone(), + error, + }) + } + } + } else { + errors.push(EngineError::Offline { + id: engine.id.clone(), + }) + } + } + + Err(errors) + } + + /// Runs `func` on all nodes concurrently, returning all results. + /// + /// This function might try to run `func` twice. If all nodes return an error on the first time + /// it runs, it will try to upcheck all offline nodes and then run the function again. + pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec> + where + F: Fn(&'a Engine) -> G + Copy, + G: Future>, + { + let first_results = self.broadcast_without_retry(func).await; + + let mut any_offline = false; + for result in &first_results { + match result { + Ok(_) => return first_results, + Err(EngineError::Offline { .. }) => any_offline = true, + _ => (), + } + } + + if any_offline { + self.upcheck_offline().await; + self.broadcast_without_retry(func).await + } else { + first_results + } + } + + /// Runs `func` on all nodes concurrently, returning all results. + pub async fn broadcast_without_retry<'a, F, G, H>( + &'a self, + func: F, + ) -> Vec> + where + F: Fn(&'a Engine) -> G, + G: Future>, + { + let func = &func; + let futures = self.engines.iter().map(|engine| async move { + let engine_online = engine.state.read().await.is_online(); + if engine_online { + func(engine).await.map_err(|error| { + error!( + self.log, + "Execution engine call failed"; + "error" => ?error, + "id" => &engine.id + ); + EngineError::Api { + id: engine.id.clone(), + error, + } + }) + } else { + Err(EngineError::Offline { + id: engine.id.clone(), + }) + } + }); + + join_all(futures).await + } +} diff --git a/beacon_node/execution_layer/src/execute_payload_handle.rs b/beacon_node/execution_layer/src/execute_payload_handle.rs new file mode 100644 index 000000000..fc8fd655b --- /dev/null +++ b/beacon_node/execution_layer/src/execute_payload_handle.rs @@ -0,0 +1,103 @@ +use crate::{ConsensusStatus, ExecutionLayer}; +use slog::{crit, error, Logger}; +use types::Hash256; + +/// Provides a "handle" which should be returned after an `engine_executePayload` call. +/// +/// This handle allows the holder to send a valid or invalid message to the execution nodes to +/// indicate the consensus verification status of `self.block_hash`. +/// +/// Most notably, this `handle` will send an "invalid" message when it is dropped unless it has +/// already sent a "valid" or "invalid" message. This is to help ensure that any accidental +/// dropping of this handle results in an "invalid" message. Such dropping would be expected when a +/// block verification returns early with an error. +pub struct ExecutePayloadHandle { + pub(crate) block_hash: Hash256, + pub(crate) execution_layer: Option, + pub(crate) log: Logger, +} + +impl ExecutePayloadHandle { + /// Publish a "valid" message to all nodes for `self.block_hash`. + pub fn publish_consensus_valid(mut self) { + self.publish_blocking(ConsensusStatus::Valid) + } + + /// Publish an "invalid" message to all nodes for `self.block_hash`. + pub fn publish_consensus_invalid(mut self) { + self.publish_blocking(ConsensusStatus::Invalid) + } + + /// Publish the `status` message to all nodes for `self.block_hash`. + pub async fn publish_async(&mut self, status: ConsensusStatus) { + if let Some(execution_layer) = self.execution_layer() { + publish(&execution_layer, self.block_hash, status, &self.log).await + } + } + + /// Publishes a message, suitable for running in a non-async context. + fn publish_blocking(&mut self, status: ConsensusStatus) { + if let Some(execution_layer) = self.execution_layer() { + let log = &self.log.clone(); + let block_hash = self.block_hash; + if let Err(e) = execution_layer.block_on(|execution_layer| async move { + publish(execution_layer, block_hash, status, log).await; + Ok(()) + }) { + error!( + self.log, + "Failed to spawn payload status task"; + "error" => ?e, + "block_hash" => ?block_hash, + "status" => ?status, + ); + } + } + } + + /// Takes `self.execution_layer`, it cannot be used to send another duplicate or conflicting + /// message. Creates a log message if such an attempt is made. + fn execution_layer(&mut self) -> Option { + let execution_layer = self.execution_layer.take(); + if execution_layer.is_none() { + crit!( + self.log, + "Double usage of ExecutePayloadHandle"; + "block_hash" => ?self.block_hash, + ); + } + execution_layer + } +} + +/// Publish a `status`, creating a log message if it fails. +async fn publish( + execution_layer: &ExecutionLayer, + block_hash: Hash256, + status: ConsensusStatus, + log: &Logger, +) { + if let Err(e) = execution_layer + .consensus_validated(block_hash, status) + .await + { + // TODO(paul): consider how to recover when we are temporarily unable to tell a node + // that the block was valid. + crit!( + log, + "Failed to update execution consensus status"; + "error" => ?e, + "block_hash" => ?block_hash, + "status" => ?status, + ); + } +} + +/// See the struct-level documentation for the reasoning for this `Drop` implementation. +impl Drop for ExecutePayloadHandle { + fn drop(&mut self) { + if self.execution_layer.is_some() { + self.publish_blocking(ConsensusStatus::Invalid) + } + } +} diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs new file mode 100644 index 000000000..d2f7a29d0 --- /dev/null +++ b/beacon_node/execution_layer/src/lib.rs @@ -0,0 +1,799 @@ +//! This crate provides an abstraction over one or more *execution engines*. An execution engine +//! was formerly known as an "eth1 node", like Geth, Nethermind, Erigon, etc. +//! +//! This crate only provides useful functionality for "The Merge", it does not provide any of the +//! deposit-contract functionality that the `beacon_node/eth1` crate already provides. + +use engine_api::{Error as ApiError, *}; +use engines::{Engine, EngineError, Engines}; +use lru::LruCache; +use sensitive_url::SensitiveUrl; +use slog::{crit, Logger}; +use std::future::Future; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::sync::{Mutex, MutexGuard}; + +pub use engine_api::{http::HttpJsonRpc, ConsensusStatus, ExecutePayloadResponse}; +pub use execute_payload_handle::ExecutePayloadHandle; + +mod engine_api; +mod engines; +mod execute_payload_handle; +pub mod test_utils; + +/// Each time the `ExecutionLayer` retrieves a block from an execution node, it stores that block +/// in an LRU cache to avoid redundant lookups. This is the size of that cache. +const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128; + +#[derive(Debug)] +pub enum Error { + NoEngines, + ApiError(ApiError), + EngineErrors(Vec), + NotSynced, + ShuttingDown, + FeeRecipientUnspecified, +} + +impl From for Error { + fn from(e: ApiError) -> Self { + Error::ApiError(e) + } +} + +struct Inner { + engines: Engines, + terminal_total_difficulty: Uint256, + terminal_block_hash: Hash256, + fee_recipient: Option
, + execution_blocks: Mutex>, + executor: TaskExecutor, + log: Logger, +} + +/// Provides access to one or more execution engines and provides a neat interface for consumption +/// by the `BeaconChain`. +/// +/// When there is more than one execution node specified, the others will be used in a "fallback" +/// fashion. Some requests may be broadcast to all nodes and others might only be sent to the first +/// node that returns a valid response. Ultimately, the purpose of fallback nodes is to provide +/// redundancy in the case where one node is offline. +/// +/// The fallback nodes have an ordering. The first supplied will be the first contacted, and so on. +#[derive(Clone)] +pub struct ExecutionLayer { + inner: Arc, +} + +impl ExecutionLayer { + /// Instantiate `Self` with `urls.len()` engines, all using the JSON-RPC via HTTP. + pub fn from_urls( + urls: Vec, + terminal_total_difficulty: Uint256, + terminal_block_hash: Hash256, + fee_recipient: Option
, + executor: TaskExecutor, + log: Logger, + ) -> Result { + if urls.is_empty() { + return Err(Error::NoEngines); + } + + let engines = urls + .into_iter() + .map(|url| { + let id = url.to_string(); + let api = HttpJsonRpc::new(url)?; + Ok(Engine::new(id, api)) + }) + .collect::>()?; + + let inner = Inner { + engines: Engines { + engines, + log: log.clone(), + }, + terminal_total_difficulty, + terminal_block_hash, + fee_recipient, + execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)), + executor, + log, + }; + + Ok(Self { + inner: Arc::new(inner), + }) + } +} + +impl ExecutionLayer { + fn engines(&self) -> &Engines { + &self.inner.engines + } + + fn executor(&self) -> &TaskExecutor { + &self.inner.executor + } + + fn terminal_total_difficulty(&self) -> Uint256 { + self.inner.terminal_total_difficulty + } + + fn terminal_block_hash(&self) -> Hash256 { + self.inner.terminal_block_hash + } + + fn fee_recipient(&self) -> Result { + self.inner + .fee_recipient + .ok_or(Error::FeeRecipientUnspecified) + } + + /// Note: this function returns a mutex guard, be careful to avoid deadlocks. + async fn execution_blocks(&self) -> MutexGuard<'_, LruCache> { + self.inner.execution_blocks.lock().await + } + + fn log(&self) -> &Logger { + &self.inner.log + } + + /// Convenience function to allow calling async functions in a non-async context. + pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result + where + T: Fn(&'a Self) -> U, + U: Future>, + { + let runtime = self + .executor() + .runtime() + .upgrade() + .ok_or(Error::ShuttingDown)?; + // TODO(paul): respect the shutdown signal. + runtime.block_on(generate_future(self)) + } + + /// Convenience function to allow spawning a task without waiting for the result. + pub fn spawn(&self, generate_future: T, name: &'static str) + where + T: FnOnce(Self) -> U, + U: Future + Send + 'static, + { + self.executor().spawn(generate_future(self.clone()), name); + } + + /// Maps to the `engine_preparePayload` JSON-RPC function. + /// + /// ## Fallback Behavior + /// + /// The result will be returned from the first node that returns successfully. No more nodes + /// will be contacted. + pub async fn prepare_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + ) -> Result { + let fee_recipient = self.fee_recipient()?; + self.engines() + .first_success(|engine| { + // TODO(merge): make a cache for these IDs, so we don't always have to perform this + // request. + engine + .api + .prepare_payload(parent_hash, timestamp, random, fee_recipient) + }) + .await + .map_err(Error::EngineErrors) + } + + /// Maps to the `engine_getPayload` JSON-RPC call. + /// + /// However, it will attempt to call `self.prepare_payload` if it cannot find an existing + /// payload id for the given parameters. + /// + /// ## Fallback Behavior + /// + /// The result will be returned from the first node that returns successfully. No more nodes + /// will be contacted. + pub async fn get_payload( + &self, + parent_hash: Hash256, + timestamp: u64, + random: Hash256, + ) -> Result, Error> { + let fee_recipient = self.fee_recipient()?; + self.engines() + .first_success(|engine| async move { + // TODO(merge): make a cache for these IDs, so we don't always have to perform this + // request. + let payload_id = engine + .api + .prepare_payload(parent_hash, timestamp, random, fee_recipient) + .await?; + + engine.api.get_payload(payload_id).await + }) + .await + .map_err(Error::EngineErrors) + } + + /// Maps to the `engine_executePayload` JSON-RPC call. + /// + /// ## Fallback Behaviour + /// + /// The request will be broadcast to all nodes, simultaneously. It will await a response (or + /// failure) from all nodes and then return based on the first of these conditions which + /// returns true: + /// + /// - Valid, if any nodes return valid. + /// - Invalid, if any nodes return invalid. + /// - Syncing, if any nodes return syncing. + /// - An error, if all nodes return an error. + pub async fn execute_payload( + &self, + execution_payload: &ExecutionPayload, + ) -> Result<(ExecutePayloadResponse, ExecutePayloadHandle), Error> { + let broadcast_results = self + .engines() + .broadcast(|engine| engine.api.execute_payload(execution_payload.clone())) + .await; + + let mut errors = vec![]; + let mut valid = 0; + let mut invalid = 0; + let mut syncing = 0; + for result in broadcast_results { + match result { + Ok(ExecutePayloadResponse::Valid) => valid += 1, + Ok(ExecutePayloadResponse::Invalid) => invalid += 1, + Ok(ExecutePayloadResponse::Syncing) => syncing += 1, + Err(e) => errors.push(e), + } + } + + if valid > 0 && invalid > 0 { + crit!( + self.log(), + "Consensus failure between execution nodes"; + "method" => "execute_payload" + ); + } + + let execute_payload_response = if valid > 0 { + ExecutePayloadResponse::Valid + } else if invalid > 0 { + ExecutePayloadResponse::Invalid + } else if syncing > 0 { + ExecutePayloadResponse::Syncing + } else { + return Err(Error::EngineErrors(errors)); + }; + + let execute_payload_handle = ExecutePayloadHandle { + block_hash: execution_payload.block_hash, + execution_layer: Some(self.clone()), + log: self.log().clone(), + }; + + Ok((execute_payload_response, execute_payload_handle)) + } + + /// Maps to the `engine_consensusValidated` JSON-RPC call. + /// + /// ## Fallback Behaviour + /// + /// The request will be broadcast to all nodes, simultaneously. It will await a response (or + /// failure) from all nodes and then return based on the first of these conditions which + /// returns true: + /// + /// - Ok, if any node returns successfully. + /// - An error, if all nodes return an error. + pub async fn consensus_validated( + &self, + block_hash: Hash256, + status: ConsensusStatus, + ) -> Result<(), Error> { + let broadcast_results = self + .engines() + .broadcast(|engine| engine.api.consensus_validated(block_hash, status)) + .await; + + if broadcast_results.iter().any(Result::is_ok) { + Ok(()) + } else { + Err(Error::EngineErrors( + broadcast_results + .into_iter() + .filter_map(Result::err) + .collect(), + )) + } + } + + /// Maps to the `engine_consensusValidated` JSON-RPC call. + /// + /// ## Fallback Behaviour + /// + /// The request will be broadcast to all nodes, simultaneously. It will await a response (or + /// failure) from all nodes and then return based on the first of these conditions which + /// returns true: + /// + /// - Ok, if any node returns successfully. + /// - An error, if all nodes return an error. + pub async fn forkchoice_updated( + &self, + head_block_hash: Hash256, + finalized_block_hash: Hash256, + ) -> Result<(), Error> { + let broadcast_results = self + .engines() + .broadcast(|engine| { + engine + .api + .forkchoice_updated(head_block_hash, finalized_block_hash) + }) + .await; + + if broadcast_results.iter().any(Result::is_ok) { + Ok(()) + } else { + Err(Error::EngineErrors( + broadcast_results + .into_iter() + .filter_map(Result::err) + .collect(), + )) + } + } + + /// Used during block production to determine if the merge has been triggered. + /// + /// ## Specification + /// + /// `get_terminal_pow_block_hash` + /// + /// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/validator.md + pub async fn get_terminal_pow_block_hash(&self) -> Result, Error> { + self.engines() + .first_success(|engine| async move { + if self.terminal_block_hash() != Hash256::zero() { + // Note: the specification is written such that if there are multiple blocks in + // the PoW chain with the terminal block hash, then to select 0'th one. + // + // Whilst it's not clear what the 0'th block is, we ignore this completely and + // make the assumption that there are no two blocks in the chain with the same + // hash. Such a scenario would be a devestating hash collision with external + // implications far outweighing those here. + Ok(self + .get_pow_block(engine, self.terminal_block_hash()) + .await? + .map(|block| block.block_hash)) + } else { + self.get_pow_block_hash_at_total_difficulty(engine).await + } + }) + .await + .map_err(Error::EngineErrors) + } + + /// This function should remain internal. External users should use + /// `self.get_terminal_pow_block` instead, since it checks against the terminal block hash + /// override. + /// + /// ## Specification + /// + /// `get_pow_block_at_terminal_total_difficulty` + /// + /// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/validator.md + async fn get_pow_block_hash_at_total_difficulty( + &self, + engine: &Engine, + ) -> Result, ApiError> { + let mut ttd_exceeding_block = None; + let mut block = engine + .api + .get_block_by_number(BlockByNumberQuery::Tag(LATEST_TAG)) + .await? + .ok_or(ApiError::ExecutionHeadBlockNotFound)?; + + self.execution_blocks().await.put(block.block_hash, block); + + // TODO(merge): This function can theoretically loop indefinitely, as per the + // specification. We should consider how to fix this. See discussion: + // + // https://github.com/ethereum/consensus-specs/issues/2636 + loop { + if block.total_difficulty >= self.terminal_total_difficulty() { + ttd_exceeding_block = Some(block.block_hash); + + // Try to prevent infinite loops. + if block.block_hash == block.parent_hash { + return Err(ApiError::ParentHashEqualsBlockHash(block.block_hash)); + } + + block = self + .get_pow_block(engine, block.parent_hash) + .await? + .ok_or(ApiError::ExecutionBlockNotFound(block.parent_hash))?; + } else { + return Ok(ttd_exceeding_block); + } + } + } + + /// Used during block verification to check that a block correctly triggers the merge. + /// + /// ## Returns + /// + /// - `Some(true)` if the given `block_hash` is the terminal proof-of-work block. + /// - `Some(false)` if the given `block_hash` is certainly *not* the terminal proof-of-work + /// block. + /// - `None` if the `block_hash` or its parent were not present on the execution engines. + /// - `Err(_)` if there was an error connecting to the execution engines. + /// + /// ## Fallback Behaviour + /// + /// The request will be broadcast to all nodes, simultaneously. It will await a response (or + /// failure) from all nodes and then return based on the first of these conditions which + /// returns true: + /// + /// - Terminal, if any node indicates it is terminal. + /// - Not terminal, if any node indicates it is non-terminal. + /// - Block not found, if any node cannot find the block. + /// - An error, if all nodes return an error. + /// + /// ## Specification + /// + /// `is_valid_terminal_pow_block` + /// + /// https://github.com/ethereum/consensus-specs/blob/v1.1.0/specs/merge/fork-choice.md + pub async fn is_valid_terminal_pow_block_hash( + &self, + block_hash: Hash256, + ) -> Result, Error> { + let broadcast_results = self + .engines() + .broadcast(|engine| async move { + if let Some(pow_block) = self.get_pow_block(engine, block_hash).await? { + if let Some(pow_parent) = + self.get_pow_block(engine, pow_block.parent_hash).await? + { + return Ok(Some( + self.is_valid_terminal_pow_block(pow_block, pow_parent), + )); + } + } + + Ok(None) + }) + .await; + + let mut errors = vec![]; + let mut terminal = 0; + let mut not_terminal = 0; + let mut block_missing = 0; + for result in broadcast_results { + match result { + Ok(Some(true)) => terminal += 1, + Ok(Some(false)) => not_terminal += 1, + Ok(None) => block_missing += 1, + Err(e) => errors.push(e), + } + } + + if terminal > 0 && not_terminal > 0 { + crit!( + self.log(), + "Consensus failure between execution nodes"; + "method" => "is_valid_terminal_pow_block_hash" + ); + } + + if terminal > 0 { + Ok(Some(true)) + } else if not_terminal > 0 { + Ok(Some(false)) + } else if block_missing > 0 { + Ok(None) + } else { + Err(Error::EngineErrors(errors)) + } + } + + /// This function should remain internal. + /// + /// External users should use `self.is_valid_terminal_pow_block_hash`. + fn is_valid_terminal_pow_block(&self, block: ExecutionBlock, parent: ExecutionBlock) -> bool { + if block.block_hash == self.terminal_block_hash() { + return true; + } + + let is_total_difficulty_reached = + block.total_difficulty >= self.terminal_total_difficulty(); + let is_parent_total_difficulty_valid = + parent.total_difficulty < self.terminal_total_difficulty(); + is_total_difficulty_reached && is_parent_total_difficulty_valid + } + + /// Maps to the `eth_getBlockByHash` JSON-RPC call. + /// + /// ## TODO(merge) + /// + /// This will return an execution block regardless of whether or not it was created by a PoW + /// miner (pre-merge) or a PoS validator (post-merge). It's not immediately clear if this is + /// correct or not, see the discussion here: + /// + /// https://github.com/ethereum/consensus-specs/issues/2636 + async fn get_pow_block( + &self, + engine: &Engine, + hash: Hash256, + ) -> Result, ApiError> { + if let Some(cached) = self.execution_blocks().await.get(&hash).copied() { + // The block was in the cache, no need to request it from the execution + // engine. + return Ok(Some(cached)); + } + + // The block was *not* in the cache, request it from the execution + // engine and cache it for future reference. + if let Some(block) = engine.api.get_block_by_hash(hash).await? { + self.execution_blocks().await.put(hash, block); + Ok(Some(block)) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::{MockServer, DEFAULT_TERMINAL_DIFFICULTY}; + use environment::null_logger; + use types::MainnetEthSpec; + + struct SingleEngineTester { + server: MockServer, + el: ExecutionLayer, + runtime: Option>, + _runtime_shutdown: exit_future::Signal, + } + + impl SingleEngineTester { + pub fn new() -> Self { + let server = MockServer::unit_testing(); + + let url = SensitiveUrl::parse(&server.url()).unwrap(); + let log = null_logger().unwrap(); + + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let executor = + TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); + + let el = ExecutionLayer::from_urls( + vec![url], + DEFAULT_TERMINAL_DIFFICULTY.into(), + Hash256::zero(), + Some(Address::repeat_byte(42)), + executor, + log, + ) + .unwrap(); + + Self { + server, + el, + runtime: Some(runtime), + _runtime_shutdown: runtime_shutdown, + } + } + + pub async fn produce_valid_execution_payload_on_head(self) -> Self { + let latest_execution_block = { + let block_gen = self.server.execution_block_generator().await; + block_gen.latest_block().unwrap() + }; + + let parent_hash = latest_execution_block.block_hash(); + let block_number = latest_execution_block.block_number() + 1; + let timestamp = block_number; + let random = Hash256::from_low_u64_be(block_number); + + let _payload_id = self + .el + .prepare_payload(parent_hash, timestamp, random) + .await + .unwrap(); + + let payload = self + .el + .get_payload::(parent_hash, timestamp, random) + .await + .unwrap(); + let block_hash = payload.block_hash; + assert_eq!(payload.parent_hash, parent_hash); + assert_eq!(payload.block_number, block_number); + assert_eq!(payload.timestamp, timestamp); + assert_eq!(payload.random, random); + + let (payload_response, mut payload_handle) = + self.el.execute_payload(&payload).await.unwrap(); + assert_eq!(payload_response, ExecutePayloadResponse::Valid); + + payload_handle.publish_async(ConsensusStatus::Valid).await; + + self.el + .forkchoice_updated(block_hash, Hash256::zero()) + .await + .unwrap(); + + let head_execution_block = { + let block_gen = self.server.execution_block_generator().await; + block_gen.latest_block().unwrap() + }; + + assert_eq!(head_execution_block.block_number(), block_number); + assert_eq!(head_execution_block.block_hash(), block_hash); + assert_eq!(head_execution_block.parent_hash(), parent_hash); + + self + } + + pub async fn move_to_block_prior_to_terminal_block(self) -> Self { + let target_block = { + let block_gen = self.server.execution_block_generator().await; + block_gen.terminal_block_number.checked_sub(1).unwrap() + }; + self.move_to_pow_block(target_block).await + } + + pub async fn move_to_terminal_block(self) -> Self { + let target_block = { + let block_gen = self.server.execution_block_generator().await; + block_gen.terminal_block_number + }; + self.move_to_pow_block(target_block).await + } + + pub async fn move_to_pow_block(self, target_block: u64) -> Self { + { + let mut block_gen = self.server.execution_block_generator().await; + let next_block = block_gen.latest_block().unwrap().block_number() + 1; + assert!(target_block >= next_block); + + block_gen + .insert_pow_blocks(next_block..=target_block) + .unwrap(); + } + self + } + + pub async fn with_terminal_block<'a, T, U>(self, func: T) -> Self + where + T: Fn(ExecutionLayer, Option) -> U, + U: Future, + { + let terminal_block_number = self + .server + .execution_block_generator() + .await + .terminal_block_number; + let terminal_block = self + .server + .execution_block_generator() + .await + .execution_block_by_number(terminal_block_number); + + func(self.el.clone(), terminal_block).await; + self + } + + pub fn shutdown(&mut self) { + if let Some(runtime) = self.runtime.take() { + Arc::try_unwrap(runtime).unwrap().shutdown_background() + } + } + } + + impl Drop for SingleEngineTester { + fn drop(&mut self) { + self.shutdown() + } + } + + #[tokio::test] + async fn produce_three_valid_pos_execution_blocks() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .produce_valid_execution_payload_on_head() + .await + .produce_valid_execution_payload_on_head() + .await + .produce_valid_execution_payload_on_head() + .await; + } + + #[tokio::test] + async fn finds_valid_terminal_block_hash() { + SingleEngineTester::new() + .move_to_block_prior_to_terminal_block() + .await + .with_terminal_block(|el, _| async move { + assert_eq!(el.get_terminal_pow_block_hash().await.unwrap(), None) + }) + .await + .move_to_terminal_block() + .await + .with_terminal_block(|el, terminal_block| async move { + assert_eq!( + el.get_terminal_pow_block_hash().await.unwrap(), + Some(terminal_block.unwrap().block_hash) + ) + }) + .await; + } + + #[tokio::test] + async fn verifies_valid_terminal_block_hash() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .with_terminal_block(|el, terminal_block| async move { + assert_eq!( + el.is_valid_terminal_pow_block_hash(terminal_block.unwrap().block_hash) + .await + .unwrap(), + Some(true) + ) + }) + .await; + } + + #[tokio::test] + async fn rejects_invalid_terminal_block_hash() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .with_terminal_block(|el, terminal_block| async move { + let invalid_terminal_block = terminal_block.unwrap().parent_hash; + + assert_eq!( + el.is_valid_terminal_pow_block_hash(invalid_terminal_block) + .await + .unwrap(), + Some(false) + ) + }) + .await; + } + + #[tokio::test] + async fn rejects_unknown_terminal_block_hash() { + SingleEngineTester::new() + .move_to_terminal_block() + .await + .with_terminal_block(|el, _| async move { + let missing_terminal_block = Hash256::repeat_byte(42); + + assert_eq!( + el.is_valid_terminal_pow_block_hash(missing_terminal_block) + .await + .unwrap(), + None + ) + }) + .await; + } +} diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs new file mode 100644 index 000000000..13ed71242 --- /dev/null +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -0,0 +1,373 @@ +use crate::engine_api::{ + http::JsonPreparePayloadRequest, ConsensusStatus, ExecutePayloadResponse, ExecutionBlock, +}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tree_hash::TreeHash; +use tree_hash_derive::TreeHash; +use types::{EthSpec, ExecutionPayload, Hash256, Uint256}; + +#[derive(Clone, Debug, PartialEq)] +#[allow(clippy::large_enum_variant)] // This struct is only for testing. +pub enum Block { + PoW(PoWBlock), + PoS(ExecutionPayload), +} + +impl Block { + pub fn block_number(&self) -> u64 { + match self { + Block::PoW(block) => block.block_number, + Block::PoS(payload) => payload.block_number, + } + } + + pub fn parent_hash(&self) -> Hash256 { + match self { + Block::PoW(block) => block.parent_hash, + Block::PoS(payload) => payload.parent_hash, + } + } + + pub fn block_hash(&self) -> Hash256 { + match self { + Block::PoW(block) => block.block_hash, + Block::PoS(payload) => payload.block_hash, + } + } + + pub fn total_difficulty(&self) -> Option { + match self { + Block::PoW(block) => Some(block.total_difficulty), + Block::PoS(_) => None, + } + } + + pub fn as_execution_block(&self, total_difficulty: u64) -> ExecutionBlock { + match self { + Block::PoW(block) => ExecutionBlock { + block_hash: block.block_hash, + block_number: block.block_number, + parent_hash: block.parent_hash, + total_difficulty: block.total_difficulty, + }, + Block::PoS(payload) => ExecutionBlock { + block_hash: payload.block_hash, + block_number: payload.block_number, + parent_hash: payload.parent_hash, + total_difficulty: total_difficulty.into(), + }, + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, TreeHash)] +#[serde(rename_all = "camelCase")] +pub struct PoWBlock { + pub block_number: u64, + pub block_hash: Hash256, + pub parent_hash: Hash256, + pub total_difficulty: Uint256, +} + +pub struct ExecutionBlockGenerator { + /* + * Common database + */ + blocks: HashMap>, + block_hashes: HashMap, + /* + * PoW block parameters + */ + pub terminal_total_difficulty: u64, + pub terminal_block_number: u64, + /* + * PoS block parameters + */ + pub pending_payloads: HashMap>, + pub next_payload_id: u64, + pub payload_ids: HashMap>, +} + +impl ExecutionBlockGenerator { + pub fn new(terminal_total_difficulty: u64, terminal_block_number: u64) -> Self { + let mut gen = Self { + blocks: <_>::default(), + block_hashes: <_>::default(), + terminal_total_difficulty, + terminal_block_number, + pending_payloads: <_>::default(), + next_payload_id: 0, + payload_ids: <_>::default(), + }; + + gen.insert_pow_block(0).unwrap(); + + gen + } + + pub fn latest_block(&self) -> Option> { + let hash = *self + .block_hashes + .iter() + .max_by_key(|(number, _)| *number) + .map(|(_, hash)| hash)?; + + self.block_by_hash(hash) + } + + pub fn latest_execution_block(&self) -> Option { + self.latest_block() + .map(|block| block.as_execution_block(self.terminal_total_difficulty)) + } + + pub fn block_by_number(&self, number: u64) -> Option> { + let hash = *self.block_hashes.get(&number)?; + self.block_by_hash(hash) + } + + pub fn execution_block_by_number(&self, number: u64) -> Option { + self.block_by_number(number) + .map(|block| block.as_execution_block(self.terminal_total_difficulty)) + } + + pub fn block_by_hash(&self, hash: Hash256) -> Option> { + self.blocks.get(&hash).cloned() + } + + pub fn execution_block_by_hash(&self, hash: Hash256) -> Option { + self.block_by_hash(hash) + .map(|block| block.as_execution_block(self.terminal_total_difficulty)) + } + + pub fn insert_pow_blocks( + &mut self, + block_numbers: impl Iterator, + ) -> Result<(), String> { + for i in block_numbers { + self.insert_pow_block(i)?; + } + + Ok(()) + } + + pub fn insert_pow_block(&mut self, block_number: u64) -> Result<(), String> { + if block_number > self.terminal_block_number { + return Err(format!( + "{} is beyond terminal pow block {}", + block_number, self.terminal_block_number + )); + } + + let parent_hash = if block_number == 0 { + Hash256::zero() + } else if let Some(hash) = self.block_hashes.get(&(block_number - 1)) { + *hash + } else { + return Err(format!( + "parent with block number {} not found", + block_number - 1 + )); + }; + + let increment = self + .terminal_total_difficulty + .checked_div(self.terminal_block_number) + .expect("terminal block number must be non-zero"); + let total_difficulty = increment + .checked_mul(block_number) + .expect("overflow computing total difficulty") + .into(); + + let mut block = PoWBlock { + block_number, + block_hash: Hash256::zero(), + parent_hash, + total_difficulty, + }; + + block.block_hash = block.tree_hash_root(); + + self.insert_block(Block::PoW(block)) + } + + pub fn insert_block(&mut self, block: Block) -> Result<(), String> { + if self.blocks.contains_key(&block.block_hash()) { + return Err(format!("{:?} is already known", block.block_hash())); + } else if self.block_hashes.contains_key(&block.block_number()) { + return Err(format!( + "block {} is already known, forking is not supported", + block.block_number() + )); + } else if block.parent_hash() != Hash256::zero() + && !self.blocks.contains_key(&block.parent_hash()) + { + return Err(format!("parent block {:?} is unknown", block.parent_hash())); + } + + self.block_hashes + .insert(block.block_number(), block.block_hash()); + self.blocks.insert(block.block_hash(), block); + + Ok(()) + } + + pub fn prepare_payload(&mut self, payload: JsonPreparePayloadRequest) -> Result { + if !self + .blocks + .iter() + .any(|(_, block)| block.block_number() == self.terminal_block_number) + { + return Err("refusing to create payload id before terminal block".to_string()); + } + + let parent = self + .blocks + .get(&payload.parent_hash) + .ok_or_else(|| format!("unknown parent block {:?}", payload.parent_hash))?; + + let id = self.next_payload_id; + self.next_payload_id += 1; + + let mut execution_payload = ExecutionPayload { + parent_hash: payload.parent_hash, + coinbase: payload.fee_recipient, + receipt_root: Hash256::repeat_byte(42), + state_root: Hash256::repeat_byte(43), + logs_bloom: vec![0; 256].into(), + random: payload.random, + block_number: parent.block_number() + 1, + gas_limit: 10, + gas_used: 9, + timestamp: payload.timestamp, + extra_data: "block gen was here".as_bytes().to_vec().into(), + base_fee_per_gas: Hash256::from_low_u64_le(1), + block_hash: Hash256::zero(), + transactions: vec![].into(), + }; + + execution_payload.block_hash = execution_payload.tree_hash_root(); + + self.payload_ids.insert(id, execution_payload); + + Ok(id) + } + + pub fn get_payload(&mut self, id: u64) -> Option> { + self.payload_ids.remove(&id) + } + + pub fn execute_payload(&mut self, payload: ExecutionPayload) -> ExecutePayloadResponse { + let parent = if let Some(parent) = self.blocks.get(&payload.parent_hash) { + parent + } else { + return ExecutePayloadResponse::Invalid; + }; + + if payload.block_number != parent.block_number() + 1 { + return ExecutePayloadResponse::Invalid; + } + + self.pending_payloads.insert(payload.block_hash, payload); + + ExecutePayloadResponse::Valid + } + + pub fn consensus_validated( + &mut self, + block_hash: Hash256, + status: ConsensusStatus, + ) -> Result<(), String> { + let payload = self + .pending_payloads + .remove(&block_hash) + .ok_or_else(|| format!("no pending payload for {:?}", block_hash))?; + + match status { + ConsensusStatus::Valid => self.insert_block(Block::PoS(payload)), + ConsensusStatus::Invalid => Ok(()), + } + } + + pub fn forkchoice_updated( + &mut self, + block_hash: Hash256, + finalized_block_hash: Hash256, + ) -> Result<(), String> { + if !self.blocks.contains_key(&block_hash) { + return Err(format!("block hash {:?} unknown", block_hash)); + } + + if finalized_block_hash != Hash256::zero() + && !self.blocks.contains_key(&finalized_block_hash) + { + return Err(format!( + "finalized block hash {:?} is unknown", + finalized_block_hash + )); + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use types::MainnetEthSpec; + + #[test] + fn pow_chain_only() { + const TERMINAL_DIFFICULTY: u64 = 10; + const TERMINAL_BLOCK: u64 = 10; + const DIFFICULTY_INCREMENT: u64 = 1; + + let mut generator: ExecutionBlockGenerator = + ExecutionBlockGenerator::new(TERMINAL_DIFFICULTY, TERMINAL_BLOCK); + + for i in 0..=TERMINAL_BLOCK { + if i > 0 { + generator.insert_pow_block(i).unwrap(); + } + + /* + * Generate a block, inspect it. + */ + + let block = generator.latest_block().unwrap(); + assert_eq!(block.block_number(), i); + + let expected_parent = i + .checked_sub(1) + .map(|i| generator.block_by_number(i).unwrap().block_hash()) + .unwrap_or_else(Hash256::zero); + assert_eq!(block.parent_hash(), expected_parent); + + assert_eq!( + block.total_difficulty().unwrap(), + (i * DIFFICULTY_INCREMENT).into() + ); + + assert_eq!(generator.block_by_hash(block.block_hash()).unwrap(), block); + assert_eq!(generator.block_by_number(i).unwrap(), block); + + /* + * Check the parent is accessible. + */ + + if let Some(prev_i) = i.checked_sub(1) { + assert_eq!( + generator.block_by_number(prev_i).unwrap(), + generator.block_by_hash(block.parent_hash()).unwrap() + ); + } + + /* + * Check the next block is inaccessible. + */ + + let next_i = i + 1; + assert!(generator.block_by_number(next_i).is_none()); + } + } +} diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs new file mode 100644 index 000000000..00fd8101e --- /dev/null +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -0,0 +1,125 @@ +use super::Context; +use crate::engine_api::http::*; +use serde::de::DeserializeOwned; +use serde_json::Value as JsonValue; +use std::sync::Arc; +use types::EthSpec; + +pub async fn handle_rpc( + body: JsonValue, + ctx: Arc>, +) -> Result { + let method = body + .get("method") + .and_then(JsonValue::as_str) + .ok_or_else(|| "missing/invalid method field".to_string())?; + + let params = body + .get("params") + .ok_or_else(|| "missing/invalid params field".to_string())?; + + match method { + ETH_SYNCING => Ok(JsonValue::Bool(false)), + ETH_GET_BLOCK_BY_NUMBER => { + let tag = params + .get(0) + .and_then(JsonValue::as_str) + .ok_or_else(|| "missing/invalid params[0] value".to_string())?; + + match tag { + "latest" => Ok(serde_json::to_value( + ctx.execution_block_generator + .read() + .await + .latest_execution_block(), + ) + .unwrap()), + other => Err(format!("The tag {} is not supported", other)), + } + } + ETH_GET_BLOCK_BY_HASH => { + let hash = params + .get(0) + .and_then(JsonValue::as_str) + .ok_or_else(|| "missing/invalid params[0] value".to_string()) + .and_then(|s| { + s.parse() + .map_err(|e| format!("unable to parse hash: {:?}", e)) + })?; + + Ok(serde_json::to_value( + ctx.execution_block_generator + .read() + .await + .execution_block_by_hash(hash), + ) + .unwrap()) + } + ENGINE_PREPARE_PAYLOAD => { + let request = get_param_0(params)?; + let payload_id = ctx + .execution_block_generator + .write() + .await + .prepare_payload(request)?; + + Ok(serde_json::to_value(JsonPayloadId { payload_id }).unwrap()) + } + ENGINE_EXECUTE_PAYLOAD => { + let request: JsonExecutionPayload = get_param_0(params)?; + let response = ctx + .execution_block_generator + .write() + .await + .execute_payload(request.into()); + + Ok(serde_json::to_value(response).unwrap()) + } + ENGINE_GET_PAYLOAD => { + let request: JsonPayloadId = get_param_0(params)?; + let id = request.payload_id; + + let response = ctx + .execution_block_generator + .write() + .await + .get_payload(id) + .ok_or_else(|| format!("no payload for id {}", id))?; + + Ok(serde_json::to_value(JsonExecutionPayload::from(response)).unwrap()) + } + + ENGINE_CONSENSUS_VALIDATED => { + let request: JsonConsensusValidatedRequest = get_param_0(params)?; + ctx.execution_block_generator + .write() + .await + .consensus_validated(request.block_hash, request.status)?; + + Ok(JsonValue::Null) + } + ENGINE_FORKCHOICE_UPDATED => { + let request: JsonForkChoiceUpdatedRequest = get_param_0(params)?; + ctx.execution_block_generator + .write() + .await + .forkchoice_updated(request.head_block_hash, request.finalized_block_hash)?; + + Ok(JsonValue::Null) + } + other => Err(format!( + "The method {} does not exist/is not available", + other + )), + } +} + +fn get_param_0(params: &JsonValue) -> Result { + params + .get(0) + .ok_or_else(|| "missing/invalid params[0] value".to_string()) + .and_then(|param| { + serde_json::from_value(param.clone()) + .map_err(|e| format!("failed to deserialize param[0]: {:?}", e)) + }) +} diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs new file mode 100644 index 000000000..d5ec89f87 --- /dev/null +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -0,0 +1,230 @@ +//! Provides a mock execution engine HTTP JSON-RPC API for use in testing. + +use crate::engine_api::http::JSONRPC_VERSION; +use bytes::Bytes; +use environment::null_logger; +use execution_block_generator::ExecutionBlockGenerator; +use handle_rpc::handle_rpc; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use slog::{info, Logger}; +use std::future::Future; +use std::marker::PhantomData; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::Arc; +use tokio::sync::{oneshot, RwLock, RwLockWriteGuard}; +use types::EthSpec; +use warp::Filter; + +pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; +pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; + +mod execution_block_generator; +mod handle_rpc; + +pub struct MockServer { + _shutdown_tx: oneshot::Sender<()>, + listen_socket_addr: SocketAddr, + last_echo_request: Arc>>, + pub ctx: Arc>, +} + +impl MockServer { + pub fn unit_testing() -> Self { + let last_echo_request = Arc::new(RwLock::new(None)); + let execution_block_generator = + ExecutionBlockGenerator::new(DEFAULT_TERMINAL_DIFFICULTY, DEFAULT_TERMINAL_BLOCK); + + let ctx: Arc> = Arc::new(Context { + config: <_>::default(), + log: null_logger().unwrap(), + last_echo_request: last_echo_request.clone(), + execution_block_generator: RwLock::new(execution_block_generator), + _phantom: PhantomData, + }); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let shutdown_future = async { + // Ignore the result from the channel, shut down regardless. + let _ = shutdown_rx.await; + }; + + let (listen_socket_addr, server_future) = serve(ctx.clone(), shutdown_future).unwrap(); + + tokio::spawn(server_future); + + Self { + _shutdown_tx: shutdown_tx, + listen_socket_addr, + last_echo_request, + ctx, + } + } + + pub async fn execution_block_generator( + &self, + ) -> RwLockWriteGuard<'_, ExecutionBlockGenerator> { + self.ctx.execution_block_generator.write().await + } + + pub fn url(&self) -> String { + format!( + "http://{}:{}", + self.listen_socket_addr.ip(), + self.listen_socket_addr.port() + ) + } + + pub async fn last_echo_request(&self) -> Bytes { + self.last_echo_request + .write() + .await + .take() + .expect("last echo request is none") + } +} + +#[derive(Debug)] +pub enum Error { + Warp(warp::Error), + Other(String), +} + +impl From for Error { + fn from(e: warp::Error) -> Self { + Error::Warp(e) + } +} + +impl From for Error { + fn from(e: String) -> Self { + Error::Other(e) + } +} + +#[derive(Debug)] +struct MissingIdField; + +impl warp::reject::Reject for MissingIdField {} + +/// A wrapper around all the items required to spawn the HTTP server. +/// +/// The server will gracefully handle the case where any fields are `None`. +pub struct Context { + pub config: Config, + pub log: Logger, + pub last_echo_request: Arc>>, + pub execution_block_generator: RwLock>, + pub _phantom: PhantomData, +} + +/// Configuration for the HTTP server. +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub listen_addr: Ipv4Addr, + pub listen_port: u16, +} + +impl Default for Config { + fn default() -> Self { + Self { + listen_addr: Ipv4Addr::new(127, 0, 0, 1), + listen_port: 0, + } + } +} + +/// Creates a server that will serve requests using information from `ctx`. +/// +/// The server will shut down gracefully when the `shutdown` future resolves. +/// +/// ## Returns +/// +/// This function will bind the server to the provided address and then return a tuple of: +/// +/// - `SocketAddr`: the address that the HTTP server will listen on. +/// - `Future`: the actual server future that will need to be awaited. +/// +/// ## Errors +/// +/// Returns an error if the server is unable to bind or there is another error during +/// configuration. +pub fn serve( + ctx: Arc>, + shutdown: impl Future + Send + Sync + 'static, +) -> Result<(SocketAddr, impl Future), Error> { + let config = &ctx.config; + let log = ctx.log.clone(); + + let inner_ctx = ctx.clone(); + let ctx_filter = warp::any().map(move || inner_ctx.clone()); + + // `/` + // + // Handles actual JSON-RPC requests. + let root = warp::path::end() + .and(warp::body::json()) + .and(ctx_filter.clone()) + .and_then(|body: serde_json::Value, ctx: Arc>| async move { + let id = body + .get("id") + .and_then(serde_json::Value::as_u64) + .ok_or_else(|| warp::reject::custom(MissingIdField))?; + + let response = match handle_rpc(body, ctx).await { + Ok(result) => json!({ + "id": id, + "jsonrpc": JSONRPC_VERSION, + "result": result + }), + Err(message) => json!({ + "id": id, + "jsonrpc": JSONRPC_VERSION, + "error": { + "code": -1234, // Junk error code. + "message": message + } + }), + }; + + Ok::<_, warp::reject::Rejection>( + warp::http::Response::builder() + .status(200) + .body(serde_json::to_string(&response).expect("response must be valid JSON")), + ) + }); + + // `/echo` + // + // Sends the body of the request to `ctx.last_echo_request` so we can inspect requests. + let echo = warp::path("echo") + .and(warp::body::bytes()) + .and(ctx_filter) + .and_then(|bytes: Bytes, ctx: Arc>| async move { + *ctx.last_echo_request.write().await = Some(bytes.clone()); + Ok::<_, warp::reject::Rejection>( + warp::http::Response::builder().status(200).body(bytes), + ) + }); + + let routes = warp::post() + .and(root.or(echo)) + // Add a `Server` header. + .map(|reply| warp::reply::with_header(reply, "Server", "lighthouse-mock-execution-client")); + + let (listening_socket, server) = warp::serve(routes).try_bind_with_graceful_shutdown( + SocketAddrV4::new(config.listen_addr, config.listen_port), + async { + shutdown.await; + }, + )?; + + info!( + log, + "Metrics HTTP server started"; + "listen_address" => listening_socket.to_string(), + ); + + Ok((listening_socket, server)) +} diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index dd42531cf..1d3983ffa 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -750,8 +750,8 @@ impl Worker { // TODO: check that this is what we're supposed to do when we don't want to // penalize a peer for our configuration issue // in the verification process BUT is this the proper way to handle it? - Err(e @BlockError::ExecutionPayloadError(ExecutionPayloadError::Eth1VerificationError(_))) - | Err(e @BlockError::ExecutionPayloadError(ExecutionPayloadError::NoEth1Connection)) => { + Err(e @BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(_))) + | Err(e @BlockError::ExecutionPayloadError(ExecutionPayloadError::NoExecutionConnection)) => { debug!(self.log, "Could not verify block for gossip, ignoring the block"; "error" => %e); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index c756b8a6c..8aadfbc11 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -371,6 +371,60 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Specifies how many blocks the database should cache in memory [default: 5]") .takes_value(true) ) + /* + * Execution Layer Integration + */ + .arg( + Arg::with_name("merge") + .long("merge") + .help("Enable the features necessary to run merge testnets. This feature \ + is unstable and is for developers only.") + .takes_value(false), + ) + .arg( + Arg::with_name("execution-endpoints") + .long("execution-endpoints") + .value_name("EXECUTION-ENDPOINTS") + .help("One or more comma-delimited server endpoints for HTTP JSON-RPC connection. \ + If multiple endpoints are given the endpoints are used as fallback in the \ + given order. Also enables the --merge flag. \ + If this flag is omitted and the --eth1-endpoints is supplied, those values \ + will be used. Defaults to http://127.0.0.1:8545.") + .takes_value(true) + ) + .arg( + Arg::with_name("terminal-total-difficulty-override") + .long("terminal-total-difficulty-override") + .value_name("TERMINAL_TOTAL_DIFFICULTY") + .help("Used to coordinate manual overrides to the TERMINAL_TOTAL_DIFFICULTY parameter. \ + This flag should only be used if the user has a clear understanding that \ + the broad Ethereum community has elected to override the terminal difficulty. \ + Incorrect use of this flag will cause your node to experience a consensus + failure. Be extremely careful with this flag.") + .takes_value(true) + ) + .arg( + Arg::with_name("terminal-block-hash-override") + .long("terminal-block-hash-override") + .value_name("TERMINAL_BLOCK_HASH") + .help("Used to coordinate manual overrides to the TERMINAL_BLOCK_HASH parameter. \ + This flag should only be used if the user has a clear understanding that \ + the broad Ethereum community has elected to override the terminal PoW block. \ + Incorrect use of this flag will cause your node to experience a consensus + failure. Be extremely careful with this flag.") + .takes_value(true) + ) + .arg( + Arg::with_name("fee-recipient") + .long("fee-recipient") + .help("Once the merge has happened, this address will receive transaction fees \ + collected from any blocks produced by this node. Defaults to a junk \ + address whilst the merge is in development stages. THE DEFAULT VALUE \ + WILL BE REMOVED BEFORE THE MERGE ENTERS PRODUCTION") + // TODO: remove this default value. It's just there to make life easy during merge + // testnets. + .default_value("0x0000000000000000000000000000000000000001"), + ) /* * Database purging and compaction. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 52a093261..f613c5fb1 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -232,6 +232,35 @@ pub fn get_config( client_config.eth1.purge_cache = true; } + if let Some(endpoints) = cli_args.value_of("execution-endpoints") { + client_config.sync_eth1_chain = true; + client_config.execution_endpoints = endpoints + .split(',') + .map(|s| SensitiveUrl::parse(s)) + .collect::>() + .map(Some) + .map_err(|e| format!("execution-endpoints contains an invalid URL {:?}", e))?; + } else if cli_args.is_present("merge") { + client_config.execution_endpoints = Some(client_config.eth1.endpoints.clone()); + } + + if let Some(terminal_total_difficulty) = + clap_utils::parse_optional(cli_args, "total-terminal-difficulty-override")? + { + if client_config.execution_endpoints.is_none() { + return Err( + "The --merge flag must be provided when using --total-terminal-difficulty-override" + .into(), + ); + } + + client_config.terminal_total_difficulty_override = Some(terminal_total_difficulty); + } + + client_config.fee_recipient = clap_utils::parse_optional(cli_args, "fee-recipient")?; + client_config.terminal_block_hash = + clap_utils::parse_optional(cli_args, "terminal-block-hash")?; + if let Some(freezer_dir) = cli_args.value_of("freezer-dir") { client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 0e15e16e0..6874966ab 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -125,7 +125,7 @@ impl TaskExecutor { /// Spawn a future on the tokio runtime. /// - /// The future is wrapped in an `exit_future::Exit`. The task is canceled when the corresponding + /// The future is wrapped in an `exit_future::Exit`. The task is cancelled when the corresponding /// exit_future `Signal` is fired/dropped. /// /// The future is monitored via another spawned future to ensure that it doesn't panic. In case diff --git a/consensus/fork_choice/Cargo.toml b/consensus/fork_choice/Cargo.toml index 2bfe3f137..f708045df 100644 --- a/consensus/fork_choice/Cargo.toml +++ b/consensus/fork_choice/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" [dependencies] types = { path = "../types" } -state_processing = { path = "../state_processing" } proto_array = { path = "../proto_array" } eth2_ssz = "0.4.0" eth2_ssz_derive = "0.3.0" diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 6b09cdc9c..ae94fac83 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -2,11 +2,9 @@ use std::marker::PhantomData; use proto_array::{Block as ProtoBlock, ProtoArrayForkChoice}; use ssz_derive::{Decode, Encode}; -use state_processing::per_block_processing::is_merge_block; use types::{ - AttestationShufflingId, BeaconBlock, BeaconState, BeaconStateError, ChainSpec, Checkpoint, - Epoch, EthSpec, Hash256, IndexedAttestation, PowBlock, RelativeEpoch, SignedBeaconBlock, Slot, - Uint256, + AttestationShufflingId, BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, + Hash256, IndexedAttestation, RelativeEpoch, SignedBeaconBlock, Slot, }; use crate::ForkChoiceStore; @@ -63,10 +61,6 @@ pub enum InvalidBlock { finalized_root: Hash256, block_ancestor: Option, }, - InvalidTerminalPowBlock { - block_total_difficulty: Uint256, - parent_total_difficulty: Uint256, - }, } #[derive(Debug)] @@ -238,14 +232,6 @@ where } } -/// https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/fork-choice.md#is_valid_terminal_pow_block -fn is_valid_terminal_pow_block(block: &PowBlock, parent: &PowBlock, spec: &ChainSpec) -> bool { - let is_total_difficulty_reached = block.total_difficulty >= spec.terminal_total_difficulty; - let is_parent_total_difficulty_valid = parent.total_difficulty < spec.terminal_total_difficulty; - - is_total_difficulty_reached && is_parent_total_difficulty_valid -} - impl ForkChoice where T: ForkChoiceStore, @@ -460,7 +446,6 @@ where block: &BeaconBlock, block_root: Hash256, state: &BeaconState, - spec: &ChainSpec, ) -> Result<(), Error> { let current_slot = self.update_time(current_slot)?; @@ -511,19 +496,6 @@ where })); } - // https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/fork-choice.md#on_block - if is_merge_block(state, block.body()) { - // TODO: get POW blocks from eth1 chain here as indicated in the merge spec link ^ - let pow_block = PowBlock::default(); - let pow_parent = PowBlock::default(); - if !is_valid_terminal_pow_block(&pow_block, &pow_parent, spec) { - return Err(Error::InvalidBlock(InvalidBlock::InvalidTerminalPowBlock { - block_total_difficulty: pow_block.total_difficulty, - parent_total_difficulty: pow_parent.total_difficulty, - })); - } - } - // Update justified checkpoint. if state.current_justified_checkpoint().epoch > self.fc_store.justified_checkpoint().epoch { if state.current_justified_checkpoint().epoch diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 2c0d498e1..8adc9de82 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -268,13 +268,7 @@ impl ForkChoiceTest { .chain .fork_choice .write() - .on_block( - current_slot, - &block, - block.canonical_root(), - &state, - &self.harness.chain.spec, - ) + .on_block(current_slot, &block, block.canonical_root(), &state) .unwrap(); self } @@ -309,13 +303,7 @@ impl ForkChoiceTest { .chain .fork_choice .write() - .on_block( - current_slot, - &block, - block.canonical_root(), - &state, - &self.harness.chain.spec, - ) + .on_block(current_slot, &block, block.canonical_root(), &state) .err() .expect("on_block did not return an error"); comparison_func(err); diff --git a/consensus/serde_utils/src/hex.rs b/consensus/serde_utils/src/hex.rs index 647b0ecfb..1e6c02427 100644 --- a/consensus/serde_utils/src/hex.rs +++ b/consensus/serde_utils/src/hex.rs @@ -6,6 +6,7 @@ use std::fmt; /// Encode `data` as a 0x-prefixed hex string. pub fn encode>(data: T) -> String { let hex = hex::encode(data); + let mut s = "0x".to_string(); s.push_str(hex.as_str()); s @@ -33,12 +34,7 @@ impl<'de> Visitor<'de> for PrefixedHexVisitor { where E: de::Error, { - if let Some(stripped) = value.strip_prefix("0x") { - Ok(hex::decode(stripped) - .map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e)))?) - } else { - Err(de::Error::custom("missing 0x prefix")) - } + decode(value).map_err(de::Error::custom) } } diff --git a/consensus/serde_utils/src/hex_vec.rs b/consensus/serde_utils/src/hex_vec.rs new file mode 100644 index 000000000..60d649443 --- /dev/null +++ b/consensus/serde_utils/src/hex_vec.rs @@ -0,0 +1,23 @@ +//! Formats `Vec` as a 0x-prefixed hex string. +//! +//! E.g., `vec![0, 1, 2, 3]` serializes as `"0x00010203"`. + +use crate::hex::PrefixedHexVisitor; +use serde::{Deserializer, Serializer}; + +pub fn serialize(bytes: &[u8], serializer: S) -> Result +where + S: Serializer, +{ + let mut hex_string: String = "0x".to_string(); + hex_string.push_str(&hex::encode(&bytes)); + + serializer.serialize_str(&hex_string) +} + +pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + deserializer.deserialize_str(PrefixedHexVisitor) +} diff --git a/consensus/serde_utils/src/lib.rs b/consensus/serde_utils/src/lib.rs index 0016e67a3..541a86d89 100644 --- a/consensus/serde_utils/src/lib.rs +++ b/consensus/serde_utils/src/lib.rs @@ -2,8 +2,11 @@ mod quoted_int; pub mod bytes_4_hex; pub mod hex; +pub mod hex_vec; +pub mod list_of_bytes_lists; pub mod quoted_u64_vec; pub mod u32_hex; +pub mod u64_hex_be; pub mod u8_hex; pub use quoted_int::{quoted_u32, quoted_u64, quoted_u8}; diff --git a/consensus/serde_utils/src/list_of_bytes_lists.rs b/consensus/serde_utils/src/list_of_bytes_lists.rs new file mode 100644 index 000000000..b93321aa0 --- /dev/null +++ b/consensus/serde_utils/src/list_of_bytes_lists.rs @@ -0,0 +1,49 @@ +//! Formats `Vec` using quotes. +//! +//! E.g., `vec![0, 1, 2]` serializes as `["0", "1", "2"]`. +//! +//! Quotes can be optional during decoding. + +use crate::hex; +use serde::ser::SerializeSeq; +use serde::{de, Deserializer, Serializer}; + +pub struct ListOfBytesListVisitor; +impl<'a> serde::de::Visitor<'a> for ListOfBytesListVisitor { + type Value = Vec>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a list of 0x-prefixed byte lists") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'a>, + { + let mut vec = vec![]; + + while let Some(val) = seq.next_element::()? { + vec.push(hex::decode(&val).map_err(de::Error::custom)?); + } + + Ok(vec) + } +} + +pub fn serialize(value: &[Vec], serializer: S) -> Result +where + S: Serializer, +{ + let mut seq = serializer.serialize_seq(Some(value.len()))?; + for val in value { + seq.serialize_element(&hex::encode(val))?; + } + seq.end() +} + +pub fn deserialize<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + deserializer.deserialize_any(ListOfBytesListVisitor) +} diff --git a/consensus/serde_utils/src/quoted_int.rs b/consensus/serde_utils/src/quoted_int.rs index 5c3fa0f0a..24edf1ebe 100644 --- a/consensus/serde_utils/src/quoted_int.rs +++ b/consensus/serde_utils/src/quoted_int.rs @@ -70,17 +70,6 @@ macro_rules! define_mod { pub value: T, } - /// Compositional wrapper type that allows quotes or no quotes. - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] - #[serde(transparent)] - pub struct MaybeQuoted - where - T: From<$int> + Into<$int> + Copy + TryFrom, - { - #[serde(with = "self")] - pub value: T, - } - /// Serialize with quotes. pub fn serialize(value: &T, serializer: S) -> Result where diff --git a/consensus/serde_utils/src/u64_hex_be.rs b/consensus/serde_utils/src/u64_hex_be.rs new file mode 100644 index 000000000..145292f8c --- /dev/null +++ b/consensus/serde_utils/src/u64_hex_be.rs @@ -0,0 +1,134 @@ +//! Formats `u64` as a 0x-prefixed, big-endian hex string. +//! +//! E.g., `0` serializes as `"0x0000000000000000"`. + +use serde::de::{self, Error, Visitor}; +use serde::{Deserializer, Serializer}; +use std::fmt; + +const BYTES_LEN: usize = 8; + +pub struct QuantityVisitor; +impl<'de> Visitor<'de> for QuantityVisitor { + type Value = Vec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a hex string") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + if !value.starts_with("0x") { + return Err(de::Error::custom("must start with 0x")); + } + + let stripped = value.trim_start_matches("0x"); + + if stripped.is_empty() { + Err(de::Error::custom(format!( + "quantity cannot be {}", + stripped + ))) + } else if stripped == "0" { + Ok(vec![0]) + } else if stripped.starts_with('0') { + Err(de::Error::custom("cannot have leading zero")) + } else if stripped.len() % 2 != 0 { + hex::decode(&format!("0{}", stripped)) + .map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e))) + } else { + hex::decode(&stripped).map_err(|e| de::Error::custom(format!("invalid hex ({:?})", e))) + } + } +} + +pub fn serialize(num: &u64, serializer: S) -> Result +where + S: Serializer, +{ + let raw = hex::encode(num.to_be_bytes()); + let trimmed = raw.trim_start_matches('0'); + + let hex = if trimmed.is_empty() { "0" } else { &trimmed }; + + serializer.serialize_str(&format!("0x{}", &hex)) +} + +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let decoded = deserializer.deserialize_str(QuantityVisitor)?; + + // TODO: this is not strict about byte length like other methods. + if decoded.len() > BYTES_LEN { + return Err(D::Error::custom(format!( + "expected max {} bytes for array, got {}", + BYTES_LEN, + decoded.len() + ))); + } + + let mut array = [0; BYTES_LEN]; + array[BYTES_LEN - decoded.len()..].copy_from_slice(&decoded); + Ok(u64::from_be_bytes(array)) +} + +#[cfg(test)] +mod test { + use serde::{Deserialize, Serialize}; + use serde_json; + + #[derive(Debug, PartialEq, Serialize, Deserialize)] + #[serde(transparent)] + struct Wrapper { + #[serde(with = "super")] + val: u64, + } + + #[test] + fn encoding() { + assert_eq!( + &serde_json::to_string(&Wrapper { val: 0 }).unwrap(), + "\"0x0\"" + ); + assert_eq!( + &serde_json::to_string(&Wrapper { val: 1 }).unwrap(), + "\"0x1\"" + ); + assert_eq!( + &serde_json::to_string(&Wrapper { val: 256 }).unwrap(), + "\"0x100\"" + ); + assert_eq!( + &serde_json::to_string(&Wrapper { val: 65 }).unwrap(), + "\"0x41\"" + ); + assert_eq!( + &serde_json::to_string(&Wrapper { val: 1024 }).unwrap(), + "\"0x400\"" + ); + } + + #[test] + fn decoding() { + assert_eq!( + serde_json::from_str::("\"0x0\"").unwrap(), + Wrapper { val: 0 }, + ); + assert_eq!( + serde_json::from_str::("\"0x41\"").unwrap(), + Wrapper { val: 65 }, + ); + assert_eq!( + serde_json::from_str::("\"0x400\"").unwrap(), + Wrapper { val: 1024 }, + ); + serde_json::from_str::("\"0x\"").unwrap_err(); + serde_json::from_str::("\"0x0400\"").unwrap_err(); + serde_json::from_str::("\"400\"").unwrap_err(); + serde_json::from_str::("\"ff\"").unwrap_err(); + } +} diff --git a/consensus/ssz_types/src/serde_utils/hex_fixed_vec.rs b/consensus/ssz_types/src/serde_utils/hex_fixed_vec.rs index 0b1b73f01..86077891b 100644 --- a/consensus/ssz_types/src/serde_utils/hex_fixed_vec.rs +++ b/consensus/ssz_types/src/serde_utils/hex_fixed_vec.rs @@ -8,10 +8,7 @@ where S: Serializer, U: Unsigned, { - let mut hex_string: String = "0x".to_string(); - hex_string.push_str(&hex::encode(&bytes[..])); - - serializer.serialize_str(&hex_string) + serializer.serialize_str(&hex::encode(&bytes[..])) } pub fn deserialize<'de, D, U>(deserializer: D) -> Result, D::Error> diff --git a/consensus/ssz_types/src/serde_utils/hex_var_list.rs b/consensus/ssz_types/src/serde_utils/hex_var_list.rs index 3fc52951b..e3a3a14e0 100644 --- a/consensus/ssz_types/src/serde_utils/hex_var_list.rs +++ b/consensus/ssz_types/src/serde_utils/hex_var_list.rs @@ -9,10 +9,7 @@ where S: Serializer, N: Unsigned, { - let mut hex_string: String = "0x".to_string(); - hex_string.push_str(&hex::encode(&**bytes)); - - serializer.serialize_str(&hex_string) + serializer.serialize_str(&hex::encode(&**bytes)) } pub fn deserialize<'de, D, N>(deserializer: D) -> Result, D::Error> diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 69fd38b81..25c73018d 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -131,6 +131,7 @@ pub struct ChainSpec { /// The Merge fork epoch is optional, with `None` representing "Merge never happens". pub merge_fork_epoch: Option, pub terminal_total_difficulty: Uint256, + pub terminal_block_hash: Hash256, /* * Networking @@ -483,6 +484,7 @@ impl ChainSpec { terminal_total_difficulty: Uint256::MAX .checked_sub(Uint256::from(2u64.pow(10))) .expect("calculation does not overflow"), + terminal_block_hash: Hash256::zero(), /* * Network specific diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 9ccd52f7b..24f77fca7 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -113,7 +113,7 @@ pub use crate::deposit_message::DepositMessage; pub use crate::enr_fork_id::EnrForkId; pub use crate::eth1_data::Eth1Data; pub use crate::eth_spec::EthSpecId; -pub use crate::execution_payload::ExecutionPayload; +pub use crate::execution_payload::{ExecutionPayload, Transaction}; pub use crate::execution_payload_header::ExecutionPayloadHeader; pub use crate::fork::Fork; pub use crate::fork_context::ForkContext;