From af96dd08c89aa404cc715b21195a4610df4a219d Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 6 Jun 2019 00:32:09 -0400 Subject: [PATCH] Simplify `simple_sync` code --- beacon_node/network/src/sync/simple_sync.rs | 260 ++++++-------------- 1 file changed, 77 insertions(+), 183 deletions(-) diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index af6cbdfc0..043f0beda 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,8 +1,6 @@ use super::import_queue::ImportQueue; use crate::message_handler::NetworkContext; -use beacon_chain::{ - BeaconChain, BeaconChainError, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock, -}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome, InvalidBlock}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; @@ -33,51 +31,6 @@ pub struct PeerSyncInfo { best_slot: Slot, } -impl PeerSyncInfo { - /// Returns `true` if the has a different network ID to `other`. - fn has_different_network_id_to(&self, other: Self) -> bool { - self.network_id != other.network_id - } - - /// Returns `true` if the peer has a higher finalized epoch than `other`. - fn has_higher_finalized_epoch_than(&self, other: Self) -> bool { - self.latest_finalized_epoch > other.latest_finalized_epoch - } - - /// Returns `true` if the peer has a higher best slot than `other`. - fn has_higher_best_slot_than(&self, other: Self) -> bool { - self.best_slot > other.best_slot - } -} - -/// The status of a peers view on the chain, relative to some other view of the chain (presumably -/// our view). -#[derive(PartialEq, Clone, Copy, Debug)] -pub enum PeerStatus { - /// The peer is on a completely different chain. - DifferentNetworkId, - /// The peer lists a finalized epoch for which we have a different root. - FinalizedEpochNotInChain, - /// The peer has a higher finalized epoch. - HigherFinalizedEpoch, - /// The peer has a higher best slot. - HigherBestSlot, - /// The peer has the same or lesser view of the chain. We have nothing to request of them. - NotInteresting, -} - -impl PeerStatus { - pub fn should_handshake(self) -> bool { - match self { - PeerStatus::DifferentNetworkId => false, - PeerStatus::FinalizedEpochNotInChain => false, - PeerStatus::HigherFinalizedEpoch => true, - PeerStatus::HigherBestSlot => true, - PeerStatus::NotInteresting => true, - } - } -} - impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { @@ -153,7 +106,7 @@ impl SimpleSync { /// /// Sends a `Hello` message to the peer. pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { - info!(self.log, "PeerConnect"; "peer" => format!("{:?}", peer_id)); + info!(self.log, "PeerConnected"; "peer" => format!("{:?}", peer_id)); network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); } @@ -193,51 +146,6 @@ impl SimpleSync { self.process_hello(peer_id, hello, network); } - /// Returns a `PeerStatus` for some peer. - fn peer_status(&self, peer: PeerSyncInfo) -> PeerStatus { - let local = PeerSyncInfo::from(&self.chain); - - if peer.has_different_network_id_to(local) { - return PeerStatus::DifferentNetworkId; - } - - if local.has_higher_finalized_epoch_than(peer) { - let peer_finalized_slot = peer - .latest_finalized_epoch - .start_slot(T::EthSpec::spec().slots_per_epoch); - - let local_roots = self.chain.get_block_roots(peer_finalized_slot, 1, 0); - - if let Ok(local_roots) = local_roots { - if let Some(local_root) = local_roots.get(0) { - if *local_root != peer.latest_finalized_root { - return PeerStatus::FinalizedEpochNotInChain; - } - } else { - error!( - self.log, - "Cannot get root for peer finalized slot."; - "error" => "empty roots" - ); - } - } else { - error!( - self.log, - "Cannot get root for peer finalized slot."; - "error" => format!("{:?}", local_roots) - ); - } - } - - if peer.has_higher_finalized_epoch_than(local) { - PeerStatus::HigherFinalizedEpoch - } else if peer.has_higher_best_slot_than(local) { - PeerStatus::HigherBestSlot - } else { - PeerStatus::NotInteresting - } - } - /// Process a `Hello` message, requesting new blocks if appropriate. /// /// Disconnects the peer if required. @@ -251,52 +159,68 @@ impl SimpleSync { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); - let remote_status = self.peer_status(remote); - if remote_status.should_handshake() { - info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); - self.known_peers.insert(peer_id.clone(), remote); - } else { + let network_id_mismatch = local.network_id != remote.network_id; + let on_different_finalized_chain = (local.latest_finalized_epoch + >= remote.latest_finalized_epoch) + && (!self + .chain + .rev_iter_block_roots(local.best_slot) + .any(|root| root == remote.latest_finalized_root)); + + if network_id_mismatch || on_different_finalized_chain { info!( self.log, "HandshakeFailure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" ); network.disconnect(peer_id.clone(), GoodbyeReason::IrreleventNetwork); + return; + } else { + info!(self.log, "HandshakeSuccess"; "peer" => format!("{:?}", peer_id)); + self.known_peers.insert(peer_id.clone(), remote); } - // If required, send additional requests. - match remote_status { - PeerStatus::HigherFinalizedEpoch => { - let start_slot = remote - .latest_finalized_epoch - .start_slot(spec.slots_per_epoch); - let required_slots = start_slot - local.best_slot; + // If we have equal or better finalized epochs and best slots, we require nothing else from + // this peer. + if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) + && (remote.best_slot <= local.best_slot) + { + return; + } - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot, - count: required_slots.into(), - }, - network, - ); - } - PeerStatus::HigherBestSlot => { - let required_slots = remote.best_slot - local.best_slot; + // If the remote has a higher finalized epoch, request all block roots from our finalized + // epoch through to its best slot. + if remote.latest_finalized_epoch > local.latest_finalized_epoch { + let start_slot = local + .latest_finalized_epoch + .start_slot(spec.slots_per_epoch); + let required_slots = start_slot - remote.best_slot; - self.request_block_roots( - peer_id, - BeaconBlockRootsRequest { - start_slot: local.best_slot + 1, - count: required_slots.into(), - }, - network, - ); - } - PeerStatus::FinalizedEpochNotInChain => {} - PeerStatus::DifferentNetworkId => {} - PeerStatus::NotInteresting => {} + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); + // If the remote has a greater best slot, request the roots between our best slot and their + // best slot. + } else if remote.best_root > local.best_root { + let start_slot = local + .latest_finalized_epoch + .start_slot(spec.slots_per_epoch); + let required_slots = start_slot - remote.best_slot; + + self.request_block_roots( + peer_id, + BeaconBlockRootsRequest { + start_slot, + count: required_slots.into(), + }, + network, + ); } } @@ -315,27 +239,12 @@ impl SimpleSync { "count" => req.count, ); - let roots = match self + let roots = self .chain - .get_block_roots(req.start_slot, req.count as usize, 0) - { - Ok(roots) => roots, - Err(e) => { - // TODO: return RPC error. - warn!( - self.log, - "RPCRequest"; "peer" => format!("{:?}", peer_id), - "req" => "BeaconBlockRoots", - "error" => format!("{:?}", e) - ); - return; - } - }; - - let roots = roots - .iter() + .rev_iter_block_roots(req.start_slot) + .take(req.count as usize) .enumerate() - .map(|(i, &block_root)| BlockRootSlot { + .map(|(i, block_root)| BlockRootSlot { slot: req.start_slot + Slot::from(i), block_root, }) @@ -426,24 +335,12 @@ impl SimpleSync { "count" => req.max_headers, ); - let headers = match get_block_headers( + let headers = get_block_headers( &self.chain, req.start_slot, req.max_headers as usize, req.skip_slots as usize, - ) { - Ok(headers) => headers, - Err(e) => { - // TODO: return RPC error. - warn!( - self.log, - "RPCRequest"; "peer" => format!("{:?}", peer_id), - "req" => "BeaconBlockHeaders", - "error" => format!("{:?}", e) - ); - return; - } - }; + ); network.send_rpc_response( peer_id, @@ -554,14 +451,15 @@ impl SimpleSync { ) -> bool { info!( self.log, - "NewGossipBlock"; + "GossipBlockReceived"; "peer" => format!("{:?}", peer_id), + "block_slot" => format!("{:?}", block.slot), ); // Ignore any block from a finalized slot. if self.slot_is_finalized(block.slot) { warn!( - self.log, "NewGossipBlock"; + self.log, "IgnoredGossipBlock"; "msg" => "new block slot is finalized.", "block_slot" => block.slot, ); @@ -572,22 +470,15 @@ impl SimpleSync { // Ignore any block that the chain already knows about. if self.chain_has_seen_block(&block_root) { - println!("this happened"); // TODO: Age confirm that we shouldn't forward a block if we already know of it. return false; } - debug!( - self.log, - "NewGossipBlock"; - "peer" => format!("{:?}", peer_id), - "msg" => "processing block", - ); match self.chain.process_block(block.clone()) { Ok(BlockProcessingOutcome::InvalidBlock(InvalidBlock::ParentUnknown)) => { // The block was valid and we processed it successfully. debug!( - self.log, "NewGossipBlock"; + self.log, "InvalidGossipBlock"; "msg" => "parent block unknown", "parent_root" => format!("{}", block.previous_block_root), "peer" => format!("{:?}", peer_id), @@ -614,7 +505,7 @@ impl SimpleSync { if block_slot - present_slot > FUTURE_SLOT_TOLERANCE { // The block is too far in the future, drop it. warn!( - self.log, "NewGossipBlock"; + self.log, "InvalidGossipBlock"; "msg" => "future block rejected", "present_slot" => present_slot, "block_slot" => block_slot, @@ -626,7 +517,7 @@ impl SimpleSync { } else { // The block is in the future, but not too far. warn!( - self.log, "NewGossipBlock"; + self.log, "FutureGossipBlock"; "msg" => "queuing future block", "present_slot" => present_slot, "block_slot" => block_slot, @@ -643,8 +534,8 @@ impl SimpleSync { if outcome.is_invalid() { // The peer has sent a block which is fundamentally invalid. warn!( - self.log, "NewGossipBlock"; - "msg" => "invalid block from peer", + self.log, "InvalidGossipBlock"; + "msg" => "peer sent objectively invalid block", "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}", peer_id), ); @@ -655,8 +546,7 @@ impl SimpleSync { } else if outcome.sucessfully_processed() { // The block was valid and we processed it successfully. info!( - self.log, "NewGossipBlock"; - "msg" => "block import successful", + self.log, "ValidGossipBlock"; "peer" => format!("{:?}", peer_id), ); // Forward the block to peers @@ -665,7 +555,7 @@ impl SimpleSync { // The block wasn't necessarily invalid but we didn't process it successfully. // This condition shouldn't be reached. error!( - self.log, "NewGossipBlock"; + self.log, "InvalidGossipBlock"; "msg" => "unexpected condition in processing block.", "outcome" => format!("{:?}", outcome), ); @@ -679,7 +569,7 @@ impl SimpleSync { // Blocks should not be able to trigger errors, instead they should be flagged as // invalid. error!( - self.log, "NewGossipBlock"; + self.log, "InvalidGossipBlock"; "msg" => "internal error in processing block.", "error" => format!("{:?}", e), ); @@ -870,7 +760,11 @@ fn get_block_headers( start_slot: Slot, count: usize, skip: usize, -) -> Result, BeaconChainError> { - let roots = beacon_chain.get_block_roots(start_slot, count, skip)?; - beacon_chain.get_block_headers(&roots) +) -> Vec { + beacon_chain + .rev_iter_blocks(start_slot) + .step_by(skip + 1) + .take(count) + .map(|block| block.block_header()) + .collect() }