From a3877b6135272a29a0d0e43f5a36f4c43d73a5ab Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 5 Sep 2019 08:07:57 +1000 Subject: [PATCH] Updates syncing stability, fixes large RPC message codec, corrects beacon chain referencing --- beacon_node/client/src/notifier.rs | 4 +- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 36 ++- beacon_node/eth2-libp2p/src/rpc/methods.rs | 6 +- beacon_node/eth2-libp2p/src/service.rs | 5 + beacon_node/network/src/message_handler.rs | 7 +- beacon_node/network/src/sync/manager.rs | 211 +++++++------ beacon_node/network/src/sync/simple_sync.rs | 304 +++++++++++-------- 7 files changed, 312 insertions(+), 261 deletions(-) diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index d705637cb..343918d4d 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -34,10 +34,10 @@ pub fn run(client: &Client, executor: TaskExecutor, exit // Panics if libp2p is poisoned. let connected_peer_count = libp2p.lock().swarm.connected_peers(); - debug!(log, "Libp2p connected peer status"; "peer_count" => connected_peer_count); + debug!(log, "Connected peer status"; "peer_count" => connected_peer_count); if connected_peer_count <= WARN_PEER_COUNT { - warn!(log, "Low libp2p peer count"; "peer_count" => connected_peer_count); + warn!(log, "Low peer count"; "peer_count" => connected_peer_count); } Ok(()) diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 260a00346..1966bab62 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -152,45 +152,49 @@ impl Decoder for SSZOutboundCodec { type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self.inner.decode(src).map_err(RPCError::from) { - Ok(Some(packet)) => match self.protocol.message_name.as_str() { + if src.is_empty() { + // the object sent could be empty. We return the empty object if this is the case + match self.protocol.message_name.as_str() { "hello" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( - &packet, - )?))), + "1" => Err(RPCError::Custom( + "Hello stream terminated unexpectedly".into(), + )), // cannot have an empty HELLO message. The stream has terminated unexpectedly _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), + "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), _ => unreachable!("Cannot negotiate an unknown version"), }, "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), + "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), _ => unreachable!("Cannot negotiate an unknown version"), }, _ => unreachable!("Cannot negotiate an unknown protocol"), - }, - Ok(None) => { - // the object sent could be a empty. We return the empty object if this is the case - match self.protocol.message_name.as_str() { + } + } else { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => match self.protocol.message_name.as_str() { "hello" => match self.protocol.version.as_str() { - "1" => Ok(None), // cannot have an empty HELLO message. The stream has terminated unexpectedly + "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( + &packet, + )?))), _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), + "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), _ => unreachable!("Cannot negotiate an unknown version"), }, "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), + "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), _ => unreachable!("Cannot negotiate an unknown version"), }, _ => unreachable!("Cannot negotiate an unknown protocol"), - } + }, + Ok(None) => Ok(None), // waiting for more bytes + Err(e) => Err(e), } - Err(e) => Err(e), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index c9610b000..49813abe9 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -168,8 +168,10 @@ impl std::fmt::Display for RPCResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RPCResponse::Hello(hello) => write!(f, "{}", hello), - RPCResponse::BeaconBlocks(_) => write!(f, ""), - RPCResponse::RecentBeaconBlocks(_) => write!(f, ""), + RPCResponse::BeaconBlocks(data) => write!(f, ", len: {}", data.len()), + RPCResponse::RecentBeaconBlocks(data) => { + write!(f, ", len: {}", data.len()) + } } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 9f08b1eda..dac011752 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -98,6 +98,11 @@ impl Service { // attempt to connect to any specified boot-nodes for bootnode_enr in config.boot_nodes { for multiaddr in bootnode_enr.multiaddr() { + // ignore udp multiaddr if it exists + let components = multiaddr.iter().collect::>(); + if let Protocol::Udp(_) = components[1] { + continue; + } dial_addr(multiaddr); } } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index d6e9f8be8..cade65d63 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -17,8 +17,6 @@ use types::{Attestation, AttesterSlashing, BeaconBlock, ProposerSlashing, Volunt /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { - /// Currently loaded and initialised beacon chain. - _chain: Arc>, /// The syncing framework. sync: SimpleSync, /// A channel to the network service to allow for gossip propagation. @@ -53,13 +51,12 @@ impl MessageHandler { let (handler_send, handler_recv) = mpsc::unbounded_channel(); // Initialise sync and begin processing in thread - let sync = SimpleSync::new(beacon_chain.clone(), network_send.clone(), &log); + let sync = SimpleSync::new(Arc::downgrade(&beacon_chain), network_send.clone(), &log); // generate the Message handler let mut handler = MessageHandler { - _chain: beacon_chain.clone(), - sync, network_send, + sync, log: log.clone(), }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 1eec51843..2b2ed9dca 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -62,13 +62,13 @@ use slog::{debug, info, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::ops::{Add, Sub}; -use std::sync::{Arc, Weak}; +use std::sync::Weak; use types::{BeaconBlock, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch /// is requested. Currently the value is small for testing. This will be incremented for /// production. -const MAX_BLOCKS_PER_REQUEST: u64 = 100; +const MAX_BLOCKS_PER_REQUEST: u64 = 50; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -224,10 +224,10 @@ impl ImportManager { /// Generates a new `ImportManager` given a logger and an Arc reference to a beacon chain. The /// import manager keeps a weak reference to the beacon chain, which allows the chain to be /// dropped during the syncing process. The syncing handles this termination gracefully. - pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { + pub fn new(beacon_chain: Weak>, log: &slog::Logger) -> Self { ImportManager { event_queue: SmallVec::new(), - chain: Arc::downgrade(&beacon_chain), + chain: beacon_chain, state: ManagerState::Regular, import_queue: HashMap::new(), parent_queue: SmallVec::new(), @@ -359,7 +359,9 @@ impl ImportManager { warn!(self.log, "Peer returned too many empty block batches"; "peer" => format!("{:?}", peer_id)); block_requests.state = BlockRequestsState::Failed; - } else if block_requests.current_start_slot >= block_requests.target_head_slot { + } else if block_requests.current_start_slot + MAX_BLOCKS_PER_REQUEST + >= block_requests.target_head_slot + { warn!(self.log, "Peer did not return blocks it claimed to possess"; "peer" => format!("{:?}", peer_id)); block_requests.state = BlockRequestsState::Failed; @@ -583,6 +585,11 @@ impl ImportManager { re_run = re_run || self.process_complete_parent_requests(); } + // exit early if the beacon chain is dropped + if let None = self.chain.upgrade() { + return ImportManagerOutcome::Idle; + } + // return any queued events if !self.event_queue.is_empty() { let event = self.event_queue.remove(0); @@ -681,56 +688,48 @@ impl ImportManager { self.import_queue.retain(|peer_id, block_requests| { if block_requests.state == BlockRequestsState::ReadyToProcess { - // check that the chain still exists - if let Some(chain) = chain_ref.upgrade() { - let downloaded_blocks = - std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); - let last_element = downloaded_blocks.len() - 1; - let start_slot = downloaded_blocks[0].slot; - let end_slot = downloaded_blocks[last_element].slot; + let downloaded_blocks = + std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); + let last_element = downloaded_blocks.len() - 1; + let start_slot = downloaded_blocks[0].slot; + let end_slot = downloaded_blocks[last_element].slot; - match process_blocks(chain, downloaded_blocks, log_ref) { - Ok(()) => { - debug!(log_ref, "Blocks processed successfully"; + match process_blocks(chain_ref.clone(), downloaded_blocks, log_ref) { + Ok(()) => { + debug!(log_ref, "Blocks processed successfully"; + "peer" => format!("{:?}", peer_id), + "start_slot" => start_slot, + "end_slot" => end_slot, + "no_blocks" => last_element + 1, + ); + block_requests.blocks_processed += last_element + 1; + + // check if the batch is complete, by verifying if we have reached the + // target head + if end_slot >= block_requests.target_head_slot { + // Completed, re-hello the peer to ensure we are up to the latest head + event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone())); + // remove the request + false + } else { + // have not reached the end, queue another batch + block_requests.update_start_slot(); + re_run = true; + // keep the batch + true + } + } + Err(e) => { + warn!(log_ref, "Block processing failed"; "peer" => format!("{:?}", peer_id), "start_slot" => start_slot, "end_slot" => end_slot, "no_blocks" => last_element + 1, - ); - block_requests.blocks_processed += last_element + 1; - - // check if the batch is complete, by verifying if we have reached the - // target head - if end_slot >= block_requests.target_head_slot { - // Completed, re-hello the peer to ensure we are up to the latest head - event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone())); - // remove the request - false - } else { - // have not reached the end, queue another batch - block_requests.update_start_slot(); - re_run = true; - // keep the batch - true - } - } - Err(e) => { - warn!(log_ref, "Block processing failed"; - "peer" => format!("{:?}", peer_id), - "start_slot" => start_slot, - "end_slot" => end_slot, - "no_blocks" => last_element + 1, - "error" => format!("{:?}", e), - ); - event_queue_ref - .push(ImportManagerOutcome::DownvotePeer(peer_id.clone())); - false - } + "error" => format!("{:?}", e), + ); + event_queue_ref.push(ImportManagerOutcome::DownvotePeer(peer_id.clone())); + false } - } else { - // chain no longer exists, empty the queue and return - event_queue_ref.clear(); - return false; } } else { // not ready to process @@ -894,42 +893,43 @@ impl ImportManager { // Helper function to process blocks fn process_blocks( - chain: Arc>, + weak_chain: Weak>, blocks: Vec>, log: &Logger, ) -> Result<(), String> { for block in blocks { - let processing_result = chain.process_block(block.clone()); + if let Some(chain) = weak_chain.upgrade() { + let processing_result = chain.process_block(block.clone()); - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - // The block was valid and we processed it successfully. - trace!( - log, "Imported block from network"; - "slot" => block.slot, - "block_root" => format!("{}", block_root), - ); - } - BlockProcessingOutcome::ParentUnknown { parent } => { - // blocks should be sequential and all parents should exist - trace!( - log, "Parent block is unknown"; - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - ); - return Err(format!( - "Block at slot {} has an unknown parent.", - block.slot - )); - } - BlockProcessingOutcome::BlockIsAlreadyKnown => { - // this block is already known to us, move to the next - debug!( - log, "Imported a block that is already known"; - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - ); + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. + trace!( + log, "Imported block from network"; + "slot" => block.slot, + "block_root" => format!("{}", block_root), + ); + } + BlockProcessingOutcome::ParentUnknown { parent } => { + // blocks should be sequential and all parents should exist + trace!( + log, "Parent block is unknown"; + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + ); + return Err(format!( + "Block at slot {} has an unknown parent.", + block.slot + )); + } + BlockProcessingOutcome::BlockIsAlreadyKnown => { + // this block is already known to us, move to the next + debug!( + log, "Imported a block that is already known"; + "block_slot" => block.slot, + ); + } BlockProcessingOutcome::FutureSlot { present_slot, block_slot, @@ -937,7 +937,7 @@ fn process_blocks( if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { // The block is too far in the future, drop it. trace!( - self.log, "Block is ahead of our slot clock"; + log, "Block is ahead of our slot clock"; "msg" => "block for future slot rejected, check your time", "present_slot" => present_slot, "block_slot" => block_slot, @@ -950,7 +950,7 @@ fn process_blocks( } else { // The block is in the future, but not too far. trace!( - self.log, "Block is slightly ahead of our slot clock, ignoring."; + log, "Block is slightly ahead of our slot clock, ignoring."; "present_slot" => present_slot, "block_slot" => block_slot, "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, @@ -959,44 +959,41 @@ fn process_blocks( } BlockProcessingOutcome::WouldRevertFinalizedSlot { .. } => { trace!( - self.log, "Finalized or earlier block processed"; + log, "Finalized or earlier block processed"; "outcome" => format!("{:?}", outcome), ); // block reached our finalized slot or was earlier, move to the next block } BlockProcessingOutcome::GenesisBlock => { trace!( - self.log, "Genesis block was processed"; + log, "Genesis block was processed"; "outcome" => format!("{:?}", outcome), ); } - BlockProcessingOutcome::FinalizedSlot => { - trace!( - log, "Finalized or earlier block processed"; - "outcome" => format!("{:?}", outcome), - ); - // block reached our finalized slot or was earlier, move to the next block - } - _ => { - warn!( - log, "Invalid block received"; - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", outcome), - ); - return Err(format!("Invalid block at slot {}", block.slot)); + _ => { + warn!( + log, "Invalid block received"; + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", outcome), + ); + return Err(format!("Invalid block at slot {}", block.slot)); + } } + } else { + warn!( + log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", processing_result) + ); + return Err(format!( + "Unexpected block processing error: {:?}", + processing_result + )); } } else { - warn!( - log, "BlockProcessingFailure"; - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", processing_result) - ); - return Err(format!( - "Unexpected block processing error: {:?}", - processing_result - )); + return Ok(()); // terminate early due to dropped beacon chain } } + Ok(()) } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 9d05b312b..a8b271700 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -6,7 +6,7 @@ use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, info, o, trace, warn}; use ssz::Encode; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use store::Store; use tokio::sync::mpsc; use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; @@ -57,7 +57,7 @@ pub enum SyncState { /// Simple Syncing protocol. pub struct SimpleSync { /// A reference to the underlying beacon chain. - chain: Arc>, + chain: Weak>, manager: ImportManager, network: NetworkContext, log: slog::Logger, @@ -66,7 +66,7 @@ pub struct SimpleSync { impl SimpleSync { /// Instantiate a `SimpleSync` instance, with no peers and an empty queue. pub fn new( - beacon_chain: Arc>, + beacon_chain: Weak>, network_send: mpsc::UnboundedSender, log: &slog::Logger, ) -> Self { @@ -91,8 +91,10 @@ impl SimpleSync { /// /// Sends a `Hello` message to the peer. pub fn on_connect(&mut self, peer_id: PeerId) { - self.network - .send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&self.chain))); + if let Some(chain) = self.chain.upgrade() { + self.network + .send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&chain))); + } } /// Handle a `Hello` request. @@ -104,16 +106,19 @@ impl SimpleSync { request_id: RequestId, hello: HelloMessage, ) { - trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); + // ignore hello responses if we are shutting down + if let Some(chain) = self.chain.upgrade() { + trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); - // Say hello back. - self.network.send_rpc_response( - peer_id.clone(), - request_id, - RPCResponse::Hello(hello_message(&self.chain)), - ); + // Say hello back. + self.network.send_rpc_response( + peer_id.clone(), + request_id, + RPCResponse::Hello(hello_message(&chain)), + ); - self.process_hello(peer_id, hello); + self.process_hello(peer_id, hello); + } } /// Process a `Hello` response from a peer. @@ -128,88 +133,107 @@ impl SimpleSync { /// /// Disconnects the peer if required. fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) { - let remote = PeerSyncInfo::from(hello); - let local = PeerSyncInfo::from(&self.chain); - - let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - - if local.fork_version != remote.fork_version { - // The node is on a different network/fork, disconnect them. - debug!( - self.log, "HandshakeFailure"; - "peer" => format!("{:?}", peer_id), - "reason" => "network_id" - ); - - self.network - .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); - } else if remote.finalized_epoch <= local.finalized_epoch - && remote.finalized_root != Hash256::zero() - && local.finalized_root != Hash256::zero() - && (self.root_at_slot(start_slot(remote.finalized_epoch)) - != Some(remote.finalized_root)) + // If we update the manager we may need to drive the sync. This flag lies out of scope of + // the beacon chain so that the process sync command has no long-lived beacon chain + // references. + let mut process_sync = false; { - // The remotes finalized epoch is less than or greater than ours, but the block root is - // different to the one in our chain. - // - // Therefore, the node is on a different chain and we should not communicate with them. - debug!( - self.log, "HandshakeFailure"; - "peer" => format!("{:?}", peer_id), - "reason" => "different finalized chain" - ); - self.network - .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); - } else if remote.finalized_epoch < local.finalized_epoch { - // The node has a lower finalized epoch, their chain is not useful to us. There are two - // cases where a node can have a lower finalized epoch: - // - // ## The node is on the same chain - // - // If a node is on the same chain but has a lower finalized epoch, their head must be - // lower than ours. Therefore, we have nothing to request from them. - // - // ## The node is on a fork - // - // If a node is on a fork that has a lower finalized epoch, switching to that fork would - // cause us to revert a finalized block. This is not permitted, therefore we have no - // interest in their blocks. - debug!( - self.log, - "NaivePeer"; - "peer" => format!("{:?}", peer_id), - "reason" => "lower finalized epoch" - ); - } else if self - .chain - .store - .exists::>(&remote.head_root) - .unwrap_or_else(|_| false) - { - trace!( - self.log, "Out of date or potentially sync'd peer found"; - "peer" => format!("{:?}", peer_id), - "remote_head_slot" => remote.head_slot, - "remote_latest_finalized_epoch" => remote.finalized_epoch, - ); + // scope of beacon chain reference + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + info!(self.log, "Sync shutting down"; + "reason" => "Beacon chain dropped"); + return; + } + }; - // If the node's best-block is already known to us and they are close to our current - // head, treat them as a fully sync'd peer. - self.manager.add_peer(peer_id, remote); - self.process_sync(); - } else { - // The remote node has an equal or great finalized epoch and we don't know it's head. - // - // Therefore, there are some blocks between the local finalized epoch and the remote - // head that are worth downloading. - debug!( - self.log, "UsefulPeer"; - "peer" => format!("{:?}", peer_id), - "local_finalized_epoch" => local.finalized_epoch, - "remote_latest_finalized_epoch" => remote.finalized_epoch, - ); + let remote = PeerSyncInfo::from(hello); + let local = PeerSyncInfo::from(&chain); - self.manager.add_peer(peer_id, remote); + let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); + + if local.fork_version != remote.fork_version { + // The node is on a different network/fork, disconnect them. + debug!( + self.log, "HandshakeFailure"; + "peer" => format!("{:?}", peer_id), + "reason" => "network_id" + ); + + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + } else if remote.finalized_epoch <= local.finalized_epoch + && remote.finalized_root != Hash256::zero() + && local.finalized_root != Hash256::zero() + && (chain.root_at_slot(start_slot(remote.finalized_epoch)) + != Some(remote.finalized_root)) + { + // The remotes finalized epoch is less than or greater than ours, but the block root is + // different to the one in our chain. + // + // Therefore, the node is on a different chain and we should not communicate with them. + debug!( + self.log, "HandshakeFailure"; + "peer" => format!("{:?}", peer_id), + "reason" => "different finalized chain" + ); + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + } else if remote.finalized_epoch < local.finalized_epoch { + // The node has a lower finalized epoch, their chain is not useful to us. There are two + // cases where a node can have a lower finalized epoch: + // + // ## The node is on the same chain + // + // If a node is on the same chain but has a lower finalized epoch, their head must be + // lower than ours. Therefore, we have nothing to request from them. + // + // ## The node is on a fork + // + // If a node is on a fork that has a lower finalized epoch, switching to that fork would + // cause us to revert a finalized block. This is not permitted, therefore we have no + // interest in their blocks. + debug!( + self.log, + "NaivePeer"; + "peer" => format!("{:?}", peer_id), + "reason" => "lower finalized epoch" + ); + } else if chain + .store + .exists::>(&remote.head_root) + .unwrap_or_else(|_| false) + { + trace!( + self.log, "Peer with known chain found"; + "peer" => format!("{:?}", peer_id), + "remote_head_slot" => remote.head_slot, + "remote_latest_finalized_epoch" => remote.finalized_epoch, + ); + + // If the node's best-block is already known to us and they are close to our current + // head, treat them as a fully sync'd peer. + self.manager.add_peer(peer_id, remote); + process_sync = true; + } else { + // The remote node has an equal or great finalized epoch and we don't know it's head. + // + // Therefore, there are some blocks between the local finalized epoch and the remote + // head that are worth downloading. + debug!( + self.log, "UsefulPeer"; + "peer" => format!("{:?}", peer_id), + "local_finalized_epoch" => local.finalized_epoch, + "remote_latest_finalized_epoch" => remote.finalized_epoch, + ); + + self.manager.add_peer(peer_id, remote); + process_sync = true + } + } // end beacon chain reference scope + + if process_sync { self.process_sync(); } } @@ -226,11 +250,13 @@ impl SimpleSync { "method" => "HELLO", "peer" => format!("{:?}", peer_id) ); - self.network.send_rpc_request( - None, - peer_id, - RPCRequest::Hello(hello_message(&self.chain)), - ); + if let Some(chain) = self.chain.upgrade() { + self.network.send_rpc_request( + None, + peer_id, + RPCRequest::Hello(hello_message(&chain)), + ); + } } ImportManagerOutcome::RequestBlocks { peer_id, @@ -283,14 +309,6 @@ impl SimpleSync { } } - //TODO: Move to beacon chain - fn root_at_slot(&self, target_slot: Slot) -> Option { - self.chain - .rev_iter_block_roots() - .find(|(_root, slot)| *slot == target_slot) - .map(|(root, _slot)| root) - } - /// Handle a `RecentBeaconBlocks` request from the peer. pub fn on_recent_beacon_blocks_request( &mut self, @@ -298,11 +316,20 @@ impl SimpleSync { request_id: RequestId, request: RecentBeaconBlocksRequest, ) { + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + info!(self.log, "Sync shutting down"; + "reason" => "Beacon chain dropped"); + return; + } + }; + let blocks: Vec> = request .block_roots .iter() .filter_map(|root| { - if let Ok(Some(block)) = self.chain.store.get::>(root) { + if let Ok(Some(block)) = chain.store.get::>(root) { Some(block) } else { debug!( @@ -319,7 +346,7 @@ impl SimpleSync { debug!( self.log, - "BlockBodiesRequest"; + "RecentBeaconBlocksRequest"; "peer" => format!("{:?}", peer_id), "requested" => request.block_roots.len(), "returned" => blocks.len(), @@ -339,6 +366,15 @@ impl SimpleSync { request_id: RequestId, req: BeaconBlocksRequest, ) { + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + info!(self.log, "Sync shutting down"; + "reason" => "Beacon chain dropped"); + return; + } + }; + debug!( self.log, "BeaconBlocksRequest"; @@ -352,15 +388,14 @@ impl SimpleSync { // In the current implementation we read from the db then filter out out-of-range blocks. // Improving the db schema to prevent this would be ideal. - let mut blocks: Vec> = self - .chain + let mut blocks: Vec> = chain .rev_iter_block_roots() .filter(|(_root, slot)| { req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64() }) .take_while(|(_root, slot)| req.start_slot <= slot.as_u64()) .filter_map(|(root, _slot)| { - if let Ok(Some(block)) = self.chain.store.get::>(&root) { + if let Ok(Some(block)) = chain.store.get::>(&root) { Some(block) } else { warn!( @@ -378,18 +413,16 @@ impl SimpleSync { blocks.reverse(); blocks.dedup_by_key(|brs| brs.slot); - if blocks.len() as u64 != req.count { - debug!( - self.log, - "BeaconBlocksRequest response"; - "peer" => format!("{:?}", peer_id), - "msg" => "Failed to return all requested hashes", - "start_slot" => req.start_slot, - "current_slot" => format!("{:?}", self.chain.slot()), - "requested" => req.count, - "returned" => blocks.len(), - ); - } + debug!( + self.log, + "BeaconBlocksRequest response"; + "peer" => format!("{:?}", peer_id), + "msg" => "Failed to return all requested hashes", + "start_slot" => req.start_slot, + "current_slot" => chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "requested" => req.count, + "returned" => blocks.len(), + ); self.network.send_rpc_response( peer_id, @@ -444,7 +477,16 @@ impl SimpleSync { /// /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock) -> bool { - if let Ok(outcome) = self.chain.process_block(block.clone()) { + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + info!(self.log, "Sync shutting down"; + "reason" => "Beacon chain dropped"); + return false; + } + }; + + if let Ok(outcome) = chain.process_block(block.clone()) { match outcome { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; @@ -477,7 +519,16 @@ impl SimpleSync { /// /// Not currently implemented. pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation) { - match self.chain.process_attestation(msg) { + let chain = match self.chain.upgrade() { + Some(chain) => chain, + None => { + info!(self.log, "Sync shutting down"; + "reason" => "Beacon chain dropped"); + return; + } + }; + + match chain.process_attestation(msg) { Ok(outcome) => info!( self.log, "Processed attestation"; @@ -489,11 +540,6 @@ impl SimpleSync { } } } - - /// Generates our current state in the form of a HELLO RPC message. - pub fn generate_hello(&self) -> HelloMessage { - hello_message(&self.chain) - } } /// Build a `HelloMessage` representing the state of the given `beacon_chain`.