Limit parallelism of head chain sync (#1527)
## Description Currently lighthouse load-balances across peers a single finalized chain. The chain is selected via the most peers. Once synced to the latest finalized epoch Lighthouse creates chains amongst its peers and syncs them all in parallel amongst each peer (grouped by their current head block). This is typically fast and relatively efficient under normal operations. However if the chain has not finalized in a long time, the head chains can grow quite long. Peer's head chains will update every slot as new blocks are added to the head. Syncing all head chains in parallel is a bottleneck and highly inefficient in block duplication leads to RPC timeouts when attempting to handle all new heads chains at once. This PR limits the parallelism of head syncing chains to 2. We now sync at most two head chains at a time. This allows for the possiblity of sync progressing alongside a peer being slow and holding up one chain via RPC timeouts.
This commit is contained in:
parent
46dbf027af
commit
cc44a64d15
@ -686,6 +686,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this chain is currently syncing.
|
||||
pub fn is_syncing(&self) -> bool {
|
||||
match self.state {
|
||||
ChainSyncingState::Syncing => true,
|
||||
ChainSyncingState::Stopped => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
|
||||
/// pool and left over batches until the batch buffer is reached or all peers are exhausted.
|
||||
fn request_batches(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
|
||||
|
@ -15,6 +15,9 @@ use tokio::sync::mpsc;
|
||||
use types::EthSpec;
|
||||
use types::{Epoch, Hash256, Slot};
|
||||
|
||||
/// The number of head syncing chains to sync at a time.
|
||||
const PARALLEL_HEAD_CHAINS: usize = 2;
|
||||
|
||||
/// The state of the long range/batch sync.
|
||||
#[derive(Clone)]
|
||||
pub enum RangeSyncState {
|
||||
@ -205,8 +208,9 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
/// Updates the state of the chain collection.
|
||||
///
|
||||
/// This removes any out-dated chains, swaps to any higher priority finalized chains and
|
||||
/// updates the state of the collection.
|
||||
pub fn update_finalized(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
|
||||
/// updates the state of the collection. This starts head chains syncing if any are required to
|
||||
/// do so.
|
||||
pub fn update(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
|
||||
let local_epoch = {
|
||||
let local = match PeerSyncInfo::from_chain(&self.beacon_chain) {
|
||||
Some(local) => local,
|
||||
@ -222,9 +226,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
local.finalized_epoch
|
||||
};
|
||||
|
||||
// Remove any outdated finalized chains
|
||||
// Remove any outdated finalized/head chains
|
||||
self.purge_outdated_chains(network);
|
||||
|
||||
// Choose the best finalized chain if one needs to be selected.
|
||||
self.update_finalized_chains(network, local_epoch);
|
||||
|
||||
if self.finalized_syncing_index().is_none() {
|
||||
// Handle head syncing chains if there are no finalized chains left.
|
||||
self.update_head_chains(network, local_epoch);
|
||||
}
|
||||
}
|
||||
|
||||
/// This looks at all current finalized chains and decides if a new chain should be prioritised
|
||||
/// or not.
|
||||
fn update_finalized_chains(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
local_epoch: Epoch,
|
||||
) {
|
||||
// Check if any chains become the new syncing chain
|
||||
if let Some(index) = self.finalized_syncing_index() {
|
||||
// There is a current finalized chain syncing
|
||||
@ -269,32 +289,76 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
head_root: chain.target_head_root,
|
||||
};
|
||||
self.state = state;
|
||||
} else {
|
||||
// There are no finalized chains, update the state.
|
||||
if self.head_chains.is_empty() {
|
||||
self.state = RangeSyncState::Idle;
|
||||
} else {
|
||||
// for the syncing API, we find the minimal start_slot and the maximum
|
||||
// target_slot of all head chains to report back.
|
||||
|
||||
let (min_epoch, max_slot) = self.head_chains.iter().fold(
|
||||
(Epoch::from(0u64), Slot::from(0u64)),
|
||||
|(min, max), chain| {
|
||||
(
|
||||
std::cmp::min(min, chain.start_epoch),
|
||||
std::cmp::max(max, chain.target_head_slot),
|
||||
)
|
||||
},
|
||||
);
|
||||
let head_state = RangeSyncState::Head {
|
||||
start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
head_slot: max_slot,
|
||||
};
|
||||
self.state = head_state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start syncing any head chains if required.
|
||||
fn update_head_chains(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
local_epoch: Epoch,
|
||||
) {
|
||||
// There are no finalized chains, update the state.
|
||||
if self.head_chains.is_empty() {
|
||||
self.state = RangeSyncState::Idle;
|
||||
return;
|
||||
}
|
||||
|
||||
let mut currently_syncing = self
|
||||
.head_chains
|
||||
.iter()
|
||||
.filter(|chain| chain.is_syncing())
|
||||
.count();
|
||||
let mut not_syncing = self.head_chains.len() - currently_syncing;
|
||||
|
||||
// Find all head chains that are not currently syncing ordered by peer count.
|
||||
while currently_syncing <= PARALLEL_HEAD_CHAINS && not_syncing > 0 {
|
||||
// Find the chain with the most peers and start syncing
|
||||
if let Some((_index, chain)) = self
|
||||
.head_chains
|
||||
.iter_mut()
|
||||
.filter(|chain| !chain.is_syncing())
|
||||
.enumerate()
|
||||
.max_by_key(|(_index, chain)| chain.peer_pool.len())
|
||||
{
|
||||
// start syncing this chain
|
||||
debug!(self.log, "New head chain started syncing"; "new_target_root" => format!("{}", chain.target_head_root), "new_end_slot" => chain.target_head_slot, "new_start_epoch"=> chain.start_epoch);
|
||||
chain.start_syncing(network, local_epoch);
|
||||
}
|
||||
|
||||
// update variables
|
||||
currently_syncing = self
|
||||
.head_chains
|
||||
.iter()
|
||||
.filter(|chain| chain.is_syncing())
|
||||
.count();
|
||||
not_syncing = self.head_chains.len() - currently_syncing;
|
||||
}
|
||||
|
||||
// Start
|
||||
// for the syncing API, we find the minimal start_slot and the maximum
|
||||
// target_slot of all head chains to report back.
|
||||
|
||||
let (min_epoch, max_slot) = self
|
||||
.head_chains
|
||||
.iter()
|
||||
.filter(|chain| chain.is_syncing())
|
||||
.fold(
|
||||
(Epoch::from(0u64), Slot::from(0u64)),
|
||||
|(min, max), chain| {
|
||||
(
|
||||
std::cmp::min(min, chain.start_epoch),
|
||||
std::cmp::max(max, chain.target_head_slot),
|
||||
)
|
||||
},
|
||||
);
|
||||
let head_state = RangeSyncState::Head {
|
||||
start_slot: min_epoch.start_slot(T::EthSpec::slots_per_epoch()),
|
||||
head_slot: max_slot,
|
||||
};
|
||||
self.state = head_state;
|
||||
}
|
||||
|
||||
/// Add a new finalized chain to the collection.
|
||||
pub fn new_finalized_chain(
|
||||
&mut self,
|
||||
@ -321,7 +385,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_head_chain(
|
||||
&mut self,
|
||||
network: &mut SyncNetworkContext<T::EthSpec>,
|
||||
remote_finalized_epoch: Epoch,
|
||||
target_head: Hash256,
|
||||
target_slot: Slot,
|
||||
@ -336,7 +399,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
self.head_chains.retain(|chain| !chain.peer_pool.is_empty());
|
||||
|
||||
let chain_id = rand::random();
|
||||
let mut new_head_chain = SyncingChain::new(
|
||||
let new_head_chain = SyncingChain::new(
|
||||
chain_id,
|
||||
remote_finalized_epoch,
|
||||
target_slot,
|
||||
@ -346,8 +409,6 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
self.beacon_chain.clone(),
|
||||
self.log.clone(),
|
||||
);
|
||||
// All head chains can sync simultaneously
|
||||
new_head_chain.start_syncing(network, remote_finalized_epoch);
|
||||
self.head_chains.push(new_head_chain);
|
||||
}
|
||||
|
||||
@ -511,7 +572,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
|
||||
debug!(self.log, "Chain was removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot);
|
||||
|
||||
// update the state
|
||||
self.update_finalized(network);
|
||||
self.update(network);
|
||||
}
|
||||
|
||||
/// Returns the index of finalized chain that is currently syncing. Returns `None` if no
|
||||
|
@ -166,7 +166,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
chain.add_peer(network, peer_id);
|
||||
|
||||
// check if the new peer's addition will favour a new syncing chain.
|
||||
self.chains.update_finalized(network);
|
||||
self.chains.update(network);
|
||||
// update the global sync state if necessary
|
||||
self.chains.update_sync_state();
|
||||
} else {
|
||||
@ -181,7 +181,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
peer_id,
|
||||
self.beacon_processor_send.clone(),
|
||||
);
|
||||
self.chains.update_finalized(network);
|
||||
self.chains.update(network);
|
||||
// update the global sync state
|
||||
self.chains.update_sync_state();
|
||||
}
|
||||
@ -221,7 +221,6 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
debug!(self.log, "Creating a new syncing head chain"; "head_root" => format!("{}",remote_info.head_root), "start_epoch" => start_epoch, "head_slot" => remote_info.head_slot, "peer_id" => format!("{:?}", peer_id));
|
||||
|
||||
self.chains.new_head_chain(
|
||||
network,
|
||||
start_epoch,
|
||||
remote_info.head_root,
|
||||
remote_info.head_slot,
|
||||
@ -229,7 +228,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
self.beacon_processor_send.clone(),
|
||||
);
|
||||
}
|
||||
self.chains.update_finalized(network);
|
||||
self.chains.update(network);
|
||||
self.chains.update_sync_state();
|
||||
}
|
||||
}
|
||||
@ -284,7 +283,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
let chain = self.chains.remove_finalized_chain(index);
|
||||
debug!(self.log, "Finalized chain removed"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot);
|
||||
// update the state of the collection
|
||||
self.chains.update_finalized(network);
|
||||
self.chains.update(network);
|
||||
|
||||
// the chain is complete, re-status it's peers
|
||||
chain.status_peers(network);
|
||||
@ -324,7 +323,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
chain.status_peers(network);
|
||||
|
||||
// update the state of the collection
|
||||
self.chains.update_finalized(network);
|
||||
self.chains.update(network);
|
||||
// update the global state and log any change
|
||||
self.chains.update_sync_state();
|
||||
}
|
||||
@ -353,7 +352,7 @@ impl<T: BeaconChainTypes> RangeSync<T> {
|
||||
self.remove_peer(network, peer_id);
|
||||
|
||||
// update the state of the collection
|
||||
self.chains.update_finalized(network);
|
||||
self.chains.update(network);
|
||||
// update the global state and inform the user
|
||||
self.chains.update_sync_state();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user