diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index a1e769e3e..7e04a3fac 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -71,8 +71,6 @@ impl From for Error { } } -pub struct EngineApi; - #[derive(Clone, Copy, Debug, PartialEq)] pub enum PayloadStatusV1Status { Valid, diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 832771460..c4811e04c 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -7,7 +7,6 @@ use reqwest::header::CONTENT_TYPE; use sensitive_url::SensitiveUrl; use serde::de::DeserializeOwned; use serde_json::json; -use std::marker::PhantomData; use std::time::Duration; use types::EthSpec; @@ -169,7 +168,7 @@ pub mod deposit_log { /// state of the deposit contract. pub mod deposit_methods { use super::Log; - use crate::{EngineApi, HttpJsonRpc}; + use crate::HttpJsonRpc; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::fmt; @@ -298,7 +297,7 @@ pub mod deposit_methods { } } - impl HttpJsonRpc { + impl HttpJsonRpc { /// Get the eth1 chain id of the given endpoint. pub async fn get_chain_id(&self, timeout: Duration) -> Result { let chain_id: String = self @@ -517,20 +516,18 @@ pub mod deposit_methods { } } -pub struct HttpJsonRpc { +pub struct HttpJsonRpc { pub client: Client, pub url: SensitiveUrl, auth: Option, - _phantom: PhantomData, } -impl HttpJsonRpc { +impl HttpJsonRpc { pub fn new(url: SensitiveUrl) -> Result { Ok(Self { client: Client::builder().build()?, url, auth: None, - _phantom: PhantomData, }) } @@ -539,7 +536,6 @@ impl HttpJsonRpc { client: Client::builder().build()?, url, auth: Some(auth), - _phantom: PhantomData, }) } @@ -592,7 +588,7 @@ impl std::fmt::Display for HttpJsonRpc { } } -impl HttpJsonRpc { +impl HttpJsonRpc { pub async fn upcheck(&self) -> Result<(), Error> { let result: serde_json::Value = self .rpc_request(ETH_SYNCING, json!([]), ETH_SYNCING_TIMEOUT) diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 34eef8a3f..d44d81c67 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -1,7 +1,7 @@ //! Provides generic behaviour for multiple execution engines, specifically fallback behaviour. use crate::engine_api::{ - EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId, + Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId, }; use crate::HttpJsonRpc; use lru::LruCache; @@ -55,20 +55,32 @@ struct PayloadIdCacheKey { pub suggested_fee_recipient: Address, } -/// An execution engine. -pub struct Engine { - pub api: HttpJsonRpc, - payload_id_cache: Mutex>, - state: RwLock, +#[derive(Debug)] +pub enum EngineError { + Offline, + Api { error: EngineApiError }, + BuilderApi { error: EngineApiError }, + Auth, } -impl Engine { +/// An execution engine. +pub struct Engine { + pub api: HttpJsonRpc, + payload_id_cache: Mutex>, + state: RwLock, + pub latest_forkchoice_state: RwLock>, + pub log: Logger, +} + +impl Engine { /// Creates a new, offline engine. - pub fn new(api: HttpJsonRpc) -> Self { + pub fn new(api: HttpJsonRpc, log: &Logger) -> Self { Self { api, payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)), state: RwLock::new(EngineState::Offline), + latest_forkchoice_state: Default::default(), + log: log.clone(), } } @@ -90,9 +102,7 @@ impl Engine { }) .cloned() } -} -impl Engine { pub async fn notify_forkchoice_updated( &self, forkchoice_state: ForkChoiceState, @@ -120,26 +130,7 @@ impl Engine { Ok(response) } -} -// This structure used to hold multiple execution engines managed in a fallback manner. This -// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this -// struct will likely be removed in the future. -pub struct Engines { - pub engine: Engine, - pub latest_forkchoice_state: RwLock>, - pub log: Logger, -} - -#[derive(Debug)] -pub enum EngineError { - Offline, - Api { error: EngineApiError }, - BuilderApi { error: EngineApiError }, - Auth, -} - -impl Engines { async fn get_latest_forkchoice_state(&self) -> Option { *self.latest_forkchoice_state.read().await } @@ -169,12 +160,7 @@ impl Engines { // For simplicity, payload attributes are never included in this call. It may be // reasonable to include them in the future. - if let Err(e) = self - .engine - .api - .forkchoice_updated_v1(forkchoice_state, None) - .await - { + if let Err(e) = self.api.forkchoice_updated_v1(forkchoice_state, None).await { debug!( self.log, "Failed to issue latest head to engine"; @@ -191,14 +177,14 @@ impl Engines { /// Returns `true` if the engine has a "synced" status. pub async fn is_synced(&self) -> bool { - *self.engine.state.read().await == EngineState::Synced + *self.state.read().await == EngineState::Synced } /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// might be used to recover the node if offline. pub async fn upcheck_not_synced(&self, logging: Logging) { - let mut state_lock = self.engine.state.write().await; + let mut state_lock = self.state.write().await; if *state_lock != EngineState::Synced { - match self.engine.api.upcheck().await { + match self.api.upcheck().await { Ok(()) => { if logging.is_enabled() { info!( @@ -261,7 +247,7 @@ impl Engines { /// upcheck it and then run the function again. pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result where - F: Fn(&'a Engine) -> G + Copy, + F: Fn(&'a Engine) -> G + Copy, G: Future>, { match self.first_success_without_retry(func).await { @@ -282,18 +268,18 @@ impl Engines { func: F, ) -> Result where - F: Fn(&'a Engine) -> G, + F: Fn(&'a Engine) -> G, G: Future>, { let (engine_synced, engine_auth_failed) = { - let state = self.engine.state.read().await; + let state = self.state.read().await; ( *state == EngineState::Synced, *state == EngineState::AuthFailed, ) }; if engine_synced { - match func(&self.engine).await { + match func(self).await { Ok(result) => Ok(result), Err(error) => { debug!( @@ -301,7 +287,7 @@ impl Engines { "Execution engine call failed"; "error" => ?error, ); - *self.engine.state.write().await = EngineState::Offline; + *self.state.write().await = EngineState::Offline; Err(EngineError::Api { error }) } } @@ -318,7 +304,7 @@ impl Engines { /// it runs, it will try to upcheck all offline nodes and then run the function again. pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result where - F: Fn(&'a Engine) -> G + Copy, + F: Fn(&'a Engine) -> G + Copy, G: Future>, { match self.broadcast_without_retry(func).await { @@ -333,14 +319,14 @@ impl Engines { /// Runs `func` on the node if it's last state is not offline. pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result where - F: Fn(&'a Engine) -> G, + F: Fn(&'a Engine) -> G, G: Future>, { let func = &func; - if *self.engine.state.read().await == EngineState::Offline { + if *self.state.read().await == EngineState::Offline { Err(EngineError::Offline) } else { - match func(&self.engine).await { + match func(self).await { Ok(res) => Ok(res), Err(error) => { debug!( @@ -348,7 +334,7 @@ impl Engines { "Execution engine call failed"; "error" => ?error, ); - *self.engine.state.write().await = EngineState::Offline; + *self.state.write().await = EngineState::Offline; Err(EngineError::Api { error }) } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 8897f8f67..47424ca0f 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -10,7 +10,7 @@ use engine_api::Error as ApiError; pub use engine_api::*; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; pub use engines::ForkChoiceState; -use engines::{Engine, EngineError, Engines, Logging}; +use engines::{Engine, EngineError, Logging}; use lru::LruCache; use payload_status::process_payload_status; pub use payload_status::PayloadStatus; @@ -64,7 +64,7 @@ const CONFIG_POLL_INTERVAL: Duration = Duration::from_secs(60); #[derive(Debug)] pub enum Error { - NoEngines, + NoEngine, NoPayloadBuilder, ApiError(ApiError), Builder(builder_client::Error), @@ -101,7 +101,7 @@ pub struct Proposer { } struct Inner { - engines: Engines, + engine: Engine, builder: Option, execution_engine_forkchoice_lock: Mutex<()>, suggested_fee_recipient: Option
, @@ -162,7 +162,7 @@ impl ExecutionLayer { if urls.len() > 1 { warn!(log, "Only the first execution engine url will be used"); } - let execution_url = urls.into_iter().next().ok_or(Error::NoEngines)?; + let execution_url = urls.into_iter().next().ok_or(Error::NoEngine)?; // Use the default jwt secret path if not provided via cli. let secret_file = secret_files @@ -198,12 +198,11 @@ impl ExecutionLayer { .map_err(Error::InvalidJWTSecret) }?; - let engine: Engine = { + let engine: Engine = { let auth = Auth::new(jwt_key, jwt_id, jwt_version); debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path()); - let api = HttpJsonRpc::::new_with_auth(execution_url, auth) - .map_err(Error::ApiError)?; - Engine::::new(api) + let api = HttpJsonRpc::new_with_auth(execution_url, auth).map_err(Error::ApiError)?; + Engine::new(api, &log) }; let builder = builder_url @@ -211,11 +210,7 @@ impl ExecutionLayer { .transpose()?; let inner = Inner { - engines: Engines { - engine, - latest_forkchoice_state: <_>::default(), - log: log.clone(), - }, + engine, builder, execution_engine_forkchoice_lock: <_>::default(), suggested_fee_recipient, @@ -234,8 +229,8 @@ impl ExecutionLayer { } impl ExecutionLayer { - fn engines(&self) -> &Engines { - &self.inner.engines + fn engines(&self) -> &Engine { + &self.inner.engine } pub fn builder(&self) -> &Option { @@ -1004,7 +999,7 @@ impl ExecutionLayer { /// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/validator.md async fn get_pow_block_hash_at_total_difficulty( &self, - engine: &Engine, + engine: &Engine, spec: &ChainSpec, ) -> Result, ApiError> { let mut block = engine @@ -1118,7 +1113,7 @@ impl ExecutionLayer { /// https://github.com/ethereum/consensus-specs/issues/2636 async fn get_pow_block( &self, - engine: &Engine, + engine: &Engine, hash: ExecutionBlockHash, ) -> Result, ApiError> { if let Some(cached) = self.execution_blocks().await.get(&hash).copied() { @@ -1153,7 +1148,7 @@ impl ExecutionLayer { async fn get_payload_by_block_hash_from_engine( &self, - engine: &Engine, + engine: &Engine, hash: ExecutionBlockHash, ) -> Result>, ApiError> { let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BY_BLOCK_HASH);