From 6c0c050fbb6c256dcd40dfa30b55ea37ea7b8ce0 Mon Sep 17 00:00:00 2001 From: divma Date: Sun, 1 Nov 2020 23:37:39 +0000 Subject: [PATCH] Tweak head syncing (#1845) ## Issue Addressed Fixes head syncing ## Proposed Changes - Get back to statusing peers after removing chain segments and making the peer manager deal with status according to the Sync status, preventing an old known deadlock - Also a bug where a chain would get removed if the optimistic batch succeeds being empty ## Additional Info Tested on Medalla and looking good --- .../eth2_libp2p/src/peer_manager/mod.rs | 22 +++-- .../eth2_libp2p/src/types/sync_state.rs | 6 ++ beacon_node/http_api/src/lib.rs | 14 +-- beacon_node/network/src/sync/manager.rs | 5 +- .../network/src/sync/range_sync/chain.rs | 32 ++++++- .../src/sync/range_sync/chain_collection.rs | 25 ++++-- .../network/src/sync/range_sync/range.rs | 90 ++++++++++++------- 7 files changed, 134 insertions(+), 60 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 31104204b..182f7a204 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -3,6 +3,7 @@ pub use self::peerdb::*; use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS}; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; +use crate::types::SyncState; use crate::{error, metrics}; use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery}; use futures::prelude::*; @@ -844,16 +845,19 @@ impl Stream for PeerManager { } } - loop { - match self.status_peers.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(peer_id))) => { - self.status_peers.insert(peer_id.clone()); - self.events.push(PeerManagerEvent::Status(peer_id)) + if !matches!(self.network_globals.sync_state(), SyncState::SyncingFinalized{..}|SyncState::SyncingHead{..}) + { + loop { + match self.status_peers.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(peer_id))) => { + self.status_peers.insert(peer_id.clone()); + self.events.push(PeerManagerEvent::Status(peer_id)) + } + Poll::Ready(Some(Err(e))) => { + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) + } + Poll::Ready(None) | Poll::Pending => break, } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) - } - Poll::Ready(None) | Poll::Pending => break, } } diff --git a/beacon_node/eth2_libp2p/src/types/sync_state.rs b/beacon_node/eth2_libp2p/src/types/sync_state.rs index c217ca1ff..628058abd 100644 --- a/beacon_node/eth2_libp2p/src/types/sync_state.rs +++ b/beacon_node/eth2_libp2p/src/types/sync_state.rs @@ -10,6 +10,9 @@ pub enum SyncState { /// The node is performing a long-range (batch) sync over one or many head chains. /// In this state parent lookups are disabled. SyncingHead { start_slot: Slot, target_slot: Slot }, + /// The node has identified the need for is sync operations and is transitioning to a syncing + /// state. + SyncTransition, /// The node is up to date with all known peers and is connected to at least one /// fully synced peer. In this state, parent lookups are enabled. Synced, @@ -25,6 +28,7 @@ impl PartialEq for SyncState { (SyncState::SyncingHead { .. }, SyncState::SyncingHead { .. }) => true, (SyncState::Synced, SyncState::Synced) => true, (SyncState::Stalled, SyncState::Stalled) => true, + (SyncState::SyncTransition, SyncState::SyncTransition) => true, _ => false, } } @@ -36,6 +40,7 @@ impl SyncState { match self { SyncState::SyncingFinalized { .. } => true, SyncState::SyncingHead { .. } => true, + SyncState::SyncTransition => true, SyncState::Synced => false, SyncState::Stalled => false, } @@ -54,6 +59,7 @@ impl std::fmt::Display for SyncState { SyncState::SyncingHead { .. } => write!(f, "Syncing Head Chain"), SyncState::Synced { .. } => write!(f, "Synced"), SyncState::Stalled { .. } => write!(f, "Stalled"), + SyncState::SyncTransition => write!(f, "Searching syncing peers"), } } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 67f52a455..cb74e66f4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -330,7 +330,7 @@ pub fn serve( ))) } } - SyncState::SyncingHead { .. } => Ok(()), + SyncState::SyncingHead { .. } | SyncState::SyncTransition => Ok(()), SyncState::Synced => Ok(()), SyncState::Stalled => Err(warp_utils::reject::not_synced( "sync is stalled".to_string(), @@ -1231,12 +1231,12 @@ pub fn serve( .and(network_globals.clone()) .and_then(|network_globals: Arc>| { blocking_task(move || match *network_globals.sync_state.read() { - SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } => { - Ok(warp::reply::with_status( - warp::reply(), - warp::http::StatusCode::PARTIAL_CONTENT, - )) - } + SyncState::SyncingFinalized { .. } + | SyncState::SyncingHead { .. } + | SyncState::SyncTransition => Ok(warp::reply::with_status( + warp::reply(), + warp::http::StatusCode::PARTIAL_CONTENT, + )), SyncState::Synced => Ok(warp::reply::with_status( warp::reply(), warp::http::StatusCode::OK, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 6b5fdffda..18d9f9aea 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -691,10 +691,7 @@ impl SyncManager { { SyncState::Synced } else if peers.advanced_peers().next().is_some() { - SyncState::SyncingHead { - start_slot: head, - target_slot: current_slot, - } + SyncState::SyncTransition } else if peers.synced_peers().next().is_none() { SyncState::Stalled } else { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 295991365..d850c3d1c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -91,6 +91,9 @@ pub struct SyncingChain { /// The current processing batch, if any. current_processing_batch: Option, + /// Batches validated by this chain. + validated_batches: u8, + /// A multi-threaded, non-blocking processor for applying messages to the beacon chain. beacon_processor_send: Sender>, @@ -140,6 +143,7 @@ impl SyncingChain { attempted_optimistic_starts: HashSet::default(), state: ChainSyncingState::Stopped, current_processing_batch: None, + validated_batches: 0, beacon_processor_send, log: log.new(o!("chain" => id)), } @@ -155,6 +159,16 @@ impl SyncingChain { self.id } + /// Peers currently syncing this chain. + pub fn peers<'a>(&'a self) -> impl Iterator + 'a { + self.peers.keys().cloned() + } + + /// Progress in epochs made by the chain + pub fn validated_epochs(&self) -> u64 { + self.validated_batches as u64 * EPOCHS_PER_BATCH + } + /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer( @@ -447,6 +461,7 @@ impl SyncingChain { self.advance_chain(network, batch_id); // we register so that on chain switching we don't try it again self.attempted_optimistic_starts.insert(batch_id); + self.processing_target += EPOCHS_PER_BATCH; } else if let Some(epoch) = self.optimistic_start { // check if this batch corresponds to an optimistic batch. In this case, we // reject it as an optimistic candidate since the batch was empty @@ -456,11 +471,11 @@ impl SyncingChain { false, /* do not re-request */ "batch was empty", )?; + } else { + self.processing_target += EPOCHS_PER_BATCH; } } - self.processing_target += EPOCHS_PER_BATCH; - // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { // chain is completed @@ -574,6 +589,7 @@ impl SyncingChain { let removed_batches = std::mem::replace(&mut self.batches, remaining_batches); for (id, batch) in removed_batches.into_iter() { + self.validated_batches = self.validated_batches.saturating_add(1); // only for batches awaiting validation can we be sure the last attempt is // right, and thus, that any different attempt is wrong match batch.state() { @@ -1024,6 +1040,7 @@ impl slog::KV for SyncingChain { )?; serializer.emit_usize("batches", self.batches.len())?; serializer.emit_usize("peers", self.peers.len())?; + serializer.emit_u8("validated_batches", self.validated_batches)?; slog::Result::Ok(()) } } @@ -1037,7 +1054,7 @@ impl From for RemoveChain { impl std::fmt::Debug for RemoveChain { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // needed to avoid Debuggins Strings + // needed to avoid Debugging Strings match self { RemoveChain::ChainCompleted => f.write_str("ChainCompleted"), RemoveChain::EmptyPeerPool => f.write_str("EmptyPeerPool"), @@ -1047,3 +1064,12 @@ impl std::fmt::Debug for RemoveChain { } } } + +impl RemoveChain { + pub fn is_critical(&self) -> bool { + matches!( + self, + RemoveChain::WrongBatchState(..) | RemoveChain::WrongChainState(..) + ) + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index c458cc847..22d5b23ee 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -11,7 +11,7 @@ use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; use fnv::FnvHashMap; -use slog::{debug, error}; +use slog::{crit, debug, error}; use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -300,7 +300,7 @@ impl ChainCollection { match old_id { Some(Some(old_id)) => debug!(self.log, "Switching finalized chains"; "old_id" => old_id, &chain), - None => debug!(self.log, "Syncing new chain"; &chain), + None => debug!(self.log, "Syncing new finalized chain"; &chain), Some(None) => { // this is the same chain. We try to advance it. } @@ -311,8 +311,12 @@ impl ChainCollection { if let Err(remove_reason) = chain.start_syncing(network, local_epoch, local_head_epoch) { - // this happens only if sending a batch over the `network` fails a lot - error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); + if remove_reason.is_critical() { + crit!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); + } else { + // this happens only if sending a batch over the `network` fails a lot + error!(self.log, "Chain removed while switching chains"; "chain" => new_id, "reason" => ?remove_reason); + } self.finalized_chains.remove(&new_id); self.on_chain_removed(&new_id, true); } @@ -330,6 +334,7 @@ impl ChainCollection { ) { // Include the awaiting head peers for (peer_id, peer_sync_info) in awaiting_head_peers.drain() { + debug!(self.log, "including head peer"); self.add_peer_or_create_chain( local_epoch, peer_sync_info.head_root, @@ -368,7 +373,11 @@ impl ChainCollection { chain.start_syncing(network, local_epoch, local_head_epoch) { self.head_chains.remove(&id); - error!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason); + if remove_reason.is_critical() { + crit!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason); + } else { + error!(self.log, "Chain removed while switching head chains"; "chain" => id, "reason" => ?remove_reason); + } } else { syncing_chains.push(id); } @@ -482,7 +491,11 @@ impl ChainCollection { debug_assert_eq!(chain.target_head_root, target_head_root); debug_assert_eq!(chain.target_head_slot, target_head_slot); if let Err(remove_reason) = chain.add_peer(network, peer) { - debug!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); + if remove_reason.is_critical() { + error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); + } else { + error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason); + } let chain = entry.remove(); self.on_chain_removed(&id, chain.is_syncing()); } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index d816cfa84..32228e250 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -19,7 +19,7 @@ //! need to be downloaded. //! //! A few interesting notes about finalized chain syncing: -//! - Only one finalized chain can sync at a time. +//! - Only one finalized chain can sync at a time //! - The finalized chain with the largest peer pool takes priority. //! - As one finalized chain completes, others are checked to see if we they can be continued, //! otherwise they are removed. @@ -39,7 +39,7 @@ //! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially //! and further batches are requested as current blocks are being processed. -use super::chain::ChainId; +use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; @@ -49,7 +49,7 @@ use crate::sync::PeerSyncInfo; use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; -use slog::{debug, error, trace}; +use slog::{crit, debug, error, trace}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc; @@ -121,7 +121,8 @@ impl RangeSync { .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); - // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. + // NOTE: A peer that has been re-status'd may now exist in multiple finalized chains. This + // is OK since we since only one finalized chain at a time. // determine which kind of sync to perform and set up the chains match RangeSyncType::new(&self.beacon_chain, &local_info, &remote_info) { @@ -208,22 +209,22 @@ impl RangeSync { chain.on_block_response(network, batch_id, &peer_id, request_id, beacon_block) }) { Ok((removed_chain, sync_type)) => { - if let Some(_removed_chain) = removed_chain { - debug!(self.log, "Chain removed after block response"; "sync_type" => ?sync_type, "chain_id" => chain_id); - // update the state of the collection - self.chains.update( + if let Some((removed_chain, remove_reason)) = removed_chain { + self.on_chain_removed( + removed_chain, + sync_type, + remove_reason, network, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, + "block response", ); } } Err(_) => { - debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) } } } else { - debug!(self.log, "Response/Error for non registered request"; "request_id" => request_id) + trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } @@ -241,17 +242,18 @@ impl RangeSync { Ok((None, _sync_type)) => { // Chain was found and not removed } - Ok((Some(_removed_chain), sync_type)) => { - debug!(self.log, "Chain removed after processing result"; "chain" => chain_id, "sync_type" => ?sync_type); - self.chains.update( + Ok((Some((removed_chain, remove_reason)), sync_type)) => { + self.on_chain_removed( + removed_chain, + sync_type, + remove_reason, network, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, + "batch processing result", ); } Err(_) => { - debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) } } } @@ -279,14 +281,16 @@ impl RangeSync { .chains .call_all(|chain| chain.remove_peer(peer_id, network)) { - debug!(self.log, "Chain removed after removing peer"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id(), "reason" => ?remove_reason); + self.on_chain_removed( + removed_chain, + sync_type, + remove_reason, + network, + "peer removed", + ); + // update the state of the collection } - self.chains.update( - network, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, - ); } /// An RPC error has occurred. @@ -307,21 +311,45 @@ impl RangeSync { }) { Ok((removed_chain, sync_type)) => { if let Some((removed_chain, remove_reason)) = removed_chain { - debug!(self.log, "Chain removed on rpc error"; "sync_type" => ?sync_type, "chain" => removed_chain.get_id(), "reason" => ?remove_reason); - // update the state of the collection - self.chains.update( + self.on_chain_removed( + removed_chain, + sync_type, + remove_reason, network, - &mut self.awaiting_head_peers, - &self.beacon_processor_send, + "RPC error", ); } } Err(_) => { - debug!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) + trace!(self.log, "BlocksByRange response for removed chain"; "chain" => chain_id) } } } else { - debug!(self.log, "Response/Error for non registered request"; "request_id" => request_id) + trace!(self.log, "Response/Error for non registered request"; "request_id" => request_id) } } + + fn on_chain_removed( + &mut self, + chain: SyncingChain, + sync_type: RangeSyncType, + remove_reason: RemoveChain, + network: &mut SyncNetworkContext, + op: &'static str, + ) { + if remove_reason.is_critical() { + crit!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op); + } else { + debug!(self.log, "Chain removed"; "sync_type" => ?sync_type, &chain, "reason" => ?remove_reason, "op" => op); + } + + network.status_peers(self.beacon_chain.clone(), chain.peers()); + + // update the state of the collection + self.chains.update( + network, + &mut self.awaiting_head_peers, + &self.beacon_processor_send, + ); + } }