From 8a1654871529f593d5956cda5ddac4ff865acd15 Mon Sep 17 00:00:00 2001 From: divma Date: Fri, 13 Nov 2020 09:00:10 +0000 Subject: [PATCH] Misc Peer sync info adjustments (#1896) ## Issue Addressed #1856 ## Proposed Changes - For clarity, the router's processor now only decides if a peer is compatible and it disconnects it or sends it to sync accordingly. No logic here regarding how useful is the peer. - Update peer_sync_info's rules - Add an `IrrelevantPeer` sync status to account for incompatible peers (maybe this should be "IncompatiblePeer" now that I think about it?) this state is update upon receiving an internal goodbye in the peer manager - Misc code cleanups - Reduce the need to create `StatusMessage`s (and thus, `Arc` accesses ) - Add missing calls to update the global sync state The overall effect should be: - More peers recognized as Behind, and less as Unknown - Peers identified as incompatible --- .../eth2_libp2p/src/peer_manager/mod.rs | 6 +- .../src/peer_manager/peer_sync_status.rs | 94 +++++------- beacon_node/eth2_libp2p/src/service.rs | 2 +- beacon_node/network/src/router/processor.rs | 143 +++++------------ beacon_node/network/src/sync/manager.rs | 127 +++++---------- beacon_node/network/src/sync/mod.rs | 1 - .../network/src/sync/network_context.rs | 2 +- .../network/src/sync/peer_sync_info.rs | 145 ++++++++---------- .../src/sync/range_sync/chain_collection.rs | 43 ++---- .../network/src/sync/range_sync/range.rs | 32 ++-- .../network/src/sync/range_sync/sync_type.rs | 6 +- 11 files changed, 222 insertions(+), 379 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 225b51fbe..3377c3d7b 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -147,7 +147,11 @@ impl PeerManager { pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { // get the peer info if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score().to_string()); + debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); + if matches!(reason, GoodbyeReason::IrrelevantNetwork) { + info.sync_status.update(PeerSyncStatus::IrrelevantPeer); + } + // Goodbye's are fatal info.apply_peer_action_to_score(PeerAction::Fatal); } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs index d044a9095..9cf38ceb3 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs @@ -12,18 +12,32 @@ pub enum PeerSyncStatus { Advanced { info: SyncInfo }, /// Is behind our current head and not useful for block downloads. Behind { info: SyncInfo }, + /// This peer is in an incompatible network. + IrrelevantPeer, /// Not currently known as a STATUS handshake has not occurred. Unknown, } -/// This is stored inside the PeerSyncStatus and is very similar to `PeerSyncInfo` in the -/// `Network` crate. +/// A relevant peer's sync information. #[derive(Clone, Debug, Serialize)] pub struct SyncInfo { - pub status_head_slot: Slot, - pub status_head_root: Hash256, - pub status_finalized_epoch: Epoch, - pub status_finalized_root: Hash256, + pub head_slot: Slot, + pub head_root: Hash256, + pub finalized_epoch: Epoch, + pub finalized_root: Hash256, +} + +impl std::cmp::PartialEq for PeerSyncStatus { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (PeerSyncStatus::Synced { .. }, PeerSyncStatus::Synced { .. }) => true, + (PeerSyncStatus::Advanced { .. }, PeerSyncStatus::Advanced { .. }) => true, + (PeerSyncStatus::Behind { .. }, PeerSyncStatus::Behind { .. }) => true, + (PeerSyncStatus::IrrelevantPeer, PeerSyncStatus::IrrelevantPeer) => true, + (PeerSyncStatus::Unknown, PeerSyncStatus::Unknown) => true, + _ => false, + } + } } impl PeerSyncStatus { @@ -42,54 +56,26 @@ impl PeerSyncStatus { matches!(self, PeerSyncStatus::Behind { .. }) } - /// Updates the sync state given a fully synced peer. - /// Returns true if the state has changed. - pub fn update_synced(&mut self, info: SyncInfo) -> bool { - let new_state = PeerSyncStatus::Synced { info }; - - match self { - PeerSyncStatus::Synced { .. } | PeerSyncStatus::Unknown => { - *self = new_state; - false // state was not updated - } - _ => { - *self = new_state; - true - } - } - } - - /// Updates the sync state given a peer that is further ahead in the chain than us. - /// Returns true if the state has changed. - pub fn update_advanced(&mut self, info: SyncInfo) -> bool { - let new_state = PeerSyncStatus::Advanced { info }; - - match self { - PeerSyncStatus::Advanced { .. } | PeerSyncStatus::Unknown => { - *self = new_state; - false // state was not updated - } - _ => { - *self = new_state; - true - } - } - } - - /// Updates the sync state given a peer that is behind us in the chain. - /// Returns true if the state has changed. - pub fn update_behind(&mut self, info: SyncInfo) -> bool { - let new_state = PeerSyncStatus::Behind { info }; - - match self { - PeerSyncStatus::Behind { .. } | PeerSyncStatus::Unknown => { - *self = new_state; - false // state was not updated - } - _ => { - *self = new_state; - true - } + pub fn update(&mut self, new_state: PeerSyncStatus) -> bool { + if *self == new_state { + *self = new_state; + false // state was not updated + } else { + *self = new_state; + true } } } + +impl std::fmt::Display for PeerSyncStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let rpr = match self { + PeerSyncStatus::Behind { .. } => "Behind", + PeerSyncStatus::Advanced { .. } => "Advanced", + PeerSyncStatus::Synced { .. } => "Synced", + PeerSyncStatus::Unknown => "Unknown", + PeerSyncStatus::IrrelevantPeer => "IrrelevantPeer", + }; + f.write_str(rpr) + } +} diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 26e844b1a..44e1e9319 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -249,7 +249,7 @@ impl Service { self.swarm.report_peer(peer_id, action); } - // Disconnect and ban a peer, providing a reason. + /// Disconnect and ban a peer, providing a reason. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { self.swarm.goodbye_peer(peer_id, reason); } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 52c0d52e6..109edba19 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -2,11 +2,11 @@ use crate::beacon_processor::{ BeaconProcessor, WorkEvent as BeaconWorkEvent, MAX_WORK_EVENT_QUEUE_LEN, }; use crate::service::NetworkMessage; -use crate::sync::{PeerSyncInfo, SyncMessage}; +use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::*; use eth2_libp2p::{ - MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, + MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, SyncInfo, }; use itertools::process_results; use slog::{debug, error, o, trace, warn}; @@ -113,7 +113,7 @@ impl Processor { /// Called when we first connect to a peer, or when the PeerManager determines we need to /// re-status. pub fn send_status(&mut self, peer_id: PeerId) { - if let Some(status_message) = status_message(&self.chain) { + if let Ok(status_message) = status_message(&self.chain) { debug!( self.log, "Sending Status Request"; @@ -150,7 +150,7 @@ impl Processor { ); // ignore status responses if we are shutting down - if let Some(status_message) = status_message(&self.chain) { + if let Ok(status_message) = status_message(&self.chain) { // Say status back. self.network.send_response( peer_id.clone(), @@ -183,41 +183,23 @@ impl Processor { } } - /// Process a `Status` message, requesting new blocks if appropriate. - /// - /// Disconnects the peer if required. + /// Process a `Status` message to determine if a peer is relevant to us. Irrelevant peers are + /// disconnected; relevant peers are sent to the SyncManager fn process_status( &mut self, peer_id: PeerId, - status: StatusMessage, + remote: StatusMessage, ) -> Result<(), BeaconChainError> { - let remote = PeerSyncInfo::from(status); - let local = match PeerSyncInfo::from_chain(&self.chain) { - Some(local) => local, - None => { - error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ); - return Err(BeaconChainError::CannotAttestToFutureState); - } - }; - + let local = status_message(&self.chain)?; let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - if local.fork_digest != remote.fork_digest { - // The node is on a different network/fork, disconnect them. - debug!( - self.log, "Handshake Failure"; - "peer_id" => peer_id.to_string(), - "reason" => "incompatible forks", - "our_fork" => hex::encode(local.fork_digest), - "their_fork" => hex::encode(remote.fork_digest) - ); - - self.network - .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); + let irrelevant_reason = if local.fork_digest != remote.fork_digest { + // The node is on a different network/fork + Some(format!( + "Incompatible forks Ours:{} Theirs:{}", + hex::encode(local.fork_digest), + hex::encode(remote.fork_digest) + )) } else if remote.head_slot > self .chain @@ -225,19 +207,10 @@ impl Processor { .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()) + FUTURE_SLOT_TOLERANCE { - // Note: If the slot_clock cannot be read, this will not error. Other system - // components will deal with an invalid slot clock error. - - // The remotes head is on a slot that is significantly ahead of ours. This could be - // because they are using a different genesis time, or that theirs or our system - // clock is incorrect. - debug!( - self.log, "Handshake Failure"; - "peer" => peer_id.to_string(), - "reason" => "different system clocks or genesis time" - ); - self.network - .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); + // The remote's head is on a slot that is significantly ahead of what we consider the + // current slot. This could be because they are using a different genesis time, or that + // their or our system's clock is incorrect. + Some("Different system clocks or genesis time".to_string()) } else if remote.finalized_epoch <= local.finalized_epoch && remote.finalized_root != Hash256::zero() && local.finalized_root != Hash256::zero() @@ -246,64 +219,26 @@ impl Processor { .root_at_slot(start_slot(remote.finalized_epoch)) .map(|root_opt| root_opt != Some(remote.finalized_root))? { - // The remotes finalized epoch is less than or greater than ours, but the block root is - // different to the one in our chain. - // - // Therefore, the node is on a different chain and we should not communicate with them. - debug!( - self.log, "Handshake Failure"; - "peer" => peer_id.to_string(), - "reason" => "different finalized chain" - ); + // The remote's finalized epoch is less than or equal to ours, but the block root is + // different to the one in our chain. Therefore, the node is on a different chain and we + // should not communicate with them. + Some("Different finalized chain".to_string()) + } else { + None + }; + + if let Some(irrelevant_reason) = irrelevant_reason { + debug!(self.log, "Handshake Failure"; "peer" => %peer_id, "reason" => irrelevant_reason); self.network .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); - } else if remote.finalized_epoch < local.finalized_epoch { - // The node has a lower finalized epoch, their chain is not useful to us. There are two - // cases where a node can have a lower finalized epoch: - // - // ## The node is on the same chain - // - // If a node is on the same chain but has a lower finalized epoch, their head must be - // lower than ours. Therefore, we have nothing to request from them. - // - // ## The node is on a fork - // - // If a node is on a fork that has a lower finalized epoch, switching to that fork would - // cause us to revert a finalized block. This is not permitted, therefore we have no - // interest in their blocks. - debug!( - self.log, - "NaivePeer"; - "peer" => peer_id.to_string(), - "reason" => "lower finalized epoch" - ); - } else if self - .chain - .store - .item_exists::>(&remote.head_root)? - { - debug!( - self.log, "Peer with known chain found"; - "peer" => peer_id.to_string(), - "remote_head_slot" => remote.head_slot, - "remote_latest_finalized_epoch" => remote.finalized_epoch, - ); - - // If the node's best-block is already known to us and they are close to our current - // head, treat them as a fully sync'd peer. - self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } else { - // The remote node has an equal or great finalized epoch and we don't know it's head. - // - // Therefore, there are some blocks between the local finalized epoch and the remote - // head that are worth downloading. - debug!( - self.log, "UsefulPeer"; - "peer" => peer_id.to_string(), - "local_finalized_epoch" => local.finalized_epoch, - "remote_latest_finalized_epoch" => remote.finalized_epoch, - ); - self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); + let info = SyncInfo { + head_slot: remote.head_slot, + head_root: remote.head_root, + finalized_epoch: remote.finalized_epoch, + finalized_root: remote.finalized_root, + }; + self.send_to_sync(SyncMessage::AddPeer(peer_id, info)); } Ok(()) @@ -679,14 +614,14 @@ impl Processor { /// Build a `StatusMessage` representing the state of the given `beacon_chain`. pub(crate) fn status_message( beacon_chain: &BeaconChain, -) -> Option { - let head_info = beacon_chain.head_info().ok()?; +) -> Result { + let head_info = beacon_chain.head_info()?; let genesis_validators_root = beacon_chain.genesis_validators_root; let fork_digest = ChainSpec::compute_fork_digest(head_info.fork.current_version, genesis_validators_root); - Some(StatusMessage { + Ok(StatusMessage { fork_digest, finalized_root: head_info.finalized_checkpoint.root, finalized_epoch: head_info.finalized_checkpoint.epoch, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 18d9f9aea..5c2a01e2b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,14 +34,16 @@ //! search for the block and subsequently search for parents if needed. use super::network_context::SyncNetworkContext; -use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; +use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{ChainId, RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::RequestId; use crate::beacon_processor::{ProcessId, WorkEvent as BeaconWorkEvent}; +use crate::router::processor::status_message; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; use eth2_libp2p::types::{NetworkGlobals, SyncState}; +use eth2_libp2p::SyncInfo; use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; use lru_cache::LRUCache; @@ -73,7 +75,7 @@ const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; /// A message than can be sent to the sync manager thread. pub enum SyncMessage { /// A useful peer has been discovered. - AddPeer(PeerId, PeerSyncInfo), + AddPeer(PeerId, SyncInfo), /// A `BlocksByRange` response has been received. BlocksByRangeResponse { @@ -254,62 +256,32 @@ impl SyncManager { /// /// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to /// ours that we consider it fully sync'd with respect to our current chain. - fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { + fn add_peer(&mut self, peer_id: PeerId, remote: SyncInfo) { // ensure the beacon chain still exists - let local_peer_info = match PeerSyncInfo::from_chain(&self.chain) { - Some(local) => local, - None => { - return error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) + let local = match status_message(&self.chain) { + Ok(status) => SyncInfo { + head_slot: status.head_slot, + head_root: status.head_root, + finalized_epoch: status.finalized_epoch, + finalized_root: status.finalized_root, + }, + Err(e) => { + return error!(self.log, "Failed to get peer sync info"; + "msg" => "likely due to head lock contention", "err" => ?e) } }; - match local_peer_info.peer_sync_type(&remote) { - PeerSyncType::FullySynced => { - trace!(self.log, "Peer synced to our head found"; - "peer" => %peer_id, - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local_peer_info.head_slot, - ); - self.synced_peer(&peer_id, remote); - } - PeerSyncType::Advanced => { - trace!(self.log, "Useful peer for sync found"; - "peer" => %peer_id, - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local_peer_info.head_slot, - "peer_finalized_epoch" => remote.finalized_epoch, - "local_finalized_epoch" => local_peer_info.finalized_epoch, - ); + let sync_type = remote_sync_type(&local, &remote, &self.chain); - // There are few cases to handle here: - // - // - A peer could appear advanced if our fork choice has rejected their version of - // the chain. If we know of their head slot, we consider this peer fully synced. - // - A peer could have just advanced to the next epoch and have a new finalized - // epoch that is currently ahead of ours. If their finalized epoch is ahead of ours - // by one and their head_slot is within the slot tolerance, consider this peer - // fully synced. + // update the state of the peer. + self.update_peer_sync_state(&peer_id, &local, &remote, &sync_type); - if (self.chain.fork_choice.read().contains_block(&remote.head_root)) || // the first case - (remote.finalized_epoch.sub(local_peer_info.finalized_epoch) == 1 && remote.head_slot.sub(local_peer_info.head_slot) < SLOT_IMPORT_TOLERANCE as u64) - // the second case - { - self.synced_peer(&peer_id, remote); - } else { - // Add the peer to our RangeSync - self.range_sync - .add_peer(&mut self.network, peer_id.clone(), remote); - self.advanced_peer(&peer_id, remote); - } - } - PeerSyncType::Behind => { - self.behind_peer(&peer_id, remote); - } + if matches!(sync_type, PeerSyncType::Advanced) { + self.range_sync + .add_peer(&mut self.network, local, peer_id, remote); } + + self.update_sync_state(); } /// The response to a `BlocksByRoot` request. @@ -616,6 +588,7 @@ impl SyncManager { // otherwise, this is a range sync issue, notify the range sync self.range_sync .inject_error(&mut self.network, peer_id, request_id); + self.update_sync_state(); } fn peer_disconnect(&mut self, peer_id: &PeerId) { @@ -623,47 +596,25 @@ impl SyncManager { self.update_sync_state(); } - // TODO: Group these functions into one for cleaner code. - /// Updates the syncing state of a peer to be synced. - fn synced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { + /// Updates the syncing state of a peer. + fn update_peer_sync_state( + &mut self, + peer_id: &PeerId, + local_sync_info: &SyncInfo, + remote_sync_info: &SyncInfo, + sync_type: &PeerSyncType, + ) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let head_slot = sync_info.head_slot; - let finalized_epoch = sync_info.finalized_epoch; - if peer_info.sync_status.update_synced(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "synced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); + let new_state = sync_type.as_sync_status(remote_sync_info); + let rpr = new_state.to_string(); + if peer_info.sync_status.update(new_state) { + debug!(self.log, "Peer transitioned sync state"; "peer_id" => %peer_id, "new_state" => rpr, + "our_head_slot" => local_sync_info.head_slot, "out_finalized_epoch" => local_sync_info.finalized_epoch, + "their_head_slot" => remote_sync_info.head_slot, "their_finalized_epoch" => remote_sync_info.finalized_epoch); } } else { crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); } - self.update_sync_state(); - } - - /// Updates the syncing state of a peer to be advanced. - fn advanced_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let head_slot = sync_info.head_slot; - let finalized_epoch = sync_info.finalized_epoch; - if peer_info.sync_status.update_advanced(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "advanced", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); - } - } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); - } - self.update_sync_state(); - } - - /// Updates the syncing state of a peer to be behind. - fn behind_peer(&mut self, peer_id: &PeerId, sync_info: PeerSyncInfo) { - if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { - let head_slot = sync_info.head_slot; - let finalized_epoch = sync_info.finalized_epoch; - if peer_info.sync_status.update_behind(sync_info.into()) { - debug!(self.log, "Peer transitioned sync state"; "new_state" => "behind", "peer_id" => %peer_id, "head_slot" => head_slot, "finalized_epoch" => finalized_epoch); - } - } else { - crit!(self.log, "Status'd peer is unknown"; "peer_id" => %peer_id); - } - self.update_sync_state(); } /// Updates the global sync state and logs any changes. @@ -921,6 +872,7 @@ impl SyncManager { request_id, beacon_block.map(|b| *b), ); + self.update_sync_state(); } SyncMessage::BlocksByRootResponse { peer_id, @@ -953,6 +905,7 @@ impl SyncManager { epoch, result, ); + self.update_sync_state(); } SyncMessage::ParentLookupFailed { chain_head, diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 1d6e4f543..9377f6552 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -7,7 +7,6 @@ mod peer_sync_info; mod range_sync; pub use manager::{BatchProcessResult, SyncMessage}; -pub use peer_sync_info::PeerSyncInfo; pub use range_sync::ChainId; /// Type of id of rpc requests sent by sync diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c9b0ee058..aa000939d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -63,7 +63,7 @@ impl SyncNetworkContext { chain: Arc>, peers: impl Iterator, ) { - if let Some(status_message) = status_message(&chain) { + if let Ok(status_message) = status_message(&chain) { for peer_id in peers { debug!( self.log, diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index 0fad735cb..513b05a09 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -1,21 +1,7 @@ use super::manager::SLOT_IMPORT_TOLERANCE; -use crate::router::processor::status_message; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::*; -use eth2_libp2p::SyncInfo; -use std::ops::Sub; -use std::sync::Arc; -use types::{Epoch, Hash256, Slot}; - -/// Keeps track of syncing information for known connected peers. -#[derive(Clone, Copy, Debug)] -pub struct PeerSyncInfo { - pub fork_digest: [u8; 4], - pub finalized_root: Hash256, - pub finalized_epoch: Epoch, - pub head_root: Hash256, - pub head_slot: Slot, -} +use eth2_libp2p::{PeerSyncStatus, SyncInfo}; +use std::cmp::Ordering; /// The type of peer relative to our current state. pub enum PeerSyncType { @@ -27,82 +13,75 @@ pub enum PeerSyncType { Behind, } -impl From for PeerSyncInfo { - fn from(status: StatusMessage) -> PeerSyncInfo { - PeerSyncInfo { - fork_digest: status.fork_digest, - finalized_root: status.finalized_root, - finalized_epoch: status.finalized_epoch, - head_root: status.head_root, - head_slot: status.head_slot, +impl PeerSyncType { + pub fn as_sync_status(&self, info: &SyncInfo) -> PeerSyncStatus { + match self { + PeerSyncType::FullySynced => PeerSyncStatus::Synced { info: info.clone() }, + PeerSyncType::Behind => PeerSyncStatus::Behind { info: info.clone() }, + PeerSyncType::Advanced => PeerSyncStatus::Advanced { info: info.clone() }, } } } -impl Into for PeerSyncInfo { - fn into(self) -> SyncInfo { - SyncInfo { - status_head_slot: self.head_slot, - status_head_root: self.head_root, - status_finalized_epoch: self.finalized_epoch, - status_finalized_root: self.finalized_root, - } - } -} +pub fn remote_sync_type( + local: &SyncInfo, + remote: &SyncInfo, + chain: &BeaconChain, +) -> PeerSyncType { + // auxiliary variables for clarity: Inclusive boundaries of the range in which we consider a peer's + // head "near" ours. + let near_range_start = local.head_slot - SLOT_IMPORT_TOLERANCE as u64; + let near_range_end = local.head_slot + SLOT_IMPORT_TOLERANCE as u64; -impl PeerSyncInfo { - /// Derives the peer sync information from a beacon chain. - pub fn from_chain(chain: &Arc>) -> Option { - Some(Self::from(status_message(chain)?)) - } - - /// Given another peer's `PeerSyncInfo` this will determine how useful that peer is to us in - /// regards to syncing. This returns the peer sync type that can then be handled by the - /// `SyncManager`. - pub fn peer_sync_type(&self, remote_peer_sync_info: &PeerSyncInfo) -> PeerSyncType { - // check if the peer is fully synced with our current chain - if self.is_fully_synced_peer(remote_peer_sync_info) { - PeerSyncType::FullySynced - } - // if not, check if the peer is ahead of our chain - else if self.is_advanced_peer(remote_peer_sync_info) { - PeerSyncType::Advanced - } else { - // the peer must be behind and not useful + match remote.finalized_epoch.cmp(&local.finalized_epoch) { + Ordering::Less => { + // The node has a lower finalized epoch, their chain is not useful to us. There are two + // cases where a node can have a lower finalized epoch: + // + // ## The node is on the same chain + // + // If a node is on the same chain but has a lower finalized epoch, their head must be + // lower than ours. Therefore, we have nothing to request from them. + // + // ## The node is on a fork + // + // If a node is on a fork that has a lower finalized epoch, switching to that fork would + // cause us to revert a finalized block. This is not permitted, therefore we have no + // interest in their blocks. + // + // We keep these peers to allow them to sync from us. PeerSyncType::Behind } - } - - /// Determines if another peer is fully synced with the current peer. - /// - /// A fully synced peer is a peer whose finalized epoch and hash match our own and their - /// head is within SLOT_IMPORT_TOLERANCE of our own. - /// In this case we ignore any batch/range syncing. - fn is_fully_synced_peer(&self, remote: &PeerSyncInfo) -> bool { - // ensure we are on the same chain, with minor differing heads - if remote.finalized_epoch == self.finalized_epoch - && remote.finalized_root == self.finalized_root - { - // that we are within SLOT_IMPORT_TOLERANCE of our two heads - if (self.head_slot >= remote.head_slot - && self.head_slot.sub(remote.head_slot).as_usize() <= SLOT_IMPORT_TOLERANCE) - || (self.head_slot < remote.head_slot - && remote.head_slot.sub(self.head_slot).as_usize() <= SLOT_IMPORT_TOLERANCE) + Ordering::Equal => { + // NOTE: if a peer has our same `finalized_epoch` with a different `finalized_root` + // they are not considered relevant and won't be propagated to sync. + // Check if the peer is the peer is inside the tolerance range to be considered synced. + if remote.head_slot < near_range_start { + PeerSyncType::Behind + } else if remote.head_slot > near_range_end + && !chain.fork_choice.read().contains_block(&remote.head_root) { - return true; + // This peer has a head ahead enough of ours and we have no knowledge of their best + // block. + PeerSyncType::Advanced + } else { + // This peer is either in the tolerance range, or ahead us with an already rejected + // block. + PeerSyncType::FullySynced + } + } + Ordering::Greater => { + if (local.finalized_epoch + 1 == remote.finalized_epoch + && near_range_start <= remote.head_slot + && remote.head_slot <= near_range_end) + || chain.fork_choice.read().contains_block(&remote.head_root) + { + // This peer is near enough to us to be considered synced, or + // we have already synced up to this peer's head + PeerSyncType::FullySynced + } else { + PeerSyncType::Advanced } } - false - } - - /// Determines if a peer has more knowledge about the current chain than we do. - /// - /// There are two conditions here. - /// 1) The peer could have a head slot that is greater - /// than SLOT_IMPORT_TOLERANCE of our current head. - /// 2) The peer has a greater finalized slot/epoch than our own. - fn is_advanced_peer(&self, remote: &PeerSyncInfo) -> bool { - remote.head_slot.sub(self.head_slot).as_usize() > SLOT_IMPORT_TOLERANCE - || self.finalized_epoch < remote.finalized_epoch } } diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 22d5b23ee..fade0a4e7 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -7,9 +7,9 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; +use eth2_libp2p::SyncInfo; use fnv::FnvHashMap; use slog::{crit, debug, error}; use smallvec::SmallVec; @@ -185,35 +185,22 @@ impl ChainCollection { pub fn update( &mut self, network: &mut SyncNetworkContext, - awaiting_head_peers: &mut HashMap, + local: &SyncInfo, + awaiting_head_peers: &mut HashMap, beacon_processor_send: &mpsc::Sender>, ) { - let (local_finalized_epoch, local_head_epoch) = - match PeerSyncInfo::from_chain(&self.beacon_chain) { - None => { - return error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) - } - Some(local) => ( - local.finalized_epoch, - local.head_slot.epoch(T::EthSpec::slots_per_epoch()), - ), - }; - // Remove any outdated finalized/head chains - self.purge_outdated_chains(awaiting_head_peers); + self.purge_outdated_chains(local, awaiting_head_peers); + let local_head_epoch = local.head_slot.epoch(T::EthSpec::slots_per_epoch()); // Choose the best finalized chain if one needs to be selected. - self.update_finalized_chains(network, local_finalized_epoch, local_head_epoch); + self.update_finalized_chains(network, local.finalized_epoch, local_head_epoch); if !matches!(self.state, RangeSyncState::Finalized(_)) { // Handle head syncing chains if there are no finalized chains left. self.update_head_chains( network, - local_finalized_epoch, + local.finalized_epoch, local_head_epoch, awaiting_head_peers, beacon_processor_send, @@ -329,7 +316,7 @@ impl ChainCollection { network: &mut SyncNetworkContext, local_epoch: Epoch, local_head_epoch: Epoch, - awaiting_head_peers: &mut HashMap, + awaiting_head_peers: &mut HashMap, beacon_processor_send: &mpsc::Sender>, ) { // Include the awaiting head peers @@ -404,19 +391,9 @@ impl ChainCollection { /// finalized block slot. Peers that would create outdated chains are removed too. pub fn purge_outdated_chains( &mut self, - awaiting_head_peers: &mut HashMap, + local_info: &SyncInfo, + awaiting_head_peers: &mut HashMap, ) { - let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { - Some(local) => local, - None => { - return error!( - self.log, - "Failed to get peer sync info"; - "msg" => "likely due to head lock contention" - ) - } - }; - let local_finalized_slot = local_info .finalized_epoch .start_slot(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 32228e250..9075804f8 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,12 +43,13 @@ use super::chain::{ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::beacon_processor::WorkEvent as BeaconWorkEvent; +use crate::router::processor::status_message; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; -use crate::sync::PeerSyncInfo; use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PeerId; +use eth2_libp2p::SyncInfo; use slog::{crit, debug, error, trace}; use std::collections::HashMap; use std::sync::Arc; @@ -63,7 +64,7 @@ pub struct RangeSync { beacon_chain: Arc>, /// Last known sync info of our useful connected peers. We use this information to create Head /// chains after all finalized chains have ended. - awaiting_head_peers: HashMap, + awaiting_head_peers: HashMap, /// A collection of chains that need to be downloaded. This stores any head or finalized chains /// that need to be downloaded. chains: ChainCollection, @@ -100,22 +101,15 @@ impl RangeSync { pub fn add_peer( &mut self, network: &mut SyncNetworkContext, + local_info: SyncInfo, peer_id: PeerId, - remote_info: PeerSyncInfo, + remote_info: SyncInfo, ) { // evaluate which chain to sync from // determine if we need to run a sync to the nearest finalized state or simply sync to // its current head - let local_info = match PeerSyncInfo::from_chain(&self.beacon_chain) { - Some(local) => local, - None => { - return error!(self.log, "Failed to get peer sync info"; - "msg" => "likely due to head lock contention") - } - }; - // convenience variable let remote_finalized_slot = remote_info .finalized_epoch @@ -146,6 +140,7 @@ impl RangeSync { self.chains.update( network, + &local_info, &mut self.awaiting_head_peers, &self.beacon_processor_send, ); @@ -182,6 +177,7 @@ impl RangeSync { ); self.chains.update( network, + &local_info, &mut self.awaiting_head_peers, &self.beacon_processor_send, ); @@ -345,9 +341,23 @@ impl RangeSync { network.status_peers(self.beacon_chain.clone(), chain.peers()); + let local = match status_message(&self.beacon_chain) { + Ok(status) => SyncInfo { + head_slot: status.head_slot, + head_root: status.head_root, + finalized_epoch: status.finalized_epoch, + finalized_root: status.finalized_root, + }, + Err(e) => { + return error!(self.log, "Failed to get peer sync info"; + "msg" => "likely due to head lock contention", "err" => ?e) + } + }; + // update the state of the collection self.chains.update( network, + &local, &mut self.awaiting_head_peers, &self.beacon_processor_send, ); diff --git a/beacon_node/network/src/sync/range_sync/sync_type.rs b/beacon_node/network/src/sync/range_sync/sync_type.rs index d9f1d3f17..c5e847bd5 100644 --- a/beacon_node/network/src/sync/range_sync/sync_type.rs +++ b/beacon_node/network/src/sync/range_sync/sync_type.rs @@ -1,8 +1,8 @@ //! Contains logic about identifying which Sync to perform given PeerSyncInfo of ourselves and //! of a remote. -use crate::sync::PeerSyncInfo; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::SyncInfo; use std::sync::Arc; /// The type of Range sync that should be done relative to our current state. @@ -19,8 +19,8 @@ impl RangeSyncType { /// `PeerSyncInfo`. pub fn new( chain: &Arc>, - local_info: &PeerSyncInfo, - remote_info: &PeerSyncInfo, + local_info: &SyncInfo, + remote_info: &SyncInfo, ) -> RangeSyncType { // Check for finalized chain sync //