diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 88c94162f..34eef8a3f 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -57,7 +57,6 @@ struct PayloadIdCacheKey { /// An execution engine. pub struct Engine { - pub id: String, pub api: HttpJsonRpc, payload_id_cache: Mutex>, state: RwLock, @@ -65,9 +64,8 @@ pub struct Engine { impl Engine { /// Creates a new, offline engine. - pub fn new(id: String, api: HttpJsonRpc) -> Self { + pub fn new(api: HttpJsonRpc) -> Self { Self { - id, api, payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)), state: RwLock::new(EngineState::Offline), @@ -135,10 +133,10 @@ pub struct Engines { #[derive(Debug)] pub enum EngineError { - Offline { id: String }, - Api { id: String, error: EngineApiError }, + Offline, + Api { error: EngineApiError }, BuilderApi { error: EngineApiError }, - Auth { id: String }, + Auth, } impl Engines { @@ -159,7 +157,6 @@ impl Engines { self.log, "No need to call forkchoiceUpdated"; "msg" => "head does not have execution enabled", - "id" => &self.engine.id, ); return; } @@ -168,7 +165,6 @@ impl Engines { self.log, "Issuing forkchoiceUpdated"; "forkchoice_state" => ?forkchoice_state, - "id" => &self.engine.id, ); // For simplicity, payload attributes are never included in this call. It may be @@ -183,14 +179,12 @@ impl Engines { self.log, "Failed to issue latest head to engine"; "error" => ?e, - "id" => &self.engine.id, ); } } else { debug!( self.log, "No head, not sending to engine"; - "id" => &self.engine.id, ); } } @@ -261,45 +255,36 @@ impl Engines { } } - /// Run `func` on all engines, in the order in which they are defined, returning the first - /// successful result that is found. + /// Run `func` on the node. /// - /// This function might try to run `func` twice. If all nodes return an error on the first time - /// it runs, it will try to upcheck all offline nodes and then run the function again. - pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result> + /// This function might try to run `func` twice. If the node returns an error it will try to + /// upcheck it and then run the function again. + pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result where F: Fn(&'a Engine) -> G + Copy, G: Future>, { match self.first_success_without_retry(func).await { Ok(result) => Ok(result), - Err(mut first_errors) => { - // Try to recover some nodes. + Err(e) => { + debug!(self.log, "First engine call failed. Retrying"; "err" => ?e); + // Try to recover the node. self.upcheck_not_synced(Logging::Enabled).await; - // Retry the call on all nodes. - match self.first_success_without_retry(func).await { - Ok(result) => Ok(result), - Err(second_errors) => { - first_errors.extend(second_errors); - Err(first_errors) - } - } + // Try again. + self.first_success_without_retry(func).await } } } - /// Run `func` on all engines, in the order in which they are defined, returning the first - /// successful result that is found. + /// Run `func` on the node. pub async fn first_success_without_retry<'a, F, G, H>( &'a self, func: F, - ) -> Result> + ) -> Result where F: Fn(&'a Engine) -> G, G: Future>, { - let mut errors = vec![]; - let (engine_synced, engine_auth_failed) = { let state = self.engine.state.read().await; ( @@ -309,32 +294,22 @@ impl Engines { }; if engine_synced { match func(&self.engine).await { - Ok(result) => return Ok(result), + Ok(result) => Ok(result), Err(error) => { debug!( self.log, "Execution engine call failed"; "error" => ?error, - "id" => &&self.engine.id ); *self.engine.state.write().await = EngineState::Offline; - errors.push(EngineError::Api { - id: self.engine.id.clone(), - error, - }) + Err(EngineError::Api { error }) } } } else if engine_auth_failed { - errors.push(EngineError::Auth { - id: self.engine.id.clone(), - }) + Err(EngineError::Auth) } else { - errors.push(EngineError::Offline { - id: self.engine.id.clone(), - }) + Err(EngineError::Offline) } - - Err(errors) } /// Runs `func` on the node. @@ -363,9 +338,7 @@ impl Engines { { let func = &func; if *self.engine.state.read().await == EngineState::Offline { - Err(EngineError::Offline { - id: self.engine.id.clone(), - }) + Err(EngineError::Offline) } else { match func(&self.engine).await { Ok(res) => Ok(res), @@ -376,10 +349,7 @@ impl Engines { "error" => ?error, ); *self.engine.state.write().await = EngineState::Offline; - Err(EngineError::Api { - id: self.engine.id.clone(), - error, - }) + Err(EngineError::Api { error }) } } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 61f1c569d..8897f8f67 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -12,7 +12,7 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; pub use engines::ForkChoiceState; use engines::{Engine, EngineError, Engines, Logging}; use lru::LruCache; -use payload_status::process_multiple_payload_statuses; +use payload_status::process_payload_status; pub use payload_status::PayloadStatus; use sensitive_url::SensitiveUrl; use serde::{Deserialize, Serialize}; @@ -68,11 +68,10 @@ pub enum Error { NoPayloadBuilder, ApiError(ApiError), Builder(builder_client::Error), - EngineErrors(Vec), + EngineError(Box), NotSynced, ShuttingDown, FeeRecipientUnspecified, - ConsensusFailure, MissingLatestValidHash, InvalidJWTSecret(String), } @@ -200,12 +199,11 @@ impl ExecutionLayer { }?; let engine: Engine = { - let id = execution_url.to_string(); let auth = Auth::new(jwt_key, jwt_id, jwt_version); - debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?secret_file.as_path()); + debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path()); let api = HttpJsonRpc::::new_with_auth(execution_url, auth) .map_err(Error::ApiError)?; - Engine::::new(id, api) + Engine::::new(api) }; let builder = builder_url @@ -709,7 +707,8 @@ impl ExecutionLayer { }) }) .await - .map_err(Error::EngineErrors) + .map_err(Box::new) + .map_err(Error::EngineError) } /// Maps to the `engine_newPayload` JSON-RPC call. @@ -742,16 +741,14 @@ impl ExecutionLayer { "block_number" => execution_payload.block_number, ); - let broadcast_results = self + let broadcast_result = self .engines() .broadcast(|engine| engine.api.new_payload_v1(execution_payload.clone())) .await; - process_multiple_payload_statuses( - execution_payload.block_hash, - Some(broadcast_results).into_iter(), - self.log(), - ) + process_payload_status(execution_payload.block_hash, broadcast_result, self.log()) + .map_err(Box::new) + .map_err(Error::EngineError) } /// Register that the given `validator_index` is going to produce a block at `slot`. @@ -879,7 +876,7 @@ impl ExecutionLayer { .set_latest_forkchoice_state(forkchoice_state) .await; - let broadcast_results = self + let broadcast_result = self .engines() .broadcast(|engine| async move { engine @@ -888,13 +885,13 @@ impl ExecutionLayer { }) .await; - process_multiple_payload_statuses( + process_payload_status( head_block_hash, - Some(broadcast_results) - .into_iter() - .map(|result| result.map(|response| response.payload_status)), + broadcast_result.map(|response| response.payload_status), self.log(), ) + .map_err(Box::new) + .map_err(Error::EngineError) } pub async fn exchange_transition_configuration(&self, spec: &ChainSpec) -> Result<(), Error> { @@ -909,9 +906,6 @@ impl ExecutionLayer { .broadcast(|engine| engine.api.exchange_transition_configuration_v1(local)) .await; - let mut errors = vec![]; - // Having no fallbacks, the id of the used node is 0 - let i = 0usize; match broadcast_result { Ok(remote) => { if local.terminal_total_difficulty != remote.terminal_total_difficulty @@ -922,20 +916,18 @@ impl ExecutionLayer { "Execution client config mismatch"; "msg" => "ensure lighthouse and the execution client are up-to-date and \ configured consistently", - "execution_endpoint" => i, "remote" => ?remote, "local" => ?local, ); - errors.push(EngineError::Api { - id: i.to_string(), + Err(Error::EngineError(Box::new(EngineError::Api { error: ApiError::TransitionConfigurationMismatch, - }); + }))) } else { debug!( self.log(), "Execution client config is OK"; - "execution_endpoint" => i ); + Ok(()) } } Err(e) => { @@ -943,17 +935,10 @@ impl ExecutionLayer { self.log(), "Unable to get transition config"; "error" => ?e, - "execution_endpoint" => i, ); - errors.push(e); + Err(Error::EngineError(Box::new(e))) } } - - if errors.is_empty() { - Ok(()) - } else { - Err(Error::EngineErrors(errors)) - } } /// Used during block production to determine if the merge has been triggered. @@ -992,7 +977,8 @@ impl ExecutionLayer { .await }) .await - .map_err(Error::EngineErrors)?; + .map_err(Box::new) + .map_err(Error::EngineError)?; if let Some(hash) = &hash_opt { info!( @@ -1102,7 +1088,8 @@ impl ExecutionLayer { Ok(None) }) .await - .map_err(|e| Error::EngineErrors(vec![e])) + .map_err(Box::new) + .map_err(Error::EngineError) } /// This function should remain internal. @@ -1160,7 +1147,8 @@ impl ExecutionLayer { .await }) .await - .map_err(Error::EngineErrors) + .map_err(Box::new) + .map_err(Error::EngineError) } async fn get_payload_by_block_hash_from_engine( diff --git a/beacon_node/execution_layer/src/payload_status.rs b/beacon_node/execution_layer/src/payload_status.rs index e0b1a01b4..46917a0aa 100644 --- a/beacon_node/execution_layer/src/payload_status.rs +++ b/beacon_node/execution_layer/src/payload_status.rs @@ -1,7 +1,6 @@ use crate::engine_api::{Error as ApiError, PayloadStatusV1, PayloadStatusV1Status}; use crate::engines::EngineError; -use crate::Error; -use slog::{crit, warn, Logger}; +use slog::{warn, Logger}; use types::ExecutionBlockHash; /// Provides a simpler, easier to parse version of `PayloadStatusV1` for upstream users. @@ -24,168 +23,117 @@ pub enum PayloadStatus { }, } -/// Processes the responses from multiple execution engines, finding the "best" status and returning -/// it (if any). -/// -/// This function has the following basic goals: -/// -/// - Detect a consensus failure between nodes. -/// - Find the most-synced node by preferring a definite response (valid/invalid) over a -/// syncing/accepted response or error. -/// -/// # Details -/// -/// - If there are conflicting valid/invalid responses, always return an error. -/// - If there are syncing/accepted responses but valid/invalid responses exist, return the -/// valid/invalid responses since they're definite. -/// - If there are multiple valid responses, return the first one processed. -/// - If there are multiple invalid responses, return the first one processed. -/// - Syncing/accepted responses are grouped, if there are multiple of them, return the first one -/// processed. -/// - If there are no responses (only errors or nothing), return an error. -pub fn process_multiple_payload_statuses( +/// Processes the response from the execution engine. +pub fn process_payload_status( head_block_hash: ExecutionBlockHash, - statuses: impl Iterator>, + status: Result, log: &Logger, -) -> Result { - let mut errors = vec![]; - let mut valid_statuses = vec![]; - let mut invalid_statuses = vec![]; - let mut other_statuses = vec![]; - - for status in statuses { - match status { - Err(e) => errors.push(e), - Ok(response) => match &response.status { - PayloadStatusV1Status::Valid => { - if response - .latest_valid_hash - .map_or(false, |h| h == head_block_hash) - { - // The response is only valid if `latest_valid_hash` is not `null` and - // equal to the provided `block_hash`. - valid_statuses.push(PayloadStatus::Valid) - } else { - errors.push(EngineError::Api { - id: "unknown".to_string(), - error: ApiError::BadResponse( - format!( - "new_payload: response.status = VALID but invalid latest_valid_hash. Expected({:?}) Found({:?})", - head_block_hash, - response.latest_valid_hash, - ) - ), - }); - } - } - PayloadStatusV1Status::Invalid => { - if let Some(latest_valid_hash) = response.latest_valid_hash { - // The response is only valid if `latest_valid_hash` is not `null`. - invalid_statuses.push(PayloadStatus::Invalid { - latest_valid_hash, - validation_error: response.validation_error.clone(), - }) - } else { - errors.push(EngineError::Api { - id: "unknown".to_string(), - error: ApiError::BadResponse( - "new_payload: response.status = INVALID but null latest_valid_hash" - .to_string(), - ), - }); - } - } - PayloadStatusV1Status::InvalidBlockHash => { - // In the interests of being liberal with what we accept, only raise a - // warning here. - if response.latest_valid_hash.is_some() { - warn!( - log, - "Malformed response from execution engine"; - "msg" => "expected a null latest_valid_hash", - "status" => ?response.status - ) - } - - invalid_statuses.push(PayloadStatus::InvalidBlockHash { - validation_error: response.validation_error.clone(), - }); - } - PayloadStatusV1Status::InvalidTerminalBlock => { - // In the interests of being liberal with what we accept, only raise a - // warning here. - if response.latest_valid_hash.is_some() { - warn!( - log, - "Malformed response from execution engine"; - "msg" => "expected a null latest_valid_hash", - "status" => ?response.status - ) - } - - invalid_statuses.push(PayloadStatus::InvalidTerminalBlock { - validation_error: response.validation_error.clone(), - }); - } - PayloadStatusV1Status::Syncing => { - // In the interests of being liberal with what we accept, only raise a - // warning here. - if response.latest_valid_hash.is_some() { - warn!( - log, - "Malformed response from execution engine"; - "msg" => "expected a null latest_valid_hash", - "status" => ?response.status - ) - } - - other_statuses.push(PayloadStatus::Syncing) - } - PayloadStatusV1Status::Accepted => { - // In the interests of being liberal with what we accept, only raise a - // warning here. - if response.latest_valid_hash.is_some() { - warn!( - log, - "Malformed response from execution engine"; - "msg" => "expected a null latest_valid_hash", - "status" => ?response.status - ) - } - - other_statuses.push(PayloadStatus::Accepted) - } - }, - } - } - - if !valid_statuses.is_empty() && !invalid_statuses.is_empty() { - crit!( - log, - "Consensus failure between execution nodes"; - "invalid_statuses" => ?invalid_statuses, - "valid_statuses" => ?valid_statuses, - ); - - // Choose to exit and ignore the valid response. This preferences correctness over - // liveness. - return Err(Error::ConsensusFailure); - } - - // Log any errors to assist with troubleshooting. - for error in &errors { - warn!( +) -> Result { + match status { + Err(error) => { + warn!( log, "Error whilst processing payload status"; "error" => ?error, - ); - } + ); + Err(error) + } + Ok(response) => match &response.status { + PayloadStatusV1Status::Valid => { + if response + .latest_valid_hash + .map_or(false, |h| h == head_block_hash) + { + // The response is only valid if `latest_valid_hash` is not `null` and + // equal to the provided `block_hash`. + Ok(PayloadStatus::Valid) + } else { + let error = format!( + "new_payload: response.status = VALID but invalid latest_valid_hash. Expected({:?}) Found({:?})", + head_block_hash, + response.latest_valid_hash + ); + Err(EngineError::Api { + error: ApiError::BadResponse(error), + }) + } + } + PayloadStatusV1Status::Invalid => { + if let Some(latest_valid_hash) = response.latest_valid_hash { + // The response is only valid if `latest_valid_hash` is not `null`. + Ok(PayloadStatus::Invalid { + latest_valid_hash, + validation_error: response.validation_error.clone(), + }) + } else { + Err(EngineError::Api { + error: ApiError::BadResponse( + "new_payload: response.status = INVALID but null latest_valid_hash" + .to_string(), + ), + }) + } + } + PayloadStatusV1Status::InvalidBlockHash => { + // In the interests of being liberal with what we accept, only raise a + // warning here. + if response.latest_valid_hash.is_some() { + warn!( + log, + "Malformed response from execution engine"; + "msg" => "expected a null latest_valid_hash", + "status" => ?response.status + ) + } - valid_statuses - .first() - .or_else(|| invalid_statuses.first()) - .or_else(|| other_statuses.first()) - .cloned() - .map(Result::Ok) - .unwrap_or_else(|| Err(Error::EngineErrors(errors))) + Ok(PayloadStatus::InvalidBlockHash { + validation_error: response.validation_error.clone(), + }) + } + PayloadStatusV1Status::InvalidTerminalBlock => { + // In the interests of being liberal with what we accept, only raise a + // warning here. + if response.latest_valid_hash.is_some() { + warn!( + log, + "Malformed response from execution engine"; + "msg" => "expected a null latest_valid_hash", + "status" => ?response.status + ) + } + + Ok(PayloadStatus::InvalidTerminalBlock { + validation_error: response.validation_error.clone(), + }) + } + PayloadStatusV1Status::Syncing => { + // In the interests of being liberal with what we accept, only raise a + // warning here. + if response.latest_valid_hash.is_some() { + warn!( + log, + "Malformed response from execution engine"; + "msg" => "expected a null latest_valid_hash", + "status" => ?response.status + ) + } + + Ok(PayloadStatus::Syncing) + } + PayloadStatusV1Status::Accepted => { + // In the interests of being liberal with what we accept, only raise a + // warning here. + if response.latest_valid_hash.is_some() { + warn!( + log, + "Malformed response from execution engine"; + "msg" => "expected a null latest_valid_hash", + "status" => ?response.status + ) + } + + Ok(PayloadStatus::Accepted) + } + }, + } }