diff --git a/beacon_node/beacon_chain/src/capella_readiness.rs b/beacon_node/beacon_chain/src/capella_readiness.rs new file mode 100644 index 000000000..b15632105 --- /dev/null +++ b/beacon_node/beacon_chain/src/capella_readiness.rs @@ -0,0 +1,135 @@ +//! Provides tools for checking if a node is ready for the Capella upgrade and following merge +//! transition. + +use crate::{BeaconChain, BeaconChainTypes}; +use execution_layer::http::{ + ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V2, +}; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::time::Duration; +use types::*; + +/// The time before the Capella fork when we will start issuing warnings about preparation. +use super::merge_readiness::SECONDS_IN_A_WEEK; +pub const CAPELLA_READINESS_PREPARATION_SECONDS: u64 = SECONDS_IN_A_WEEK * 2; +pub const ENGINE_CAPABILITIES_REFRESH_INTERVAL: u64 = 300; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type")] +pub enum CapellaReadiness { + /// The execution engine is capella-enabled (as far as we can tell) + Ready, + /// The EL can be reached and has the correct configuration, however it's not yet synced. + NotSynced, + /// We are connected to an execution engine which doesn't support the V2 engine api methods + V2MethodsNotSupported { error: String }, + /// The transition configuration with the EL failed, there might be a problem with + /// connectivity, authentication or a difference in configuration. + ExchangeCapabilitiesFailed { error: String }, + /// The user has not configured an execution endpoint + NoExecutionEndpoint, +} + +impl fmt::Display for CapellaReadiness { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CapellaReadiness::Ready => { + write!(f, "This node appears ready for Capella.") + } + CapellaReadiness::ExchangeCapabilitiesFailed { error } => write!( + f, + "Could not exchange capabilities with the \ + execution endpoint: {}", + error + ), + CapellaReadiness::NotSynced => write!( + f, + "The execution endpoint is connected and configured, \ + however it is not yet synced" + ), + CapellaReadiness::NoExecutionEndpoint => write!( + f, + "The --execution-endpoint flag is not specified, this is a \ + requirement post-merge" + ), + CapellaReadiness::V2MethodsNotSupported { error } => write!( + f, + "The execution endpoint does not appear to support \ + the required engine api methods for Capella: {}", + error + ), + } + } +} + +impl BeaconChain { + /// Returns `true` if capella epoch is set and Capella fork has occurred or will + /// occur within `CAPELLA_READINESS_PREPARATION_SECONDS` + pub fn is_time_to_prepare_for_capella(&self, current_slot: Slot) -> bool { + if let Some(capella_epoch) = self.spec.capella_fork_epoch { + let capella_slot = capella_epoch.start_slot(T::EthSpec::slots_per_epoch()); + let capella_readiness_preparation_slots = + CAPELLA_READINESS_PREPARATION_SECONDS / self.spec.seconds_per_slot; + // Return `true` if Capella has happened or is within the preparation time. + current_slot + capella_readiness_preparation_slots > capella_slot + } else { + // The Capella fork epoch has not been defined yet, no need to prepare. + false + } + } + + /// Attempts to connect to the EL and confirm that it is ready for capella. + pub async fn check_capella_readiness(&self) -> CapellaReadiness { + if let Some(el) = self.execution_layer.as_ref() { + match el + .get_engine_capabilities(Some(Duration::from_secs( + ENGINE_CAPABILITIES_REFRESH_INTERVAL, + ))) + .await + { + Err(e) => { + // The EL was either unreachable or responded with an error + CapellaReadiness::ExchangeCapabilitiesFailed { + error: format!("{:?}", e), + } + } + Ok(capabilities) => { + let mut missing_methods = String::from("Required Methods Unsupported:"); + let mut all_good = true; + if !capabilities.get_payload_v2 { + missing_methods.push(' '); + missing_methods.push_str(ENGINE_GET_PAYLOAD_V2); + all_good = false; + } + if !capabilities.forkchoice_updated_v2 { + missing_methods.push(' '); + missing_methods.push_str(ENGINE_FORKCHOICE_UPDATED_V2); + all_good = false; + } + if !capabilities.new_payload_v2 { + missing_methods.push(' '); + missing_methods.push_str(ENGINE_NEW_PAYLOAD_V2); + all_good = false; + } + + if all_good { + if !el.is_synced_for_notifier().await { + // The EL is not synced. + CapellaReadiness::NotSynced + } else { + CapellaReadiness::Ready + } + } else { + CapellaReadiness::V2MethodsNotSupported { + error: missing_methods, + } + } + } + } + } else { + CapellaReadiness::NoExecutionEndpoint + } + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c17b48517..2444c144f 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -11,6 +11,7 @@ mod block_times_cache; mod block_verification; pub mod builder; pub mod canonical_head; +pub mod capella_readiness; pub mod chain_config; mod early_attester_cache; mod errors; diff --git a/beacon_node/beacon_chain/src/merge_readiness.rs b/beacon_node/beacon_chain/src/merge_readiness.rs index 4ef2102fd..c66df39ee 100644 --- a/beacon_node/beacon_chain/src/merge_readiness.rs +++ b/beacon_node/beacon_chain/src/merge_readiness.rs @@ -8,7 +8,7 @@ use std::fmt::Write; use types::*; /// The time before the Bellatrix fork when we will start issuing warnings about preparation. -const SECONDS_IN_A_WEEK: u64 = 604800; +pub const SECONDS_IN_A_WEEK: u64 = 604800; pub const MERGE_READINESS_PREPARATION_SECONDS: u64 = SECONDS_IN_A_WEEK * 2; #[derive(Default, Debug, Serialize, Deserialize)] diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index c5da51899..875ff845a 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -375,7 +375,6 @@ where .collect::>() .unwrap(); - let spec = MainnetEthSpec::default_spec(); let config = execution_layer::Config { execution_endpoints: urls, secret_files: vec![], @@ -386,7 +385,6 @@ where config, self.runtime.task_executor.clone(), self.log.clone(), - &spec, ) .unwrap(); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index b19b636c7..3b016ebda 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -154,7 +154,6 @@ where config, context.executor.clone(), context.log().clone(), - &spec, ) .map_err(|e| format!("unable to start execution layer endpoints: {:?}", e))?; Some(execution_layer) diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 1da7a7970..c1d830bc0 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -1,5 +1,6 @@ use crate::metrics; use beacon_chain::{ + capella_readiness::CapellaReadiness, merge_readiness::{MergeConfig, MergeReadiness}, BeaconChain, BeaconChainTypes, ExecutionStatus, }; @@ -313,6 +314,7 @@ pub fn spawn_notifier( eth1_logging(&beacon_chain, &log); merge_readiness_logging(current_slot, &beacon_chain, &log).await; + capella_readiness_logging(current_slot, &beacon_chain, &log).await; } }; @@ -350,12 +352,15 @@ async fn merge_readiness_logging( } if merge_completed && !has_execution_layer { - error!( - log, - "Execution endpoint required"; - "info" => "you need an execution engine to validate blocks, see: \ - https://lighthouse-book.sigmaprime.io/merge-migration.html" - ); + if !beacon_chain.is_time_to_prepare_for_capella(current_slot) { + // logging of the EE being offline is handled in `capella_readiness_logging()` + error!( + log, + "Execution endpoint required"; + "info" => "you need an execution engine to validate blocks, see: \ + https://lighthouse-book.sigmaprime.io/merge-migration.html" + ); + } return; } @@ -419,6 +424,60 @@ async fn merge_readiness_logging( } } +/// Provides some helpful logging to users to indicate if their node is ready for Capella +async fn capella_readiness_logging( + current_slot: Slot, + beacon_chain: &BeaconChain, + log: &Logger, +) { + let capella_completed = beacon_chain + .canonical_head + .cached_head() + .snapshot + .beacon_block + .message() + .body() + .execution_payload() + .map_or(false, |payload| payload.withdrawals_root().is_ok()); + + let has_execution_layer = beacon_chain.execution_layer.is_some(); + + if capella_completed && has_execution_layer + || !beacon_chain.is_time_to_prepare_for_capella(current_slot) + { + return; + } + + if capella_completed && !has_execution_layer { + error!( + log, + "Execution endpoint required"; + "info" => "you need a Capella enabled execution engine to validate blocks, see: \ + https://lighthouse-book.sigmaprime.io/merge-migration.html" + ); + return; + } + + match beacon_chain.check_capella_readiness().await { + CapellaReadiness::Ready => { + info!(log, "Ready for Capella") + } + readiness @ CapellaReadiness::ExchangeCapabilitiesFailed { error: _ } => { + error!( + log, + "Not ready for Capella"; + "info" => %readiness, + "hint" => "try updating Lighthouse and/or the execution layer", + ) + } + readiness => warn!( + log, + "Not ready for Capella"; + "info" => %readiness, + ), + } +} + fn eth1_logging(beacon_chain: &BeaconChain, log: &Logger) { let current_slot_opt = beacon_chain.slot().ok(); diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index a44b31050..0468a02d2 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -122,7 +122,7 @@ impl SszEth1Cache { cache: self.deposit_cache.to_deposit_cache()?, last_processed_block: self.last_processed_block, }), - endpoint: endpoint_from_config(&config, &spec) + endpoint: endpoint_from_config(&config) .map_err(|e| format!("Failed to create endpoint: {:?}", e))?, to_finalize: RwLock::new(None), // Set the remote head_block zero when creating a new instance. We only care about diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 56c2411ba..31082394b 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -363,7 +363,7 @@ impl Default for Config { } } -pub fn endpoint_from_config(config: &Config, spec: &ChainSpec) -> Result { +pub fn endpoint_from_config(config: &Config) -> Result { match config.endpoint.clone() { Eth1Endpoint::Auth { endpoint, @@ -373,16 +373,11 @@ pub fn endpoint_from_config(config: &Config, spec: &ChainSpec) -> Result { let auth = Auth::new_with_path(jwt_path, jwt_id, jwt_version) .map_err(|e| format!("Failed to initialize jwt auth: {:?}", e))?; - HttpJsonRpc::new_with_auth( - endpoint, - auth, - Some(config.execution_timeout_multiplier), - spec, - ) - .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) + HttpJsonRpc::new_with_auth(endpoint, auth, Some(config.execution_timeout_multiplier)) + .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) } Eth1Endpoint::NoAuth(endpoint) => { - HttpJsonRpc::new(endpoint, Some(config.execution_timeout_multiplier), spec) + HttpJsonRpc::new(endpoint, Some(config.execution_timeout_multiplier)) .map_err(|e| format!("Failed to create eth1 json rpc client: {:?}", e)) } } @@ -409,7 +404,7 @@ impl Service { deposit_cache: RwLock::new(DepositUpdater::new( config.deposit_contract_deploy_block, )), - endpoint: endpoint_from_config(&config, &spec)?, + endpoint: endpoint_from_config(&config)?, to_finalize: RwLock::new(None), remote_head_block: RwLock::new(None), config: RwLock::new(config), @@ -438,7 +433,7 @@ impl Service { inner: Arc::new(Inner { block_cache: <_>::default(), deposit_cache: RwLock::new(deposit_cache), - endpoint: endpoint_from_config(&config, &spec) + endpoint: endpoint_from_config(&config) .map_err(Error::FailedToInitializeFromSnapshot)?, to_finalize: RwLock::new(None), remote_head_block: RwLock::new(None), diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index eb0d2371c..cd680478c 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -494,8 +494,7 @@ mod deposit_tree { let mut deposit_counts = vec![]; let client = - HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None, spec) - .unwrap(); + HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None).unwrap(); // Perform deposits to the smart contract, recording it's state along the way. for deposit in &deposits { @@ -599,12 +598,8 @@ mod http { .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let client = HttpJsonRpc::new( - SensitiveUrl::parse(ð1.endpoint()).unwrap(), - None, - &MainnetEthSpec::default_spec(), - ) - .unwrap(); + let client = + HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None).unwrap(); let block_number = get_block_number(&web3).await; let logs = blocking_deposit_logs(&client, ð1, 0..block_number).await; @@ -720,8 +715,7 @@ mod fast { ) .unwrap(); let client = - HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None, &spec) - .unwrap(); + HttpJsonRpc::new(SensitiveUrl::parse(ð1.endpoint()).unwrap(), None).unwrap(); let n = 10; let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); for deposit in &deposits { diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index afc5cffe2..da5e991b0 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,4 +1,9 @@ use crate::engines::ForkchoiceState; +use crate::http::{ + ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, ENGINE_FORKCHOICE_UPDATED_V1, + ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, + ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, +}; pub use ethers_core::types::Transaction; use ethers_core::utils::rlp::{self, Decodable, Rlp}; use http::deposit_methods::RpcError; @@ -347,11 +352,8 @@ impl GetPayloadResponse { } } -// This name is work in progress, it could -// change when this method is actually proposed -// but I'm writing this as it has been described #[derive(Clone, Copy, Debug)] -pub struct SupportedApis { +pub struct EngineCapabilities { pub new_payload_v1: bool, pub new_payload_v2: bool, pub forkchoice_updated_v1: bool, @@ -360,3 +362,32 @@ pub struct SupportedApis { pub get_payload_v2: bool, pub exchange_transition_configuration_v1: bool, } + +impl EngineCapabilities { + pub fn to_response(&self) -> Vec<&str> { + let mut response = Vec::new(); + if self.new_payload_v1 { + response.push(ENGINE_NEW_PAYLOAD_V1); + } + if self.new_payload_v2 { + response.push(ENGINE_NEW_PAYLOAD_V2); + } + if self.forkchoice_updated_v1 { + response.push(ENGINE_FORKCHOICE_UPDATED_V1); + } + if self.forkchoice_updated_v2 { + response.push(ENGINE_FORKCHOICE_UPDATED_V2); + } + if self.get_payload_v1 { + response.push(ENGINE_GET_PAYLOAD_V1); + } + if self.get_payload_v2 { + response.push(ENGINE_GET_PAYLOAD_V2); + } + if self.exchange_transition_configuration_v1 { + response.push(ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1); + } + + response + } +} diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 60725192b..d1faab42c 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -7,10 +7,11 @@ use reqwest::header::CONTENT_TYPE; use sensitive_url::SensitiveUrl; use serde::de::DeserializeOwned; use serde_json::json; -use tokio::sync::RwLock; +use std::collections::HashSet; +use tokio::sync::Mutex; -use std::time::Duration; -use types::{ChainSpec, EthSpec}; +use std::time::{Duration, SystemTime}; +use types::EthSpec; pub use deposit_log::{DepositLog, Log}; pub use reqwest::Client; @@ -48,8 +49,37 @@ pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str = "engine_exchangeTransitionConfigurationV1"; pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_secs(1); +pub const ENGINE_EXCHANGE_CAPABILITIES: &str = "engine_exchangeCapabilities"; +pub const ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT: Duration = Duration::from_secs(1); + /// This error is returned during a `chainId` call by Geth. pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block"; +/// This code is returned by all clients when a method is not supported +/// (verified geth, nethermind, erigon, besu) +pub const METHOD_NOT_FOUND_CODE: i64 = -32601; + +pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ + ENGINE_NEW_PAYLOAD_V1, + ENGINE_NEW_PAYLOAD_V2, + ENGINE_GET_PAYLOAD_V1, + ENGINE_GET_PAYLOAD_V2, + ENGINE_FORKCHOICE_UPDATED_V1, + ENGINE_FORKCHOICE_UPDATED_V2, + ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1, +]; + +/// This is necessary because a user might run a capella-enabled version of +/// lighthouse before they update to a capella-enabled execution engine. +// TODO (mark): rip this out once we are post-capella on mainnet +pub static PRE_CAPELLA_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { + new_payload_v1: true, + new_payload_v2: false, + forkchoice_updated_v1: true, + forkchoice_updated_v2: false, + get_payload_v1: true, + get_payload_v2: false, + exchange_transition_configuration_v1: true, +}; /// Contains methods to convert arbitrary bytes to an ETH2 deposit contract object. pub mod deposit_log { @@ -526,11 +556,47 @@ pub mod deposit_methods { } } +#[derive(Clone, Debug)] +pub struct CapabilitiesCacheEntry { + engine_capabilities: EngineCapabilities, + fetch_time: SystemTime, +} + +impl CapabilitiesCacheEntry { + pub fn new(engine_capabilities: EngineCapabilities) -> Self { + Self { + engine_capabilities, + fetch_time: SystemTime::now(), + } + } + + pub fn engine_capabilities(&self) -> &EngineCapabilities { + &self.engine_capabilities + } + + pub fn age(&self) -> Duration { + // duration_since() may fail because measurements taken earlier + // are not guaranteed to always be before later measurements + // due to anomalies such as the system clock being adjusted + // either forwards or backwards + // + // In such cases, we'll just say the age is zero + SystemTime::now() + .duration_since(self.fetch_time) + .unwrap_or(Duration::ZERO) + } + + /// returns `true` if the entry's age is >= age_limit + pub fn older_than(&self, age_limit: Option) -> bool { + age_limit.map_or(false, |limit| self.age() >= limit) + } +} + pub struct HttpJsonRpc { pub client: Client, pub url: SensitiveUrl, pub execution_timeout_multiplier: u32, - pub cached_supported_apis: RwLock>, + pub engine_capabilities_cache: Mutex>, auth: Option, } @@ -538,27 +604,12 @@ impl HttpJsonRpc { pub fn new( url: SensitiveUrl, execution_timeout_multiplier: Option, - spec: &ChainSpec, ) -> Result { - // FIXME: remove this `cached_supported_apis` spec hack once the `engine_getCapabilities` - // method is implemented in all execution clients: - // https://github.com/ethereum/execution-apis/issues/321 - let cached_supported_apis = RwLock::new(Some(SupportedApis { - new_payload_v1: true, - new_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - forkchoice_updated_v1: true, - forkchoice_updated_v2: spec.capella_fork_epoch.is_some() - || spec.eip4844_fork_epoch.is_some(), - get_payload_v1: true, - get_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - exchange_transition_configuration_v1: true, - })); - Ok(Self { client: Client::builder().build()?, url, execution_timeout_multiplier: execution_timeout_multiplier.unwrap_or(1), - cached_supported_apis, + engine_capabilities_cache: Mutex::new(None), auth: None, }) } @@ -567,27 +618,12 @@ impl HttpJsonRpc { url: SensitiveUrl, auth: Auth, execution_timeout_multiplier: Option, - spec: &ChainSpec, ) -> Result { - // FIXME: remove this `cached_supported_apis` spec hack once the `engine_getCapabilities` - // method is implemented in all execution clients: - // https://github.com/ethereum/execution-apis/issues/321 - let cached_supported_apis = RwLock::new(Some(SupportedApis { - new_payload_v1: true, - new_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - forkchoice_updated_v1: true, - forkchoice_updated_v2: spec.capella_fork_epoch.is_some() - || spec.eip4844_fork_epoch.is_some(), - get_payload_v1: true, - get_payload_v2: spec.capella_fork_epoch.is_some() || spec.eip4844_fork_epoch.is_some(), - exchange_transition_configuration_v1: true, - })); - Ok(Self { client: Client::builder().build()?, url, execution_timeout_multiplier: execution_timeout_multiplier.unwrap_or(1), - cached_supported_apis, + engine_capabilities_cache: Mutex::new(None), auth: Some(auth), }) } @@ -893,35 +929,67 @@ impl HttpJsonRpc { Ok(response) } - // TODO: This is currently a stub for the `engine_getCapabilities` - // method. This stub is unused because we set cached_supported_apis - // in the constructor based on the `spec` - // Implement this once the execution clients support it - // https://github.com/ethereum/execution-apis/issues/321 - pub async fn get_capabilities(&self) -> Result { - Ok(SupportedApis { - new_payload_v1: true, - new_payload_v2: true, - forkchoice_updated_v1: true, - forkchoice_updated_v2: true, - get_payload_v1: true, - get_payload_v2: true, - exchange_transition_configuration_v1: true, - }) + pub async fn exchange_capabilities(&self) -> Result { + let params = json!([LIGHTHOUSE_CAPABILITIES]); + + let response: Result, _> = self + .rpc_request( + ENGINE_EXCHANGE_CAPABILITIES, + params, + ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT * self.execution_timeout_multiplier, + ) + .await; + + match response { + // TODO (mark): rip this out once we are post capella on mainnet + Err(error) => match error { + Error::ServerMessage { code, message: _ } if code == METHOD_NOT_FOUND_CODE => { + Ok(PRE_CAPELLA_ENGINE_CAPABILITIES) + } + _ => Err(error), + }, + Ok(capabilities) => Ok(EngineCapabilities { + new_payload_v1: capabilities.contains(ENGINE_NEW_PAYLOAD_V1), + new_payload_v2: capabilities.contains(ENGINE_NEW_PAYLOAD_V2), + forkchoice_updated_v1: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V1), + forkchoice_updated_v2: capabilities.contains(ENGINE_FORKCHOICE_UPDATED_V2), + get_payload_v1: capabilities.contains(ENGINE_GET_PAYLOAD_V1), + get_payload_v2: capabilities.contains(ENGINE_GET_PAYLOAD_V2), + exchange_transition_configuration_v1: capabilities + .contains(ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1), + }), + } } - pub async fn set_cached_supported_apis(&self, supported_apis: Option) { - *self.cached_supported_apis.write().await = supported_apis; + pub async fn clear_exchange_capabilties_cache(&self) { + *self.engine_capabilities_cache.lock().await = None; } - pub async fn get_cached_supported_apis(&self) -> Result { - let cached_opt = *self.cached_supported_apis.read().await; - if let Some(supported_apis) = cached_opt { - Ok(supported_apis) + /// Returns the execution engine capabilities resulting from a call to + /// engine_exchangeCapabilities. If the capabilities cache is not populated, + /// or if it is populated with a cached result of age >= `age_limit`, this + /// method will fetch the result from the execution engine and populate the + /// cache before returning it. Otherwise it will return a cached result from + /// a previous call. + /// + /// Set `age_limit` to `None` to always return the cached result + /// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE + pub async fn get_engine_capabilities( + &self, + age_limit: Option, + ) -> Result { + let mut lock = self.engine_capabilities_cache.lock().await; + + if lock + .as_ref() + .map_or(true, |entry| entry.older_than(age_limit)) + { + let engine_capabilities = self.exchange_capabilities().await?; + *lock = Some(CapabilitiesCacheEntry::new(engine_capabilities)); + Ok(engine_capabilities) } else { - let supported_apis = self.get_capabilities().await?; - self.set_cached_supported_apis(Some(supported_apis)).await; - Ok(supported_apis) + // here entry is guaranteed to exist so unwrap() is safe + Ok(*lock.as_ref().unwrap().engine_capabilities()) } } @@ -931,10 +999,10 @@ impl HttpJsonRpc { &self, execution_payload: ExecutionPayload, ) -> Result { - let supported_apis = self.get_cached_supported_apis().await?; - if supported_apis.new_payload_v2 { + let engine_capabilities = self.get_engine_capabilities(None).await?; + if engine_capabilities.new_payload_v2 { self.new_payload_v2(execution_payload).await - } else if supported_apis.new_payload_v1 { + } else if engine_capabilities.new_payload_v1 { self.new_payload_v1(execution_payload).await } else { Err(Error::RequiredMethodUnsupported("engine_newPayload")) @@ -948,8 +1016,8 @@ impl HttpJsonRpc { fork_name: ForkName, payload_id: PayloadId, ) -> Result, Error> { - let supported_apis = self.get_cached_supported_apis().await?; - if supported_apis.get_payload_v2 { + let engine_capabilities = self.get_engine_capabilities(None).await?; + if engine_capabilities.get_payload_v2 { // TODO: modify this method to return GetPayloadResponse instead // of throwing away the `block_value` and returning only the // ExecutionPayload @@ -957,7 +1025,7 @@ impl HttpJsonRpc { .get_payload_v2(fork_name, payload_id) .await? .execution_payload()) - } else if supported_apis.new_payload_v1 { + } else if engine_capabilities.new_payload_v1 { self.get_payload_v1(payload_id).await } else { Err(Error::RequiredMethodUnsupported("engine_getPayload")) @@ -971,11 +1039,11 @@ impl HttpJsonRpc { forkchoice_state: ForkchoiceState, payload_attributes: Option, ) -> Result { - let supported_apis = self.get_cached_supported_apis().await?; - if supported_apis.forkchoice_updated_v2 { + let engine_capabilities = self.get_engine_capabilities(None).await?; + if engine_capabilities.forkchoice_updated_v2 { self.forkchoice_updated_v2(forkchoice_state, payload_attributes) .await - } else if supported_apis.forkchoice_updated_v1 { + } else if engine_capabilities.forkchoice_updated_v1 { self.forkchoice_updated_v1(forkchoice_state, payload_attributes) .await } else { @@ -1003,7 +1071,6 @@ mod test { impl Tester { pub fn new(with_auth: bool) -> Self { let server = MockServer::unit_testing(); - let spec = MainnetEthSpec::default_spec(); let rpc_url = SensitiveUrl::parse(&server.url()).unwrap(); let echo_url = SensitiveUrl::parse(&format!("{}/echo", server.url())).unwrap(); @@ -1014,13 +1081,13 @@ mod test { let echo_auth = Auth::new(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(), None, None); ( - Arc::new(HttpJsonRpc::new_with_auth(rpc_url, rpc_auth, None, &spec).unwrap()), - Arc::new(HttpJsonRpc::new_with_auth(echo_url, echo_auth, None, &spec).unwrap()), + Arc::new(HttpJsonRpc::new_with_auth(rpc_url, rpc_auth, None).unwrap()), + Arc::new(HttpJsonRpc::new_with_auth(echo_url, echo_auth, None).unwrap()), ) } else { ( - Arc::new(HttpJsonRpc::new(rpc_url, None, &spec).unwrap()), - Arc::new(HttpJsonRpc::new(echo_url, None, &spec).unwrap()), + Arc::new(HttpJsonRpc::new(rpc_url, None).unwrap()), + Arc::new(HttpJsonRpc::new(echo_url, None).unwrap()), ) }; diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 271cca26c..5532fbb34 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -1,13 +1,15 @@ //! Provides generic behaviour for multiple execution engines, specifically fallback behaviour. use crate::engine_api::{ - Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId, + EngineCapabilities, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, + PayloadId, }; use crate::HttpJsonRpc; use lru::LruCache; -use slog::{debug, error, info, Logger}; +use slog::{debug, error, info, warn, Logger}; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::{watch, Mutex, RwLock}; use tokio_stream::wrappers::WatchStream; @@ -18,6 +20,7 @@ use types::ExecutionBlockHash; /// Since the size of each value is small (~100 bytes) a large number is used for safety. /// FIXME: check this assumption now that the key includes entire payload attributes which now includes withdrawals const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512; +const CACHED_ENGINE_CAPABILITIES_AGE_LIMIT: Duration = Duration::from_secs(900); // 15 minutes /// Stores the remembered state of a engine. #[derive(Copy, Clone, PartialEq, Debug, Eq, Default)] @@ -29,6 +32,14 @@ enum EngineStateInternal { AuthFailed, } +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +enum CapabilitiesCacheAction { + #[default] + None, + Update, + Clear, +} + /// A subset of the engine state to inform other services if the engine is online or offline. #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum EngineState { @@ -231,7 +242,7 @@ impl Engine { /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// might be used to recover the node if offline. pub async fn upcheck(&self) { - let state: EngineStateInternal = match self.api.upcheck().await { + let (state, cache_action) = match self.api.upcheck().await { Ok(()) => { let mut state = self.state.write().await; if **state != EngineStateInternal::Synced { @@ -249,12 +260,12 @@ impl Engine { ); } state.update(EngineStateInternal::Synced); - **state + (**state, CapabilitiesCacheAction::Update) } Err(EngineApiError::IsSyncing) => { let mut state = self.state.write().await; state.update(EngineStateInternal::Syncing); - **state + (**state, CapabilitiesCacheAction::Update) } Err(EngineApiError::Auth(err)) => { error!( @@ -265,7 +276,7 @@ impl Engine { let mut state = self.state.write().await; state.update(EngineStateInternal::AuthFailed); - **state + (**state, CapabilitiesCacheAction::None) } Err(e) => { error!( @@ -276,10 +287,30 @@ impl Engine { let mut state = self.state.write().await; state.update(EngineStateInternal::Offline); - **state + // need to clear the engine capabilities cache if we detect the + // execution engine is offline as it is likely the engine is being + // updated to a newer version with new capabilities + (**state, CapabilitiesCacheAction::Clear) } }; + // do this after dropping state lock guard to avoid holding two locks at once + match cache_action { + CapabilitiesCacheAction::None => {} + CapabilitiesCacheAction::Update => { + if let Err(e) = self + .get_engine_capabilities(Some(CACHED_ENGINE_CAPABILITIES_AGE_LIMIT)) + .await + { + warn!(self.log, + "Error during exchange capabilities"; + "error" => ?e, + ) + } + } + CapabilitiesCacheAction::Clear => self.api.clear_exchange_capabilties_cache().await, + } + debug!( self.log, "Execution engine upcheck complete"; @@ -287,6 +318,22 @@ impl Engine { ); } + /// Returns the execution engine capabilities resulting from a call to + /// engine_exchangeCapabilities. If the capabilities cache is not populated, + /// or if it is populated with a cached result of age >= `age_limit`, this + /// method will fetch the result from the execution engine and populate the + /// cache before returning it. Otherwise it will return a cached result from + /// a previous call. + /// + /// Set `age_limit` to `None` to always return the cached result + /// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE + pub async fn get_engine_capabilities( + &self, + age_limit: Option, + ) -> Result { + self.api.get_engine_capabilities(age_limit).await + } + /// Run `func` on the node regardless of the node's current state. /// /// ## Note diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 0a1a1eef3..ad72453f1 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -7,6 +7,7 @@ use crate::payload_cache::PayloadCache; use auth::{strip_prefix, Auth, JwtKey}; use builder_client::BuilderHttpClient; +pub use engine_api::EngineCapabilities; use engine_api::Error as ApiError; pub use engine_api::*; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; @@ -265,12 +266,7 @@ pub struct ExecutionLayer { impl ExecutionLayer { /// Instantiate `Self` with an Execution engine specified in `Config`, using JSON-RPC via HTTP. - pub fn from_config( - config: Config, - executor: TaskExecutor, - log: Logger, - spec: &ChainSpec, - ) -> Result { + pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result { let Config { execution_endpoints: urls, builder_url, @@ -325,9 +321,8 @@ impl ExecutionLayer { let engine: Engine = { let auth = Auth::new(jwt_key, jwt_id, jwt_version); debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path()); - let api = - HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier, spec) - .map_err(Error::ApiError)?; + let api = HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier) + .map_err(Error::ApiError)?; Engine::new(api, executor.clone(), &log) }; @@ -1367,6 +1362,26 @@ impl ExecutionLayer { } } + /// Returns the execution engine capabilities resulting from a call to + /// engine_exchangeCapabilities. If the capabilities cache is not populated, + /// or if it is populated with a cached result of age >= `age_limit`, this + /// method will fetch the result from the execution engine and populate the + /// cache before returning it. Otherwise it will return a cached result from + /// a previous call. + /// + /// Set `age_limit` to `None` to always return the cached result + /// Set `age_limit` to `Some(Duration::ZERO)` to force fetching from EE + pub async fn get_engine_capabilities( + &self, + age_limit: Option, + ) -> Result { + self.engine() + .request(|engine| engine.get_engine_capabilities(age_limit)) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + /// Used during block production to determine if the merge has been triggered. /// /// ## Specification diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 1e0963649..31a8a5da1 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -6,20 +6,27 @@ use serde_json::Value as JsonValue; use std::sync::Arc; use types::{EthSpec, ForkName}; +pub const GENERIC_ERROR_CODE: i64 = -1234; +pub const BAD_PARAMS_ERROR_CODE: i64 = -32602; +pub const UNKNOWN_PAYLOAD_ERROR_CODE: i64 = -38001; +pub const FORK_REQUEST_MISMATCH_ERROR_CODE: i64 = -32000; + pub async fn handle_rpc( body: JsonValue, ctx: Arc>, -) -> Result { +) -> Result { *ctx.previous_request.lock() = Some(body.clone()); let method = body .get("method") .and_then(JsonValue::as_str) - .ok_or_else(|| "missing/invalid method field".to_string())?; + .ok_or_else(|| "missing/invalid method field".to_string()) + .map_err(|s| (s, GENERIC_ERROR_CODE))?; let params = body .get("params") - .ok_or_else(|| "missing/invalid params field".to_string())?; + .ok_or_else(|| "missing/invalid params field".to_string()) + .map_err(|s| (s, GENERIC_ERROR_CODE))?; match method { ETH_SYNCING => Ok(JsonValue::Bool(false)), @@ -27,7 +34,8 @@ pub async fn handle_rpc( let tag = params .get(0) .and_then(JsonValue::as_str) - .ok_or_else(|| "missing/invalid params[0] value".to_string())?; + .ok_or_else(|| "missing/invalid params[0] value".to_string()) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; match tag { "latest" => Ok(serde_json::to_value( @@ -36,7 +44,10 @@ pub async fn handle_rpc( .latest_execution_block(), ) .unwrap()), - other => Err(format!("The tag {} is not supported", other)), + other => Err(( + format!("The tag {} is not supported", other), + BAD_PARAMS_ERROR_CODE, + )), } } ETH_GET_BLOCK_BY_HASH => { @@ -47,7 +58,8 @@ pub async fn handle_rpc( .and_then(|s| { s.parse() .map_err(|e| format!("unable to parse hash: {:?}", e)) - })?; + }) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; // If we have a static response set, just return that. if let Some(response) = *ctx.static_get_block_by_hash_response.lock() { @@ -57,7 +69,8 @@ pub async fn handle_rpc( let full_tx = params .get(1) .and_then(JsonValue::as_bool) - .ok_or_else(|| "missing/invalid params[1] value".to_string())?; + .ok_or_else(|| "missing/invalid params[1] value".to_string()) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; if full_tx { Ok(serde_json::to_value( ctx.execution_block_generator @@ -76,15 +89,17 @@ pub async fn handle_rpc( } ENGINE_NEW_PAYLOAD_V1 | ENGINE_NEW_PAYLOAD_V2 => { let request = match method { - ENGINE_NEW_PAYLOAD_V1 => { - JsonExecutionPayload::V1(get_param::>(params, 0)?) - } + ENGINE_NEW_PAYLOAD_V1 => JsonExecutionPayload::V1( + get_param::>(params, 0) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?, + ), ENGINE_NEW_PAYLOAD_V2 => get_param::>(params, 0) .map(|jep| JsonExecutionPayload::V2(jep)) .or_else(|_| { get_param::>(params, 0) .map(|jep| JsonExecutionPayload::V1(jep)) - })?, + }) + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?, // TODO(4844) add that here.. _ => unreachable!(), }; @@ -97,20 +112,29 @@ pub async fn handle_rpc( match fork { ForkName::Merge => { if matches!(request, JsonExecutionPayload::V2(_)) { - return Err(format!( - "{} called with `ExecutionPayloadV2` before capella fork!", - method + return Err(( + format!( + "{} called with `ExecutionPayloadV2` before Capella fork!", + method + ), + GENERIC_ERROR_CODE, )); } } ForkName::Capella => { if method == ENGINE_NEW_PAYLOAD_V1 { - return Err(format!("{} called after capella fork!", method)); + return Err(( + format!("{} called after Capella fork!", method), + GENERIC_ERROR_CODE, + )); } if matches!(request, JsonExecutionPayload::V1(_)) { - return Err(format!( - "{} called with `ExecutionPayloadV1` after capella fork!", - method + return Err(( + format!( + "{} called with `ExecutionPayloadV1` after Capella fork!", + method + ), + GENERIC_ERROR_CODE, )); } } @@ -149,14 +173,20 @@ pub async fn handle_rpc( Ok(serde_json::to_value(JsonPayloadStatusV1::from(response)).unwrap()) } ENGINE_GET_PAYLOAD_V1 | ENGINE_GET_PAYLOAD_V2 => { - let request: JsonPayloadIdRequest = get_param(params, 0)?; + let request: JsonPayloadIdRequest = + get_param(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; let id = request.into(); let response = ctx .execution_block_generator .write() .get_payload(&id) - .ok_or_else(|| format!("no payload for id {:?}", id))?; + .ok_or_else(|| { + ( + format!("no payload for id {:?}", id), + UNKNOWN_PAYLOAD_ERROR_CODE, + ) + })?; // validate method called correctly according to shanghai fork time if ctx @@ -166,7 +196,10 @@ pub async fn handle_rpc( == ForkName::Capella && method == ENGINE_GET_PAYLOAD_V1 { - return Err(format!("{} called after capella fork!", method)); + return Err(( + format!("{} called after Capella fork!", method), + FORK_REQUEST_MISMATCH_ERROR_CODE, + )); } // TODO(4844) add 4844 error checking here @@ -195,38 +228,42 @@ pub async fn handle_rpc( } } ENGINE_FORKCHOICE_UPDATED_V1 | ENGINE_FORKCHOICE_UPDATED_V2 => { - let forkchoice_state: JsonForkchoiceStateV1 = get_param(params, 0)?; + let forkchoice_state: JsonForkchoiceStateV1 = + get_param(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; let payload_attributes = match method { ENGINE_FORKCHOICE_UPDATED_V1 => { - let jpa1: Option = get_param(params, 1)?; + let jpa1: Option = + get_param(params, 1).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; jpa1.map(JsonPayloadAttributes::V1) } ENGINE_FORKCHOICE_UPDATED_V2 => { // we can't use `deny_unknown_fields` without breaking compatibility with some // clients that haven't updated to the latest engine_api spec. So instead we'll // need to deserialize based on timestamp - get_param::>(params, 1).and_then(|pa| { - pa.and_then(|pa| { - match ctx - .execution_block_generator - .read() - .get_fork_at_timestamp(*pa.timestamp()) - { - ForkName::Merge => { - get_param::>(params, 1) - .map(|opt| opt.map(JsonPayloadAttributes::V1)) - .transpose() + get_param::>(params, 1) + .and_then(|pa| { + pa.and_then(|pa| { + match ctx + .execution_block_generator + .read() + .get_fork_at_timestamp(*pa.timestamp()) + { + ForkName::Merge => { + get_param::>(params, 1) + .map(|opt| opt.map(JsonPayloadAttributes::V1)) + .transpose() + } + ForkName::Capella => { + get_param::>(params, 1) + .map(|opt| opt.map(JsonPayloadAttributes::V2)) + .transpose() + } + _ => unreachable!(), } - ForkName::Capella => { - get_param::>(params, 1) - .map(|opt| opt.map(JsonPayloadAttributes::V2)) - .transpose() - } - _ => unreachable!(), - } + }) + .transpose() }) - .transpose() - })? + .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))? } _ => unreachable!(), }; @@ -240,20 +277,29 @@ pub async fn handle_rpc( { ForkName::Merge => { if matches!(pa, JsonPayloadAttributes::V2(_)) { - return Err(format!( - "{} called with `JsonPayloadAttributesV2` before capella fork!", - method + return Err(( + format!( + "{} called with `JsonPayloadAttributesV2` before Capella fork!", + method + ), + GENERIC_ERROR_CODE, )); } } ForkName::Capella => { if method == ENGINE_FORKCHOICE_UPDATED_V1 { - return Err(format!("{} called after capella fork!", method)); + return Err(( + format!("{} called after Capella fork!", method), + FORK_REQUEST_MISMATCH_ERROR_CODE, + )); } if matches!(pa, JsonPayloadAttributes::V1(_)) { - return Err(format!( - "{} called with `JsonPayloadAttributesV1` after capella fork!", - method + return Err(( + format!( + "{} called with `JsonPayloadAttributesV1` after Capella fork!", + method + ), + FORK_REQUEST_MISMATCH_ERROR_CODE, )); } } @@ -281,10 +327,14 @@ pub async fn handle_rpc( return Ok(serde_json::to_value(response).unwrap()); } - let mut response = ctx.execution_block_generator.write().forkchoice_updated( - forkchoice_state.into(), - payload_attributes.map(|json| json.into()), - )?; + let mut response = ctx + .execution_block_generator + .write() + .forkchoice_updated( + forkchoice_state.into(), + payload_attributes.map(|json| json.into()), + ) + .map_err(|s| (s, GENERIC_ERROR_CODE))?; if let Some(mut status) = ctx.static_forkchoice_updated_response.lock().clone() { if status.status == PayloadStatusV1Status::Valid { @@ -305,9 +355,13 @@ pub async fn handle_rpc( }; Ok(serde_json::to_value(transition_config).unwrap()) } - other => Err(format!( - "The method {} does not exist/is not available", - other + ENGINE_EXCHANGE_CAPABILITIES => { + let engine_capabilities = ctx.engine_capabilities.read(); + Ok(serde_json::to_value(engine_capabilities.to_response()).unwrap()) + } + other => Err(( + format!("The method {} does not exist/is not available", other), + METHOD_NOT_FOUND_CODE, )), } } diff --git a/beacon_node/execution_layer/src/test_utils/mock_builder.rs b/beacon_node/execution_layer/src/test_utils/mock_builder.rs index 8ce4a6556..06b5e81eb 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_builder.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_builder.rs @@ -84,8 +84,7 @@ impl TestingBuilder { }; let el = - ExecutionLayer::from_config(config, executor.clone(), executor.log().clone(), &spec) - .unwrap(); + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); // This should probably be done for all fields, we only update ones we are testing with so far. let mut context = Context::for_mainnet(); diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index d061f13a6..ad73b2b4e 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -73,8 +73,7 @@ impl MockExecutionLayer { ..Default::default() }; let el = - ExecutionLayer::from_config(config, executor.clone(), executor.log().clone(), &spec) - .unwrap(); + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); Self { server, @@ -106,7 +105,7 @@ impl MockExecutionLayer { prev_randao, Address::repeat_byte(42), // FIXME: think about how to handle different forks / withdrawals here.. - Some(vec![]), + None, ); // Insert a proposer to ensure the fork choice updated command works. diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index bad02e369..adf9358f0 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -22,6 +22,7 @@ use tokio::{runtime, sync::oneshot}; use types::{EthSpec, ExecutionBlockHash, Uint256}; use warp::{http::StatusCode, Filter, Rejection}; +use crate::EngineCapabilities; pub use execution_block_generator::{generate_pow_block, Block, ExecutionBlockGenerator}; pub use hook::Hook; pub use mock_builder::{Context as MockBuilderContext, MockBuilder, Operation, TestingBuilder}; @@ -31,6 +32,15 @@ pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; pub const DEFAULT_JWT_SECRET: [u8; 32] = [42; 32]; pub const DEFAULT_BUILDER_THRESHOLD_WEI: u128 = 1_000_000_000_000_000_000; +pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { + new_payload_v1: true, + new_payload_v2: true, + forkchoice_updated_v1: true, + forkchoice_updated_v2: true, + get_payload_v1: true, + get_payload_v2: true, + exchange_transition_configuration_v1: true, +}; mod execution_block_generator; mod handle_rpc; @@ -117,6 +127,7 @@ impl MockServer { hook: <_>::default(), new_payload_statuses: <_>::default(), fcu_payload_statuses: <_>::default(), + engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)), _phantom: PhantomData, }); @@ -147,6 +158,10 @@ impl MockServer { } } + pub fn set_engine_capabilities(&self, engine_capabilities: EngineCapabilities) { + *self.ctx.engine_capabilities.write() = engine_capabilities; + } + pub fn new( handle: &runtime::Handle, jwt_key: JwtKey, @@ -469,6 +484,7 @@ pub struct Context { pub new_payload_statuses: Arc>>, pub fcu_payload_statuses: Arc>>, + pub engine_capabilities: Arc>, pub _phantom: PhantomData, } @@ -620,11 +636,11 @@ pub fn serve( "jsonrpc": JSONRPC_VERSION, "result": result }), - Err(message) => json!({ + Err((message, code)) => json!({ "id": id, "jsonrpc": JSONRPC_VERSION, "error": { - "code": -1234, // Junk error code. + "code": code, "message": message } }), diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 2daacb0ad..fe7e51e92 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -127,7 +127,7 @@ impl TestRig { ..Default::default() }; let execution_layer = - ExecutionLayer::from_config(config, executor.clone(), log.clone(), &spec).unwrap(); + ExecutionLayer::from_config(config, executor.clone(), log.clone()).unwrap(); ExecutionPair { execution_engine, execution_layer, @@ -146,7 +146,7 @@ impl TestRig { ..Default::default() }; let execution_layer = - ExecutionLayer::from_config(config, executor, log.clone(), &spec).unwrap(); + ExecutionLayer::from_config(config, executor, log.clone()).unwrap(); ExecutionPair { execution_engine, execution_layer,