diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 719db74c5..d3c4d0e42 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -156,10 +156,11 @@ impl Builder for Engine { } } -/// Holds multiple execution engines and provides functionality for managing them in a fallback -/// manner. +// This structure used to hold multiple execution engines managed in a fallback manner. This +// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this +// struct will likely be removed in the future. pub struct Engines { - pub engines: Vec>, + pub engine: Engine, pub latest_forkchoice_state: RwLock>, pub log: Logger, } @@ -185,7 +186,7 @@ impl Engines { *self.latest_forkchoice_state.write().await = Some(state); } - async fn send_latest_forkchoice_state(&self, engine: &Engine) { + async fn send_latest_forkchoice_state(&self) { let latest_forkchoice_state = self.get_latest_forkchoice_state().await; if let Some(forkchoice_state) = latest_forkchoice_state { @@ -194,7 +195,7 @@ impl Engines { self.log, "No need to call forkchoiceUpdated"; "msg" => "head does not have execution enabled", - "id" => &engine.id, + "id" => &self.engine.id, ); return; } @@ -203,12 +204,13 @@ impl Engines { self.log, "Issuing forkchoiceUpdated"; "forkchoice_state" => ?forkchoice_state, - "id" => &engine.id, + "id" => &self.engine.id, ); // For simplicity, payload attributes are never included in this call. It may be // reasonable to include them in the future. - if let Err(e) = engine + if let Err(e) = self + .engine .api .forkchoice_updated_v1(forkchoice_state, None) .await @@ -217,98 +219,77 @@ impl Engines { self.log, "Failed to issue latest head to engine"; "error" => ?e, - "id" => &engine.id, + "id" => &self.engine.id, ); } } else { debug!( self.log, "No head, not sending to engine"; - "id" => &engine.id, + "id" => &self.engine.id, ); } } - /// Returns `true` if there is at least one engine with a "synced" status. - pub async fn any_synced(&self) -> bool { - for engine in &self.engines { - if *engine.state.read().await == EngineState::Synced { - return true; - } - } - false + /// Returns `true` if the engine has a "synced" status. + pub async fn is_synced(&self) -> bool { + *self.engine.state.read().await == EngineState::Synced } - - /// Run the `EngineApi::upcheck` function on all nodes which are currently offline. - /// - /// This can be used to try and recover any offline nodes. + /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This + /// might be used to recover the node if offline. pub async fn upcheck_not_synced(&self, logging: Logging) { - let upcheck_futures = self.engines.iter().map(|engine| async move { - let mut state_lock = engine.state.write().await; - if *state_lock != EngineState::Synced { - match engine.api.upcheck().await { - Ok(()) => { - if logging.is_enabled() { - info!( - self.log, - "Execution engine online"; - "id" => &engine.id - ); - } - - // Send the node our latest forkchoice_state. - self.send_latest_forkchoice_state(engine).await; - - *state_lock = EngineState::Synced + let mut state_lock = self.engine.state.write().await; + if *state_lock != EngineState::Synced { + match self.engine.api.upcheck().await { + Ok(()) => { + if logging.is_enabled() { + info!( + self.log, + "Execution engine online"; + ); } - Err(EngineApiError::IsSyncing) => { - if logging.is_enabled() { - warn!( - self.log, - "Execution engine syncing"; - "id" => &engine.id - ) - } + // Send the node our latest forkchoice_state. + self.send_latest_forkchoice_state().await; - // Send the node our latest forkchoice_state, it may assist with syncing. - self.send_latest_forkchoice_state(engine).await; - - *state_lock = EngineState::Syncing + *state_lock = EngineState::Synced + } + Err(EngineApiError::IsSyncing) => { + if logging.is_enabled() { + warn!( + self.log, + "Execution engine syncing"; + ) } - Err(EngineApiError::Auth(err)) => { - if logging.is_enabled() { - warn!( - self.log, - "Failed jwt authorization"; - "error" => ?err, - "id" => &engine.id - ); - } - *state_lock = EngineState::AuthFailed + // Send the node our latest forkchoice_state, it may assist with syncing. + self.send_latest_forkchoice_state().await; + + *state_lock = EngineState::Syncing + } + Err(EngineApiError::Auth(err)) => { + if logging.is_enabled() { + warn!( + self.log, + "Failed jwt authorization"; + "error" => ?err, + ); } - Err(e) => { - if logging.is_enabled() { - warn!( - self.log, - "Execution engine offline"; - "error" => ?e, - "id" => &engine.id - ) - } + + *state_lock = EngineState::AuthFailed + } + Err(e) => { + if logging.is_enabled() { + warn!( + self.log, + "Execution engine offline"; + "error" => ?e, + ) } } } - *state_lock - }); + } - let num_synced = join_all(upcheck_futures) - .await - .into_iter() - .filter(|state: &EngineState| *state == EngineState::Synced) - .count(); - - if num_synced == 0 && logging.is_enabled() { + if *state_lock != EngineState::Synced && logging.is_enabled() { crit!( self.log, "No synced execution engines"; @@ -355,111 +336,89 @@ impl Engines { { let mut errors = vec![]; - for engine in &self.engines { - let (engine_synced, engine_auth_failed) = { - let state = engine.state.read().await; - ( - *state == EngineState::Synced, - *state == EngineState::AuthFailed, - ) - }; - if engine_synced { - match func(engine).await { - Ok(result) => return Ok(result), - Err(error) => { - debug!( - self.log, - "Execution engine call failed"; - "error" => ?error, - "id" => &engine.id - ); - *engine.state.write().await = EngineState::Offline; - errors.push(EngineError::Api { - id: engine.id.clone(), - error, - }) - } + let (engine_synced, engine_auth_failed) = { + let state = self.engine.state.read().await; + ( + *state == EngineState::Synced, + *state == EngineState::AuthFailed, + ) + }; + if engine_synced { + match func(&self.engine).await { + Ok(result) => return Ok(result), + Err(error) => { + debug!( + self.log, + "Execution engine call failed"; + "error" => ?error, + "id" => &&self.engine.id + ); + *self.engine.state.write().await = EngineState::Offline; + errors.push(EngineError::Api { + id: self.engine.id.clone(), + error, + }) } - } else if engine_auth_failed { - errors.push(EngineError::Auth { - id: engine.id.clone(), - }) - } else { - errors.push(EngineError::Offline { - id: engine.id.clone(), - }) } + } else if engine_auth_failed { + errors.push(EngineError::Auth { + id: self.engine.id.clone(), + }) + } else { + errors.push(EngineError::Offline { + id: self.engine.id.clone(), + }) } Err(errors) } - /// Runs `func` on all nodes concurrently, returning all results. Any nodes that are offline - /// will be ignored, however all synced or unsynced nodes will receive the broadcast. + /// Runs `func` on the node. /// /// This function might try to run `func` twice. If all nodes return an error on the first time /// it runs, it will try to upcheck all offline nodes and then run the function again. - pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Vec> + pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result where F: Fn(&'a Engine) -> G + Copy, G: Future>, { - let first_results = self.broadcast_without_retry(func).await; - - let mut any_offline = false; - for result in &first_results { - match result { - Ok(_) => return first_results, - Err(EngineError::Offline { .. }) => any_offline = true, - _ => (), + match self.broadcast_without_retry(func).await { + Err(EngineError::Offline { .. }) => { + self.upcheck_not_synced(Logging::Enabled).await; + self.broadcast_without_retry(func).await } - } - - if any_offline { - self.upcheck_not_synced(Logging::Enabled).await; - self.broadcast_without_retry(func).await - } else { - first_results + other => other, } } - /// Runs `func` on all nodes concurrently, returning all results. - pub async fn broadcast_without_retry<'a, F, G, H>( - &'a self, - func: F, - ) -> Vec> + /// Runs `func` on the node if it's last state is not offline. + pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result where F: Fn(&'a Engine) -> G, G: Future>, { let func = &func; - let futures = self.engines.iter().map(|engine| async move { - let is_offline = *engine.state.read().await == EngineState::Offline; - if !is_offline { - match func(engine).await { - Ok(res) => Ok(res), - Err(error) => { - debug!( - self.log, - "Execution engine call failed"; - "error" => ?error, - "id" => &engine.id - ); - *engine.state.write().await = EngineState::Offline; - Err(EngineError::Api { - id: engine.id.clone(), - error, - }) - } + if *self.engine.state.read().await == EngineState::Offline { + Err(EngineError::Offline { + id: self.engine.id.clone(), + }) + } else { + match func(&self.engine).await { + Ok(res) => Ok(res), + Err(error) => { + debug!( + self.log, + "Execution engine call failed"; + "error" => ?error, + ); + *self.engine.state.write().await = EngineState::Offline; + Err(EngineError::Api { + id: self.engine.id.clone(), + error, + }) } - } else { - Err(EngineError::Offline { - id: engine.id.clone(), - }) } - }); - - join_all(futures).await + } } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index d6acd5fe5..cff219027 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -158,72 +158,60 @@ impl ExecutionLayer { let Config { execution_endpoints: urls, builder_endpoints: builder_urls, - mut secret_files, + secret_files, suggested_fee_recipient, jwt_id, jwt_version, default_datadir, } = config; - if urls.is_empty() { - return Err(Error::NoEngines); + if urls.len() > 1 { + warn!(log, "Only the first execution engine url will be used"); } + let execution_url = urls.into_iter().next().ok_or(Error::NoEngines)?; - // Extend the jwt secret files with the default jwt secret path if not provided via cli. - // This ensures that we have a jwt secret for every EL. - secret_files.extend(vec![ - default_datadir.join(DEFAULT_JWT_FILE); - urls.len().saturating_sub(secret_files.len()) - ]); - - let secrets: Vec<(JwtKey, PathBuf)> = secret_files - .iter() - .map(|p| { - // Read secret from file if it already exists - if p.exists() { - std::fs::read_to_string(p) - .map_err(|e| { - format!("Failed to read JWT secret file {:?}, error: {:?}", p, e) - }) - .and_then(|ref s| { - let secret = JwtKey::from_slice( - &hex::decode(strip_prefix(s.trim_end())) - .map_err(|e| format!("Invalid hex string: {:?}", e))?, - )?; - Ok((secret, p.to_path_buf())) - }) - } else { - // Create a new file and write a randomly generated secret to it if file does not exist - std::fs::File::options() - .write(true) - .create_new(true) - .open(p) - .map_err(|e| { - format!("Failed to open JWT secret file {:?}, error: {:?}", p, e) - }) - .and_then(|mut f| { - let secret = auth::JwtKey::random(); - f.write_all(secret.hex_string().as_bytes()).map_err(|e| { - format!("Failed to write to JWT secret file: {:?}", e) - })?; - Ok((secret, p.to_path_buf())) - }) - } - }) - .collect::>() - .map_err(Error::InvalidJWTSecret)?; - - let engines: Vec> = urls + // Use the default jwt secret path if not provided via cli. + let secret_file = secret_files .into_iter() - .zip(secrets.into_iter()) - .map(|(url, (secret, path))| { - let id = url.to_string(); - let auth = Auth::new(secret, jwt_id.clone(), jwt_version.clone()); - debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?path); - let api = HttpJsonRpc::::new_with_auth(url, auth)?; - Ok(Engine::::new(id, api)) - }) - .collect::>()?; + .next() + .unwrap_or_else(|| default_datadir.join(DEFAULT_JWT_FILE)); + + let jwt_key = if secret_file.exists() { + // Read secret from file if it already exists + std::fs::read_to_string(&secret_file) + .map_err(|e| format!("Failed to read JWT secret file. Error: {:?}", e)) + .and_then(|ref s| { + let secret = JwtKey::from_slice( + &hex::decode(strip_prefix(s.trim_end())) + .map_err(|e| format!("Invalid hex string: {:?}", e))?, + )?; + Ok(secret) + }) + .map_err(Error::InvalidJWTSecret) + } else { + // Create a new file and write a randomly generated secret to it if file does not exist + std::fs::File::options() + .write(true) + .create_new(true) + .open(&secret_file) + .map_err(|e| format!("Failed to open JWT secret file. Error: {:?}", e)) + .and_then(|mut f| { + let secret = auth::JwtKey::random(); + f.write_all(secret.hex_string().as_bytes()) + .map_err(|e| format!("Failed to write to JWT secret file: {:?}", e))?; + Ok(secret) + }) + .map_err(Error::InvalidJWTSecret) + }?; + + let engine: Engine = { + let id = execution_url.to_string(); + let auth = Auth::new(jwt_key, jwt_id, jwt_version); + debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?secret_file.as_path()); + let api = HttpJsonRpc::::new_with_auth(execution_url, auth) + .map_err(Error::ApiError)?; + Engine::::new(id, api) + }; let builders: Vec> = builder_urls .into_iter() @@ -236,7 +224,7 @@ impl ExecutionLayer { let inner = Inner { engines: Engines { - engines, + engine, latest_forkchoice_state: <_>::default(), log: log.clone(), }, @@ -455,7 +443,7 @@ impl ExecutionLayer { /// Returns `true` if there is at least one synced and reachable engine. pub async fn is_synced(&self) -> bool { - self.engines().any_synced().await + self.engines().is_synced().await } /// Updates the proposer preparation data provided by validators @@ -750,7 +738,7 @@ impl ExecutionLayer { process_multiple_payload_statuses( execution_payload.block_hash, - broadcast_results.into_iter(), + Some(broadcast_results).into_iter(), self.log(), ) } @@ -903,7 +891,7 @@ impl ExecutionLayer { }; process_multiple_payload_statuses( head_block_hash, - broadcast_results + Some(broadcast_results) .into_iter() .chain(builder_broadcast_results.into_iter()) .map(|result| result.map(|response| response.payload_status)), @@ -918,49 +906,49 @@ impl ExecutionLayer { terminal_block_number: 0, }; - let broadcast_results = self + let broadcast_result = self .engines() .broadcast(|engine| engine.api.exchange_transition_configuration_v1(local)) .await; let mut errors = vec![]; - for (i, result) in broadcast_results.into_iter().enumerate() { - match result { - Ok(remote) => { - if local.terminal_total_difficulty != remote.terminal_total_difficulty - || local.terminal_block_hash != remote.terminal_block_hash - { - error!( - self.log(), - "Execution client config mismatch"; - "msg" => "ensure lighthouse and the execution client are up-to-date and \ - configured consistently", - "execution_endpoint" => i, - "remote" => ?remote, - "local" => ?local, - ); - errors.push(EngineError::Api { - id: i.to_string(), - error: ApiError::TransitionConfigurationMismatch, - }); - } else { - debug!( - self.log(), - "Execution client config is OK"; - "execution_endpoint" => i - ); - } - } - Err(e) => { + // Having no fallbacks, the id of the used node is 0 + let i = 0usize; + match broadcast_result { + Ok(remote) => { + if local.terminal_total_difficulty != remote.terminal_total_difficulty + || local.terminal_block_hash != remote.terminal_block_hash + { error!( self.log(), - "Unable to get transition config"; - "error" => ?e, + "Execution client config mismatch"; + "msg" => "ensure lighthouse and the execution client are up-to-date and \ + configured consistently", "execution_endpoint" => i, + "remote" => ?remote, + "local" => ?local, + ); + errors.push(EngineError::Api { + id: i.to_string(), + error: ApiError::TransitionConfigurationMismatch, + }); + } else { + debug!( + self.log(), + "Execution client config is OK"; + "execution_endpoint" => i ); - errors.push(e); } } + Err(e) => { + error!( + self.log(), + "Unable to get transition config"; + "error" => ?e, + "execution_endpoint" => i, + ); + errors.push(e); + } } if errors.is_empty() { @@ -1102,8 +1090,7 @@ impl ExecutionLayer { &[metrics::IS_VALID_TERMINAL_POW_BLOCK_HASH], ); - let broadcast_results = self - .engines() + self.engines() .broadcast(|engine| async move { if let Some(pow_block) = self.get_pow_block(engine, block_hash).await? { if let Some(pow_parent) = @@ -1116,38 +1103,8 @@ impl ExecutionLayer { } Ok(None) }) - .await; - - let mut errors = vec![]; - let mut terminal = 0; - let mut not_terminal = 0; - let mut block_missing = 0; - for result in broadcast_results { - match result { - Ok(Some(true)) => terminal += 1, - Ok(Some(false)) => not_terminal += 1, - Ok(None) => block_missing += 1, - Err(e) => errors.push(e), - } - } - - if terminal > 0 && not_terminal > 0 { - crit!( - self.log(), - "Consensus failure between execution nodes"; - "method" => "is_valid_terminal_pow_block_hash" - ); - } - - if terminal > 0 { - Ok(Some(true)) - } else if not_terminal > 0 { - Ok(Some(false)) - } else if block_missing > 0 { - Ok(None) - } else { - Err(Error::EngineErrors(errors)) - } + .await + .map_err(|e| Error::EngineErrors(vec![e])) } /// This function should remain internal.