diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a01e8c3c5..d79dff469 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -686,6 +686,14 @@ impl SyncingChain { } } + /// Returns true if this chain is currently syncing. + pub fn is_syncing(&self) -> bool { + match self.state { + ChainSyncingState::Syncing => true, + ChainSyncingState::Stopped => false, + } + } + /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. fn request_batches(&mut self, network: &mut SyncNetworkContext) { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index ff194d27b..1543710cc 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -15,6 +15,9 @@ use tokio::sync::mpsc; use types::EthSpec; use types::{Epoch, Hash256, Slot}; +/// The number of head syncing chains to sync at a time. +const PARALLEL_HEAD_CHAINS: usize = 2; + /// The state of the long range/batch sync. #[derive(Clone)] pub enum RangeSyncState { @@ -205,8 +208,9 @@ impl ChainCollection { /// Updates the state of the chain collection. /// /// This removes any out-dated chains, swaps to any higher priority finalized chains and - /// updates the state of the collection. - pub fn update_finalized(&mut self, network: &mut SyncNetworkContext) { + /// updates the state of the collection. This starts head chains syncing if any are required to + /// do so. + pub fn update(&mut self, network: &mut SyncNetworkContext) { let local_epoch = { let local = match PeerSyncInfo::from_chain(&self.beacon_chain) { Some(local) => local, @@ -222,9 +226,25 @@ impl ChainCollection { local.finalized_epoch }; - // Remove any outdated finalized chains + // Remove any outdated finalized/head chains self.purge_outdated_chains(network); + // Choose the best finalized chain if one needs to be selected. + self.update_finalized_chains(network, local_epoch); + + if self.finalized_syncing_index().is_none() { + // Handle head syncing chains if there are no finalized chains left. + self.update_head_chains(network, local_epoch); + } + } + + /// This looks at all current finalized chains and decides if a new chain should be prioritised + /// or not. + fn update_finalized_chains( + &mut self, + network: &mut SyncNetworkContext, + local_epoch: Epoch, + ) { // Check if any chains become the new syncing chain if let Some(index) = self.finalized_syncing_index() { // There is a current finalized chain syncing @@ -269,32 +289,76 @@ impl ChainCollection { head_root: chain.target_head_root, }; self.state = state; - } else { - // There are no finalized chains, update the state. - if self.head_chains.is_empty() { - self.state = RangeSyncState::Idle; - } else { - // for the syncing API, we find the minimal start_slot and the maximum - // target_slot of all head chains to report back. - - let (min_epoch, max_slot) = self.head_chains.iter().fold( - (Epoch::from(0u64), Slot::from(0u64)), - |(min, max), chain| { - ( - std::cmp::min(min, chain.start_epoch), - std::cmp::max(max, chain.target_head_slot), - ) - }, - ); - let head_state = RangeSyncState::Head { - start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()), - head_slot: max_slot, - }; - self.state = head_state; - } } } + /// Start syncing any head chains if required. + fn update_head_chains( + &mut self, + network: &mut SyncNetworkContext, + local_epoch: Epoch, + ) { + // There are no finalized chains, update the state. + if self.head_chains.is_empty() { + self.state = RangeSyncState::Idle; + return; + } + + let mut currently_syncing = self + .head_chains + .iter() + .filter(|chain| chain.is_syncing()) + .count(); + let mut not_syncing = self.head_chains.len() - currently_syncing; + + // Find all head chains that are not currently syncing ordered by peer count. + while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 { + // Find the chain with the most peers and start syncing + if let Some((_index, chain)) = self + .head_chains + .iter_mut() + .filter(|chain| !chain.is_syncing()) + .enumerate() + .max_by_key(|(_index, chain)| chain.peer_pool.len()) + { + // start syncing this chain + debug!(self.log, "New head chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch); + chain.start_syncing(network, local_epoch); + } + + // update variables + currently_syncing = self + .head_chains + .iter() + .filter(|chain| chain.is_syncing()) + .count(); + not_syncing = self.head_chains.len() - currently_syncing; + } + + // Start + // for the syncing API, we find the minimal start_slot and the maximum + // target_slot of all head chains to report back. + + let (min_epoch, max_slot) = self + .head_chains + .iter() + .filter(|chain| chain.is_syncing()) + .fold( + (Epoch::from(0u64), Slot::from(0u64)), + |(min, max), chain| { + ( + std::cmp::min(min, chain.start_epoch), + std::cmp::max(max, chain.target_head_slot), + ) + }, + ); + let head_state = RangeSyncState::Head { + start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()), + head_slot: max_slot, + }; + self.state = head_state; + } + /// Add a new finalized chain to the collection. pub fn new_finalized_chain( &mut self, @@ -321,7 +385,6 @@ impl ChainCollection { #[allow(clippy::too_many_arguments)] pub fn new_head_chain( &mut self, - network: &mut SyncNetworkContext, remote_finalized_epoch: Epoch, target_head: Hash256, target_slot: Slot, @@ -336,7 +399,7 @@ impl ChainCollection { self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); let chain_id = rand::random(); - let mut new_head_chain = SyncingChain::new( + let new_head_chain = SyncingChain::new( chain_id, remote_finalized_epoch, target_slot, @@ -346,8 +409,6 @@ impl ChainCollection { self.beacon_chain.clone(), self.log.clone(), ); - // All head chains can sync simultaneously - new_head_chain.start_syncing(network, remote_finalized_epoch); self.head_chains.push(new_head_chain); } @@ -511,7 +572,7 @@ impl ChainCollection { debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // update the state - self.update_finalized(network); + self.update(network); } /// Returns the index of finalized chain that is currently syncing. Returns `None` if no diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 4d768b6fc..f0ee38e10 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -166,7 +166,7 @@ impl RangeSync { chain.add_peer(network, peer_id); // check if the new peer's addition will favour a new syncing chain. - self.chains.update_finalized(network); + self.chains.update(network); // update the global sync state if necessary self.chains.update_sync_state(); } else { @@ -181,7 +181,7 @@ impl RangeSync { peer_id, self.beacon_processor_send.clone(), ); - self.chains.update_finalized(network); + self.chains.update(network); // update the global sync state self.chains.update_sync_state(); } @@ -221,7 +221,6 @@ impl RangeSync { debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id)); self.chains.new_head_chain( - network, start_epoch, remote_info.head_root, remote_info.head_slot, @@ -229,7 +228,7 @@ impl RangeSync { self.beacon_processor_send.clone(), ); } - self.chains.update_finalized(network); + self.chains.update(network); self.chains.update_sync_state(); } } @@ -284,7 +283,7 @@ impl RangeSync { let chain = self.chains.remove_finalized_chain(index); debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot); // update the state of the collection - self.chains.update_finalized(network); + self.chains.update(network); // the chain is complete, re-status it's peers chain.status_peers(network); @@ -324,7 +323,7 @@ impl RangeSync { chain.status_peers(network); // update the state of the collection - self.chains.update_finalized(network); + self.chains.update(network); // update the global state and log any change self.chains.update_sync_state(); } @@ -353,7 +352,7 @@ impl RangeSync { self.remove_peer(network, peer_id); // update the state of the collection - self.chains.update_finalized(network); + self.chains.update(network); // update the global state and inform the user self.chains.update_sync_state(); }