diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index e37ed286b..5cd0a04c3 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1003,6 +1003,11 @@ async fn payload_preparation_before_transition_block() { let rig = InvalidPayloadRig::new(); let el = rig.execution_layer(); + // Run the watchdog routine so that the status of the execution engine is set. This ensures + // that we don't end up with `eth_syncing` requests later in this function that will impede + // testing. + el.watchdog_task().await; + let head = rig.harness.chain.head_snapshot(); assert_eq!( head.beacon_block diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index d44d81c67..eb188c61f 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -5,8 +5,10 @@ use crate::engine_api::{ }; use crate::HttpJsonRpc; use lru::LruCache; -use slog::{crit, debug, info, warn, Logger}; +use slog::{debug, error, info, Logger}; use std::future::Future; +use std::sync::Arc; +use task_executor::TaskExecutor; use tokio::sync::{Mutex, RwLock}; use types::{Address, ExecutionBlockHash, Hash256}; @@ -16,7 +18,7 @@ use types::{Address, ExecutionBlockHash, Hash256}; const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512; /// Stores the remembered state of a engine. -#[derive(Copy, Clone, PartialEq)] +#[derive(Copy, Clone, PartialEq, Debug)] enum EngineState { Synced, Offline, @@ -31,22 +33,6 @@ pub struct ForkChoiceState { pub finalized_block_hash: ExecutionBlockHash, } -/// Used to enable/disable logging on some tasks. -#[derive(Copy, Clone, PartialEq)] -pub enum Logging { - Enabled, - Disabled, -} - -impl Logging { - pub fn is_enabled(&self) -> bool { - match self { - Logging::Enabled => true, - Logging::Disabled => false, - } - } -} - #[derive(Hash, PartialEq, std::cmp::Eq)] struct PayloadIdCacheKey { pub head_block_hash: ExecutionBlockHash, @@ -69,17 +55,19 @@ pub struct Engine { payload_id_cache: Mutex>, state: RwLock, pub latest_forkchoice_state: RwLock>, + pub executor: TaskExecutor, pub log: Logger, } impl Engine { /// Creates a new, offline engine. - pub fn new(api: HttpJsonRpc, log: &Logger) -> Self { + pub fn new(api: HttpJsonRpc, executor: TaskExecutor, log: &Logger) -> Self { Self { api, payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)), state: RwLock::new(EngineState::Offline), latest_forkchoice_state: Default::default(), + executor, log: log.clone(), } } @@ -179,164 +167,117 @@ impl Engine { pub async fn is_synced(&self) -> bool { *self.state.read().await == EngineState::Synced } + /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// might be used to recover the node if offline. - pub async fn upcheck_not_synced(&self, logging: Logging) { - let mut state_lock = self.state.write().await; - if *state_lock != EngineState::Synced { - match self.api.upcheck().await { - Ok(()) => { - if logging.is_enabled() { - info!( - self.log, - "Execution engine online"; - ); - } + pub async fn upcheck(&self) { + let state: EngineState = match self.api.upcheck().await { + Ok(()) => { + let mut state = self.state.write().await; + + if *state != EngineState::Synced { + info!( + self.log, + "Execution engine online"; + ); + // Send the node our latest forkchoice_state. self.send_latest_forkchoice_state().await; - - *state_lock = EngineState::Synced + } else { + debug!( + self.log, + "Execution engine online"; + ); } - Err(EngineApiError::IsSyncing) => { - if logging.is_enabled() { - warn!( - self.log, - "Execution engine syncing"; - ) - } - // Send the node our latest forkchoice_state, it may assist with syncing. - self.send_latest_forkchoice_state().await; - - *state_lock = EngineState::Syncing - } - Err(EngineApiError::Auth(err)) => { - if logging.is_enabled() { - warn!( - self.log, - "Failed jwt authorization"; - "error" => ?err, - ); - } - - *state_lock = EngineState::AuthFailed - } - Err(e) => { - if logging.is_enabled() { - warn!( - self.log, - "Execution engine offline"; - "error" => ?e, - ) - } - } + *state = EngineState::Synced; + *state } - } + Err(EngineApiError::IsSyncing) => { + let mut state = self.state.write().await; + *state = EngineState::Syncing; + *state + } + Err(EngineApiError::Auth(err)) => { + error!( + self.log, + "Failed jwt authorization"; + "error" => ?err, + ); - if *state_lock != EngineState::Synced && logging.is_enabled() { - crit!( - self.log, - "No synced execution engines"; - ) - } - } - - /// Run `func` on the node. - /// - /// This function might try to run `func` twice. If the node returns an error it will try to - /// upcheck it and then run the function again. - pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result - where - F: Fn(&'a Engine) -> G + Copy, - G: Future>, - { - match self.first_success_without_retry(func).await { - Ok(result) => Ok(result), + let mut state = self.state.write().await; + *state = EngineState::AuthFailed; + *state + } Err(e) => { - debug!(self.log, "First engine call failed. Retrying"; "err" => ?e); - // Try to recover the node. - self.upcheck_not_synced(Logging::Enabled).await; - // Try again. - self.first_success_without_retry(func).await - } - } - } + error!( + self.log, + "Error during execution engine upcheck"; + "error" => ?e, + ); - /// Run `func` on the node. - pub async fn first_success_without_retry<'a, F, G, H>( - &'a self, - func: F, - ) -> Result - where - F: Fn(&'a Engine) -> G, - G: Future>, - { - let (engine_synced, engine_auth_failed) = { - let state = self.state.read().await; - ( - *state == EngineState::Synced, - *state == EngineState::AuthFailed, - ) + let mut state = self.state.write().await; + *state = EngineState::Offline; + *state + } }; - if engine_synced { - match func(self).await { - Ok(result) => Ok(result), - Err(error) => { - debug!( - self.log, - "Execution engine call failed"; - "error" => ?error, - ); - *self.state.write().await = EngineState::Offline; - Err(EngineError::Api { error }) - } - } - } else if engine_auth_failed { - Err(EngineError::Auth) - } else { - Err(EngineError::Offline) - } + + debug!( + self.log, + "Execution engine upcheck complete"; + "state" => ?state, + ); } - /// Runs `func` on the node. + /// Run `func` on the node regardless of the node's current state. /// - /// This function might try to run `func` twice. If all nodes return an error on the first time - /// it runs, it will try to upcheck all offline nodes and then run the function again. - pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result - where - F: Fn(&'a Engine) -> G + Copy, - G: Future>, - { - match self.broadcast_without_retry(func).await { - Err(EngineError::Offline { .. }) => { - self.upcheck_not_synced(Logging::Enabled).await; - self.broadcast_without_retry(func).await - } - other => other, - } - } - - /// Runs `func` on the node if it's last state is not offline. - pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result + /// ## Note + /// + /// This function takes locks on `self.state`, holding a conflicting lock might cause a + /// deadlock. + pub async fn request<'a, F, G, H>(self: &'a Arc, func: F) -> Result where F: Fn(&'a Engine) -> G, G: Future>, { - let func = &func; - if *self.state.read().await == EngineState::Offline { - Err(EngineError::Offline) - } else { - match func(self).await { - Ok(res) => Ok(res), - Err(error) => { - debug!( - self.log, - "Execution engine call failed"; - "error" => ?error, + match func(self).await { + Ok(result) => { + // Take a clone *without* holding the read-lock since the `upcheck` function will + // take a write-lock. + let state: EngineState = *self.state.read().await; + + // If this request just returned successfully but we don't think this node is + // synced, check to see if it just became synced. This helps to ensure that the + // networking stack can get fast feedback about a synced engine. + if state != EngineState::Synced { + // Spawn the upcheck in another task to avoid slowing down this request. + let inner_self = self.clone(); + self.executor.spawn( + async move { inner_self.upcheck().await }, + "upcheck_after_success", ); - *self.state.write().await = EngineState::Offline; - Err(EngineError::Api { error }) } + + Ok(result) + } + Err(error) => { + error!( + self.log, + "Execution engine call failed"; + "error" => ?error, + ); + + // The node just returned an error, run an upcheck so we can update the endpoint + // state. + // + // Spawn the upcheck in another task to avoid slowing down this request. + let inner_self = self.clone(); + self.executor.spawn( + async move { inner_self.upcheck().await }, + "upcheck_after_error", + ); + + Err(EngineError::Api { error }) } } } diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 47424ca0f..9bb4ead35 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -10,7 +10,7 @@ use engine_api::Error as ApiError; pub use engine_api::*; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; pub use engines::ForkChoiceState; -use engines::{Engine, EngineError, Logging}; +use engines::{Engine, EngineError}; use lru::LruCache; use payload_status::process_payload_status; pub use payload_status::PayloadStatus; @@ -27,7 +27,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::{ sync::{Mutex, MutexGuard, RwLock}, - time::{sleep, sleep_until, Instant}, + time::sleep, }; use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash, @@ -101,7 +101,7 @@ pub struct Proposer { } struct Inner { - engine: Engine, + engine: Arc, builder: Option, execution_engine_forkchoice_lock: Mutex<()>, suggested_fee_recipient: Option
, @@ -132,22 +132,15 @@ pub struct Config { pub default_datadir: PathBuf, } -/// Provides access to one or more execution engines and provides a neat interface for consumption -/// by the `BeaconChain`. -/// -/// When there is more than one execution node specified, the others will be used in a "fallback" -/// fashion. Some requests may be broadcast to all nodes and others might only be sent to the first -/// node that returns a valid response. Ultimately, the purpose of fallback nodes is to provide -/// redundancy in the case where one node is offline. -/// -/// The fallback nodes have an ordering. The first supplied will be the first contacted, and so on. +/// Provides access to one execution engine and provides a neat interface for consumption by the +/// `BeaconChain`. #[derive(Clone)] pub struct ExecutionLayer { inner: Arc>, } impl ExecutionLayer { - /// Instantiate `Self` with Execution engines specified using `Config`, all using the JSON-RPC via HTTP. + /// Instantiate `Self` with an Execution engine specified in `Config`, using JSON-RPC via HTTP. pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result { let Config { execution_endpoints: urls, @@ -202,7 +195,7 @@ impl ExecutionLayer { let auth = Auth::new(jwt_key, jwt_id, jwt_version); debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path()); let api = HttpJsonRpc::new_with_auth(execution_url, auth).map_err(Error::ApiError)?; - Engine::new(api, &log) + Engine::new(api, executor.clone(), &log) }; let builder = builder_url @@ -210,7 +203,7 @@ impl ExecutionLayer { .transpose()?; let inner = Inner { - engine, + engine: Arc::new(engine), builder, execution_engine_forkchoice_lock: <_>::default(), suggested_fee_recipient, @@ -229,7 +222,7 @@ impl ExecutionLayer { } impl ExecutionLayer { - fn engines(&self) -> &Engine { + fn engine(&self) -> &Arc { &self.inner.engine } @@ -276,54 +269,18 @@ impl ExecutionLayer { self.executor().spawn(generate_future(self.clone()), name); } - /// Spawns a routine which attempts to keep the execution engines online. + /// Spawns a routine which attempts to keep the execution engine online. pub fn spawn_watchdog_routine(&self, slot_clock: S) { let watchdog = |el: ExecutionLayer| async move { // Run one task immediately. el.watchdog_task().await; - let recurring_task = - |el: ExecutionLayer, now: Instant, duration_to_next_slot: Duration| async move { - // We run the task three times per slot. - // - // The interval between each task is 1/3rd of the slot duration. This matches nicely - // with the attestation production times (unagg. at 1/3rd, agg at 2/3rd). - // - // Each task is offset by 3/4ths of the interval. - // - // On mainnet, this means we will run tasks at: - // - // - 3s after slot start: 1s before publishing unaggregated attestations. - // - 7s after slot start: 1s before publishing aggregated attestations. - // - 11s after slot start: 1s before the next slot starts. - let interval = duration_to_next_slot / 3; - let offset = (interval / 4) * 3; - - let first_execution = duration_to_next_slot + offset; - let second_execution = first_execution + interval; - let third_execution = second_execution + interval; - - sleep_until(now + first_execution).await; - el.engines().upcheck_not_synced(Logging::Disabled).await; - - sleep_until(now + second_execution).await; - el.engines().upcheck_not_synced(Logging::Disabled).await; - - sleep_until(now + third_execution).await; - el.engines().upcheck_not_synced(Logging::Disabled).await; - }; - // Start the loop to periodically update. loop { - if let Some(duration) = slot_clock.duration_to_next_slot() { - let now = Instant::now(); - - // Spawn a new task rather than waiting for this to finish. This ensure that a - // slow run doesn't prevent the next run from starting. - el.spawn(|el| recurring_task(el, now, duration), "exec_watchdog_task"); - } else { - error!(el.log(), "Failed to spawn watchdog task"); - } + el.spawn( + |el| async move { el.watchdog_task().await }, + "exec_watchdog_task", + ); sleep(slot_clock.slot_duration()).await; } }; @@ -333,8 +290,7 @@ impl ExecutionLayer { /// Performs a single execution of the watchdog routine. pub async fn watchdog_task(&self) { - // Disable logging since this runs frequently and may get annoying. - self.engines().upcheck_not_synced(Logging::Disabled).await; + self.engine().upcheck().await; } /// Spawns a routine which cleans the cached proposer data periodically. @@ -394,9 +350,9 @@ impl ExecutionLayer { self.spawn(routine, "exec_config_poll"); } - /// Returns `true` if there is at least one synced and reachable engine. + /// Returns `true` if the execution engine is synced and reachable. pub async fn is_synced(&self) -> bool { - self.engines().is_synced().await + self.engine().is_synced().await } /// Updates the proposer preparation data provided by validators @@ -632,8 +588,8 @@ impl ExecutionLayer { "timestamp" => timestamp, "parent_hash" => ?parent_hash, ); - self.engines() - .first_success(|engine| async move { + self.engine() + .request(|engine| async move { let payload_id = if let Some(id) = engine .get_payload_id(parent_hash, timestamp, prev_randao, suggested_fee_recipient) .await @@ -736,12 +692,12 @@ impl ExecutionLayer { "block_number" => execution_payload.block_number, ); - let broadcast_result = self - .engines() - .broadcast(|engine| engine.api.new_payload_v1(execution_payload.clone())) + let result = self + .engine() + .request(|engine| engine.api.new_payload_v1(execution_payload.clone())) .await; - process_payload_status(execution_payload.block_hash, broadcast_result, self.log()) + process_payload_status(execution_payload.block_hash, result, self.log()) .map_err(Box::new) .map_err(Error::EngineError) } @@ -867,13 +823,13 @@ impl ExecutionLayer { finalized_block_hash, }; - self.engines() + self.engine() .set_latest_forkchoice_state(forkchoice_state) .await; - let broadcast_result = self - .engines() - .broadcast(|engine| async move { + let result = self + .engine() + .request(|engine| async move { engine .notify_forkchoice_updated(forkchoice_state, payload_attributes, self.log()) .await @@ -882,7 +838,7 @@ impl ExecutionLayer { process_payload_status( head_block_hash, - broadcast_result.map(|response| response.payload_status), + result.map(|response| response.payload_status), self.log(), ) .map_err(Box::new) @@ -896,12 +852,12 @@ impl ExecutionLayer { terminal_block_number: 0, }; - let broadcast_result = self - .engines() - .broadcast(|engine| engine.api.exchange_transition_configuration_v1(local)) + let result = self + .engine() + .request(|engine| engine.api.exchange_transition_configuration_v1(local)) .await; - match broadcast_result { + match result { Ok(remote) => { if local.terminal_total_difficulty != remote.terminal_total_difficulty || local.terminal_block_hash != remote.terminal_block_hash @@ -953,8 +909,8 @@ impl ExecutionLayer { ); let hash_opt = self - .engines() - .first_success(|engine| async move { + .engine() + .request(|engine| async move { let terminal_block_hash = spec.terminal_block_hash; if terminal_block_hash != ExecutionBlockHash::zero() { if self @@ -1040,8 +996,8 @@ impl ExecutionLayer { /// - `Some(true)` if the given `block_hash` is the terminal proof-of-work block. /// - `Some(false)` if the given `block_hash` is certainly *not* the terminal proof-of-work /// block. - /// - `None` if the `block_hash` or its parent were not present on the execution engines. - /// - `Err(_)` if there was an error connecting to the execution engines. + /// - `None` if the `block_hash` or its parent were not present on the execution engine. + /// - `Err(_)` if there was an error connecting to the execution engine. /// /// ## Fallback Behaviour /// @@ -1069,8 +1025,8 @@ impl ExecutionLayer { &[metrics::IS_VALID_TERMINAL_POW_BLOCK_HASH], ); - self.engines() - .broadcast(|engine| async move { + self.engine() + .request(|engine| async move { if let Some(pow_block) = self.get_pow_block(engine, block_hash).await? { if let Some(pow_parent) = self.get_pow_block(engine, pow_block.parent_hash).await? @@ -1136,8 +1092,8 @@ impl ExecutionLayer { &self, hash: ExecutionBlockHash, ) -> Result>, Error> { - self.engines() - .first_success(|engine| async move { + self.engine() + .request(|engine| async move { self.get_payload_by_block_hash_from_engine(engine, hash) .await }) @@ -1240,7 +1196,7 @@ mod test { MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_block_prior_to_terminal_block() .with_terminal_block(|spec, el, _| async move { - el.engines().upcheck_not_synced(Logging::Disabled).await; + el.engine().upcheck().await; assert_eq!(el.get_terminal_pow_block_hash(&spec).await.unwrap(), None) }) .await @@ -1260,7 +1216,7 @@ mod test { MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { - el.engines().upcheck_not_synced(Logging::Disabled).await; + el.engine().upcheck().await; assert_eq!( el.is_valid_terminal_pow_block_hash(terminal_block.unwrap().block_hash, &spec) .await @@ -1277,7 +1233,7 @@ mod test { MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { - el.engines().upcheck_not_synced(Logging::Disabled).await; + el.engine().upcheck().await; let invalid_terminal_block = terminal_block.unwrap().parent_hash; assert_eq!( @@ -1296,7 +1252,7 @@ mod test { MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, _| async move { - el.engines().upcheck_not_synced(Logging::Disabled).await; + el.engine().upcheck().await; let missing_terminal_block = ExecutionBlockHash::repeat_byte(42); assert_eq!(