diff --git a/Cargo.lock b/Cargo.lock index 9e17e2e2a..5a2c4312b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2087,6 +2087,7 @@ dependencies = [ "task_executor", "tempfile", "tokio", + "tokio-stream", "tree_hash", "tree_hash_derive", "types", diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 481b1ae73..fc24a34bb 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -59,6 +59,7 @@ pub use block_verification::{BlockError, ExecutionPayloadError, GossipVerifiedBl pub use canonical_head::{CachedHead, CanonicalHead, CanonicalHeadRwLock}; pub use eth1_chain::{Eth1Chain, Eth1ChainBackend}; pub use events::ServerSentEventHandler; +pub use execution_layer::EngineState; pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters}; pub use metrics::scrape_for_metrics; pub use parking_lot; diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index 3b401d459..5c0e66ea4 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -43,4 +43,5 @@ fork_choice = { path = "../../consensus/fork_choice" } mev-build-rs = {git = "https://github.com/ralexstokes/mev-rs", tag = "v0.2.0"} ethereum-consensus = {git = "https://github.com/ralexstokes/ethereum-consensus"} ssz-rs = {git = "https://github.com/ralexstokes/ssz-rs"} +tokio-stream = { version = "0.1.9", features = [ "sync" ] } strum = "0.24.0" diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index eb188c61f..339006c1b 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -9,7 +9,8 @@ use slog::{debug, error, info, Logger}; use std::future::Future; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{watch, Mutex, RwLock}; +use tokio_stream::wrappers::WatchStream; use types::{Address, ExecutionBlockHash, Hash256}; /// The number of payload IDs that will be stored for each `Engine`. @@ -18,14 +19,74 @@ use types::{Address, ExecutionBlockHash, Hash256}; const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512; /// Stores the remembered state of a engine. -#[derive(Copy, Clone, PartialEq, Debug)] -enum EngineState { +#[derive(Copy, Clone, PartialEq, Debug, Eq, Default)] +enum EngineStateInternal { Synced, + #[default] Offline, Syncing, AuthFailed, } +/// A subset of the engine state to inform other services if the engine is online or offline. +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub enum EngineState { + Online, + Offline, +} + +impl From for EngineState { + fn from(state: EngineStateInternal) -> Self { + match state { + EngineStateInternal::Synced | EngineStateInternal::Syncing => EngineState::Online, + EngineStateInternal::Offline | EngineStateInternal::AuthFailed => EngineState::Offline, + } + } +} + +/// Wrapper structure that ensures changes to the engine state are correctly reported to watchers. +struct State { + /// The actual engine state. + state: EngineStateInternal, + /// Notifier to watch the engine state. + notifier: watch::Sender, +} + +impl std::ops::Deref for State { + type Target = EngineStateInternal; + + fn deref(&self) -> &Self::Target { + &self.state + } +} + +impl Default for State { + fn default() -> Self { + let state = EngineStateInternal::default(); + let (notifier, _receiver) = watch::channel(state.into()); + State { state, notifier } + } +} + +impl State { + // Updates the state and notifies all watchers if the state has changed. + pub fn update(&mut self, new_state: EngineStateInternal) { + self.state = new_state; + self.notifier.send_if_modified(|last_state| { + let changed = *last_state != new_state.into(); // notify conditionally + *last_state = new_state.into(); // update the state unconditionally + changed + }); + } + + /// Gives access to a channel containing whether the last state is online. + /// + /// This can be called several times. + pub fn watch(&self) -> WatchStream { + self.notifier.subscribe().into() + } +} + #[derive(Copy, Clone, PartialEq, Debug)] pub struct ForkChoiceState { pub head_block_hash: ExecutionBlockHash, @@ -53,10 +114,10 @@ pub enum EngineError { pub struct Engine { pub api: HttpJsonRpc, payload_id_cache: Mutex>, - state: RwLock, - pub latest_forkchoice_state: RwLock>, - pub executor: TaskExecutor, - pub log: Logger, + state: RwLock, + latest_forkchoice_state: RwLock>, + executor: TaskExecutor, + log: Logger, } impl Engine { @@ -65,13 +126,20 @@ impl Engine { Self { api, payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)), - state: RwLock::new(EngineState::Offline), + state: Default::default(), latest_forkchoice_state: Default::default(), executor, log: log.clone(), } } + /// Gives access to a channel containing the last engine state. + /// + /// This can be called several times. + pub async fn watch_state(&self) -> WatchStream { + self.state.read().await.watch() + } + pub async fn get_payload_id( &self, head_block_hash: ExecutionBlockHash, @@ -165,17 +233,16 @@ impl Engine { /// Returns `true` if the engine has a "synced" status. pub async fn is_synced(&self) -> bool { - *self.state.read().await == EngineState::Synced + **self.state.read().await == EngineStateInternal::Synced } /// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This /// might be used to recover the node if offline. pub async fn upcheck(&self) { - let state: EngineState = match self.api.upcheck().await { + let state: EngineStateInternal = match self.api.upcheck().await { Ok(()) => { let mut state = self.state.write().await; - - if *state != EngineState::Synced { + if **state != EngineStateInternal::Synced { info!( self.log, "Execution engine online"; @@ -189,14 +256,13 @@ impl Engine { "Execution engine online"; ); } - - *state = EngineState::Synced; - *state + state.update(EngineStateInternal::Synced); + **state } Err(EngineApiError::IsSyncing) => { let mut state = self.state.write().await; - *state = EngineState::Syncing; - *state + state.update(EngineStateInternal::Syncing); + **state } Err(EngineApiError::Auth(err)) => { error!( @@ -206,8 +272,8 @@ impl Engine { ); let mut state = self.state.write().await; - *state = EngineState::AuthFailed; - *state + state.update(EngineStateInternal::AuthFailed); + **state } Err(e) => { error!( @@ -217,8 +283,8 @@ impl Engine { ); let mut state = self.state.write().await; - *state = EngineState::Offline; - *state + state.update(EngineStateInternal::Offline); + **state } }; @@ -244,12 +310,10 @@ impl Engine { Ok(result) => { // Take a clone *without* holding the read-lock since the `upcheck` function will // take a write-lock. - let state: EngineState = *self.state.read().await; + let state: EngineStateInternal = **self.state.read().await; - // If this request just returned successfully but we don't think this node is - // synced, check to see if it just became synced. This helps to ensure that the - // networking stack can get fast feedback about a synced engine. - if state != EngineState::Synced { + // Keep an up to date engine state. + if state != EngineStateInternal::Synced { // Spawn the upcheck in another task to avoid slowing down this request. let inner_self = self.clone(); self.executor.spawn( @@ -293,3 +357,22 @@ impl PayloadIdCacheKey { } } } + +#[cfg(test)] +mod tests { + use super::*; + use tokio_stream::StreamExt; + + #[tokio::test] + async fn test_state_notifier() { + let mut state = State::default(); + let initial_state: EngineState = state.state.into(); + assert_eq!(initial_state, EngineState::Offline); + state.update(EngineStateInternal::Synced); + + // a watcher that arrives after the first update. + let mut watcher = state.watch(); + let new_state = watcher.next().await.expect("Last state is always present"); + assert_eq!(new_state, EngineState::Online); + } +} diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 778b2247b..3bdca82ad 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -10,8 +10,8 @@ use builder_client::BuilderHttpClient; use engine_api::Error as ApiError; pub use engine_api::*; pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc}; -pub use engines::ForkChoiceState; use engines::{Engine, EngineError}; +pub use engines::{EngineState, ForkChoiceState}; use fork_choice::ForkchoiceUpdateParameters; use lru::LruCache; use payload_status::process_payload_status; @@ -31,6 +31,7 @@ use tokio::{ sync::{Mutex, MutexGuard, RwLock}, time::sleep, }; +use tokio_stream::wrappers::WatchStream; use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash, ForkName, ProposerPreparationData, PublicKeyBytes, SignedBeaconBlock, Slot, @@ -286,6 +287,13 @@ impl ExecutionLayer { self.inner.execution_blocks.lock().await } + /// Gives access to a channel containing if the last engine state is online or not. + /// + /// This can be called several times. + pub async fn get_responsiveness_watch(&self) -> WatchStream { + self.engine().watch_state().await + } + /// Note: this function returns a mutex guard, be careful to avoid deadlocks. async fn proposer_preparation_data( &self, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 6767350ce..d36bbbc79 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -24,7 +24,6 @@ use std::collections::{ HashMap, HashSet, }; use std::sync::Arc; -use tokio::sync::mpsc; use types::{Epoch, EthSpec, SignedBeaconBlock}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -144,9 +143,6 @@ pub struct BackFillSync { /// (i.e synced peers). network_globals: Arc>, - /// A multi-threaded, non-blocking processor for processing batches in the beacon chain. - beacon_processor_send: mpsc::Sender>, - /// A logger for backfill sync. log: slog::Logger, } @@ -155,7 +151,6 @@ impl BackFillSync { pub fn new( beacon_chain: Arc>, network_globals: Arc>, - beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { // Determine if backfill is enabled or not. @@ -193,7 +188,6 @@ impl BackFillSync { participating_peers: HashSet::new(), restart_failed_sync: false, beacon_chain, - beacon_processor_send, log, }; @@ -216,7 +210,7 @@ impl BackFillSync { #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn start( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) -> Result { match self.state() { BackFillState::Syncing => {} // already syncing ignore. @@ -312,7 +306,7 @@ impl BackFillSync { pub fn peer_disconnected( &mut self, peer_id: &PeerId, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) -> Result<(), BackFillError> { if matches!( self.state(), @@ -355,7 +349,7 @@ impl BackFillSync { #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn inject_error( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, @@ -392,7 +386,7 @@ impl BackFillSync { #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn on_block_response( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, @@ -505,7 +499,7 @@ impl BackFillSync { /// The batch must exist and be ready for processing fn process_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result { // Only process batches if this chain is Syncing, and only one at a time @@ -541,8 +535,8 @@ impl BackFillSync { let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); - if let Err(e) = self - .beacon_processor_send + if let Err(e) = network + .processor_channel() .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) { crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", @@ -563,7 +557,7 @@ impl BackFillSync { #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn on_batch_process_result( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, result: &BatchProcessResult, ) -> Result { @@ -704,7 +698,7 @@ impl BackFillSync { /// Processes the next ready batch. fn process_completed_batches( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) -> Result { // Only process batches if backfill is syncing and only process one batch at a time if self.state() != BackFillState::Syncing || self.current_processing_batch.is_some() { @@ -764,11 +758,7 @@ impl BackFillSync { /// /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. - fn advance_chain( - &mut self, - network: &mut SyncNetworkContext, - validating_epoch: Epoch, - ) { + fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch >= self.current_start { return; @@ -863,7 +853,7 @@ impl BackFillSync { /// intended and can result in downvoting a peer. fn handle_invalid_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result<(), BackFillError> { // The current batch could not be processed, indicating either the current or previous @@ -914,7 +904,7 @@ impl BackFillSync { /// Sends and registers the request of a batch awaiting download. fn retry_batch_download( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result<(), BackFillError> { let batch = match self.batches.get_mut(&batch_id) { @@ -958,7 +948,7 @@ impl BackFillSync { /// Requests the batch assigned to the given id from a given peer. fn send_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, peer: PeerId, ) -> Result<(), BackFillError> { @@ -1011,10 +1001,7 @@ impl BackFillSync { /// When resuming a chain, this function searches for batches that need to be re-downloaded and /// transitions their state to redownload the batch. - fn resume_batches( - &mut self, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { + fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { let batch_ids_to_retry = self .batches .iter() @@ -1040,7 +1027,7 @@ impl BackFillSync { /// pool and left over batches until the batch buffer is reached or all peers are exhausted. fn request_batches( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) -> Result<(), BackFillError> { if !matches!(self.state(), BackFillState::Syncing) { return Ok(()); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 9f2a5fdce..22d815121 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -5,11 +5,10 @@ use beacon_chain::{BeaconChainTypes, BlockError}; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -use slog::{crit, debug, error, trace, warn, Logger}; +use slog::{debug, error, trace, warn, Logger}; use smallvec::SmallVec; use std::sync::Arc; use store::{Hash256, SignedBeaconBlock}; -use tokio::sync::mpsc; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent}; use crate::metrics; @@ -36,7 +35,7 @@ const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; pub(crate) struct BlockLookups { /// A collection of parent block lookups. - parent_queue: SmallVec<[ParentLookup; 3]>, + parent_queue: SmallVec<[ParentLookup; 3]>, /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, @@ -47,22 +46,18 @@ pub(crate) struct BlockLookups { /// The flag allows us to determine if the peer returned data or sent us nothing. single_block_lookups: FnvHashMap>, - /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, - /// The logger for the import manager. log: Logger, } impl BlockLookups { - pub fn new(beacon_processor_send: mpsc::Sender>, log: Logger) -> Self { + pub fn new(log: Logger) -> Self { Self { parent_queue: Default::default(), failed_chains: LRUTimeCache::new(Duration::from_secs( FAILED_CHAINS_CACHE_EXPIRY_SECONDS, )), single_block_lookups: Default::default(), - beacon_processor_send, log, } } @@ -71,12 +66,7 @@ impl BlockLookups { /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. - pub fn search_block( - &mut self, - hash: Hash256, - peer_id: PeerId, - cx: &mut SyncNetworkContext, - ) { + pub fn search_block(&mut self, hash: Hash256, peer_id: PeerId, cx: &mut SyncNetworkContext) { // Do not re-request a block that is already being requested if self .single_block_lookups @@ -113,7 +103,7 @@ impl BlockLookups { &mut self, block: Arc>, peer_id: PeerId, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let block_root = block.canonical_root(); let parent_root = block.parent_root(); @@ -147,18 +137,16 @@ impl BlockLookups { peer_id: PeerId, block: Option>>, seen_timestamp: Duration, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let mut request = match self.single_block_lookups.entry(id) { Entry::Occupied(req) => req, Entry::Vacant(_) => { if block.is_some() { - crit!( + debug!( self.log, "Block returned for single block lookup not present" ); - #[cfg(debug_assertions)] - panic!("block returned for single block lookup not present"); } return; } @@ -172,6 +160,7 @@ impl BlockLookups { block, seen_timestamp, BlockProcessType::SingleBlock { id }, + cx, ) .is_err() { @@ -212,7 +201,7 @@ impl BlockLookups { peer_id: PeerId, block: Option>>, seen_timestamp: Duration, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let mut parent_lookup = if let Some(pos) = self .parent_queue @@ -236,6 +225,7 @@ impl BlockLookups { block, seen_timestamp, BlockProcessType::ParentLookup { chain_hash }, + cx, ) .is_ok() { @@ -289,7 +279,7 @@ impl BlockLookups { /* Error responses */ #[allow(clippy::needless_collect)] // false positive - pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { + pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single block lookups */ // better written after https://github.com/rust-lang/rust/issues/59618 let remove_retry_ids: Vec = self @@ -345,7 +335,7 @@ impl BlockLookups { &mut self, id: Id, peer_id: PeerId, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { if let Some(pos) = self .parent_queue @@ -365,7 +355,7 @@ impl BlockLookups { ); } - pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext) { + pub fn single_block_lookup_failed(&mut self, id: Id, cx: &mut SyncNetworkContext) { if let Some(mut request) = self.single_block_lookups.remove(&id) { request.register_failure_downloading(); trace!(self.log, "Single block lookup failed"; "block" => %request.hash); @@ -388,15 +378,12 @@ impl BlockLookups { &mut self, id: Id, result: BlockProcessResult, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let mut req = match self.single_block_lookups.remove(&id) { Some(req) => req, None => { - #[cfg(debug_assertions)] - panic!("block processed for single block lookup not present"); - #[cfg(not(debug_assertions))] - return crit!( + return debug!( self.log, "Block processed for single block lookup not present" ); @@ -476,7 +463,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, result: BlockProcessResult, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let (mut parent_lookup, peer_id) = if let Some((pos, peer)) = self .parent_queue @@ -489,13 +476,7 @@ impl BlockLookups { }) { (self.parent_queue.remove(pos), peer) } else { - #[cfg(debug_assertions)] - panic!( - "Process response for a parent lookup request that was not found. Chain_hash: {}", - chain_hash - ); - #[cfg(not(debug_assertions))] - return crit!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); + return debug!(self.log, "Process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; match &result { @@ -524,14 +505,22 @@ impl BlockLookups { } BlockProcessResult::Ok | BlockProcessResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { + // Check if the beacon processor is available + let beacon_processor_send = match cx.processor_channel_if_enabled() { + Some(channel) => channel, + None => { + return trace!( + self.log, + "Dropping parent chain segment that was ready for processing."; + parent_lookup + ); + } + }; let chain_hash = parent_lookup.chain_hash(); let blocks = parent_lookup.chain_blocks(); let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); - match self - .beacon_processor_send - .try_send(WorkEvent::chain_segment(process_id, blocks)) - { + match beacon_processor_send.try_send(WorkEvent::chain_segment(process_id, blocks)) { Ok(_) => { self.parent_queue.push(parent_lookup); } @@ -595,7 +584,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, result: BatchProcessResult, - cx: &mut SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let parent_lookup = if let Some(pos) = self .parent_queue @@ -604,12 +593,6 @@ impl BlockLookups { { self.parent_queue.remove(pos) } else { - #[cfg(debug_assertions)] - panic!( - "Chain process response for a parent lookup request that was not found. Chain_hash: {}", - chain_hash - ); - #[cfg(not(debug_assertions))] return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash); }; @@ -645,25 +628,34 @@ impl BlockLookups { block: Arc>, duration: Duration, process_type: BlockProcessType, + cx: &mut SyncNetworkContext, ) -> Result<(), ()> { - trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type); - let event = WorkEvent::rpc_beacon_block(block, duration, process_type); - if let Err(e) = self.beacon_processor_send.try_send(event) { - error!( - self.log, - "Failed to send sync block to processor"; - "error" => ?e - ); - return Err(()); + match cx.processor_channel_if_enabled() { + Some(beacon_processor_send) => { + trace!(self.log, "Sending block for processing"; "block" => %block.canonical_root(), "process" => ?process_type); + let event = WorkEvent::rpc_beacon_block(block, duration, process_type); + if let Err(e) = beacon_processor_send.try_send(event) { + error!( + self.log, + "Failed to send sync block to processor"; + "error" => ?e + ); + Err(()) + } else { + Ok(()) + } + } + None => { + trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block.canonical_root()); + Err(()) + } } - - Ok(()) } fn request_parent( &mut self, - mut parent_lookup: ParentLookup, - cx: &mut SyncNetworkContext, + mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, ) { match parent_lookup.request_parent(cx) { Err(e) => { @@ -710,4 +702,14 @@ impl BlockLookups { self.parent_queue.len() as i64, ); } + + /// Drops all the single block requests and returns how many requests were dropped. + pub fn drop_single_block_requests(&mut self) -> usize { + self.single_block_lookups.drain().len() + } + + /// Drops all the parent chain requests and returns how many requests were dropped. + pub fn drop_parent_chain_requests(&mut self) -> usize { + self.parent_queue.drain(..).len() + } } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index bf5a1b259..295d9cc94 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,6 +1,7 @@ +use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; use std::sync::Arc; -use store::{EthSpec, Hash256, SignedBeaconBlock}; +use store::{Hash256, SignedBeaconBlock}; use strum::IntoStaticStr; use crate::sync::{ @@ -18,11 +19,11 @@ pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; /// Maintains a sequential list of parents to lookup and the lookup's current state. -pub(crate) struct ParentLookup { +pub(crate) struct ParentLookup { /// The root of the block triggering this parent request. chain_hash: Hash256, /// The blocks that have currently been downloaded. - downloaded_blocks: Vec>>, + downloaded_blocks: Vec>>, /// Request of the last parent. current_parent_request: SingleBlockRequest, /// Id of the last parent request. @@ -50,14 +51,14 @@ pub enum RequestError { NoPeers, } -impl ParentLookup { - pub fn contains_block(&self, block: &SignedBeaconBlock) -> bool { +impl ParentLookup { + pub fn contains_block(&self, block: &SignedBeaconBlock) -> bool { self.downloaded_blocks .iter() .any(|d_block| d_block.as_ref() == block) } - pub fn new(block: Arc>, peer_id: PeerId) -> Self { + pub fn new(block: Arc>, peer_id: PeerId) -> Self { let current_parent_request = SingleBlockRequest::new(block.parent_root(), peer_id); Self { @@ -92,7 +93,7 @@ impl ParentLookup { self.current_parent_request.check_peer_disconnected(peer_id) } - pub fn add_block(&mut self, block: Arc>) { + pub fn add_block(&mut self, block: Arc>) { let next_parent = block.parent_root(); self.downloaded_blocks.push(block); self.current_parent_request.hash = next_parent; @@ -119,7 +120,7 @@ impl ParentLookup { self.current_parent_request_id = None; } - pub fn chain_blocks(&mut self) -> Vec>> { + pub fn chain_blocks(&mut self) -> Vec>> { std::mem::take(&mut self.downloaded_blocks) } @@ -127,9 +128,9 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_block( &mut self, - block: Option>>, + block: Option>>, failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result>>, VerifyError> { + ) -> Result>>, VerifyError> { let block = self.current_parent_request.verify_block(block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should @@ -189,7 +190,7 @@ impl From for RequestError { } } -impl slog::KV for ParentLookup { +impl slog::KV for ParentLookup { fn serialize( &self, record: &slog::Record, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 2f2720fd1..ead15e23a 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -12,6 +12,7 @@ use lighthouse_network::{NetworkGlobals, Request}; use slog::{Drain, Level}; use slot_clock::SystemTimeSlotClock; use store::MemoryStore; +use tokio::sync::mpsc; use types::test_utils::{SeedableRng, TestRandom, XorShiftRng}; use types::MinimalEthSpec as E; @@ -26,7 +27,7 @@ struct TestRig { const D: Duration = Duration::new(0, 0); impl TestRig { - fn test_setup(log_level: Option) -> (BlockLookups, SyncNetworkContext, Self) { + fn test_setup(log_level: Option) -> (BlockLookups, SyncNetworkContext, Self) { let log = { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); @@ -47,15 +48,13 @@ impl TestRig { network_rx, rng, }; - let bl = BlockLookups::new( - beacon_processor_tx, - log.new(slog::o!("component" => "block_lookups")), - ); + let bl = BlockLookups::new(log.new(slog::o!("component" => "block_lookups"))); let cx = { let globals = Arc::new(NetworkGlobals::new_test_globals(&log)); SyncNetworkContext::new( network_tx, globals, + beacon_processor_tx, log.new(slog::o!("component" => "network_context")), ) }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 64755300c..623034797 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,7 +41,8 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; +use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; @@ -165,7 +166,7 @@ pub struct SyncManager { input_channel: mpsc::UnboundedReceiver>, /// A network context to contact the network service. - network: SyncNetworkContext, + network: SyncNetworkContext, /// The object handling long-range batch load-balanced syncing. range_sync: RangeSync, @@ -202,19 +203,15 @@ pub fn spawn( chain: beacon_chain.clone(), network_globals: network_globals.clone(), input_channel: sync_recv, - network: SyncNetworkContext::new(network_send, network_globals.clone(), log.clone()), - range_sync: RangeSync::new( - beacon_chain.clone(), - beacon_processor_send.clone(), + network: SyncNetworkContext::new( + network_send, + network_globals.clone(), + beacon_processor_send, log.clone(), ), - backfill_sync: BackFillSync::new( - beacon_chain, - network_globals, - beacon_processor_send.clone(), - log.clone(), - ), - block_lookups: BlockLookups::new(beacon_processor_send, log.clone()), + range_sync: RangeSync::new(beacon_chain.clone(), log.clone()), + backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()), + block_lookups: BlockLookups::new(log.clone()), log: log.clone(), }; @@ -468,100 +465,178 @@ impl SyncManager { /// The main driving future for the sync manager. async fn main(&mut self) { + let check_ee = self.chain.execution_layer.is_some(); + let mut check_ee_stream = { + // some magic to have an instance implementing stream even if there is no execution layer + let ee_responsiveness_watch: futures::future::OptionFuture<_> = self + .chain + .execution_layer + .as_ref() + .map(|el| el.get_responsiveness_watch()) + .into(); + futures::stream::iter(ee_responsiveness_watch.await).flatten() + }; + // process any inbound messages loop { - if let Some(sync_message) = self.input_channel.recv().await { - match sync_message { - SyncMessage::AddPeer(peer_id, info) => { - self.add_peer(peer_id, info); - } - SyncMessage::RpcBlock { - request_id, - peer_id, - beacon_block, - seen_timestamp, - } => { - self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); - } - SyncMessage::UnknownBlock(peer_id, block) => { - // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore - if !self.network_globals.sync_state.read().is_synced() { - let head_slot = self.chain.canonical_head.cached_head().head_slot(); - let unknown_block_slot = block.slot(); + tokio::select! { + Some(sync_message) = self.input_channel.recv() => { + self.handle_message(sync_message); + }, + Some(engine_state) = check_ee_stream.next(), if check_ee => { + self.handle_new_execution_engine_state(engine_state); + } + } + } + } - // if the block is far in the future, ignore it. If its within the slot tolerance of - // our current head, regardless of the syncing state, fetch it. - if (head_slot >= unknown_block_slot - && head_slot.sub(unknown_block_slot).as_usize() - > SLOT_IMPORT_TOLERANCE) - || (head_slot < unknown_block_slot - && unknown_block_slot.sub(head_slot).as_usize() - > SLOT_IMPORT_TOLERANCE) - { - continue; - } - } - if self.network_globals.peers.read().is_connected(&peer_id) { - self.block_lookups - .search_parent(block, peer_id, &mut self.network); - } + fn handle_message(&mut self, sync_message: SyncMessage) { + match sync_message { + SyncMessage::AddPeer(peer_id, info) => { + self.add_peer(peer_id, info); + } + SyncMessage::RpcBlock { + request_id, + peer_id, + beacon_block, + seen_timestamp, + } => { + self.rpc_block_received(request_id, peer_id, beacon_block, seen_timestamp); + } + SyncMessage::UnknownBlock(peer_id, block) => { + // If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore + if !self.network_globals.sync_state.read().is_synced() { + let head_slot = self.chain.canonical_head.cached_head().head_slot(); + let unknown_block_slot = block.slot(); + + // if the block is far in the future, ignore it. If its within the slot tolerance of + // our current head, regardless of the syncing state, fetch it. + if (head_slot >= unknown_block_slot + && head_slot.sub(unknown_block_slot).as_usize() > SLOT_IMPORT_TOLERANCE) + || (head_slot < unknown_block_slot + && unknown_block_slot.sub(head_slot).as_usize() > SLOT_IMPORT_TOLERANCE) + { + return; } - SyncMessage::UnknownBlockHash(peer_id, block_hash) => { - // If we are not synced, ignore this block. - if self.network_globals.sync_state.read().is_synced() - && self.network_globals.peers.read().is_connected(&peer_id) - { - self.block_lookups - .search_block(block_hash, peer_id, &mut self.network); - } - } - SyncMessage::Disconnect(peer_id) => { - self.peer_disconnect(&peer_id); - } - SyncMessage::RpcError { - peer_id, - request_id, - } => self.inject_error(peer_id, request_id), - SyncMessage::BlockProcessed { - process_type, + } + if self.network_globals.peers.read().is_connected(&peer_id) + && self.network.is_execution_engine_online() + { + self.block_lookups + .search_parent(block, peer_id, &mut self.network); + } + } + SyncMessage::UnknownBlockHash(peer_id, block_hash) => { + // If we are not synced, ignore this block. + if self.network_globals.sync_state.read().is_synced() + && self.network_globals.peers.read().is_connected(&peer_id) + && self.network.is_execution_engine_online() + { + self.block_lookups + .search_block(block_hash, peer_id, &mut self.network); + } + } + SyncMessage::Disconnect(peer_id) => { + self.peer_disconnect(&peer_id); + } + SyncMessage::RpcError { + peer_id, + request_id, + } => self.inject_error(peer_id, request_id), + SyncMessage::BlockProcessed { + process_type, + result, + } => match process_type { + BlockProcessType::SingleBlock { id } => { + self.block_lookups + .single_block_processed(id, result, &mut self.network) + } + BlockProcessType::ParentLookup { chain_hash } => self + .block_lookups + .parent_block_processed(chain_hash, result, &mut self.network), + }, + SyncMessage::BatchProcessed { sync_type, result } => match sync_type { + ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { + self.range_sync.handle_block_process_result( + &mut self.network, + chain_id, + epoch, result, - } => match process_type { - BlockProcessType::SingleBlock { id } => self - .block_lookups - .single_block_processed(id, result, &mut self.network), - BlockProcessType::ParentLookup { chain_hash } => self - .block_lookups - .parent_block_processed(chain_hash, result, &mut self.network), - }, - SyncMessage::BatchProcessed { sync_type, result } => match sync_type { - ChainSegmentProcessId::RangeBatchId(chain_id, epoch, _) => { - self.range_sync.handle_block_process_result( - &mut self.network, - chain_id, - epoch, - result, - ); + ); + self.update_sync_state(); + } + ChainSegmentProcessId::BackSyncBatchId(epoch) => { + match self.backfill_sync.on_batch_process_result( + &mut self.network, + epoch, + &result, + ) { + Ok(ProcessResult::Successful) => {} + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Err(error) => { + error!(self.log, "Backfill sync failed"; "error" => ?error); + // Update the global status self.update_sync_state(); } - ChainSegmentProcessId::BackSyncBatchId(epoch) => { - match self.backfill_sync.on_batch_process_result( - &mut self.network, - epoch, - &result, - ) { - Ok(ProcessResult::Successful) => {} - Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), - Err(error) => { - error!(self.log, "Backfill sync failed"; "error" => ?error); - // Update the global status - self.update_sync_state(); - } - } - } - ChainSegmentProcessId::ParentLookup(chain_hash) => self - .block_lookups - .parent_chain_processed(chain_hash, result, &mut self.network), - }, + } + } + ChainSegmentProcessId::ParentLookup(chain_hash) => self + .block_lookups + .parent_chain_processed(chain_hash, result, &mut self.network), + }, + } + } + + fn handle_new_execution_engine_state(&mut self, engine_state: EngineState) { + self.network.update_execution_engine_state(engine_state); + + match engine_state { + EngineState::Online => { + // Resume sync components. + + // - Block lookups: + // We start searching for blocks again. This is done by updating the stored ee online + // state. No further action required. + + // - Parent lookups: + // We start searching for parents again. This is done by updating the stored ee + // online state. No further action required. + + // - Range: + // Actively resume. + self.range_sync.resume(&mut self.network); + + // - Backfill: + // Not affected by ee states, nothing to do. + } + + EngineState::Offline => { + // Pause sync components. + + // - Block lookups: + // Disabled while in this state. We drop current requests and don't search for new + // blocks. + let dropped_single_blocks_requests = + self.block_lookups.drop_single_block_requests(); + + // - Parent lookups: + // Disabled while in this state. We drop current requests and don't search for new + // blocks. + let dropped_parent_chain_requests = self.block_lookups.drop_parent_chain_requests(); + + // - Range: + // We still send found peers to range so that it can keep track of potential chains + // with respect to our current peers. Range will stop processing batches in the + // meantime. No further action from the manager is required for this. + + // - Backfill: Not affected by ee states, nothing to do. + + // Some logs. + if dropped_single_blocks_requests > 0 || dropped_parent_chain_requests > 0 { + debug!(self.log, "Execution engine not online, dropping active requests."; + "dropped_single_blocks_requests" => dropped_single_blocks_requests, + "dropped_parent_chain_requests" => dropped_parent_chain_requests, + ); } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ffbd1a64d..45ade7034 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -3,24 +3,25 @@ use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ChainId}; +use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; +use beacon_chain::{BeaconChainTypes, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; -use types::EthSpec; /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. -pub struct SyncNetworkContext { +pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. - network_send: mpsc::UnboundedSender>, + network_send: mpsc::UnboundedSender>, /// Access to the network global vars. - network_globals: Arc>, + network_globals: Arc>, /// A sequential ID for all RPC requests. request_id: Id, @@ -28,24 +29,35 @@ pub struct SyncNetworkContext { /// BlocksByRange requests made by the range syncing algorithm. range_requests: FnvHashMap, + /// BlocksByRange requests made by backfill syncing. backfill_requests: FnvHashMap, + /// Whether the ee is online. If it's not, we don't allow access to the + /// `beacon_processor_send`. + execution_engine_state: EngineState, + + /// Channel to send work to the beacon processor. + beacon_processor_send: mpsc::Sender>, + /// Logger for the `SyncNetworkContext`. log: slog::Logger, } -impl SyncNetworkContext { +impl SyncNetworkContext { pub fn new( - network_send: mpsc::UnboundedSender>, - network_globals: Arc>, + network_send: mpsc::UnboundedSender>, + network_globals: Arc>, + beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { Self { network_send, + execution_engine_state: EngineState::Online, // always assume `Online` at the start network_globals, request_id: 1, range_requests: FnvHashMap::default(), backfill_requests: FnvHashMap::default(), + beacon_processor_send, log, } } @@ -211,6 +223,16 @@ impl SyncNetworkContext { Ok(id) } + pub fn is_execution_engine_online(&self) -> bool { + self.execution_engine_state == EngineState::Online + } + + pub fn update_execution_engine_state(&mut self, engine_state: EngineState) { + debug!(self.log, "Sync's view on execution engine state updated"; + "past_state" => ?self.execution_engine_state, "new_state" => ?engine_state); + self.execution_engine_state = engine_state; + } + /// Terminates the connection with the peer and bans them. pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.network_send @@ -249,13 +271,22 @@ impl SyncNetworkContext { } /// Sends an arbitrary network message. - fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { + fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { self.network_send.send(msg).map_err(|_| { debug!(self.log, "Could not send message to the network service"); "Network channel send Failed" }) } + pub fn processor_channel_if_enabled(&self) -> Option<&mpsc::Sender>> { + self.is_execution_engine_online() + .then_some(&self.beacon_processor_send) + } + + pub fn processor_channel(&self) -> &mpsc::Sender> { + &self.beacon_processor_send + } + fn next_id(&mut self) -> Id { let id = self.request_id; self.request_id += 1; diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a54105f5c..4226b600f 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -11,7 +11,6 @@ use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use tokio::sync::mpsc::Sender; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -102,9 +101,6 @@ pub struct SyncingChain { /// Batches validated by this chain. validated_batches: u64, - /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: Sender>, - is_finalized_segment: bool, /// The chain's log. @@ -132,7 +128,6 @@ impl SyncingChain { target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, - beacon_processor_send: Sender>, is_finalized_segment: bool, log: &slog::Logger, ) -> Self { @@ -155,7 +150,6 @@ impl SyncingChain { state: ChainSyncingState::Stopped, current_processing_batch: None, validated_batches: 0, - beacon_processor_send, is_finalized_segment, log: log.new(o!("chain" => id)), } @@ -186,7 +180,7 @@ impl SyncingChain { pub fn remove_peer( &mut self, peer_id: &PeerId, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) -> ProcessingResult { if let Some(batch_ids) = self.peers.remove(peer_id) { // fail the batches @@ -227,7 +221,7 @@ impl SyncingChain { /// If the block correctly completes the batch it will be processed if possible. pub fn on_block_response( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, @@ -296,7 +290,7 @@ impl SyncingChain { /// The batch must exist and be ready for processing fn process_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { // Only process batches if this chain is Syncing, and only one at a time @@ -304,6 +298,11 @@ impl SyncingChain { return Ok(KeepChain); } + let beacon_processor_send = match network.processor_channel_if_enabled() { + Some(channel) => channel, + None => return Ok(KeepChain), + }; + let batch = match self.batches.get_mut(&batch_id) { Some(batch) => batch, None => { @@ -327,9 +326,8 @@ impl SyncingChain { let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); self.current_processing_batch = Some(batch_id); - if let Err(e) = self - .beacon_processor_send - .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) + if let Err(e) = + beacon_processor_send.try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) { crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); @@ -346,7 +344,7 @@ impl SyncingChain { /// Processes the next ready batch, prioritizing optimistic batches over the processing target. fn process_completed_batches( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) -> ProcessingResult { // Only process batches if this chain is Syncing and only process one batch at a time if self.state != ChainSyncingState::Syncing || self.current_processing_batch.is_some() { @@ -447,7 +445,7 @@ impl SyncingChain { /// of the batch processor. pub fn on_batch_process_result( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, result: &BatchProcessResult, ) -> ProcessingResult { @@ -580,7 +578,7 @@ impl SyncingChain { fn reject_optimistic_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, redownload: bool, reason: &str, ) -> ProcessingResult { @@ -611,11 +609,7 @@ impl SyncingChain { /// /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. - fn advance_chain( - &mut self, - network: &mut SyncNetworkContext, - validating_epoch: Epoch, - ) { + fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch <= self.start_epoch { return; @@ -719,7 +713,7 @@ impl SyncingChain { /// intended and can result in downvoting a peer. fn handle_invalid_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { // The current batch could not be processed, indicating either the current or previous @@ -778,7 +772,7 @@ impl SyncingChain { /// This could be new chain, or an old chain that is being resumed. pub fn start_syncing( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, local_finalized_epoch: Epoch, optimistic_start_epoch: Epoch, ) -> ProcessingResult { @@ -816,7 +810,7 @@ impl SyncingChain { /// If the chain is active, this starts requesting batches from this peer. pub fn add_peer( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, peer_id: PeerId, ) -> ProcessingResult { // add the peer without overwriting its active requests @@ -833,7 +827,7 @@ impl SyncingChain { /// If the batch exists it is re-requested. pub fn inject_error( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, peer_id: &PeerId, request_id: Id, @@ -865,7 +859,7 @@ impl SyncingChain { /// Sends and registers the request of a batch awaiting download. pub fn retry_batch_download( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, ) -> ProcessingResult { let batch = match self.batches.get_mut(&batch_id) { @@ -898,7 +892,7 @@ impl SyncingChain { /// Requests the batch assigned to the given id from a given peer. pub fn send_batch( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, batch_id: BatchId, peer: PeerId, ) -> ProcessingResult { @@ -967,12 +961,21 @@ impl SyncingChain { } } + /// Kickstarts the chain by sending for processing batches that are ready and requesting more + /// batches if needed. + pub fn resume( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + // Request more batches if needed. + self.request_batches(network)?; + // If there is any batch ready for processing, send it. + self.process_completed_batches(network) + } + /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. - fn request_batches( - &mut self, - network: &mut SyncNetworkContext, - ) -> ProcessingResult { + fn request_batches(&mut self, network: &mut SyncNetworkContext) -> ProcessingResult { if !matches!(self.state, ChainSyncingState::Syncing) { return Ok(KeepChain); } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index e76adff3a..37a3f13e7 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -6,7 +6,6 @@ use super::block_storage::BlockStorage; use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; -use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::metrics; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::BeaconChainTypes; @@ -18,7 +17,6 @@ use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::mpsc; use types::EthSpec; use types::{Epoch, Hash256, Slot}; @@ -193,10 +191,9 @@ impl ChainCollection { /// do so. pub fn update( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, local: &SyncInfo, awaiting_head_peers: &mut HashMap, - beacon_processor_send: &mpsc::Sender>, ) { // Remove any outdated finalized/head chains self.purge_outdated_chains(local, awaiting_head_peers); @@ -212,7 +209,6 @@ impl ChainCollection { local.finalized_epoch, local_head_epoch, awaiting_head_peers, - beacon_processor_send, ); } } @@ -257,7 +253,7 @@ impl ChainCollection { /// or not. fn update_finalized_chains( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, local_epoch: Epoch, local_head_epoch: Epoch, ) { @@ -326,11 +322,10 @@ impl ChainCollection { /// Start syncing any head chains if required. fn update_head_chains( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, local_epoch: Epoch, local_head_epoch: Epoch, awaiting_head_peers: &mut HashMap, - beacon_processor_send: &mpsc::Sender>, ) { // Include the awaiting head peers for (peer_id, peer_sync_info) in awaiting_head_peers.drain() { @@ -341,7 +336,6 @@ impl ChainCollection { peer_sync_info.head_slot, peer_id, RangeSyncType::Head, - beacon_processor_send, network, ); } @@ -468,8 +462,7 @@ impl ChainCollection { target_head_slot: Slot, peer: PeerId, sync_type: RangeSyncType, - beacon_processor_send: &mpsc::Sender>, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, ) { let id = SyncingChain::::id(&target_head_root, &target_head_slot); let (collection, is_finalized) = if let RangeSyncType::Finalized = sync_type { @@ -500,7 +493,6 @@ impl ChainCollection { target_head_slot, target_head_root, peer, - beacon_processor_send.clone(), is_finalized, &self.log, ); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 4b29d3129..253145438 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,7 +43,6 @@ use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; -use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::status::ToStatusMessage; use crate::sync::manager::Id; use crate::sync::network_context::SyncNetworkContext; @@ -56,7 +55,6 @@ use lru_cache::LRUTimeCache; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::mpsc; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// For how long we store failed finalized chains to prevent retries. @@ -76,8 +74,6 @@ pub struct RangeSync> { chains: ChainCollection, /// Chains that have failed and are stored to prevent being retried. failed_chains: LRUTimeCache, - /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. - beacon_processor_send: mpsc::Sender>, /// The syncing logger. log: slog::Logger, } @@ -87,11 +83,7 @@ where C: BlockStorage + ToStatusMessage, T: BeaconChainTypes, { - pub fn new( - beacon_chain: Arc, - beacon_processor_send: mpsc::Sender>, - log: slog::Logger, - ) -> Self { + pub fn new(beacon_chain: Arc, log: slog::Logger) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), chains: ChainCollection::new(beacon_chain, log.clone()), @@ -99,7 +91,6 @@ where FAILED_CHAINS_EXPIRY_SECONDS, )), awaiting_head_peers: HashMap::new(), - beacon_processor_send, log, } } @@ -117,7 +108,7 @@ where /// prioritised by peer-pool size. pub fn add_peer( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, local_info: SyncInfo, peer_id: PeerId, remote_info: SyncInfo, @@ -159,16 +150,11 @@ where remote_finalized_slot, peer_id, RangeSyncType::Finalized, - &self.beacon_processor_send, network, ); - self.chains.update( - network, - &local_info, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, - ); + self.chains + .update(network, &local_info, &mut self.awaiting_head_peers); } RangeSyncType::Head => { // This peer requires a head chain sync @@ -197,15 +183,10 @@ where remote_info.head_slot, peer_id, RangeSyncType::Head, - &self.beacon_processor_send, network, ); - self.chains.update( - network, - &local_info, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, - ); + self.chains + .update(network, &local_info, &mut self.awaiting_head_peers); } } } @@ -216,7 +197,7 @@ where /// This request could complete a chain or simply add to its progress. pub fn blocks_by_range_response( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, peer_id: PeerId, chain_id: ChainId, batch_id: BatchId, @@ -246,7 +227,7 @@ where pub fn handle_block_process_result( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, chain_id: ChainId, batch_id: Epoch, result: BatchProcessResult, @@ -276,11 +257,7 @@ where /// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A /// disconnected peer could remove a chain - pub fn peer_disconnect( - &mut self, - network: &mut SyncNetworkContext, - peer_id: &PeerId, - ) { + pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { // if the peer is in the awaiting head mapping, remove it self.awaiting_head_peers.remove(peer_id); @@ -292,7 +269,7 @@ where /// which pool the peer is in. The chain may also have a batch or batches awaiting /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. - fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { + fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { for (removed_chain, sync_type, remove_reason) in self .chains .call_all(|chain| chain.remove_peer(peer_id, network)) @@ -304,8 +281,6 @@ where network, "peer removed", ); - - // update the state of the collection } } @@ -315,7 +290,7 @@ where /// been too many failed attempts for the batch, remove the chain. pub fn inject_error( &mut self, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, peer_id: PeerId, batch_id: BatchId, chain_id: ChainId, @@ -347,7 +322,7 @@ where chain: SyncingChain, sync_type: RangeSyncType, remove_reason: RemoveChain, - network: &mut SyncNetworkContext, + network: &mut SyncNetworkContext, op: &'static str, ) { if remove_reason.is_critical() { @@ -374,12 +349,23 @@ where }; // update the state of the collection - self.chains.update( - network, - &local, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, - ); + self.chains + .update(network, &local, &mut self.awaiting_head_peers); + } + + /// Kickstarts sync. + pub fn resume(&mut self, network: &mut SyncNetworkContext) { + for (removed_chain, sync_type, remove_reason) in + self.chains.call_all(|chain| chain.resume(network)) + { + self.on_chain_removed( + removed_chain, + sync_type, + remove_reason, + network, + "chain resumed", + ); + } } } @@ -389,13 +375,16 @@ mod tests { use crate::NetworkMessage; use super::*; + use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::parking_lot::RwLock; + use beacon_chain::EngineState; use lighthouse_network::rpc::BlocksByRangeRequest; use lighthouse_network::Request; use lighthouse_network::{rpc::StatusMessage, NetworkGlobals}; use slog::{o, Drain}; + use tokio::sync::mpsc; use slot_clock::SystemTimeSlotClock; use std::collections::HashSet; @@ -470,7 +459,7 @@ mod tests { /// To set up different scenarios where sync is told about known/unkown blocks. chain: Arc, /// Needed by range to handle communication with the network. - cx: SyncNetworkContext, + cx: SyncNetworkContext, /// To check what the network receives from Range. network_rx: mpsc::UnboundedReceiver>, /// To modify what the network declares about various global variables, in particular about @@ -516,12 +505,13 @@ mod tests { } /// Reads an BlocksByRange request to a given peer from the network receiver channel. + #[track_caller] fn grab_request(&mut self, expected_peer: &PeerId) -> (RequestId, BlocksByRangeRequest) { - if let Some(NetworkMessage::SendRequest { + if let Ok(NetworkMessage::SendRequest { peer_id, request: Request::BlocksByRange(request), request_id, - }) = self.network_rx.blocking_recv() + }) = self.network_rx.try_recv() { assert_eq!(&peer_id, expected_peer); (request_id, request) @@ -575,6 +565,29 @@ mod tests { let peer_id = PeerId::random(); (peer_id, local_info, remote_info) } + + #[track_caller] + fn expect_empty_processor(&mut self) { + match self.beacon_processor_rx.try_recv() { + Ok(work) => { + panic!("Expected empty processor. Instead got {}", work.work_type()); + } + Err(e) => match e { + mpsc::error::TryRecvError::Empty => {} + mpsc::error::TryRecvError::Disconnected => unreachable!("bad coded test?"), + }, + } + } + + #[track_caller] + fn expect_chain_segment(&mut self) { + match self.beacon_processor_rx.try_recv() { + Ok(work) => { + assert_eq!(work.work_type(), crate::beacon_processor::CHAIN_SEGMENT); + } + other => panic!("Expected chain segment process, found {:?}", other), + } + } } fn range(log_enabled: bool) -> (TestRig, RangeSync) { @@ -583,7 +596,6 @@ mod tests { let (beacon_processor_tx, beacon_processor_rx) = mpsc::channel(10); let range_sync = RangeSync::::new( chain.clone(), - beacon_processor_tx, log.new(o!("component" => "range")), ); let (network_tx, network_rx) = mpsc::unbounded_channel(); @@ -591,6 +603,7 @@ mod tests { let cx = SyncNetworkContext::new( network_tx, globals.clone(), + beacon_processor_tx, log.new(o!("component" => "network_context")), ); let test_rig = TestRig { @@ -661,4 +674,53 @@ mod tests { let (finalized_peer, local_info, remote_info) = rig.finalized_peer(); range.add_peer(&mut rig.cx, local_info, finalized_peer, remote_info); } + + #[test] + fn pause_and_resume_on_ee_offline() { + let (mut rig, mut range) = range(true); + + // add some peers + let (peer1, local_info, head_info) = rig.head_peer(); + range.add_peer(&mut rig.cx, local_info, peer1, head_info); + let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { + (rig.cx.range_sync_response(id, true).unwrap(), id) + } + other => panic!("unexpected request {:?}", other), + }; + + // make the ee offline + rig.cx.update_execution_engine_state(EngineState::Offline); + + // send the response to the request + range.blocks_by_range_response(&mut rig.cx, peer1, chain1, batch1, id1, None); + + // the beacon processor shouldn't have received any work + rig.expect_empty_processor(); + + // while the ee is offline, more peers might arrive. Add a new finalized peer. + let (peer2, local_info, finalized_info) = rig.finalized_peer(); + range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); + let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { + (rig.cx.range_sync_response(id, true).unwrap(), id) + } + other => panic!("unexpected request {:?}", other), + }; + + // send the response to the request + range.blocks_by_range_response(&mut rig.cx, peer2, chain2, batch2, id2, None); + + // the beacon processor shouldn't have received any work + rig.expect_empty_processor(); + + // make the beacon processor available again. + rig.cx.update_execution_engine_state(EngineState::Online); + + // now resume range, we should have two processing requests in the beacon processor. + range.resume(&mut rig.cx); + + rig.expect_chain_segment(); + rig.expect_chain_segment(); + } }