diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 47c19f3a2..bfd87b9dd 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -66,7 +66,7 @@ pub struct RPC { /// Queue of events to processed. events: Vec>, /// Pins the generic substream. - marker: PhantomData<(TSubstream)>, + marker: PhantomData, /// Slog logger for RPC behaviour. log: slog::Logger, } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 71c707250..f39d91da5 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -16,36 +16,11 @@ //! further peers connect, this process is run in parallel with those peers, until our head is //! within `SLOT_IMPORT_TOLERANCE` of all connected peers. //! -//! Batch Syncing +//! ## Batch Syncing //! -//! This syncing process start by requesting `BLOCKS_PER_REQUEST` blocks from a peer with an -//! unknown chain (with a greater slot height) starting from our current head slot. If the earliest -//! block returned is known to us, then the group of blocks returned form part of a known chain, -//! and we process this batch of blocks, before requesting more batches forward and processing -//! those in turn until we reach the peer's chain's head. If the first batch doesn't contain a -//! block we know of, we must iteratively request blocks backwards (until our latest finalized head -//! slot) until we find a common ancestor before we can start processing the blocks. If no common -//! ancestor is found, the peer has a chain which is not part of our finalized head slot and we -//! drop the peer and the downloaded blocks. -//! Once we are fully synced with all known peers, the state of the manager becomes `Regular` which -//! then allows for parent lookups of propagated blocks. +//! See `RangeSync` for further details. //! -//! A schematic version of this logic with two chain variations looks like the following. -//! -//! |----------------------|---------------------------------| -//! ^finalized head ^current local head ^remotes head -//! -//! -//! An example of the remotes chain diverging before our current head. -//! |---------------------------| -//! ^---------------------------------------------| -//! ^chain diverges |initial batch| ^remotes head -//! -//! In this example, we cannot process the initial batch as it is not on a known chain. We must -//! then backwards sync until we reach a common chain to begin forwarding batch syncing. -//! -//! -//! Parent Lookup +//! ## Parent Lookup //! //! When a block with an unknown parent is received and we are in `Regular` sync mode, the block is //! queued for lookup. A round-robin approach is used to request the parent from the known list of diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 6bd725027..52b47f108 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -23,6 +23,7 @@ const BLOCKS_PER_BATCH: u64 = 50; /// The number of times to retry a batch before the chain is considered failed and removed. const MAX_BATCH_RETRIES: u8 = 5; +/// A collection of sequential blocks that are requested from peers in a single RPC request. #[derive(PartialEq)] pub struct Batch { /// The ID of the batch, batches are ID's sequentially. @@ -55,6 +56,9 @@ impl PartialOrd for Batch { } } +/// A return type for functions that act on a `Chain` which informs the caller whether the chain +/// has been completed and should be removed or to be kept if further processing is +/// required. pub enum ProcessingResult { KeepChain, RemoveChain, @@ -86,6 +90,9 @@ impl Batch { } } +/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head +/// root are grouped into the peer pool and queried for batches when downloading the +/// chain. pub struct SyncingChain { /// The original start slot when this chain was initialised. pub start_slot: Slot, @@ -127,7 +134,7 @@ pub enum ChainSyncingState { /// The chain is undergoing syncing. Syncing, /// The chain is temporarily paused whilst an error is rectified. - Paused, + _Paused, } impl SyncingChain { @@ -154,6 +161,12 @@ impl SyncingChain { } } + /// A batch of blocks has been received. This function gets run on all chains and should + /// return Some if the request id matches a pending request on this chain, or None if it does + /// not. + /// + /// If the request corresponds to a pending batch, this function processes the completed + /// batch. pub fn on_block_response( &mut self, chain: &Weak>, @@ -174,6 +187,9 @@ impl SyncingChain { } } + /// A completed batch has been received, process the batch. + /// This will return `ProcessingResult::KeepChain` if the chain has not completed or + /// failed indicating that further batches are required. fn process_completed_batch( &mut self, chain: Weak>, @@ -226,103 +242,109 @@ impl SyncingChain { //blocks here, manage the queue and process them in another thread as they become //available. - if self.state != ChainSyncingState::Paused { + if self.state == ChainSyncingState::Syncing { // pre-emptively request more blocks from peers whilst we process current blocks, - self.send_range_request(network, current_peer); + self.send_range_request(network, current_peer, log); + } - // Try and process batches sequentially in the ordered list. - let current_process_id = self.to_be_processed_id; - for batch in self - .completed_batches - .iter() - .filter(|batch| batch.id >= current_process_id) - { - if batch.id == self.to_be_processed_id { - if batch.downloaded_blocks.is_empty() { - // the batch was empty, progress to the next block - self.to_be_processed_id += 1; - continue; - } else { - let mut successes = 0; - debug!(log, "Processing batch"; "batch_id" => batch.id); - match process_batch(chain.clone(), batch, &mut successes, log) { - Ok(_) => { - // batch was successfully processed - self.last_processed_id = self.to_be_processed_id; - self.to_be_processed_id += 1; + // Try and process batches sequentially in the ordered list. + let current_process_id = self.to_be_processed_id; + // keep track of the number of successful batches to decide whether to run fork choice + let mut successful_block_process = false; - if let Some(chain) = chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "location" => "batch import success" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "batch import success" - ), - } - } - } - Err(e) => { - warn!(log, "Block processing error"; "error"=> format!("{:?}", e)); + for batch in self + .completed_batches + .iter() + .filter(|batch| batch.id >= current_process_id) + { + if batch.id != self.to_be_processed_id { + // there are no batches to be processed at the moment + break; + } - if successes > 0 { - if let Some(chain) = chain.upgrade() { - match chain.fork_choice() { - Ok(()) => trace!( - log, - "Fork choice success"; - "block_imports" => successes, - "location" => "batch import error" - ), - Err(e) => error!( - log, - "Fork choice failed"; - "error" => format!("{:?}", e), - "location" => "batch import error" - ), - } - } - } + if batch.downloaded_blocks.is_empty() { + // the batch was empty, progress to the next block + self.to_be_processed_id += 1; + continue; + } - // batch processing failed - // this could be because this batch is invalid, or a previous invalidated batch - // is invalid. We need to find out which and downvote the peer that has sent us - // an invalid batch. + // process the batch + // Keep track of successful batches. Run fork choice after all waiting batches have + // been processed. + debug!(log, "Processing batch"; "batch_id" => batch.id); + match process_batch(chain.clone(), batch, log) { + Ok(_) => { + // batch was successfully processed + self.last_processed_id = self.to_be_processed_id; + self.to_be_processed_id += 1; + successful_block_process = true; + } + Err(e) => { + warn!(log, "Block processing error"; "error"=> format!("{:?}", e)); - // firstly remove any validated batches - return self.handle_invalid_batch(chain, network); + if successful_block_process { + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "batch import error" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import error" + ), } } } - } else { - // there are no more batches to be processed, end - break; + + // batch processing failed + // this could be because this batch is invalid, or a previous invalidated batch + // is invalid. We need to find out which and downvote the peer that has sent us + // an invalid batch. + + // firstly remove any validated batches + return self.handle_invalid_batch(chain, network); } } - // remove any validated batches - let last_processed_id = self.last_processed_id; - self.completed_batches - .retain(|batch| batch.id >= last_processed_id); - - // check if the chain has completed syncing, if not, request another batch from this peer - if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot - { - // chain is completed - ProcessingResult::RemoveChain - } else { - // chain is not completed - ProcessingResult::KeepChain + } + // If we have processed batches, run fork choice + if successful_block_process { + if let Some(chain) = chain.upgrade() { + match chain.fork_choice() { + Ok(()) => trace!( + log, + "Fork choice success"; + "location" => "batch import success" + ), + Err(e) => error!( + log, + "Fork choice failed"; + "error" => format!("{:?}", e), + "location" => "batch import success" + ), + } } + } + + // remove any validated batches + let last_processed_id = self.last_processed_id; + self.completed_batches + .retain(|batch| batch.id >= last_processed_id); + + // check if the chain has completed syncing + if self.start_slot + self.last_processed_id * BLOCKS_PER_BATCH >= self.target_head_slot { + // chain is completed + ProcessingResult::RemoveChain } else { + // chain is not completed ProcessingResult::KeepChain } } + /// An invalid batch has been received that could not be processed. fn handle_invalid_batch( &mut self, _chain: Weak>, @@ -340,9 +362,10 @@ impl SyncingChain { // //TODO: Implement this logic - // Currently just fail the chain, and drop all associated peers - for peer_id in self.peer_pool.iter() { - network.downvote_peer(peer_id.clone()); + // Currently just fail the chain, and drop all associated peers, removing them from the + // peer pool, to prevent re-status + for peer_id in self.peer_pool.drain() { + network.downvote_peer(peer_id); } ProcessingResult::RemoveChain } @@ -352,6 +375,9 @@ impl SyncingChain { } // Either a new chain, or an old one with a peer list + /// This chain has been requested to start syncing. + /// + /// This could be new chain, or an old chain that is being resumed. pub fn start_syncing( &mut self, network: &mut SyncNetworkContext, @@ -415,14 +441,15 @@ impl SyncingChain { for peer_id in peers { // send a blocks by range request to the peer - self.send_range_request(network, peer_id); + self.send_range_request(network, peer_id, log); } self.state = ChainSyncingState::Syncing; } - // A peer has been added, start batch requests for this peer - // this should only be called for a syncing chain + /// A peer has been added. + /// + /// If the chain is active, this starts requesting batches from this peer. pub fn peer_added( &mut self, network: &mut SyncNetworkContext, @@ -436,24 +463,32 @@ impl SyncingChain { } // find the next batch and request it from the peer - self.send_range_request(network, peer_id); + self.send_range_request(network, peer_id, log); } - // Re-STATUS all the peers in this chain + /// Sends a STATUS message to all peers in the peer pool. pub fn status_peers(&self, chain: Weak>, network: &mut SyncNetworkContext) { for peer_id in self.peer_pool.iter() { network.status_peer(chain.clone(), peer_id.clone()); } } - fn send_range_request(&mut self, network: &mut SyncNetworkContext, peer_id: PeerId) { + /// Requests the next required batch from the provided peer. + fn send_range_request( + &mut self, + network: &mut SyncNetworkContext, + peer_id: PeerId, + log: &slog::Logger, + ) { // find the next pending batch and request it from the peer if let Some(batch) = self.get_next_batch(peer_id) { + debug!(log, "Requesting batch"; "start_slot" => batch.start_slot, "end_slot" => batch.end_slot, "id" => batch.id, "peer" => format!("{:?}", batch.current_peer), "head_root"=> format!("{}", batch.head_root)); // send the batch self.send_batch(network, batch); } } + /// Requests the provided batch from the provided peer. fn send_batch(&mut self, network: &mut SyncNetworkContext, batch: Batch) { let request = batch.to_blocks_by_range_request(); if let Ok(request_id) = network.blocks_by_range_request(batch.current_peer.clone(), request) @@ -463,6 +498,8 @@ impl SyncingChain { } } + /// Returns the next required batch from the chain if it exists. If there are no more batches + /// required, `None` is returned. fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { let batch_start_slot = self.start_slot + self.to_be_downloaded_id.saturating_sub(1) * BLOCKS_PER_BATCH; @@ -493,9 +530,12 @@ impl SyncingChain { )) } - // Checks if the request_id is associated with this chain. If so, attempts to re-request the - // batch. If the batch has exceeded the number of retries, returns Some(true), indicating - // the chain should be dropped. + /// An RPC error has occurred. + /// + /// Checks if the request_id is associated with this chain. If so, attempts to re-request the + /// batch. If the batch has exceeded the number of retries, returns + /// Some(`ProcessingResult::RemoveChain)`. Returns `None` if the request isn't related to + /// this chain. pub fn inject_error( &mut self, network: &mut SyncNetworkContext, @@ -512,6 +552,11 @@ impl SyncingChain { } } + /// A batch has failed. + /// + /// Attempts to re-request from another peer in the peer pool (if possible) and returns + /// `ProcessingResult::RemoveChain` if the number of retries on the batch exceeds + /// `MAX_BATCH_RETRIES`. pub fn failed_batch( &mut self, network: &mut SyncNetworkContext, @@ -542,7 +587,6 @@ impl SyncingChain { fn process_batch( chain: Weak>, batch: &Batch, - successes: &mut usize, log: &Logger, ) -> Result<(), String> { for block in &batch.downloaded_blocks { @@ -558,8 +602,6 @@ fn process_batch( "slot" => block.slot, "block_root" => format!("{}", block_root), ); - - *successes += 1 } BlockProcessingOutcome::ParentUnknown { parent } => { // blocks should be sequential and all parents should exist diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index db5ff5811..c8704c773 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -1,3 +1,8 @@ +//! This provides the logic for the finalized and head chains. +//! +//! Each chain type is stored in it's own vector. A variety of helper functions are given along +//! with this struct to to simplify the logic of the other layers of sync. + use super::chain::{ChainSyncingState, ProcessingResult, SyncingChain}; use crate::message_processor::PeerSyncInfo; use crate::sync::network_context::SyncNetworkContext; @@ -8,32 +13,47 @@ use std::sync::Weak; use types::EthSpec; use types::{Hash256, Slot}; +/// The state of the long range/batch sync. pub enum SyncState { + /// A finalized chain is being synced. Finalized, + /// There are no finalized chains and we are syncing one more head chains. Head, + /// There are no head or finalized chains and no long range sync is in progress. Idle, } + +/// A collection of finalized and head chains currently being processed. pub struct ChainCollection { + /// The beacon chain for processing. + beacon_chain: Weak>, + /// The set of finalized chains being synced. finalized_chains: Vec>, + /// The set of head chains being synced. head_chains: Vec>, + /// The current sync state of the process. sync_state: SyncState, } impl ChainCollection { - pub fn new() -> Self { + pub fn new(beacon_chain: Weak>) -> Self { ChainCollection { sync_state: SyncState::Idle, finalized_chains: Vec::new(), head_chains: Vec::new(), + beacon_chain, } } + /// The current syncing state. pub fn sync_state(&self) -> &SyncState { &self.sync_state } - // if a finalized chain just completed, we assume we waiting for head syncing, unless a fully - // sync peer joins. + /// A fully synced peer has joined. + /// + /// We could be awaiting a head sync. If we are in the head syncing state, without any head + /// chains, then update the state to idle. pub fn fully_synced_peer_found(&mut self) { if let SyncState::Head = self.sync_state { if self.head_chains.is_empty() { @@ -42,47 +62,15 @@ impl ChainCollection { } } - // after a finalized chain completes, the state should be waiting for a head chain + /// After a finalized chain completes this function is called. It ensures the state is set to + /// `SyncState::Head` indicating we are awaiting new peers to connect before we can consider + /// the state as idle. pub fn set_head_sync(&mut self) { if let SyncState::Idle = self.sync_state { self.sync_state = SyncState::Head; } } - fn finalized_syncing_index(&self) -> Option { - self.finalized_chains - .iter() - .enumerate() - .find_map(|(index, chain)| { - if chain.state == ChainSyncingState::Syncing { - Some(index) - } else { - None - } - }) - } - - pub fn purge_finalized(&mut self, local_finalized_slot: Slot) { - self.finalized_chains - .retain(|chain| chain.target_head_slot > local_finalized_slot); - } - - pub fn purge_head(&mut self, head_slot: Slot) { - self.head_chains - .retain(|chain| chain.target_head_slot > head_slot); - } - - fn get_chain<'a>( - chain: &'a mut [SyncingChain], - target_head_root: Hash256, - target_head_slot: Slot, - ) -> Option<&'a mut SyncingChain> { - chain.iter_mut().find(|iter_chain| { - iter_chain.target_head_root == target_head_root - && iter_chain.target_head_slot == target_head_slot - }) - } - /// Finds any finalized chain if it exists. pub fn get_finalized_mut( &mut self, @@ -109,35 +97,23 @@ impl ChainCollection { ) } - /// Checks if a new finalized state should become the syncing chain. Updates the state of the - /// collection. - pub fn update_finalized( - &mut self, - beacon_chain: Weak>, - network: &mut SyncNetworkContext, - log: &slog::Logger, - ) { - let local_info = match beacon_chain.upgrade() { - Some(chain) => PeerSyncInfo::from(&chain), + /// Updates the state of the chain collection. + /// + /// This removes any out-dated chains, swaps to any higher priority finalized chains and + /// updates the state of the collection. + pub fn update_finalized(&mut self, network: &mut SyncNetworkContext, log: &slog::Logger) { + let local_slot = match self.beacon_chain.upgrade() { + Some(chain) => PeerSyncInfo::from(&chain) + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()), None => { warn!(log, "Beacon chain dropped. Chains not updated"); return; } }; - let local_slot = local_info - .finalized_epoch - .start_slot(T::EthSpec::slots_per_epoch()); - // Remove any outdated finalized chains - self.purge_finalized(local_slot); - self.finalized_chains - .retain(|chain| !chain.peer_pool.is_empty()); - - // Remove any outdated head chains - self.purge_head(local_info.head_slot); - self.finalized_chains - .retain(|chain| !chain.peer_pool.is_empty()); + self.purge_outdated_chains(network); // Check if any chains become the new syncing chain if let Some(index) = self.finalized_syncing_index() { @@ -172,7 +148,7 @@ impl ChainCollection { chain.start_syncing(network, local_slot, log); self.sync_state = SyncState::Finalized; } else { - // There are no finalized chains, update the state + // There are no finalized chains, update the state. if self.head_chains.is_empty() { self.sync_state = SyncState::Idle; } else { @@ -181,7 +157,7 @@ impl ChainCollection { } } - /// Add a new finalized chain to the collection + /// Add a new finalized chain to the collection. pub fn new_finalized_chain( &mut self, local_finalized_slot: Slot, @@ -197,7 +173,7 @@ impl ChainCollection { )); } - /// Add a new finalized chain to the collection + /// Add a new finalized chain to the collection and starts syncing it. pub fn new_head_chain( &mut self, network: &mut SyncNetworkContext, @@ -221,10 +197,14 @@ impl ChainCollection { self.head_chains.push(new_head_chain); } + /// Returns if `true` if any finalized chains exist, `false` otherwise. pub fn is_finalizing_sync(&self) -> bool { !self.finalized_chains.is_empty() } + /// Given a chain iterator, runs a given function on each chain until the function returns + /// `Some`. This allows the `RangeSync` struct to loop over chains and optionally remove the + /// chain from the collection if the function results in completing the chain. fn request_function<'a, F, I>(chain: I, mut func: F) -> Option<(usize, ProcessingResult)> where I: Iterator>, @@ -235,6 +215,7 @@ impl ChainCollection { .find_map(|(index, chain)| Some((index, func(chain)?))) } + /// Runs a function on all finalized chains. pub fn finalized_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> where F: FnMut(&mut SyncingChain) -> Option, @@ -242,6 +223,7 @@ impl ChainCollection { ChainCollection::request_function(self.finalized_chains.iter_mut(), func) } + /// Runs a function on all head chains. pub fn head_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> where F: FnMut(&mut SyncingChain) -> Option, @@ -249,7 +231,7 @@ impl ChainCollection { ChainCollection::request_function(self.head_chains.iter_mut(), func) } - #[allow(dead_code)] + /// Runs a function on all finalized and head chains. pub fn head_finalized_request(&mut self, func: F) -> Option<(usize, ProcessingResult)> where F: FnMut(&mut SyncingChain) -> Option, @@ -262,10 +244,53 @@ impl ChainCollection { ) } + /// Removes any outdated finalized or head chains. + /// + /// This removes chains with no peers, or chains whose start block slot is less than our current + /// finalized block slot. + pub fn purge_outdated_chains(&mut self, network: &mut SyncNetworkContext) { + // Remove any chains that have no peers + self.finalized_chains + .retain(|chain| !chain.peer_pool.is_empty()); + self.head_chains.retain(|chain| !chain.peer_pool.is_empty()); + + let local_info = match self.beacon_chain.upgrade() { + Some(chain) => PeerSyncInfo::from(&chain), + None => { + return; + } + }; + + let local_finalized_slot = local_info + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // Remove chains that are out-dated and re-status their peers + let beacon_chain_clone = self.beacon_chain.clone(); + self.finalized_chains.retain(|chain| { + if chain.target_head_slot <= local_finalized_slot { + chain.status_peers(beacon_chain_clone.clone(), network); + false + } else { + true + } + }); + self.head_chains.retain(|chain| { + if chain.target_head_slot <= local_finalized_slot { + chain.status_peers(beacon_chain_clone.clone(), network); + false + } else { + true + } + }); + } + + /// Removes and returns a finalized chain from the collection. pub fn remove_finalized_chain(&mut self, index: usize) -> SyncingChain { self.finalized_chains.swap_remove(index) } + /// Removes and returns a head chain from the collection. pub fn remove_head_chain(&mut self, index: usize) -> SyncingChain { self.head_chains.swap_remove(index) } @@ -273,12 +298,55 @@ impl ChainCollection { /// Removes a chain from either finalized or head chains based on the index. Using a request /// iterates of finalized chains before head chains. Thus an index that is greater than the /// finalized chain length, indicates a head chain. - pub fn remove_chain(&mut self, index: usize) -> SyncingChain { - if index >= self.finalized_chains.len() { + /// + /// This will re-status the chains peers on removal. The index must exist. + pub fn remove_chain( + &mut self, + network: &mut SyncNetworkContext, + index: usize, + log: &slog::Logger, + ) { + let chain = if index >= self.finalized_chains.len() { let index = index - self.finalized_chains.len(); - self.head_chains.swap_remove(index) + let chain = self.head_chains.swap_remove(index); + chain.status_peers(self.beacon_chain.clone(), network); + chain } else { - self.finalized_chains.swap_remove(index) - } + let chain = self.finalized_chains.swap_remove(index); + chain.status_peers(self.beacon_chain.clone(), network); + chain + }; + + debug!(log, "Chain was removed"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); + + // update the state + self.update_finalized(network, log); + } + + /// Returns the index of finalized chain that is currently syncing. Returns `None` if no + /// finalized chain is currently syncing. + fn finalized_syncing_index(&self) -> Option { + self.finalized_chains + .iter() + .enumerate() + .find_map(|(index, chain)| { + if chain.state == ChainSyncingState::Syncing { + Some(index) + } else { + None + } + }) + } + + /// Returns a chain given the target head root and slot. + fn get_chain<'a>( + chain: &'a mut [SyncingChain], + target_head_root: Hash256, + target_head_slot: Slot, + ) -> Option<&'a mut SyncingChain> { + chain.iter_mut().find(|iter_chain| { + iter_chain.target_head_root == target_head_root + && iter_chain.target_head_slot == target_head_slot + }) } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index f5ed03c79..750eb5ef9 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -1,3 +1,44 @@ +//! This contains the logic for the long range (batch) sync strategy. +//! +//! The general premise is to group peers by their self-proclaimed finalized blocks and head +//! blocks. Once grouped, the peers become sources to download a specific `Chain`. A `Chain` is a +//! collection of blocks that terminates at the specified target head. +//! +//! This sync strategy can be separated into two distinct forms: +//! - Finalized Chain Sync +//! - Head Chain Sync +//! +//! ## Finalized chain sync +//! +//! This occurs when a peer connects that claims to have a finalized head slot that is greater +//! than our own. In this case, we form a chain from our last finalized slot, to their claimed +//! finalized slot. Any peer that also claims to have this last finalized slot is added to a pool +//! of peers from which batches of blocks may be downloaded. Blocks are downloaded until +//! the finalized slot of the chain is reached. Once reached, all peers within the pool are sent a +//! STATUS message to potentially start a head chain sync, or check if further finalized chains +//! need to be downloaded. +//! +//! A few interesting notes about finalized chain syncing: +//! - Only one finalized chain can sync at a time. +//! - The finalized chain with the largest peer pool takes priority. +//! - As one finalized chain completes, others are checked to see if we they can be continued, +//! otherwise they are removed. +//! +//! ## Head Chain Sync +//! +//! If a peer joins and there is no active finalized chains being synced, and it's head is +//! beyond our `SLOT_IMPORT_TOLERANCE` a chain is formed starting from this peers finalized slot +//! (this has been necessarily downloaded by our node, otherwise we would start a finalized chain +//! sync) to this peers head slot. Any other peers that match this head slot and head root, are +//! added to this chain's peer pool, which will be downloaded in parallel. +//! +//! Unlike finalized chains, head chains can be synced in parallel. +//! +//! ## Batch Syncing +//! +//! Each chain is downloaded in batches of blocks. The batched blocks are processed sequentially +//! and further batches are requested as current blocks are being processed. + use super::chain::ProcessingResult; use super::chain_collection::{ChainCollection, SyncState}; use crate::message_processor::PeerSyncInfo; @@ -10,38 +51,50 @@ use std::collections::HashSet; use std::sync::Weak; use types::{BeaconBlock, EthSpec}; -//TODO: The code becomes cleaner if finalized_chains and head_chains were merged into a single -// object. This will prevent code duplication. Rather than keeping the current syncing -// finalized chain in index 0, it should be stored in this object under an option. Then lookups can -// occur over the single object containing both finalized and head chains, which would then -// behave similarly. - +/// The primary object dealing with long range/batch syncing. This contains all the active and +/// non-active chains that need to be processed before the syncing is considered complete. This +/// holds the current state of the long range sync. pub struct RangeSync { - /// The beacon chain for processing + /// The beacon chain for processing. beacon_chain: Weak>, + /// A collection of chains that need to be downloaded. This stores any head or finalized chains + /// that need to be downloaded. chains: ChainCollection, - /// Known peers to the RangeSync, that need to be re-status'd once finalized chains are - /// completed. + /// Peers that join whilst a finalized chain is being download, sit in this set. Once the + /// finalized chain(s) complete, these peer's get STATUS'ed to update their head slot before + /// the head chains are formed and downloaded. awaiting_head_peers: HashSet, + /// The syncing logger. log: slog::Logger, } impl RangeSync { pub fn new(beacon_chain: Weak>, log: slog::Logger) -> Self { RangeSync { - beacon_chain, - chains: ChainCollection::new(), + beacon_chain: beacon_chain.clone(), + chains: ChainCollection::new(beacon_chain), awaiting_head_peers: HashSet::new(), log, } } - // Notify the collection that a fully synced peer was found. This allows updating the state - // if we were awaiting a head state. + /// The `chains` collection stores the current state of syncing. Once a finalized chain + /// completes, it's state is pre-emptively set to `SyncState::Head`. This ensures that + /// during the transition period of finalized to head, the sync manager doesn't start + /// requesting blocks from gossipsub. + /// + /// On re-status, a peer that has no head to download indicates that this state can be set to + /// idle as there are in fact no head chains to download. This function notifies the chain + /// collection that the state can safely be set to idle. pub fn fully_synced_peer_found(&mut self) { self.chains.fully_synced_peer_found() } + /// A useful peer has been added. The SyncManager has identified this peer as needing either + /// a finalized or head chain sync. This processes the peer and starts/resumes any chain that + /// may need to be synced as a result. A new peer, may increase the peer pool of a finalized + /// chain, this may result in a different finalized chain from syncing as finalized chains are + /// prioritised by peer-pool size. pub fn add_peer( &mut self, network: &mut SyncNetworkContext, @@ -70,13 +123,12 @@ impl RangeSync { .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); - // firstly, remove any out-of-date chains - self.chains.purge_finalized(local_finalized_slot); - self.chains.purge_head(local_info.head_slot); - // remove peer from any chains self.remove_peer(network, &peer_id); + // remove any out-of-date chains + self.chains.purge_outdated_chains(network); + if remote_finalized_slot > local_info.head_slot { debug!(self.log, "Finalization sync peer joined"; "peer_id" => format!("{:?}", peer_id)); // Finalized chain search @@ -97,8 +149,7 @@ impl RangeSync { chain.peer_added(network, peer_id, &self.log); // check if the new peer's addition will favour a new syncing chain. - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + self.chains.update_finalized(network, &self.log); } else { // there is no finalized chain that matches this peer's last finalized target // create a new finalized chain @@ -110,8 +161,7 @@ impl RangeSync { remote_finalized_slot, peer_id, ); - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + self.chains.update_finalized(network, &self.log); } } else { if self.chains.is_finalizing_sync() { @@ -145,11 +195,14 @@ impl RangeSync { &self.log, ); } - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + self.chains.update_finalized(network, &self.log); } } + /// A `BlocksByRange` response has been received from the network. + /// + /// This function finds the chain that made this request. Once found, processes the result. + /// This request could complete a chain or simply add to its progress. pub fn blocks_by_range_response( &mut self, network: &mut SyncNetworkContext, @@ -162,7 +215,6 @@ impl RangeSync { // `connected_peers` number of head chains, which should be relatively small and this // lookup should not be very expensive. However, we could add an extra index that maps the // request id to index of the vector to avoid O(N) searches and O(N) hash lookups. - // Note to future sync-rewriter/profiler: Michael approves of these O(N) searches. let chain_ref = &self.beacon_chain; let log_ref = &self.log; @@ -177,8 +229,7 @@ impl RangeSync { chain.status_peers(self.beacon_chain.clone(), network); // update the state of the collection - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + self.chains.update_finalized(network, &self.log); // set the state to a head sync, to inform the manager that we are awaiting a // head chain. @@ -207,8 +258,7 @@ impl RangeSync { chain.status_peers(self.beacon_chain.clone(), network); // update the state of the collection - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + self.chains.update_finalized(network, &self.log); } Some(_) => {} None => { @@ -221,6 +271,7 @@ impl RangeSync { } } + /// Public method to indicate the current state of the long range sync. pub fn is_syncing(&self) -> bool { match self.chains.sync_state() { SyncState::Finalized => true, @@ -229,6 +280,8 @@ impl RangeSync { } } + /// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A + /// disconnected peer could remove a chain pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { // if the peer is in the awaiting head mapping, remove it self.awaiting_head_peers.remove(&peer_id); @@ -237,8 +290,7 @@ impl RangeSync { self.remove_peer(network, peer_id); // update the state of the collection - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + self.chains.update_finalized(network, &self.log); } /// When a peer gets removed, both the head and finalized chains need to be searched to check which pool the peer is in. The chain may also have a batch or batches awaiting @@ -271,22 +323,17 @@ impl RangeSync { }) { Some((index, ProcessingResult::RemoveChain)) => { // the chain needed to be removed - let chain = self.chains.remove_chain(index); - debug!(self.log, "Chain was removed due batch failing"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); - // the chain has been removed, re-status it's peers - chain.status_peers(self.beacon_chain.clone(), network); - // update the state of the collection - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + debug!(self.log, "Chain being removed due to failed batch"); + self.chains.remove_chain(network, index, &self.log); } _ => {} // chain didn't need to be removed, ignore } - - // remove any chains that no longer have any peers } - // An RPC Error occurred, if it's a pending batch, re-request it if possible, if there have - // been too many attempts, remove the chain + /// An RPC error has occurred. + /// + /// Check to see if the request corresponds to a pending batch. If so, re-request it if possible, if there have + /// been too many failed attempts for the batch, remove the chain. pub fn inject_error( &mut self, network: &mut SyncNetworkContext, @@ -300,13 +347,8 @@ impl RangeSync { }) { Some((_, ProcessingResult::KeepChain)) => {} // error handled chain persists Some((index, ProcessingResult::RemoveChain)) => { - let chain = self.chains.remove_chain(index); - debug!(self.log, "Chain was removed due to error"; "start_slot" => chain.start_slot.as_u64(), "end_slot" => chain.target_head_slot.as_u64()); - // the chain has failed, re-status it's peers - chain.status_peers(self.beacon_chain.clone(), network); - // update the state of the collection - self.chains - .update_finalized(self.beacon_chain.clone(), network, &self.log); + debug!(self.log, "Chain being removed due to RPC error"); + self.chains.remove_chain(network, index, &self.log) } None => {} // request wasn't in the finalized chains, check the head chains }