diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index cade65d63..be8fa21f8 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,6 +1,6 @@ use crate::error; use crate::service::NetworkMessage; -use crate::sync::SimpleSync; +use crate::sync::MessageProcessor; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ behaviour::PubsubMessage, @@ -15,12 +15,16 @@ use std::sync::Arc; use tokio::sync::mpsc; use types::{Attestation, AttesterSlashing, BeaconBlock, ProposerSlashing, VoluntaryExit}; -/// Handles messages received from the network and client and organises syncing. +/// Handles messages received from the network and client and organises syncing. This +/// functionality of this struct is to validate an decode messages from the network before +/// passing them to the internal message processor. The message processor spawns a syncing thread +/// which manages which blocks need to be requested and processed. pub struct MessageHandler { - /// The syncing framework. - sync: SimpleSync, /// A channel to the network service to allow for gossip propagation. network_send: mpsc::UnboundedSender, + /// Processes validated and decoded messages from the network. Has direct access to the + /// sync manager. + message_processor: MessageProcessor, /// The `MessageHandler` logger. log: slog::Logger, } @@ -50,13 +54,15 @@ impl MessageHandler { trace!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); - // Initialise sync and begin processing in thread - let sync = SimpleSync::new(Arc::downgrade(&beacon_chain), network_send.clone(), &log); + + // Initialise a message instance, which itself spawns the syncing thread. + let message_processor = + MessageProcessor::new(executor, beacon_chain, network_send.clone(), &log); // generate the Message handler let mut handler = MessageHandler { network_send, - sync, + message_processor, log: log.clone(), }; @@ -66,7 +72,11 @@ impl MessageHandler { .for_each(move |msg| Ok(handler.handle_message(msg))) .map_err(move |_| { debug!(log, "Network message handler terminated."); - }), + }), /* + .then(move |_| { + debug!(log.clone(), "Message handler shutdown"); + }), + */ ); Ok(handler_send) @@ -77,11 +87,11 @@ impl MessageHandler { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { - self.sync.on_connect(peer_id); + self.message_processor.on_connect(peer_id); } // A peer has disconnected HandlerMessage::PeerDisconnected(peer_id) => { - self.sync.on_disconnect(peer_id); + self.message_processor.on_disconnect(peer_id); } // An RPC message request/response has been received HandlerMessage::RPC(peer_id, rpc_event) => { @@ -109,7 +119,7 @@ impl MessageHandler { fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { match request { RPCRequest::Hello(hello_message) => { - self.sync + self.message_processor .on_hello_request(peer_id, request_id, hello_message) } RPCRequest::Goodbye(goodbye_reason) => { @@ -118,13 +128,13 @@ impl MessageHandler { "peer" => format!("{:?}", peer_id), "reason" => format!("{:?}", goodbye_reason), ); - self.sync.on_disconnect(peer_id); + self.message_processor.on_disconnect(peer_id); } RPCRequest::BeaconBlocks(request) => self - .sync + .message_processor .on_beacon_blocks_request(peer_id, request_id, request), RPCRequest::RecentBeaconBlocks(request) => self - .sync + .message_processor .on_recent_beacon_blocks_request(peer_id, request_id, request), } } @@ -151,12 +161,13 @@ impl MessageHandler { RPCErrorResponse::Success(response) => { match response { RPCResponse::Hello(hello_message) => { - self.sync.on_hello_response(peer_id, hello_message); + self.message_processor + .on_hello_response(peer_id, hello_message); } RPCResponse::BeaconBlocks(response) => { match self.decode_beacon_blocks(&response) { Ok(beacon_blocks) => { - self.sync.on_beacon_blocks_response( + self.message_processor.on_beacon_blocks_response( peer_id, request_id, beacon_blocks, @@ -171,7 +182,7 @@ impl MessageHandler { RPCResponse::RecentBeaconBlocks(response) => { match self.decode_beacon_blocks(&response) { Ok(beacon_blocks) => { - self.sync.on_recent_beacon_blocks_response( + self.message_processor.on_recent_beacon_blocks_response( peer_id, request_id, beacon_blocks, @@ -199,7 +210,9 @@ impl MessageHandler { match gossip_message { PubsubMessage::Block(message) => match self.decode_gossip_block(message) { Ok(block) => { - let should_forward_on = self.sync.on_block_gossip(peer_id.clone(), block); + let should_forward_on = self + .message_processor + .on_block_gossip(peer_id.clone(), block); // TODO: Apply more sophisticated validation and decoding logic if should_forward_on { self.propagate_message(id, peer_id.clone()); @@ -213,7 +226,8 @@ impl MessageHandler { Ok(attestation) => { // TODO: Apply more sophisticated validation and decoding logic self.propagate_message(id, peer_id.clone()); - self.sync.on_attestation_gossip(peer_id, attestation); + self.message_processor + .on_attestation_gossip(peer_id, attestation); } Err(e) => { debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fa1315c39..12bef95fa 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1,4 +1,4 @@ -//! The `ImportManager` facilities the block syncing logic of lighthouse. The current networking +//! The `SyncManager` facilities the block syncing logic of lighthouse. The current networking //! specification provides two methods from which to obtain blocks from peers. The `BeaconBlocks` //! request and the `RecentBeaconBlocks` request. The former is used to obtain a large number of //! blocks and the latter allows for searching for blocks given a block-hash. @@ -7,7 +7,7 @@ //! - Long range (batch) sync, when a client is out of date and needs to the latest head. //! - Parent lookup - when a peer provides us a block whose parent is unknown to us. //! -//! Both of these syncing strategies are built into the `ImportManager`. +//! Both of these syncing strategies are built into the `SyncManager`. //! //! //! Currently the long-range (batch) syncing method functions by opportunistically downloading @@ -53,16 +53,18 @@ //! fully sync'd peers. If `PARENT_FAIL_TOLERANCE` attempts at requesting the block fails, we //! drop the propagated block and downvote the peer that sent it to us. -use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; +use super::simple_sync::{hello_message, NetworkContext, PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::RequestId; +use eth2_libp2p::rpc::{RPCRequest, RequestId}; use eth2_libp2p::PeerId; +use futures::prelude::*; use slog::{debug, info, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::ops::{Add, Sub}; use std::sync::Weak; +use tokio::sync::{mpsc, oneshot}; use types::{BeaconBlock, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch @@ -84,6 +86,31 @@ const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; /// requests to peers who never return blocks. const EMPTY_BATCH_TOLERANCE: usize = 100; +#[derive(Debug)] +/// A message than can be sent to the sync manager thread. +pub enum SyncMessage { + /// A useful peer has been discovered. + AddPeer(PeerId, PeerSyncInfo), + /// A `BeaconBlocks` response has been received. + BeaconBlocksResponse { + peer_id: PeerId, + request_id: RequestId, + beacon_blocks: Vec>, + }, + /// A `RecentBeaconBlocks` response has been received. + RecentBeaconBlocksResponse { + peer_id: PeerId, + request_id: RequestId, + beacon_blocks: Vec>, + }, + /// A block with an unknown parent has been received. + UnknownBlock(PeerId, BeaconBlock), + /// A peer has disconnected. + Disconnect(PeerId), + /// An RPC Error has occurred on a request. + _RPCError(RequestId), +} + #[derive(PartialEq)] /// The current state of a block or batches lookup. enum BlockRequestsState { @@ -176,39 +203,19 @@ enum ManagerState { Stalled, } -/// The output states that can occur from driving (polling) the manager state machine. -pub(crate) enum ImportManagerOutcome { - /// There is no further work to complete. The manager is waiting for further input. - Idle, - /// A `BeaconBlocks` request is required. - RequestBlocks { - peer_id: PeerId, - request_id: RequestId, - request: BeaconBlocksRequest, - }, - /// A `RecentBeaconBlocks` request is required. - RecentRequest { - peer_id: PeerId, - request_id: RequestId, - request: RecentBeaconBlocksRequest, - }, - /// Updates information with peer via requesting another HELLO handshake. - Hello(PeerId), - /// A peer has caused a punishable error and should be downvoted. - DownvotePeer(PeerId), -} - /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent /// look-up of blocks. -pub struct ImportManager { - /// List of events to be processed externally. - event_queue: SmallVec<[ImportManagerOutcome; 20]>, +pub struct SyncManager { /// A weak reference to the underlying beacon chain. chain: Weak>, /// The current state of the import manager. state: ManagerState, + /// A receiving channel sent by the message processor thread. + input_channel: mpsc::UnboundedReceiver>, + /// A network context to contact the network service. + network: NetworkContext, /// A collection of `BlockRequest` per peer that is currently being downloaded. Used in the /// long-range (batch) sync process. import_queue: HashMap>, @@ -224,22 +231,51 @@ pub struct ImportManager { log: Logger, } -impl ImportManager { - /// Generates a new `ImportManager` given a logger and an Arc reference to a beacon chain. The - /// import manager keeps a weak reference to the beacon chain, which allows the chain to be - /// dropped during the syncing process. The syncing handles this termination gracefully. - pub fn new(beacon_chain: Weak>, log: &slog::Logger) -> Self { - ImportManager { - event_queue: SmallVec::new(), - chain: beacon_chain, - state: ManagerState::Regular, - import_queue: HashMap::new(), - parent_queue: SmallVec::new(), - full_peers: HashSet::new(), - current_req_id: 0, - log: log.clone(), - } - } +/// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon +/// chain. This allows the chain to be +/// dropped during the syncing process which will gracefully end the `SyncManager`. +pub fn spawn( + executor: &tokio::runtime::TaskExecutor, + beacon_chain: Weak>, + network: NetworkContext, + log: slog::Logger, +) -> ( + mpsc::UnboundedSender>, + oneshot::Sender<()>, +) { + // generate the exit channel + let (sync_exit, exit_rx) = tokio::sync::oneshot::channel(); + // generate the message channel + let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); + + // create an instance of the SyncManager + let sync_manager = SyncManager { + chain: beacon_chain, + state: ManagerState::Regular, + input_channel: sync_recv, + network, + import_queue: HashMap::new(), + parent_queue: SmallVec::new(), + full_peers: HashSet::new(), + current_req_id: 0, + log: log.clone(), + }; + + // spawn the sync manager thread + debug!(log, "Sync Manager started"); + executor.spawn( + sync_manager + .select(exit_rx.then(|_| Ok(()))) + .then(move |_| { + info!(log.clone(), "Sync Manager shutdown"); + Ok(()) + }), + ); + (sync_send, sync_exit) +} + +impl SyncManager { + /* Input Handling Functions */ /// A peer has connected which has blocks that are unknown to us. /// @@ -281,7 +317,7 @@ impl ImportManager { return; } - // Check if the peer is significantly is behind us. If within `SLOT_IMPORT_TOLERANCE` + // Check if the peer is significantly behind us. If within `SLOT_IMPORT_TOLERANCE` // treat them as a fully synced peer. If not, ignore them in the sync process if local.head_slot.sub(remote.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { self.add_full_peer(peer_id.clone()); @@ -328,8 +364,7 @@ impl ImportManager { let chain = match self.chain.upgrade() { Some(chain) => chain, None => { - debug!(self.log, "Chain dropped. Sync terminating"); - self.event_queue.clear(); + trace!(self.log, "Chain dropped. Sync terminating"); return; } }; @@ -390,8 +425,7 @@ impl ImportManager { "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.current_start_slot); - self.event_queue - .push(ImportManagerOutcome::DownvotePeer(peer_id)); + downvote_peer(&mut self.network, &self.log, peer_id); // consider this sync failed block_requests.state = BlockRequestsState::Failed; return; @@ -515,26 +549,7 @@ impl ImportManager { parent_request.state = BlockRequestsState::ReadyToProcess; } - pub fn _inject_error(_peer_id: PeerId, _id: RequestId) { - //TODO: Remove block state from pending - } - - pub fn peer_disconnect(&mut self, peer_id: &PeerId) { - self.import_queue.remove(peer_id); - self.full_peers.remove(peer_id); - self.update_state(); - } - - pub fn add_full_peer(&mut self, peer_id: PeerId) { - debug!( - self.log, "Fully synced peer added"; - "peer" => format!("{:?}", peer_id), - ); - self.full_peers.insert(peer_id); - self.update_state(); - } - - pub fn add_unknown_block(&mut self, block: BeaconBlock, peer_id: PeerId) { + fn add_unknown_block(&mut self, peer_id: PeerId, block: BeaconBlock) { // if we are not in regular sync mode, ignore this block if self.state != ManagerState::Regular { return; @@ -563,55 +578,29 @@ impl ImportManager { self.parent_queue.push(req); } - pub(crate) fn poll(&mut self) -> ImportManagerOutcome { - loop { - //TODO: Optimize the lookups. Potentially keep state of whether each of these functions - //need to be called. - - // only break once everything has been processed - let mut re_run = false; - - // only process batch requests if there are any - if !self.import_queue.is_empty() { - // process potential block requests - re_run = re_run || self.process_potential_block_requests(); - - // process any complete long-range batches - re_run = re_run || self.process_complete_batches(); - } - - // only process parent objects if we are in regular sync - if !self.parent_queue.is_empty() { - // process any parent block lookup-requests - re_run = re_run || self.process_parent_requests(); - - // process any complete parent lookups - re_run = re_run || self.process_complete_parent_requests(); - } - - // exit early if the beacon chain is dropped - if let None = self.chain.upgrade() { - return ImportManagerOutcome::Idle; - } - - // return any queued events - if !self.event_queue.is_empty() { - let event = self.event_queue.remove(0); - self.event_queue.shrink_to_fit(); - return event; - } - - // update the state of the manager - self.update_state(); - - if !re_run { - break; - } - } - - return ImportManagerOutcome::Idle; + fn inject_error(&mut self, _id: RequestId) { + //TODO: Remove block state from pending } + fn peer_disconnect(&mut self, peer_id: &PeerId) { + self.import_queue.remove(peer_id); + self.full_peers.remove(peer_id); + self.update_state(); + } + + fn add_full_peer(&mut self, peer_id: PeerId) { + debug!( + self.log, "Fully synced peer added"; + "peer" => format!("{:?}", peer_id), + ); + self.full_peers.insert(peer_id); + self.update_state(); + } + + /* Processing State Functions */ + // These functions are called in the main poll function to transition the state of the sync + // manager + fn update_state(&mut self) { let previous_state = self.state.clone(); self.state = { @@ -631,13 +620,12 @@ impl ImportManager { } } - fn process_potential_block_requests(&mut self) -> bool { + fn process_potential_block_requests(&mut self) { // check if an outbound request is required // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p // layer and not needed here. Therefore we create many outbound requests and let the RPC // handle the number of simultaneous requests. Request all queued objects. - let mut re_run = false; // remove any failed batches let debug_log = &self.log; let full_peer_ref = &mut self.full_peers; @@ -655,40 +643,40 @@ impl ImportManager { }); // process queued block requests - for (peer_id, block_requests) in self - .import_queue - .iter_mut() - .find(|(_peer_id, req)| req.state == BlockRequestsState::Queued) - { - let request_id = self.current_req_id; - block_requests.state = BlockRequestsState::Pending(request_id); - self.current_req_id += 1; + for (peer_id, block_requests) in self.import_queue.iter_mut() { + { + if block_requests.state == BlockRequestsState::Queued { + let request_id = self.current_req_id; + block_requests.state = BlockRequestsState::Pending(request_id); + self.current_req_id += 1; - let request = BeaconBlocksRequest { - head_block_root: block_requests.target_head_root, - start_slot: block_requests.current_start_slot.as_u64(), - count: MAX_BLOCKS_PER_REQUEST, - step: 0, - }; - self.event_queue.push(ImportManagerOutcome::RequestBlocks { - peer_id: peer_id.clone(), - request, - request_id, - }); - re_run = true; + let request = BeaconBlocksRequest { + head_block_root: block_requests.target_head_root, + start_slot: block_requests.current_start_slot.as_u64(), + count: MAX_BLOCKS_PER_REQUEST, + step: 0, + }; + request_blocks( + &mut self.network, + &self.log, + peer_id.clone(), + request_id, + request, + ); + } + } } - - re_run } fn process_complete_batches(&mut self) -> bool { - // flag to indicate if the manager can be switched to idle or not - let mut re_run = false; + // This function can queue extra blocks and the main poll loop will need to be re-executed + // to process these. This flag indicates that the main poll loop has to continue. + let mut re_run_poll = false; // create reference variables to be moved into subsequent closure let chain_ref = self.chain.clone(); let log_ref = &self.log; - let event_queue_ref = &mut self.event_queue; + let network_ref = &mut self.network; self.import_queue.retain(|peer_id, block_requests| { if block_requests.state == BlockRequestsState::ReadyToProcess { @@ -712,13 +700,13 @@ impl ImportManager { // target head if end_slot >= block_requests.target_head_slot { // Completed, re-hello the peer to ensure we are up to the latest head - event_queue_ref.push(ImportManagerOutcome::Hello(peer_id.clone())); + hello_peer(network_ref, log_ref, chain_ref.clone(), peer_id.clone()); // remove the request false } else { // have not reached the end, queue another batch block_requests.update_start_slot(); - re_run = true; + re_run_poll = true; // keep the batch true } @@ -731,7 +719,7 @@ impl ImportManager { "no_blocks" => last_element + 1, "error" => format!("{:?}", e), ); - event_queue_ref.push(ImportManagerOutcome::DownvotePeer(peer_id.clone())); + downvote_peer(network_ref, log_ref, peer_id.clone()); false } } @@ -741,17 +729,15 @@ impl ImportManager { } }); - re_run + re_run_poll } - fn process_parent_requests(&mut self) -> bool { + fn process_parent_requests(&mut self) { // check to make sure there are peers to search for the parent from if self.full_peers.is_empty() { - return false; + return; } - let mut re_run = false; - // remove any failed requests let debug_log = &self.log; self.parent_queue.retain(|parent_request| { @@ -790,20 +776,20 @@ impl ImportManager { // select a random fully synced peer to attempt to download the parent block let peer_id = self.full_peers.iter().next().expect("List is not empty"); - self.event_queue.push(ImportManagerOutcome::RecentRequest { - peer_id: peer_id.clone(), + recent_blocks_request( + &mut self.network, + &self.log, + peer_id.clone(), request_id, request, - }); - re_run = true; + ); } } - re_run } fn process_complete_parent_requests(&mut self) -> bool { // returned value indicating whether the manager can be switched to idle or not - let mut re_run = false; + let mut re_run_poll = false; // Find any parent_requests ready to be processed for completed_request in self @@ -827,9 +813,8 @@ impl ImportManager { "received_block" => format!("{}", block_hash), "expected_parent" => format!("{}", expected_hash), ); - re_run = true; - self.event_queue - .push(ImportManagerOutcome::DownvotePeer(peer)); + re_run_poll = true; + downvote_peer(&mut self.network, &self.log, peer); } // try and process the list of blocks up to the requested block @@ -846,7 +831,7 @@ impl ImportManager { // need to keep looking for parents completed_request.downloaded_blocks.push(block); completed_request.state = BlockRequestsState::Queued; - re_run = true; + re_run_poll = true; break; } Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} @@ -859,11 +844,13 @@ impl ImportManager { "peer" => format!("{:?}", completed_request.last_submitted_peer), ); completed_request.state = BlockRequestsState::Queued; - re_run = true; - self.event_queue.push(ImportManagerOutcome::DownvotePeer( + re_run_poll = true; + downvote_peer( + &mut self.network, + &self.log, completed_request.last_submitted_peer.clone(), - )); - return re_run; + ); + return re_run_poll; } Err(e) => { completed_request.failed_attempts += 1; @@ -872,16 +859,17 @@ impl ImportManager { "error" => format!("{:?}", e) ); completed_request.state = BlockRequestsState::Queued; - re_run = true; - self.event_queue.push(ImportManagerOutcome::DownvotePeer( + re_run_poll = true; + downvote_peer( + &mut self.network, + &self.log, completed_request.last_submitted_peer.clone(), - )); - return re_run; + ); + return re_run_poll; } } } else { // chain doesn't exist - clear the event queue and return - self.event_queue.clear(); return false; } } @@ -895,11 +883,83 @@ impl ImportManager { true } }); - re_run + re_run_poll } } -// Helper function to process blocks +/* Network Context Helper Functions */ + +fn hello_peer( + network: &mut NetworkContext, + log: &slog::Logger, + chain: Weak>, + peer_id: PeerId, +) { + trace!( + log, + "RPC Request"; + "method" => "HELLO", + "peer" => format!("{:?}", peer_id) + ); + if let Some(chain) = chain.upgrade() { + network.send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&chain))); + } +} + +fn request_blocks( + network: &mut NetworkContext, + log: &slog::Logger, + peer_id: PeerId, + request_id: RequestId, + request: BeaconBlocksRequest, +) { + trace!( + log, + "RPC Request"; + "method" => "BeaconBlocks", + "id" => request_id, + "count" => request.count, + "peer" => format!("{:?}", peer_id) + ); + network.send_rpc_request( + Some(request_id), + peer_id.clone(), + RPCRequest::BeaconBlocks(request), + ); +} + +fn recent_blocks_request( + network: &mut NetworkContext, + log: &slog::Logger, + peer_id: PeerId, + request_id: RequestId, + request: RecentBeaconBlocksRequest, +) { + trace!( + log, + "RPC Request"; + "method" => "RecentBeaconBlocks", + "count" => request.block_roots.len(), + "peer" => format!("{:?}", peer_id) + ); + network.send_rpc_request( + Some(request_id), + peer_id.clone(), + RPCRequest::RecentBeaconBlocks(request), + ); +} + +fn downvote_peer(network: &mut NetworkContext, log: &slog::Logger, peer_id: PeerId) { + trace!( + log, + "Peer downvoted"; + "peer" => format!("{:?}", peer_id) + ); + // TODO: Implement reputation + network.disconnect(peer_id.clone(), GoodbyeReason::Fault); +} + +// Helper function to process blocks which only consumes the chain and blocks to process fn process_blocks( weak_chain: Weak>, blocks: Vec>, @@ -1005,3 +1065,99 @@ fn process_blocks( Ok(()) } + +impl Future for SyncManager { + type Item = (); + type Error = String; + + fn poll(&mut self) -> Result, Self::Error> { + // process any inbound messages + loop { + match self.input_channel.poll() { + Ok(Async::Ready(Some(message))) => match message { + SyncMessage::AddPeer(peer_id, info) => { + self.add_peer(peer_id, info); + dbg!("add peer"); + } + SyncMessage::BeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + } => { + self.beacon_blocks_response(peer_id, request_id, beacon_blocks); + } + SyncMessage::RecentBeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + } => { + self.recent_blocks_response(peer_id, request_id, beacon_blocks); + } + SyncMessage::UnknownBlock(peer_id, block) => { + self.add_unknown_block(peer_id, block); + } + SyncMessage::Disconnect(peer_id) => { + self.peer_disconnect(&peer_id); + } + SyncMessage::_RPCError(request_id) => { + self.inject_error(request_id); + } + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => { + return Err("Sync manager channel closed".into()); + } + Err(e) => { + return Err(format!("Sync Manager channel error: {:?}", e)); + } + } + } + + loop { + //TODO: Optimize the lookups. Potentially keep state of whether each of these functions + //need to be called. + let mut re_run = false; + + dbg!(self.import_queue.len()); + // only process batch requests if there are any + if !self.import_queue.is_empty() { + // process potential block requests + self.process_potential_block_requests(); + + dbg!(self.import_queue.len()); + // process any complete long-range batches + re_run = re_run || self.process_complete_batches(); + dbg!(self.import_queue.len()); + dbg!(&self.state); + } + + // only process parent objects if we are in regular sync + if !self.parent_queue.is_empty() { + // process any parent block lookup-requests + self.process_parent_requests(); + + // process any complete parent lookups + re_run = re_run || self.process_complete_parent_requests(); + } + + dbg!(self.import_queue.len()); + dbg!(&self.state); + + // Shutdown the thread if the chain has termined + if let None = self.chain.upgrade() { + return Ok(Async::Ready(())); + } + + if !re_run { + break; + } + } + dbg!(self.import_queue.len()); + dbg!(&self.state); + + // update the state of the manager + self.update_state(); + + return Ok(Async::NotReady); + } +} diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index b26d78c14..58ec386aa 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -4,7 +4,7 @@ mod manager; /// Stores the various syncing methods for the beacon chain. mod simple_sync; -pub use simple_sync::SimpleSync; +pub use simple_sync::MessageProcessor; /// Currently implemented sync methods. pub enum SyncMethod { diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 4a853f05d..d8b5f2dbf 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,4 +1,4 @@ -use super::manager::{ImportManager, ImportManagerOutcome}; +use super::manager::SyncMessage; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; @@ -6,11 +6,14 @@ use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, info, o, trace, warn}; use ssz::Encode; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use store::Store; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; +//TODO: Put a maximum limit on the number of block that can be requested. +//TODO: Rate limit requests + /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; @@ -46,55 +49,71 @@ impl From<&Arc>> for PeerSyncInfo { } } -/// The current syncing state. -#[derive(PartialEq)] -pub enum SyncState { - _Idle, - _Downloading, - _Stopped, -} - -/// Simple Syncing protocol. -pub struct SimpleSync { +/// Processes validated messages from the network. It relays necessary data to the syncing thread +/// and processes blocks from the pubsub network. +pub struct MessageProcessor { /// A reference to the underlying beacon chain. - chain: Weak>, - manager: ImportManager, + chain: Arc>, + /// A channel to the syncing thread. + sync_send: mpsc::UnboundedSender>, + /// A oneshot channel for destroying the sync thread. + _sync_exit: oneshot::Sender<()>, + /// A nextwork context to return and handle RPC requests. network: NetworkContext, + /// The `RPCHandler` logger. log: slog::Logger, } -impl SimpleSync { - /// Instantiate a `SimpleSync` instance, with no peers and an empty queue. +impl MessageProcessor { + /// Instantiate a `MessageProcessor` instance pub fn new( - beacon_chain: Weak>, + executor: &tokio::runtime::TaskExecutor, + beacon_chain: Arc>, network_send: mpsc::UnboundedSender, log: &slog::Logger, ) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); + let sync_network_context = NetworkContext::new(network_send.clone(), sync_logger.clone()); - SimpleSync { - chain: beacon_chain.clone(), - manager: ImportManager::new(beacon_chain, log), + // spawn the sync thread + let (sync_send, _sync_exit) = super::manager::spawn( + executor, + Arc::downgrade(&beacon_chain), + sync_network_context, + sync_logger, + ); + + MessageProcessor { + chain: beacon_chain, + sync_send, + _sync_exit, network: NetworkContext::new(network_send, log.clone()), - log: sync_logger, + log: log.clone(), } } + fn send_to_sync(&mut self, message: SyncMessage) { + self.sync_send.try_send(message).unwrap_or_else(|_| { + warn!( + self.log, + "Could not send message to the sync service"; + ) + }); + } + /// Handle a peer disconnect. /// /// Removes the peer from the manager. pub fn on_disconnect(&mut self, peer_id: PeerId) { - self.manager.peer_disconnect(&peer_id); + self.send_to_sync(SyncMessage::Disconnect(peer_id)); } /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. pub fn on_connect(&mut self, peer_id: PeerId) { - if let Some(chain) = self.chain.upgrade() { - self.network - .send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&chain))); - } + self.network + .send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&self.chain))); } /// Handle a `Hello` request. @@ -107,18 +126,16 @@ impl SimpleSync { hello: HelloMessage, ) { // ignore hello responses if we are shutting down - if let Some(chain) = self.chain.upgrade() { - trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); + trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); - // Say hello back. - self.network.send_rpc_response( - peer_id.clone(), - request_id, - RPCResponse::Hello(hello_message(&chain)), - ); + // Say hello back. + self.network.send_rpc_response( + peer_id.clone(), + request_id, + RPCResponse::Hello(hello_message(&self.chain)), + ); - self.process_hello(peer_id, hello); - } + self.process_hello(peer_id, hello); } /// Process a `Hello` response from a peer. @@ -133,183 +150,86 @@ impl SimpleSync { /// /// Disconnects the peer if required. fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) { - // If we update the manager we may need to drive the sync. This flag lies out of scope of - // the beacon chain so that the process sync command has no long-lived beacon chain - // references. - let mut process_sync = false; + let remote = PeerSyncInfo::from(hello); + let local = PeerSyncInfo::from(&self.chain); + + let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); + + if local.fork_version != remote.fork_version { + // The node is on a different network/fork, disconnect them. + debug!( + self.log, "HandshakeFailure"; + "peer" => format!("{:?}", peer_id), + "reason" => "network_id" + ); + + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + } else if remote.finalized_epoch <= local.finalized_epoch + && remote.finalized_root != Hash256::zero() + && local.finalized_root != Hash256::zero() + && (self.chain.root_at_slot(start_slot(remote.finalized_epoch)) + != Some(remote.finalized_root)) { - // scope of beacon chain reference - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - info!(self.log, "Sync shutting down"; - "reason" => "Beacon chain dropped"); - return; - } - }; + // The remotes finalized epoch is less than or greater than ours, but the block root is + // different to the one in our chain. + // + // Therefore, the node is on a different chain and we should not communicate with them. + debug!( + self.log, "HandshakeFailure"; + "peer" => format!("{:?}", peer_id), + "reason" => "different finalized chain" + ); + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + } else if remote.finalized_epoch < local.finalized_epoch { + // The node has a lower finalized epoch, their chain is not useful to us. There are two + // cases where a node can have a lower finalized epoch: + // + // ## The node is on the same chain + // + // If a node is on the same chain but has a lower finalized epoch, their head must be + // lower than ours. Therefore, we have nothing to request from them. + // + // ## The node is on a fork + // + // If a node is on a fork that has a lower finalized epoch, switching to that fork would + // cause us to revert a finalized block. This is not permitted, therefore we have no + // interest in their blocks. + debug!( + self.log, + "NaivePeer"; + "peer" => format!("{:?}", peer_id), + "reason" => "lower finalized epoch" + ); + } else if self + .chain + .store + .exists::>(&remote.head_root) + .unwrap_or_else(|_| false) + { + trace!( + self.log, "Peer with known chain found"; + "peer" => format!("{:?}", peer_id), + "remote_head_slot" => remote.head_slot, + "remote_latest_finalized_epoch" => remote.finalized_epoch, + ); - let remote = PeerSyncInfo::from(hello); - let local = PeerSyncInfo::from(&chain); - - let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); - - if local.fork_version != remote.fork_version { - // The node is on a different network/fork, disconnect them. - debug!( - self.log, "HandshakeFailure"; - "peer" => format!("{:?}", peer_id), - "reason" => "network_id" - ); - - self.network - .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); - } else if remote.finalized_epoch <= local.finalized_epoch - && remote.finalized_root != Hash256::zero() - && local.finalized_root != Hash256::zero() - && (chain.root_at_slot(start_slot(remote.finalized_epoch)) - != Some(remote.finalized_root)) - { - // The remotes finalized epoch is less than or greater than ours, but the block root is - // different to the one in our chain. - // - // Therefore, the node is on a different chain and we should not communicate with them. - debug!( - self.log, "HandshakeFailure"; - "peer" => format!("{:?}", peer_id), - "reason" => "different finalized chain" - ); - self.network - .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); - } else if remote.finalized_epoch < local.finalized_epoch { - // The node has a lower finalized epoch, their chain is not useful to us. There are two - // cases where a node can have a lower finalized epoch: - // - // ## The node is on the same chain - // - // If a node is on the same chain but has a lower finalized epoch, their head must be - // lower than ours. Therefore, we have nothing to request from them. - // - // ## The node is on a fork - // - // If a node is on a fork that has a lower finalized epoch, switching to that fork would - // cause us to revert a finalized block. This is not permitted, therefore we have no - // interest in their blocks. - debug!( - self.log, - "NaivePeer"; - "peer" => format!("{:?}", peer_id), - "reason" => "lower finalized epoch" - ); - } else if chain - .store - .exists::>(&remote.head_root) - .unwrap_or_else(|_| false) - { - trace!( - self.log, "Peer with known chain found"; - "peer" => format!("{:?}", peer_id), - "remote_head_slot" => remote.head_slot, - "remote_latest_finalized_epoch" => remote.finalized_epoch, - ); - - // If the node's best-block is already known to us and they are close to our current - // head, treat them as a fully sync'd peer. - self.manager.add_peer(peer_id, remote); - process_sync = true; - } else { - // The remote node has an equal or great finalized epoch and we don't know it's head. - // - // Therefore, there are some blocks between the local finalized epoch and the remote - // head that are worth downloading. - debug!( - self.log, "UsefulPeer"; - "peer" => format!("{:?}", peer_id), - "local_finalized_epoch" => local.finalized_epoch, - "remote_latest_finalized_epoch" => remote.finalized_epoch, - ); - - self.manager.add_peer(peer_id, remote); - process_sync = true - } - } // end beacon chain reference scope - - if process_sync { - self.process_sync(); - } - } - - /// This function drives the `ImportManager` state machine. The outcomes it provides are - /// actioned until the `ImportManager` is idle. - fn process_sync(&mut self) { - loop { - match self.manager.poll() { - ImportManagerOutcome::Hello(peer_id) => { - trace!( - self.log, - "RPC Request"; - "method" => "HELLO", - "peer" => format!("{:?}", peer_id) - ); - if let Some(chain) = self.chain.upgrade() { - self.network.send_rpc_request( - None, - peer_id, - RPCRequest::Hello(hello_message(&chain)), - ); - } - } - ImportManagerOutcome::RequestBlocks { - peer_id, - request_id, - request, - } => { - trace!( - self.log, - "RPC Request"; - "method" => "BeaconBlocks", - "id" => request_id, - "count" => request.count, - "peer" => format!("{:?}", peer_id) - ); - self.network.send_rpc_request( - Some(request_id), - peer_id.clone(), - RPCRequest::BeaconBlocks(request), - ); - } - ImportManagerOutcome::RecentRequest { - peer_id, - request_id, - request, - } => { - trace!( - self.log, - "RPC Request"; - "method" => "RecentBeaconBlocks", - "count" => request.block_roots.len(), - "peer" => format!("{:?}", peer_id) - ); - self.network.send_rpc_request( - Some(request_id), - peer_id.clone(), - RPCRequest::RecentBeaconBlocks(request), - ); - } - ImportManagerOutcome::DownvotePeer(peer_id) => { - trace!( - self.log, - "Peer downvoted"; - "peer" => format!("{:?}", peer_id) - ); - // TODO: Implement reputation - self.network - .disconnect(peer_id.clone(), GoodbyeReason::Fault); - } - ImportManagerOutcome::Idle => { - // nothing to do - return; - } - } + // If the node's best-block is already known to us and they are close to our current + // head, treat them as a fully sync'd peer. + self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); + } else { + // The remote node has an equal or great finalized epoch and we don't know it's head. + // + // Therefore, there are some blocks between the local finalized epoch and the remote + // head that are worth downloading. + debug!( + self.log, "UsefulPeer"; + "peer" => format!("{:?}", peer_id), + "local_finalized_epoch" => local.finalized_epoch, + "remote_latest_finalized_epoch" => remote.finalized_epoch, + ); + self.send_to_sync(SyncMessage::AddPeer(peer_id, remote)); } } @@ -320,20 +240,11 @@ impl SimpleSync { request_id: RequestId, request: RecentBeaconBlocksRequest, ) { - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - info!(self.log, "Sync shutting down"; - "reason" => "Beacon chain dropped"); - return; - } - }; - let blocks: Vec> = request .block_roots .iter() .filter_map(|root| { - if let Ok(Some(block)) = chain.store.get::>(root) { + if let Ok(Some(block)) = self.chain.store.get::>(root) { Some(block) } else { debug!( @@ -370,15 +281,6 @@ impl SimpleSync { request_id: RequestId, req: BeaconBlocksRequest, ) { - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - info!(self.log, "Sync shutting down"; - "reason" => "Beacon chain dropped"); - return; - } - }; - debug!( self.log, "BeaconBlocksRequest"; @@ -392,14 +294,15 @@ impl SimpleSync { // In the current implementation we read from the db then filter out out-of-range blocks. // Improving the db schema to prevent this would be ideal. - let mut blocks: Vec> = chain + let mut blocks: Vec> = self + .chain .rev_iter_block_roots() .filter(|(_root, slot)| { req.start_slot <= slot.as_u64() && req.start_slot + req.count > slot.as_u64() }) .take_while(|(_root, slot)| req.start_slot <= slot.as_u64()) .filter_map(|(root, _slot)| { - if let Ok(Some(block)) = chain.store.get::>(&root) { + if let Ok(Some(block)) = self.chain.store.get::>(&root) { Some(block) } else { warn!( @@ -423,7 +326,7 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), "msg" => "Failed to return all requested hashes", "start_slot" => req.start_slot, - "current_slot" => chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), "requested" => req.count, "returned" => blocks.len(), ); @@ -449,10 +352,11 @@ impl SimpleSync { "count" => beacon_blocks.len(), ); - self.manager - .beacon_blocks_response(peer_id, request_id, beacon_blocks); - - self.process_sync(); + self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + }); } /// Handle a `RecentBeaconBlocks` response from the peer. @@ -469,10 +373,11 @@ impl SimpleSync { "count" => beacon_blocks.len(), ); - self.manager - .recent_blocks_response(peer_id, request_id, beacon_blocks); - - self.process_sync(); + self.send_to_sync(SyncMessage::BeaconBlocksResponse { + peer_id, + request_id, + beacon_blocks, + }); } /// Process a gossip message declaring a new block. @@ -481,16 +386,7 @@ impl SimpleSync { /// /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock) -> bool { - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - info!(self.log, "Sync shutting down"; - "reason" => "Beacon chain dropped"); - return false; - } - }; - - if let Ok(outcome) = chain.process_block(block.clone()) { + if let Ok(outcome) = self.chain.process_block(block.clone()) { match outcome { BlockProcessingOutcome::Processed { .. } => { trace!(self.log, "Gossipsub block processed"; @@ -501,7 +397,7 @@ impl SimpleSync { // Inform the sync manager to find parents for this block trace!(self.log, "Block with unknown parent received"; "peer_id" => format!("{:?}",peer_id)); - self.manager.add_unknown_block(block.clone(), peer_id); + self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block.clone())); SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::FutureSlot { @@ -523,16 +419,7 @@ impl SimpleSync { /// /// Not currently implemented. pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation) { - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - info!(self.log, "Sync shutting down"; - "reason" => "Beacon chain dropped"); - return; - } - }; - - match chain.process_attestation(msg) { + match self.chain.process_attestation(msg) { Ok(outcome) => info!( self.log, "Processed attestation"; @@ -547,7 +434,7 @@ impl SimpleSync { } /// Build a `HelloMessage` representing the state of the given `beacon_chain`. -fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { +pub(crate) fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { let state = &beacon_chain.head().beacon_state; HelloMessage {