Purge out-dated head chains on chain completion (#1538)

## Description

There can be many head chains queued up to complete. Currently we try and process all of these to completion before we consider the node synced. 

In a chaotic network, there can be many of these and processing them to completion can be very expensive and slow. This PR removes any non-syncing head chains from the queue, and re-status's the peers. If, after we have synced to head on one chain, there is still a valid head chain to download, it will be re-established once the status has been returned. 

This should assist with getting nodes to sync on medalla faster.
This commit is contained in:
Age Manning 2020-08-18 05:22:34 +00:00
parent 3bb30754d9
commit 8311074d68
3 changed files with 24 additions and 4 deletions

View File

@ -40,19 +40,19 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
0 0
}; };
debug!(log, "Processing batch"; "batch_epoch" => epoch, "blocks" => downloaded_blocks.len(), "start_slot" => start_slot, "end_slot" => end_slot); debug!(log, "Processing batch"; "batch_epoch" => epoch, "blocks" => downloaded_blocks.len(), "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service" => "sync");
let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { let result = match process_blocks(chain, downloaded_blocks.iter(), &log) {
(_, Ok(_)) => { (_, Ok(_)) => {
debug!(log, "Batch processed"; "batch_epoch" => epoch , "start_slot" => start_slot, "end_slot" => end_slot); debug!(log, "Batch processed"; "batch_epoch" => epoch , "first_block_slot" => start_slot, "last_block_slot" => end_slot, "service"=> "sync");
BatchProcessResult::Success BatchProcessResult::Success
} }
(imported_blocks, Err(e)) if imported_blocks > 0 => { (imported_blocks, Err(e)) if imported_blocks > 0 => {
debug!(log, "Batch processing failed but imported some blocks"; debug!(log, "Batch processing failed but imported some blocks";
"batch_epoch" => epoch, "error" => e, "imported_blocks"=> imported_blocks); "batch_epoch" => epoch, "error" => e, "imported_blocks"=> imported_blocks, "service" => "sync");
BatchProcessResult::Partial BatchProcessResult::Partial
} }
(_, Err(e)) => { (_, Err(e)) => {
debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "error" => e); debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "error" => e, "service" => "sync");
BatchProcessResult::Failed BatchProcessResult::Failed
} }
}; };

View File

@ -359,6 +359,22 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
self.state = head_state; self.state = head_state;
} }
/// This is called once a head chain has completed syncing. It removes all non-syncing head
/// chains and re-status their peers.
pub fn clear_head_chains(&mut self, network: &mut SyncNetworkContext<T::EthSpec>) {
let log_ref = &self.log;
self.head_chains.retain(|chain| {
if !chain.is_syncing()
{
debug!(log_ref, "Removing old head chain"; "start_epoch" => chain.start_epoch, "end_slot" => chain.target_head_slot);
chain.status_peers(network);
false
} else {
true
}
});
}
/// Add a new finalized chain to the collection. /// Add a new finalized chain to the collection.
pub fn new_finalized_chain( pub fn new_finalized_chain(
&mut self, &mut self,

View File

@ -322,6 +322,10 @@ impl<T: BeaconChainTypes> RangeSync<T> {
// the chain is complete, re-status it's peers and remove it // the chain is complete, re-status it's peers and remove it
chain.status_peers(network); chain.status_peers(network);
// Remove non-syncing head chains and re-status the peers
// This removes a build-up of potentially duplicate head chains. Any
// legitimate head chains will be re-established
self.chains.clear_head_chains(network);
// update the state of the collection // update the state of the collection
self.chains.update(network); self.chains.update(network);
// update the global state and log any change // update the global state and log any change