From 395d99ce03094006f8a79a0cc73a43dd93f1886c Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 29 Jul 2020 05:25:10 +0000 Subject: [PATCH] Sync update (#1412) ## Issue Addressed Recurring sync loop and invalid batch downloading ## Proposed Changes Shifts the batches to include the first slot of each epoch. This ensures the finalized is always downloaded once a chain has completed syncing. Also add in logic to prevent re-dialing disconnected peers. Non-performant peers get disconnected during sync, this prevents re-connection to these during sync. ## Additional Info N/A --- .../eth2_libp2p/src/peer_manager/mod.rs | 14 ++-- .../eth2_libp2p/src/peer_manager/peerdb.rs | 16 +++-- .../eth2_libp2p/src/peer_manager/score.rs | 12 ++-- .../network/src/sync/range_sync/batch.rs | 23 +++++-- .../network/src/sync/range_sync/chain.rs | 64 ++++++++++++------- 5 files changed, 84 insertions(+), 45 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index e27afbe43..8b73653b7 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -159,7 +159,7 @@ impl PeerManager { info.score.apply_peer_action(action); if previous_state != info.score.state() { match info.score.state() { - ScoreState::Ban => { + ScoreState::Banned => { debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); ban_peer = Some(peer_id.clone()); if info.connection_status.is_connected_or_dialing() { @@ -169,7 +169,7 @@ impl PeerManager { )); } } - ScoreState::Disconnect => { + ScoreState::Disconnected => { debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); // disconnect the peer if it's currently connected or dialing unban_peer = Some(peer_id.clone()); @@ -501,7 +501,11 @@ impl PeerManager { .peers .read() .is_connected_or_dialing(&peer_id) - && !self.network_globals.peers.read().is_banned(&peer_id) + && !self + .network_globals + .peers + .read() + .is_banned_or_disconnected(&peer_id) { // TODO: Update output // This should be updated with the peer dialing. In fact created once the peer is @@ -629,7 +633,7 @@ impl PeerManager { // handle score transitions if previous_state != info.score.state() { match info.score.state() { - ScoreState::Ban => { + ScoreState::Banned => { debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); to_ban_peers.push(peer_id.clone()); if info.connection_status.is_connected_or_dialing() { @@ -639,7 +643,7 @@ impl PeerManager { )); } } - ScoreState::Disconnect => { + ScoreState::Disconnected => { debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); // disconnect the peer if it's currently connected or dialing to_unban_peers.push(peer_id.clone()); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index fe7fffadb..4830a0be8 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -1,6 +1,6 @@ use super::peer_info::{PeerConnectionStatus, PeerInfo}; use super::peer_sync_status::PeerSyncStatus; -use super::score::Score; +use super::score::{Score, ScoreState}; use crate::rpc::methods::MetaData; use crate::PeerId; use rand::seq::SliceRandom; @@ -99,9 +99,17 @@ impl PeerDB { /// Returns true if the Peer is banned. pub fn is_banned(&self, peer_id: &PeerId) -> bool { - match self.peers.get(peer_id).map(|info| &info.connection_status) { - Some(status) => status.is_banned(), - None => false, + match self.peers.get(peer_id).map(|info| info.score.state()) { + Some(ScoreState::Banned) => true, + _ => false, + } + } + + /// Returns true if the Peer is either banned or in the disconnected state. + pub fn is_banned_or_disconnected(&self, peer_id: &PeerId) -> bool { + match self.peers.get(peer_id).map(|info| info.score.state()) { + Some(ScoreState::Banned) | Some(ScoreState::Disconnected) => true, + _ => false, } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 719b6e990..7226db647 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -60,10 +60,10 @@ pub(crate) enum ScoreState { /// We are content with the peers performance. We permit connections and messages. Healthy, /// The peer should be disconnected. We allow re-connections if the peer is persistent. - Disconnect, + Disconnected, /// The peer is banned. We disallow new connections until it's score has decayed into a /// tolerable threshold. - Ban, + Banned, } /// A peer's score (perceived potential usefulness). @@ -138,8 +138,8 @@ impl std::fmt::Display for ScoreState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { ScoreState::Healthy => write!(f, "Healthy"), - ScoreState::Ban => write!(f, "Ban"), - ScoreState::Disconnect => write!(f, "Disconnect"), + ScoreState::Banned => write!(f, "Banned"), + ScoreState::Disconnected => write!(f, "Disconnected"), } } } @@ -164,8 +164,8 @@ impl Score { /// Returns the expected state of the peer given it's score. pub(crate) fn state(&self) -> ScoreState { match self.score { - x if x <= MIN_SCORE_BEFORE_BAN => ScoreState::Ban, - x if x <= MIN_SCORE_BEFORE_DISCONNECT => ScoreState::Disconnect, + x if x <= MIN_SCORE_BEFORE_BAN => ScoreState::Banned, + x if x <= MIN_SCORE_BEFORE_DISCONNECT => ScoreState::Disconnected, _ => ScoreState::Healthy, } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 5b3e2b581..9d65f11f8 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -39,23 +39,33 @@ pub struct Batch { pub id: BatchId, /// The requested start slot of the batch, inclusive. pub start_slot: Slot, - /// The requested end slot of batch, exclusive. + /// The requested end slot of batch, exlcusive. pub end_slot: Slot, - /// The peer that was originally assigned to the batch. - pub original_peer: PeerId, + /// The `Attempts` that have been made to send us this batch. + pub attempts: Vec, /// The peer that is currently assigned to the batch. pub current_peer: PeerId, /// The number of retries this batch has undergone due to a failed request. + /// This occurs when peers do not respond or we get an RPC error. pub retries: u8, /// The number of times this batch has attempted to be re-downloaded and re-processed. This /// occurs when a batch has been received but cannot be processed. pub reprocess_retries: u8, - /// Marks the batch as undergoing a re-process, with a hash of the original blocks it received. - pub original_hash: Option, /// The blocks that have been downloaded. pub downloaded_blocks: Vec>, } +/// Represents a peer's attempt and providing the result for this batch. +/// +/// Invalid attempts will downscore a peer. +#[derive(PartialEq, Debug)] +pub struct Attempt { + /// The peer that made the attempt. + pub peer_id: PeerId, + /// The hash of the blocks of the attempt. + pub hash: u64, +} + impl Eq for Batch {} impl Batch { @@ -64,11 +74,10 @@ impl Batch { id, start_slot, end_slot, - original_peer: peer_id.clone(), + attempts: Vec::new(), current_peer: peer_id, retries: 0, reprocess_retries: 0, - original_hash: None, downloaded_blocks: Vec::new(), } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 51afe45d2..be9765939 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -30,10 +30,10 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// be reported negatively. const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; -#[derive(PartialEq)] /// A return type for functions that act on a `Chain` which informs the caller whether the chain /// has been completed and should be removed or to be kept if further processing is /// required. +#[derive(PartialEq)] pub enum ProcessingResult { KeepChain, RemoveChain, @@ -325,18 +325,13 @@ impl SyncingChain { *self.to_be_processed_id += 1; // If the processed batch was not empty, we can validate previous invalidated - // blocks + // blocks including the current batch. if !batch.downloaded_blocks.is_empty() { self.mark_processed_batches_as_valid(network, &batch); } - // Add the current batch to processed batches to be verified in the future. We are - // only uncertain about this batch, if it has not returned all blocks. - if batch.downloaded_blocks.last().map(|block| block.slot()) - != Some(batch.end_slot.saturating_sub(1u64)) - { - self.processed_batches.push(batch); - } + // Add the current batch to processed batches to be verified in the future. + self.processed_batches.push(batch); // check if the chain has completed syncing if self.current_processed_slot() >= self.target_head_slot { @@ -432,7 +427,7 @@ impl SyncingChain { last_batch: &Batch, ) { while !self.processed_batches.is_empty() { - let processed_batch = self.processed_batches.remove(0); + let mut processed_batch = self.processed_batches.remove(0); if *processed_batch.id >= *last_batch.id { crit!(self.log, "A processed batch had a greater id than the current process id"; "chain_id" => self.id, @@ -440,12 +435,14 @@ impl SyncingChain { "current_id" => *last_batch.id); } - if let Some(prev_hash) = processed_batch.original_hash { + // Go through passed attempts and downscore peers that returned invalid batches + while !processed_batch.attempts.is_empty() { + let attempt = processed_batch.attempts.remove(0); // The validated batch has been re-processed - if prev_hash != processed_batch.hash() { + if attempt.hash != processed_batch.hash() { // The re-downloaded version was different - if processed_batch.current_peer != processed_batch.original_peer { - // A new peer sent the correct batch, the previous peer did not + if processed_batch.current_peer != attempt.peer_id { + // A different peer sent the correct batch, the previous peer did not // We negatively score the original peer. let action = PeerAction::LowToleranceError; debug!( @@ -453,10 +450,10 @@ impl SyncingChain { "chain_id" => self.id, "batch_id" => *processed_batch.id, "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",processed_batch.original_peer), + "original_peer" => format!("{}",attempt.peer_id), "new_peer" => format!("{}", processed_batch.current_peer) ); - network.report_peer(processed_batch.original_peer, action); + network.report_peer(attempt.peer_id, action); } else { // The same peer corrected it's previous mistake. There was an error, so we // negative score the original peer. @@ -466,10 +463,10 @@ impl SyncingChain { "chain_id" => self.id, "batch_id" => *processed_batch.id, "score_adjustment" => action.to_string(), - "original_peer" => format!("{}",processed_batch.original_peer), + "original_peer" => format!("{}",attempt.peer_id), "new_peer" => format!("{}", processed_batch.current_peer) ); - network.report_peer(processed_batch.original_peer, action); + network.report_peer(attempt.peer_id, action); } } } @@ -524,7 +521,13 @@ impl SyncingChain { mut batch: Batch, ) { // marks the batch as attempting to be reprocessed by hashing the downloaded blocks - batch.original_hash = Some(batch.hash()); + let attempt = super::batch::Attempt { + peer_id: batch.current_peer.clone(), + hash: batch.hash(), + }; + + // add this attempt to the batch + batch.attempts.push(attempt); // remove previously downloaded blocks batch.downloaded_blocks.clear(); @@ -546,7 +549,7 @@ impl SyncingChain { debug!(self.log, "Re-requesting batch"; "chain_id" => self.id, "start_slot" => batch.start_slot, - "end_slot" => batch.end_slot, + "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks "id" => *batch.id, "peer" => format!("{}", batch.current_peer), "retries" => batch.retries, @@ -682,7 +685,8 @@ impl SyncingChain { debug!(self.log, "Re-Requesting batch"; "chain_id" => self.id, "start_slot" => batch.start_slot, - "end_slot" => batch.end_slot, + "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks + "id" => *batch.id, "peer" => format!("{:?}", batch.current_peer)); self.send_batch(network, batch); @@ -707,7 +711,7 @@ impl SyncingChain { debug!(self.log, "Requesting batch"; "chain_id" => self.id, "start_slot" => batch.start_slot, - "end_slot" => batch.end_slot, + "end_slot" => batch.end_slot -1, // The -1 shows inclusive blocks "id" => *batch.id, "peer" => format!("{}", batch.current_peer)); // send the batch @@ -737,6 +741,16 @@ impl SyncingChain { /// Returns the next required batch from the chain if it exists. If there are no more batches /// required, `None` is returned. + /// + /// Batches are downloaded excluding the first block of the epoch assuming it has already been + /// downloaded. + /// + /// For example: + /// + /// + /// Epoch boundary | | + /// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | + /// Batch 1 | Batch 2 | Batch 3 fn get_next_batch(&mut self, peer_id: PeerId) -> Option> { let slots_per_epoch = T::EthSpec::slots_per_epoch(); let blocks_per_batch = slots_per_epoch * EPOCHS_PER_BATCH; @@ -751,7 +765,11 @@ impl SyncingChain { return None; } - let batch_start_slot = self.start_epoch.start_slot(slots_per_epoch) + // One is added to the start slot to begin one slot after the epoch boundary + let batch_start_slot = self + .start_epoch + .start_slot(slots_per_epoch) + .saturating_add(1u64) + self.to_be_downloaded_id.saturating_sub(1) * blocks_per_batch; // don't request batches beyond the target head slot