diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index fd10c5aea..7a1a4ad31 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -22,8 +22,6 @@ pub struct MessageHandler { _chain: Arc>, /// The syncing framework. sync: SimpleSync, - /// The context required to send messages to, and process messages from peers. - network_context: NetworkContext, /// The `MessageHandler` logger. log: slog::Logger, } @@ -52,15 +50,13 @@ impl MessageHandler { trace!(log, "Service starting"); let (handler_send, handler_recv) = mpsc::unbounded_channel(); - // Initialise sync and begin processing in thread - let sync = SimpleSync::new(beacon_chain.clone(), &log); + let sync = SimpleSync::new(beacon_chain.clone(), network_send, &log); // generate the Message handler let mut handler = MessageHandler { _chain: beacon_chain.clone(), sync, - network_context: NetworkContext::new(network_send, log.clone()), log: log.clone(), }; @@ -81,7 +77,7 @@ impl MessageHandler { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { - self.sync.on_connect(peer_id, &mut self.network_context); + self.sync.on_connect(peer_id); } // A peer has disconnected HandlerMessage::PeerDisconnected(peer_id) => { @@ -112,32 +108,24 @@ impl MessageHandler { /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { match request { - RPCRequest::Hello(hello_message) => self.sync.on_hello_request( - peer_id, - request_id, - hello_message, - &mut self.network_context, - ), + RPCRequest::Hello(hello_message) => { + self.sync + .on_hello_request(peer_id, request_id, hello_message) + } RPCRequest::Goodbye(goodbye_reason) => { debug!( self.log, "PeerGoodbye"; "peer" => format!("{:?}", peer_id), - "reason" => format!("{:?}", reason), + "reason" => format!("{:?}", goodbye_reason), ); - self.sync.on_disconnect(peer_id), - }, - RPCRequest::BeaconBlocks(request) => self.sync.on_beacon_blocks_request( - peer_id, - request_id, - request, - &mut self.network_context, - ), - RPCRequest::RecentBeaconBlocks(request) => self.sync.on_recent_beacon_blocks_request( - peer_id, - request_id, - request, - &mut self.network_context, - ), + self.sync.on_disconnect(peer_id); + } + RPCRequest::BeaconBlocks(request) => self + .sync + .on_beacon_blocks_request(peer_id, request_id, request), + RPCRequest::RecentBeaconBlocks(request) => self + .sync + .on_recent_beacon_blocks_request(peer_id, request_id, request), } } @@ -163,20 +151,15 @@ impl MessageHandler { RPCErrorResponse::Success(response) => { match response { RPCResponse::Hello(hello_message) => { - self.sync.on_hello_response( - peer_id, - hello_message, - &mut self.network_context, - ); + self.sync.on_hello_response(peer_id, hello_message); } RPCResponse::BeaconBlocks(response) => { - match self.decode_beacon_blocks(response) { + match self.decode_beacon_blocks(&response) { Ok(beacon_blocks) => { self.sync.on_beacon_blocks_response( peer_id, request_id, beacon_blocks, - &mut self.network_context, ); } Err(e) => { @@ -186,13 +169,12 @@ impl MessageHandler { } } RPCResponse::RecentBeaconBlocks(response) => { - match self.decode_beacon_blocks(response) { + match self.decode_beacon_blocks(&response) { Ok(beacon_blocks) => { self.sync.on_recent_beacon_blocks_response( - request_id, peer_id, + request_id, beacon_blocks, - &mut self.network_context, ); } Err(e) => { @@ -217,19 +199,14 @@ impl MessageHandler { match gossip_message { PubsubMessage::Block(message) => match self.decode_gossip_block(message) { Ok(block) => { - let _should_forward_on = - self.sync - .on_block_gossip(peer_id, block, &mut self.network_context); + let _should_forward_on = self.sync.on_block_gossip(peer_id, block); } Err(e) => { debug!(self.log, "Invalid gossiped beacon block"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); } }, PubsubMessage::Attestation(message) => match self.decode_gossip_attestation(message) { - Ok(attestation) => { - self.sync - .on_attestation_gossip(peer_id, attestation, &mut self.network_context) - } + Ok(attestation) => self.sync.on_attestation_gossip(peer_id, attestation), Err(e) => { debug!(self.log, "Invalid gossiped attestation"; "peer_id" => format!("{}", peer_id), "Error" => format!("{:?}", e)); } @@ -331,56 +308,3 @@ impl MessageHandler { Vec::from_ssz_bytes(&beacon_blocks) } } - -/// Wraps a Network Channel to employ various RPC/Sync related network functionality. -pub struct NetworkContext { - /// The network channel to relay messages to the Network service. - network_send: mpsc::UnboundedSender, - /// Logger for the `NetworkContext`. - log: slog::Logger, -} - -impl NetworkContext { - pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { - Self { network_send, log } - } - - pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { - self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason)) - // TODO: disconnect peers. - } - - pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { - // Note: There is currently no use of keeping track of requests. However the functionality - // is left here for future revisions. - self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request)); - } - - //TODO: Handle Error responses - pub fn send_rpc_response( - &mut self, - peer_id: PeerId, - request_id: RequestId, - rpc_response: RPCErrorResponse, - ) { - self.send_rpc_event( - peer_id, - RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), - ); - } - - fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.send(peer_id, OutgoingMessage::RPC(rpc_event)) - } - - fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) { - self.network_send - .try_send(NetworkMessage::Send(peer_id, outgoing_message)) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send RPC message to the network service" - ) - }); - } -} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a4ce544ec..f5c669455 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1,129 +1,164 @@ -const MAX_BLOCKS_PER_REQUEST: usize = 10; +use super::simple_sync::{PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; +use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::RequestId; +use eth2_libp2p::PeerId; +use slog::{debug, info, trace, warn, Logger}; +use std::collections::{HashMap, HashSet}; +use std::ops::{Add, Sub}; +use std::sync::Arc; +use types::{BeaconBlock, EthSpec, Hash256, Slot}; + +const MAX_BLOCKS_PER_REQUEST: u64 = 10; /// The number of slots that we can import blocks ahead of us, before going into full Sync mode. -const SLOT_IMPORT_TOLERANCE: u64 = 10; +const SLOT_IMPORT_TOLERANCE: usize = 10; const PARENT_FAIL_TOLERANCE: usize = 3; -const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE*2; +const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2; +#[derive(PartialEq)] enum BlockRequestsState { QueuedForward, QueuedBackward, Pending(RequestId), Complete, + Failed, } -struct BlockRequests { - target_head_slot: Slot +struct BlockRequests { + target_head_slot: Slot, target_head_root: Hash256, - downloaded_blocks: Vec, - state: State, + downloaded_blocks: Vec>, + state: BlockRequestsState, } -struct ParentRequests { - downloaded_blocks: Vec, - attempts: usize, +struct ParentRequests { + downloaded_blocks: Vec>, + failed_attempts: usize, last_submitted_peer: PeerId, // to downvote the submitting peer. state: BlockRequestsState, } -impl BlockRequests { - +impl BlockRequests { // gets the start slot for next batch // last block slot downloaded plus 1 fn next_start_slot(&self) -> Option { if !self.downloaded_blocks.is_empty() { match self.state { BlockRequestsState::QueuedForward => { - let last_element_index = self.downloaded_blocks.len() -1; - Some(downloaded_blocks[last_element_index].slot.add(1)) + let last_element_index = self.downloaded_blocks.len() - 1; + Some(self.downloaded_blocks[last_element_index].slot.add(1)) } BlockRequestsState::QueuedBackward => { let earliest_known_slot = self.downloaded_blocks[0].slot; Some(earliest_known_slot.add(1).sub(MAX_BLOCKS_PER_REQUEST)) } + _ => { + // pending/complete/failed + None + } } - } - else { + } else { None } } } +#[derive(PartialEq, Debug, Clone)] enum ManagerState { Syncing, Regular, Stalled, } -enum ImportManagerOutcome { +pub(crate) enum ImportManagerOutcome { Idle, - RequestBlocks{ + RequestBlocks { peer_id: PeerId, request_id: RequestId, request: BeaconBlocksRequest, }, + /// Updates information with peer via requesting another HELLO handshake. + Hello(PeerId), RecentRequest(PeerId, RecentBeaconBlocksRequest), DownvotePeer(PeerId), } - -pub struct ImportManager { +pub struct ImportManager { /// A reference to the underlying beacon chain. chain: Arc>, - state: MangerState, - import_queue: HashMap, - parent_queue: Vec, - full_peers: Hashset, + state: ManagerState, + import_queue: HashMap>, + parent_queue: Vec>, + full_peers: HashSet, current_req_id: usize, log: Logger, } -impl ImportManager { +impl ImportManager { + pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { + ImportManager { + chain: beacon_chain.clone(), + state: ManagerState::Regular, + import_queue: HashMap::new(), + parent_queue: Vec::new(), + full_peers: HashSet::new(), + current_req_id: 0, + log: log.clone(), + } + } - pub fn add_peer(&mut self, peer_id, remote: PeerSyncInfo) { + pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { // TODO: Improve comments. // initially try to download blocks from our current head // then backwards search all the way back to our finalized epoch until we match on a chain // has to be done sequentially to find next slot to start the batch from - + let local = PeerSyncInfo::from(&self.chain); // If a peer is within SLOT_IMPORT_TOLERANCE from out head slot, ignore a batch sync - if remote.head_slot.sub(local.head_slot) < SLOT_IMPORT_TOLERANCE { + if remote.head_slot.sub(local.head_slot).as_usize() < SLOT_IMPORT_TOLERANCE { trace!(self.log, "Ignoring full sync with peer"; - "peer" => peer_id, - "peer_head_slot" => remote.head_slot, - "local_head_slot" => local.head_slot, - ); + "peer" => format!("{:?}", peer_id), + "peer_head_slot" => remote.head_slot, + "local_head_slot" => local.head_slot, + ); // remove the peer from the queue if it exists - self.import_queue.remove(&peer_id); + self.import_queue.remove(&peer_id); return; } if let Some(block_requests) = self.import_queue.get_mut(&peer_id) { // update the target head slot - if remote.head_slot > requested_block.target_head_slot { + if remote.head_slot > block_requests.target_head_slot { block_requests.target_head_slot = remote.head_slot; } - } else { + } else { let block_requests = BlockRequests { target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called target_head_root: remote.head_root, downloaded_blocks: Vec::new(), - state: RequestedBlockState::Queued - } + state: BlockRequestsState::QueuedForward, + }; self.import_queue.insert(peer_id, block_requests); } - } - pub fn beacon_blocks_response(peer_id: PeerId, request_id: RequestId, blocks: Vec) { - + pub fn beacon_blocks_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + mut blocks: Vec>, + ) { // find the request - let block_requests = match self.import_queue.get_mut(&peer_id) { - Some(req) if req.state = RequestedBlockState::Pending(request_id) => req, - None => { + let block_requests = match self + .import_queue + .get_mut(&peer_id) + .filter(|r| r.state == BlockRequestsState::Pending(request_id)) + { + Some(req) => req, + _ => { // No pending request, invalid request_id or coding error warn!(self.log, "BeaconBlocks response unknown"; "request_id" => request_id); return; @@ -142,100 +177,115 @@ impl ImportManager { if blocks.is_empty() { warn!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); - block_requests.state = RequestedBlockState::Failed; + block_requests.state = BlockRequestsState::Failed; return; } // Add the newly downloaded blocks to the current list of downloaded blocks. This also // determines if we are syncing forward or backward. let syncing_forwards = { - if block_requests.blocks.is_empty() { - block_requests.blocks.push(blocks); + if block_requests.downloaded_blocks.is_empty() { + block_requests.downloaded_blocks.append(&mut blocks); true - } - else if block_requests.blocks[0].slot < blocks[0].slot { // syncing forwards - // verify the peer hasn't sent overlapping blocks - ensuring the strictly - // increasing blocks in a batch will be verified during the processing - if block_requests.next_slot() > blocks[0].slot { - warn!(self.log, "BeaconBlocks response returned duplicate blocks", "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.next_slot()); - block_requests.state = RequestedBlockState::Failed; - return; - } - - block_requests.blocks.push(blocks); - true + } else if block_requests.downloaded_blocks[0].slot < blocks[0].slot { + // syncing forwards + // verify the peer hasn't sent overlapping blocks - ensuring the strictly + // increasing blocks in a batch will be verified during the processing + if block_requests.next_start_slot() > Some(blocks[0].slot) { + warn!(self.log, "BeaconBlocks response returned duplicate blocks"; "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.next_start_slot()); + block_requests.state = BlockRequestsState::Failed; + return; } - else { false } + + block_requests.downloaded_blocks.append(&mut blocks); + true + } else { + false + } }; - // Determine if more blocks need to be downloaded. There are a few cases: // - We have downloaded a batch from our head_slot, which has not reached the remotes head // (target head). Therefore we need to download another sequential batch. // - The latest batch includes blocks that greater than or equal to the target_head slot, - // which means we have caught up to their head. We then check to see if the first + // which means we have caught up to their head. We then check to see if the first // block downloaded matches our head. If so, we are on the same chain and can process // the blocks. If not we need to sync back further until we are on the same chain. So // request more blocks. // - We are syncing backwards (from our head slot) and need to check if we are on the same // chain. If so, process the blocks, if not, request more blocks all the way up to // our last finalized slot. - + if syncing_forwards { // does the batch contain the target_head_slot - let last_element_index = block_requests.blocks.len()-1; - if block_requests[last_element_index].slot >= block_requests.target_slot { + let last_element_index = block_requests.downloaded_blocks.len() - 1; + if block_requests.downloaded_blocks[last_element_index].slot + >= block_requests.target_head_slot + { // if the batch is on our chain, this is complete and we can then process. // Otherwise start backwards syncing until we reach a common chain. - let earliest_slot = block_requests_blocks[0].slot - if block_requests.blocks[0] == self.chain.get_block_by_slot(earliest_slot) { - block_requests.state = RequestedBlockState::Complete; + let earliest_slot = block_requests.downloaded_blocks[0].slot; + //TODO: Decide which is faster. Reading block from db and comparing or calculating + //the hash tree root and comparing. + if Some(block_requests.downloaded_blocks[0].canonical_root()) + == root_at_slot(self.chain, earliest_slot) + { + block_requests.state = BlockRequestsState::Complete; return; } // not on the same chain, request blocks backwards - // binary search, request half the distance between the earliest block and our - // finalized slot - let state = &beacon_chain.head().beacon_state; - let local_finalized_slot = state.finalized_checkpoint.epoch; //TODO: Convert to slot - // check that the request hasn't failed by having no common chain - if local_finalized_slot >= block_requests.blocks[0] { + let state = &self.chain.head().beacon_state; + let local_finalized_slot = state + .finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // check that the request hasn't failed by having no common chain + if local_finalized_slot >= block_requests.downloaded_blocks[0].slot { warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); - block_requests.state = RequestedBlockState::Failed; + block_requests.state = BlockRequestsState::Failed; return; } - // Start a backwards sync by requesting earlier blocks + // Start a backwards sync by requesting earlier blocks // There can be duplication in downloaded blocks here if there are a large number // of skip slots. In all cases we at least re-download the earliest known block. // It is unlikely that a backwards sync in required, so we accept this duplication // for now. - block_requests.state = RequestedBlockState::QueuedBackward; + block_requests.state = BlockRequestsState::QueuedBackward; + } else { + // batch doesn't contain the head slot, request the next batch + block_requests.state = BlockRequestsState::QueuedForward; } - else { - // batch doesn't contain the head slot, request the next batch - block_requests.state = RequestedBlockState::QueuedForward; - } - } - else { + } else { // syncing backwards // if the batch is on our chain, this is complete and we can then process. // Otherwise continue backwards - let earliest_slot = block_requests_blocks[0].slot - if block_requests.blocks[0] == self.chain.get_block_by_slot(earliest_slot) { - block_requests.state = RequestedBlockState::Complete; + let earliest_slot = block_requests.downloaded_blocks[0].slot; + if Some(block_requests.downloaded_blocks[0].canonical_root()) + == root_at_slot(self.chain, earliest_slot) + { + block_requests.state = BlockRequestsState::Complete; return; } - block_requests.state = RequestedBlockState::QueuedBackward; - + block_requests.state = BlockRequestsState::QueuedBackward; } } - pub fn recent_blocks_response(peer_id: PeerId, request_id: RequestId, blocks: Vec) { - + pub fn recent_blocks_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + blocks: Vec>, + ) { // find the request - let parent_request = match self.parent_queue.get_mut(&peer_id) { - Some(req) if req.state = RequestedBlockState::Pending(request_id) => req, + let parent_request = match self + .parent_queue + .iter_mut() + .find(|request| request.state == BlockRequestsState::Pending(request_id)) + { + Some(req) => req, None => { // No pending request, invalid request_id or coding error warn!(self.log, "RecentBeaconBlocks response unknown"; "request_id" => request_id); @@ -245,8 +295,8 @@ impl ImportManager { // if an empty response is given, the peer didn't have the requested block, try again if blocks.is_empty() { - parent_request.attempts += 1; - parent_request.state = RequestedBlockState::QueuedForward; + parent_request.failed_attempts += 1; + parent_request.state = BlockRequestsState::QueuedForward; parent_request.last_submitted_peer = peer_id; return; } @@ -256,29 +306,27 @@ impl ImportManager { if blocks.len() != 1 { //TODO: Potentially downvote the peer debug!(self.log, "Peer sent more than 1 parent. Ignoring"; - "peer_id" => peer_id, - "no_parents" => blocks.len() - ); + "peer_id" => format!("{:?}", peer_id), + "no_parents" => blocks.len() + ); return; } - // queue for processing - parent_request.state = RequestedBlockState::Complete; + parent_request.state = BlockRequestsState::Complete; } - pub fn inject_error(peer_id: PeerId, id: RequestId) { //TODO: Remove block state from pending } - pub fn peer_disconnect(peer_id: PeerId) { - self.import_queue.remove(&peer_id); - self.full_peers.remove(&peer_id); + pub fn peer_disconnect(&mut self, peer_id: &PeerId) { + self.import_queue.remove(peer_id); + self.full_peers.remove(peer_id); self.update_state(); } - pub fn add_full_peer(peer_id: PeerId) { + pub fn add_full_peer(&mut self, peer_id: PeerId) { debug!( self.log, "Fully synced peer added"; "peer" => format!("{:?}", peer_id), @@ -287,32 +335,36 @@ impl ImportManager { self.update_state(); } - pub fn add_unknown_block(&mut self,block: BeaconBlock) { + pub fn add_unknown_block(&mut self, block: BeaconBlock, peer_id: PeerId) { // if we are not in regular sync mode, ignore this block - if self.state == ManagerState::Regular { + if let ManagerState::Regular = self.state { return; } // make sure this block is not already being searched for // TODO: Potentially store a hashset of blocks for O(1) lookups for parent_req in self.parent_queue.iter() { - if let Some(_) = parent_req.downloaded_blocks.iter().find(|d_block| d_block == block) { + if let Some(_) = parent_req + .downloaded_blocks + .iter() + .find(|d_block| d_block == &&block) + { // we are already searching for this block, ignore it return; } } - let req = ParentRequests { + let req = ParentRequests { downloaded_blocks: vec![block], failed_attempts: 0, - state: RequestedBlockState::QueuedBackward - } + last_submitted_peer: peer_id, + state: BlockRequestsState::QueuedBackward, + }; self.parent_queue.push(req); } - pub fn poll() -> ImportManagerOutcome { - + pub fn poll(&mut self) -> ImportManagerOutcome { loop { // update the state of the manager self.update_state(); @@ -336,304 +388,340 @@ impl ImportManager { if let (re_run, outcome) = self.process_complete_parent_requests() { if let Some(outcome) = outcome { return outcome; - } - else if !re_run { + } else if !re_run { break; } } } - - return ImportManagerOutcome::Idle; + return ImportManagerOutcome::Idle; } - fn update_state(&mut self) { - let previous_state = self.state; + let previous_state = self.state.clone(); self.state = { if !self.import_queue.is_empty() { ManagerState::Syncing + } else if !self.full_peers.is_empty() { + ManagerState::Regular + } else { + ManagerState::Stalled } - else if !self.full_peers.is_empty() { - ManagerState::Regualar - } - else { - ManagerState::Stalled } }; if self.state != previous_state { - info!(self.log, "Syncing state updated", - "old_state" => format!("{:?}", previous_state) - "new_state" => format!("{:?}", self.state) - ); + info!(self.log, "Syncing state updated"; + "old_state" => format!("{:?}", previous_state), + "new_state" => format!("{:?}", self.state), + ); } } - - - fn process_potential_block_requests(&mut self) -> Option { + fn process_potential_block_requests(&mut self) -> Option { // check if an outbound request is required // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p // layer and not needed here. - // If any in queued state we submit a request. - + // If any in queued state we submit a request. // remove any failed batches self.import_queue.retain(|peer_id, block_request| { - if block_request.state == RequestedBlockState::Failed { - debug!(self.log, "Block import from peer failed", - "peer_id" => peer_id, - "downloaded_blocks" => block_request.downloaded.blocks.len() - ); + if let BlockRequestsState::Failed = block_request.state { + debug!(self.log, "Block import from peer failed"; + "peer_id" => format!("{:?}", peer_id), + "downloaded_blocks" => block_request.downloaded_blocks.len() + ); false + } else { + true } - else { true } }); + // process queued block requests + for (peer_id, block_requests) in self.import_queue.iter_mut().find(|(_peer_id, req)| { + req.state == BlockRequestsState::QueuedForward + || req.state == BlockRequestsState::QueuedBackward + }) { + let request_id = self.current_req_id; + block_requests.state = BlockRequestsState::Pending(request_id); + self.current_req_id += 1; - for (peer_id, block_requests) in self.import_queue.iter_mut() { - if let Some(request) = requests.iter().find(|req| req.state == RequestedBlockState::QueuedForward || req.state == RequestedBlockState::QueuedBackward) { - - let request.state = RequestedBlockState::Pending(self.current_req_id); - self.current_req_id +=1; - - let req = BeaconBlocksRequest { - head_block_root: request.target_root, - start_slot: request.next_start_slot().unwrap_or_else(|| self.chain.head().slot), - count: MAX_BLOCKS_PER_REQUEST, - step: 0 - } - return Some(ImportManagerOutCome::RequestBlocks{ peer_id, req }); - } + let request = BeaconBlocksRequest { + head_block_root: block_requests.target_head_root, + start_slot: block_requests + .next_start_slot() + .unwrap_or_else(|| self.chain.best_slot()) + .as_u64(), + count: MAX_BLOCKS_PER_REQUEST, + step: 0, + }; + return Some(ImportManagerOutcome::RequestBlocks { + peer_id: peer_id.clone(), + request, + request_id, + }); } None } fn process_complete_batches(&mut self) -> Option { - - let completed_batches = self.import_queue.iter().filter(|_peer, block_requests| block_requests.state == RequestedState::Complete).map(|peer, _| peer).collect::>(); + let completed_batches = self + .import_queue + .iter() + .filter(|(_peer, block_requests)| block_requests.state == BlockRequestsState::Complete) + .map(|(peer, _)| peer) + .cloned() + .collect::>(); for peer_id in completed_batches { - let block_requests = self.import_queue.remove(&peer_id).unwrap("key exists"); - match self.process_blocks(block_requests.downloaded_blocks) { - Ok(()) => { - //TODO: Verify it's impossible to have empty downloaded_blocks - last_element = block_requests.downloaded_blocks.len() -1 - debug!(self.log, "Blocks processed successfully"; - "peer" => peer_id, - "start_slot" => block_requests.downloaded_blocks[0].slot, - "end_slot" => block_requests.downloaded_blocks[last_element].slot, - "no_blocks" => last_element + 1, - ); - // Re-HELLO to ensure we are up to the latest head - return Some(ImportManagerOutcome::Hello(peer_id)); - } - Err(e) => { - last_element = block_requests.downloaded_blocks.len() -1 - warn!(self.log, "Block processing failed"; - "peer" => peer_id, - "start_slot" => block_requests.downloaded_blocks[0].slot, - "end_slot" => block_requests.downloaded_blocks[last_element].slot, - "no_blocks" => last_element + 1, - "error" => format!("{:?}", e), - ); - return Some(ImportManagerOutcome::DownvotePeer(peer_id)); - } + let block_requests = self.import_queue.remove(&peer_id).expect("key exists"); + match self.process_blocks(block_requests.downloaded_blocks.clone()) { + Ok(()) => { + //TODO: Verify it's impossible to have empty downloaded_blocks + let last_element = block_requests.downloaded_blocks.len() - 1; + debug!(self.log, "Blocks processed successfully"; + "peer" => format!("{:?}", peer_id), + "start_slot" => block_requests.downloaded_blocks[0].slot, + "end_slot" => block_requests.downloaded_blocks[last_element].slot, + "no_blocks" => last_element + 1, + ); + // Re-HELLO to ensure we are up to the latest head + return Some(ImportManagerOutcome::Hello(peer_id)); } + Err(e) => { + let last_element = block_requests.downloaded_blocks.len() - 1; + warn!(self.log, "Block processing failed"; + "peer" => format!("{:?}", peer_id), + "start_slot" => block_requests.downloaded_blocks[0].slot, + "end_slot" => block_requests.downloaded_blocks[last_element].slot, + "no_blocks" => last_element + 1, + "error" => format!("{:?}", e), + ); + return Some(ImportManagerOutcome::DownvotePeer(peer_id)); + } + } } None } - fn process_parent_requests(&mut self) -> Option { - // remove any failed requests self.parent_queue.retain(|parent_request| { - if parent_request.state == RequestedBlockState::Failed { - debug!(self.log, "Parent import failed", - "block" => parent_request.downloaded_blocks[0].hash, - "siblings found" => parent_request.len() - ); + if parent_request.state == BlockRequestsState::Failed { + debug!(self.log, "Parent import failed"; + "block" => format!("{:?}",parent_request.downloaded_blocks[0].canonical_root()), + "ancestors_found" => parent_request.downloaded_blocks.len() + ); false + } else { + true } - else { true } }); // check to make sure there are peers to search for the parent from if self.full_peers.is_empty() { - return; + return None; } // check if parents need to be searched for for parent_request in self.parent_queue.iter_mut() { if parent_request.failed_attempts >= PARENT_FAIL_TOLERANCE { - parent_request.state == BlockRequestsState::Failed - continue; - } - else if parent_request.state == BlockRequestsState::QueuedForward { + parent_request.state == BlockRequestsState::Failed; + continue; + } else if parent_request.state == BlockRequestsState::QueuedForward { parent_request.state = BlockRequestsState::Pending(self.current_req_id); - self.current_req_id +=1; - let parent_hash = + self.current_req_id += 1; + let last_element_index = parent_request.downloaded_blocks.len() - 1; + let parent_hash = parent_request.downloaded_blocks[last_element_index].parent_root; let req = RecentBeaconBlocksRequest { block_roots: vec![parent_hash], }; // select a random fully synced peer to attempt to download the parent block - let peer_id = self.full_peers.iter().next().expect("List is not empty"); + let peer_id = self.full_peers.iter().next().expect("List is not empty"); - return Some(ImportManagerOutcome::RecentRequest(peer_id, req); + return Some(ImportManagerOutcome::RecentRequest(peer_id.clone(), req)); } } None - } - - - fn process_complete_parent_requests(&mut self) => (bool, Option) { + } + fn process_complete_parent_requests(&mut self) -> (bool, Option) { // flag to determine if there is more process to drive or if the manager can be switched to // an idle state - let mut re_run = false; - - // verify the last added block is the parent of the last requested block - let last_index = parent_requests.downloaded_blocks.len() -1; - let expected_hash = parent_requests.downloaded_blocks[last_index].parent ; - let block_hash = parent_requests.downloaded_blocks[0].tree_hash_root(); - if block_hash != expected_hash { - //TODO: Potentially downvote the peer - debug!(self.log, "Peer sent invalid parent. Ignoring"; - "peer_id" => peer_id, - "received_block" => block_hash, - "expected_parent" => expected_hash, - ); - return; - } + let mut re_run = false; // Find any parent_requests ready to be processed - for completed_request in self.parent_queue.iter_mut().filter(|req| req.state == BlockRequestsState::Complete) { + for completed_request in self + .parent_queue + .iter_mut() + .filter(|req| req.state == BlockRequestsState::Complete) + { + // verify the last added block is the parent of the last requested block + let last_index = completed_request.downloaded_blocks.len() - 1; + let expected_hash = completed_request.downloaded_blocks[last_index].parent_root; + // Note: the length must be greater than 1 so this cannot panic. + let block_hash = completed_request.downloaded_blocks[last_index - 1].canonical_root(); + if block_hash != expected_hash { + // remove the head block + let _ = completed_request.downloaded_blocks.pop(); + completed_request.state = BlockRequestsState::QueuedForward; + //TODO: Potentially downvote the peer + let peer = completed_request.last_submitted_peer.clone(); + debug!(self.log, "Peer sent invalid parent. Ignoring"; + "peer_id" => format!("{:?}",peer), + "received_block" => format!("{}", block_hash), + "expected_parent" => format!("{}", expected_hash), + ); + return (true, Some(ImportManagerOutcome::DownvotePeer(peer))); + } + // try and process the list of blocks up to the requested block while !completed_request.downloaded_blocks.is_empty() { - let block = completed_request.downloaded_blocks.pop(); - match self.chain_process_block(block.clone()) { - Ok(BlockProcessingOutcome::ParentUnknown { parent } => { + let block = completed_request + .downloaded_blocks + .pop() + .expect("Block must exist exist"); + match self.chain.process_block(block.clone()) { + Ok(BlockProcessingOutcome::ParentUnknown { parent: _ }) => { // need to keep looking for parents completed_request.downloaded_blocks.push(block); completed_request.state == BlockRequestsState::QueuedForward; re_run = true; break; } - Ok(BlockProcessingOutcome::Processed { _ } => { } - Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again - completed_request.failed_attempts +=1; + Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} + Ok(outcome) => { + // it's a future slot or an invalid block, remove it and try again + completed_request.failed_attempts += 1; trace!( self.log, "Invalid parent block"; - "outcome" => format!("{:?}", outcome); + "outcome" => format!("{:?}", outcome), "peer" => format!("{:?}", completed_request.last_submitted_peer), ); completed_request.state == BlockRequestsState::QueuedForward; re_run = true; - return (re_run, Some(ImportManagerOutcome::DownvotePeer(completed_request.last_submitted_peer))); + return ( + re_run, + Some(ImportManagerOutcome::DownvotePeer( + completed_request.last_submitted_peer.clone(), + )), + ); } - Err(e) => { - completed_request.failed_attempts +=1; + Err(e) => { + completed_request.failed_attempts += 1; warn!( self.log, "Parent processing error"; - "error" => format!("{:?}", e); + "error" => format!("{:?}", e) ); completed_request.state == BlockRequestsState::QueuedForward; re_run = true; - return (re_run, Some(ImportManagerOutcome::DownvotePeer(completed_request.last_submitted_peer))); - } + return ( + re_run, + Some(ImportManagerOutcome::DownvotePeer( + completed_request.last_submitted_peer.clone(), + )), + ); } + } } } // remove any full completed and processed parent chains - self.parent_queue.retain(|req| if req.state == BlockRequestsState::Complete { false } else { true }); + self.parent_queue.retain(|req| { + if req.state == BlockRequestsState::Complete { + false + } else { + true + } + }); (re_run, None) - } - - fn process_blocks( - &mut self, - blocks: Vec>, - ) -> Result<(), String> { - + fn process_blocks(&mut self, blocks: Vec>) -> Result<(), String> { for block in blocks { - let processing_result = self.chain.process_block(block.clone()); + let processing_result = self.chain.process_block(block.clone()); - if let Ok(outcome) = processing_result { - match outcome { - BlockProcessingOutcome::Processed { block_root } => { - // The block was valid and we processed it successfully. - trace!( - self.log, "Imported block from network"; - "source" => source, - "slot" => block.slot, - "block_root" => format!("{}", block_root), - "peer" => format!("{:?}", peer_id), - ); - } - BlockProcessingOutcome::ParentUnknown { parent } => { - // blocks should be sequential and all parents should exist - trace!( - self.log, "ParentBlockUnknown"; - "source" => source, - "parent_root" => format!("{}", parent), - "baby_block_slot" => block.slot, - ); - return Err(format!("Block at slot {} has an unknown parent.", block.slot)); - } - BlockProcessingOutcome::FutureSlot { - present_slot, - block_slot, - } => { - if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { - // The block is too far in the future, drop it. + if let Ok(outcome) = processing_result { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + // The block was valid and we processed it successfully. trace!( - self.log, "FutureBlock"; - "source" => source, - "msg" => "block for future slot rejected, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - "peer" => format!("{:?}", peer_id), - ); - return Err(format!("Block at slot {} is too far in the future", block.slot)); - } else { - // The block is in the future, but not too far. - trace!( - self.log, "QueuedFutureBlock"; - "source" => source, - "msg" => "queuing future block, check your time", - "present_slot" => present_slot, - "block_slot" => block_slot, - "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, - "peer" => format!("{:?}", peer_id), + self.log, "Imported block from network"; + "slot" => block.slot, + "block_root" => format!("{}", block_root), ); } + BlockProcessingOutcome::ParentUnknown { parent } => { + // blocks should be sequential and all parents should exist + trace!( + self.log, "ParentBlockUnknown"; + "parent_root" => format!("{}", parent), + "baby_block_slot" => block.slot, + ); + return Err(format!( + "Block at slot {} has an unknown parent.", + block.slot + )); + } + BlockProcessingOutcome::FutureSlot { + present_slot, + block_slot, + } => { + if present_slot + FUTURE_SLOT_TOLERANCE >= block_slot { + // The block is too far in the future, drop it. + trace!( + self.log, "FutureBlock"; + "msg" => "block for future slot rejected, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + return Err(format!( + "Block at slot {} is too far in the future", + block.slot + )); + } else { + // The block is in the future, but not too far. + trace!( + self.log, "QueuedFutureBlock"; + "msg" => "queuing future block, check your time", + "present_slot" => present_slot, + "block_slot" => block_slot, + "FUTURE_SLOT_TOLERANCE" => FUTURE_SLOT_TOLERANCE, + ); + } + } + _ => { + trace!( + self.log, "InvalidBlock"; + "msg" => "peer sent invalid block", + "outcome" => format!("{:?}", outcome), + ); + return Err(format!("Invalid block at slot {}", block.slot)); + } } - _ => { - trace!( - self.log, "InvalidBlock"; - "source" => source, - "msg" => "peer sent invalid block", - "outcome" => format!("{:?}", outcome), - "peer" => format!("{:?}", peer_id), - ); - return Err(format!("Invalid block at slot {}", block.slot)); - } + } else { + trace!( + self.log, "BlockProcessingFailure"; + "msg" => "unexpected condition in processing block.", + "outcome" => format!("{:?}", processing_result) + ); + return Err(format!( + "Unexpected block processing error: {:?}", + processing_result + )); } - Ok(()) - } else { - trace!( - self.log, "BlockProcessingFailure"; - "source" => source, - "msg" => "unexpected condition in processing block.", - "outcome" => format!("{:?}", processing_result) - ); - return Err(format!("Unexpected block processing error: {:?}", processing_result)); } - } + Ok(()) } } + +fn root_at_slot( + chain: Arc>, + target_slot: Slot, +) -> Option { + chain + .rev_iter_block_roots() + .find(|(_root, slot)| *slot == target_slot) + .map(|(root, _slot)| root) +} diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index fac1b46eb..b26d78c14 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -1,4 +1,4 @@ -mod import_queue; +mod manager; /// Syncing for lighthouse. /// /// Stores the various syncing methods for the beacon chain. diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index a7f5ced40..deadf214d 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,8 +1,9 @@ -use super::import_queue::{ImportQueue, PartialBeaconBlockCompletion}; -use crate::message_handler::NetworkContext; +use super::manager::{ImportManager, ImportManagerOutcome}; +use crate::service::{NetworkMessage, OutgoingMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, trace, warn}; use ssz::Encode; @@ -10,14 +11,14 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use store::Store; +use tokio::sync::mpsc; use types::{ Attestation, BeaconBlock, BeaconBlockBody, BeaconBlockHeader, Epoch, EthSpec, Hash256, Slot, }; - /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. /// Otherwise we queue it. -const FUTURE_SLOT_TOLERANCE: u64 = 1; +pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; const SHOULD_FORWARD_GOSSIP_BLOCK: bool = true; const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; @@ -25,16 +26,13 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; /// Keeps track of syncing information for known connected peers. #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { - fork_version: [u8,4], - finalized_root: Hash256, - finalized_epoch: Epoch, - head_root: Hash256, - head_slot: Slot, + fork_version: [u8; 4], + pub finalized_root: Hash256, + pub finalized_epoch: Epoch, + pub head_root: Hash256, + pub head_slot: Slot, } - - - impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { @@ -43,7 +41,6 @@ impl From for PeerSyncInfo { finalized_epoch: hello.finalized_epoch, head_root: hello.head_root, head_slot: hello.head_slot, - requested_slot_skip: None, } } } @@ -66,18 +63,24 @@ pub enum SyncState { pub struct SimpleSync { /// A reference to the underlying beacon chain. chain: Arc>, - manager: ImportManager, + manager: ImportManager, + network: NetworkContext, log: slog::Logger, } impl SimpleSync { /// Instantiate a `SimpleSync` instance, with no peers and an empty queue. - pub fn new(beacon_chain: Arc>, log: &slog::Logger) -> Self { + pub fn new( + beacon_chain: Arc>, + network_send: mpsc::UnboundedSender, + log: &slog::Logger, + ) -> Self { let sync_logger = log.new(o!("Service"=> "Sync")); SimpleSync { chain: beacon_chain.clone(), - manager: ImportManager::new(), + manager: ImportManager::new(beacon_chain, log), + network: NetworkContext::new(network_send, log.clone()), log: sync_logger, } } @@ -92,8 +95,9 @@ impl SimpleSync { /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. - pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { - network.send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); + pub fn on_connect(&mut self, peer_id: PeerId) { + self.network + .send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); } /// Handle a `Hello` request. @@ -104,42 +108,31 @@ impl SimpleSync { peer_id: PeerId, request_id: RequestId, hello: HelloMessage, - network: &mut NetworkContext, ) { trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); // Say hello back. - network.send_rpc_response( + self.network.send_rpc_response( peer_id.clone(), request_id, RPCResponse::Hello(hello_message(&self.chain)), ); - self.process_hello(peer_id, hello, network); + self.process_hello(peer_id, hello); } /// Process a `Hello` response from a peer. - pub fn on_hello_response( - &mut self, - peer_id: PeerId, - hello: HelloMessage, - network: &mut NetworkContext, - ) { + pub fn on_hello_response(&mut self, peer_id: PeerId, hello: HelloMessage) { trace!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); // Process the hello message, without sending back another hello. - self.process_hello(peer_id, hello, network); + self.process_hello(peer_id, hello); } /// Process a `Hello` message, requesting new blocks if appropriate. /// /// Disconnects the peer if required. - fn process_hello( - &mut self, - peer_id: PeerId, - hello: HelloMessage, - network: &mut NetworkContext, - ) { + fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) { let remote = PeerSyncInfo::from(hello); let local = PeerSyncInfo::from(&self.chain); @@ -153,12 +146,13 @@ impl SimpleSync { "reason" => "network_id" ); - network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); } else if remote.finalized_epoch <= local.finalized_epoch && remote.finalized_root != Hash256::zero() && local.finalized_root != Hash256::zero() - && (self.root_at_slot(start_slot(remote.latest_finalized_epoch)) - != Some(remote.latest_finalized_root)) + && (self.root_at_slot(start_slot(remote.finalized_epoch)) + != Some(remote.finalized_root)) { // The remotes finalized epoch is less than or greater than ours, but the block root is // different to the one in our chain. @@ -169,8 +163,9 @@ impl SimpleSync { "peer" => format!("{:?}", peer_id), "reason" => "different finalized chain" ); - network.disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); - } else if remote.latest_finalized_epoch < local.latest_finalized_epoch { + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + } else if remote.finalized_epoch < local.finalized_epoch { // The node has a lower finalized epoch, their chain is not useful to us. There are two // cases where a node can have a lower finalized epoch: // @@ -193,12 +188,12 @@ impl SimpleSync { } else if self .chain .store - .exists::>(&remote.best_root) + .exists::>(&remote.head_root) .unwrap_or_else(|_| false) { // If the node's best-block is already known to us and they are close to our current // head, treat them as a fully sync'd peer. - self.import_manager.add_full_peer(peer_id); + self.manager.add_full_peer(peer_id); self.process_sync(); } else { // The remote node has an equal or great finalized epoch and we don't know it's head. @@ -208,29 +203,45 @@ impl SimpleSync { debug!( self.log, "UsefulPeer"; "peer" => format!("{:?}", peer_id), - "local_finalized_epoch" => local.latest_finalized_epoch, - "remote_latest_finalized_epoch" => remote.latest_finalized_epoch, + "local_finalized_epoch" => local.finalized_epoch, + "remote_latest_finalized_epoch" => remote.finalized_epoch, ); - self.import_manager.add_peer(peer_id, remote); + self.manager.add_peer(peer_id, remote); self.process_sync(); } } - self.proess_sync(&mut self) { + fn process_sync(&mut self) { loop { - match self.import_manager.poll() { - ImportManagerOutcome::RequestBlocks(peer_id, req) { + match self.manager.poll() { + ImportManagerOutcome::Hello(peer_id) => { + trace!( + self.log, + "RPC Request"; + "method" => "HELLO", + "peer" => format!("{:?}", peer_id) + ); + self.network + .send_rpc_request(peer_id, RPCRequest::Hello(hello_message(&self.chain))); + } + ImportManagerOutcome::RequestBlocks { + peer_id, + request_id, + request, + } => { trace!( self.log, "RPC Request"; "method" => "BeaconBlocks", - "count" => req.count, + "id" => request_id, + "count" => request.count, "peer" => format!("{:?}", peer_id) ); - network.send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(req)); - }, - ImportManagerOutcome::RecentRequest(peer_id, req) { + self.network + .send_rpc_request(peer_id.clone(), RPCRequest::BeaconBlocks(request)); + } + ImportManagerOutcome::RecentRequest(peer_id, req) => { trace!( self.log, "RPC Request"; @@ -238,18 +249,20 @@ impl SimpleSync { "count" => req.block_roots.len(), "peer" => format!("{:?}", peer_id) ); - network.send_rpc_request(peer_id.clone(), RPCRequest::RecentBeaconBlocks(req)); - }, - ImportManagerOutcome::DownvotePeer(peer_id) { + self.network + .send_rpc_request(peer_id.clone(), RPCRequest::RecentBeaconBlocks(req)); + } + ImportManagerOutcome::DownvotePeer(peer_id) => { trace!( self.log, "Peer downvoted"; "peer" => format!("{:?}", peer_id) ); // TODO: Implement reputation - network.disconnect(peer_id.clone(), GoodbyeReason::Fault); - }, - SyncManagerState::Idle { + self.network + .disconnect(peer_id.clone(), GoodbyeReason::Fault); + } + ImportManagerOutcome::Idle => { // nothing to do return; } @@ -257,37 +270,26 @@ impl SimpleSync { } } - - /* fn root_at_slot(&self, target_slot: Slot) -> Option { self.chain .rev_iter_block_roots() .find(|(_root, slot)| *slot == target_slot) .map(|(root, _slot)| root) } - */ - /// Handle a `BeaconBlocks` request from the peer. - pub fn on_beacon_blocks_request( + /// Handle a `RecentBeaconBlocks` request from the peer. + pub fn on_recent_beacon_blocks_request( &mut self, peer_id: PeerId, request_id: RequestId, - req: BeaconBlocksRequest, - network: &mut NetworkContext, + request: RecentBeaconBlocksRequest, ) { - debug!( - self.log, - "BeaconBlocksRequest"; - "peer" => format!("{:?}", peer_id), - "count" => req.count, - "start_slot" => req.start_slot, - ); - - let blocks = Vec> = self - .chain.rev_iter_block_roots().filter(|(_root, slot) req.start_slot <= slot && req.start_slot + req.count >= slot).take_while(|(_root, slot) req.start_slot <= *slot) - .filter_map(|root, slot| { + let blocks: Vec> = request + .block_roots + .iter() + .filter_map(|root| { if let Ok(Some(block)) = self.chain.store.get::>(root) { - Some(block.body) + Some(block) } else { debug!( self.log, @@ -301,10 +303,63 @@ impl SimpleSync { }) .collect(); - roots.reverse(); - roots.dedup_by_key(|brs| brs.block_root); + debug!( + self.log, + "BlockBodiesRequest"; + "peer" => format!("{:?}", peer_id), + "requested" => request.block_roots.len(), + "returned" => blocks.len(), + ); - if roots.len() as u64 != req.count { + self.network.send_rpc_response( + peer_id, + request_id, + RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()), + ) + } + + /// Handle a `BeaconBlocks` request from the peer. + pub fn on_beacon_blocks_request( + &mut self, + peer_id: PeerId, + request_id: RequestId, + req: BeaconBlocksRequest, + ) { + debug!( + self.log, + "BeaconBlocksRequest"; + "peer" => format!("{:?}", peer_id), + "count" => req.count, + "start_slot" => req.start_slot, + ); + + let mut blocks: Vec> = self + .chain + .rev_iter_block_roots() + .filter(|(_root, slot)| { + req.start_slot <= slot.as_u64() && req.start_slot + req.count >= slot.as_u64() + }) + .take_while(|(_root, slot)| req.start_slot <= slot.as_u64()) + .filter_map(|(root, _slot)| { + if let Ok(Some(block)) = self.chain.store.get::>(&root) { + Some(block) + } else { + debug!( + self.log, + "Peer requested unknown block"; + "peer" => format!("{:?}", peer_id), + "request_root" => format!("{:}", root), + ); + + None + } + }) + .collect(); + + blocks.reverse(); + blocks.dedup_by_key(|brs| brs.slot); + + if blocks.len() as u64 != req.count { debug!( self.log, "BeaconBlocksRequest"; @@ -313,33 +368,33 @@ impl SimpleSync { "start_slot" => req.start_slot, "current_slot" => self.chain.present_slot(), "requested" => req.count, - "returned" => roots.len(), + "returned" => blocks.len(), ); } - network.send_rpc_response( + self.network.send_rpc_response( peer_id, request_id, RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()), ) } - /// Handle a `BeaconBlocks` response from the peer. pub fn on_beacon_blocks_response( &mut self, peer_id: PeerId, request_id: RequestId, - res: Vec>, + beacon_blocks: Vec>, ) { debug!( self.log, "BeaconBlocksResponse"; "peer" => format!("{:?}", peer_id), - "count" => res.block_bodies.len(), + "count" => beacon_blocks.len(), ); - self.import_manager.beacon_blocks_response(peer_id, request_id, blocks); + self.manager + .beacon_blocks_response(peer_id, request_id, beacon_blocks); self.process_sync(); } @@ -349,16 +404,17 @@ impl SimpleSync { &mut self, peer_id: PeerId, request_id: RequestId, - res: Vec>, + beacon_blocks: Vec>, ) { debug!( self.log, "BeaconBlocksResponse"; "peer" => format!("{:?}", peer_id), - "count" => res.block_bodies.len(), + "count" => beacon_blocks.len(), ); - self.import_manager.recent_blocks_response(peer_id, request_id, blocks); + self.manager + .recent_blocks_response(peer_id, request_id, beacon_blocks); self.process_sync(); } @@ -368,19 +424,13 @@ impl SimpleSync { /// Attempts to apply to block to the beacon chain. May queue the block for later processing. /// /// Returns a `bool` which, if `true`, indicates we should forward the block to our peers. - pub fn on_block_gossip( - &mut self, - peer_id: PeerId, - block: BeaconBlock, - ) -> bool { - if let Some(outcome) = - self.process_block(peer_id.clone(), block.clone(), network, &"gossip") - { + pub fn on_block_gossip(&mut self, peer_id: PeerId, block: BeaconBlock) -> bool { + if let Ok(outcome) = self.chain.process_block(block.clone()) { match outcome { BlockProcessingOutcome::Processed { .. } => SHOULD_FORWARD_GOSSIP_BLOCK, - BlockProcessingOutcome::ParentUnknown { parent } => { + BlockProcessingOutcome::ParentUnknown { parent: _ } => { // Inform the sync manager to find parents for this block - self.import_manager.add_unknown_block(block.clone()); + self.manager.add_unknown_block(block.clone(), peer_id); SHOULD_FORWARD_GOSSIP_BLOCK } BlockProcessingOutcome::FutureSlot { @@ -401,12 +451,7 @@ impl SimpleSync { /// Process a gossip message declaring a new attestation. /// /// Not currently implemented. - pub fn on_attestation_gossip( - &mut self, - _peer_id: PeerId, - msg: Attestation, - _network: &mut NetworkContext, - ) { + pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation) { match self.chain.process_attestation(msg) { Ok(outcome) => info!( self.log, @@ -420,39 +465,74 @@ impl SimpleSync { } } - -/* - /// Returns `true` if `self.chain` has not yet processed this block. - pub fn chain_has_seen_block(&self, block_root: &Hash256) -> bool { - !self - .chain - .is_new_block_root(&block_root) - .unwrap_or_else(|_| { - error!(self.log, "Unable to determine if block is new."); - false - }) - } - */ - /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { hello_message(&self.chain) } - } /// Build a `HelloMessage` representing the state of the given `beacon_chain`. fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { - let spec = &beacon_chain.spec; let state = &beacon_chain.head().beacon_state; HelloMessage { - network_id: spec.network_id, - //TODO: Correctly define the chain id - chain_id: spec.network_id as u64, - latest_finalized_root: state.finalized_checkpoint.root, - latest_finalized_epoch: state.finalized_checkpoint.epoch, - best_root: beacon_chain.head().beacon_block_root, - best_slot: state.slot, + fork_version: state.fork.current_version, + finalized_root: state.finalized_checkpoint.root, + finalized_epoch: state.finalized_checkpoint.epoch, + head_root: beacon_chain.head().beacon_block_root, + head_slot: state.slot, + } +} + +/// Wraps a Network Channel to employ various RPC/Sync related network functionality. +pub struct NetworkContext { + /// The network channel to relay messages to the Network service. + network_send: mpsc::UnboundedSender, + /// Logger for the `NetworkContext`. + log: slog::Logger, +} + +impl NetworkContext { + pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { + Self { network_send, log } + } + + pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + self.send_rpc_request(peer_id, RPCRequest::Goodbye(reason)) + // TODO: disconnect peers. + } + + pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { + // Note: There is currently no use of keeping track of requests. However the functionality + // is left here for future revisions. + self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request)); + } + + //TODO: Handle Error responses + pub fn send_rpc_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + rpc_response: RPCResponse, + ) { + self.send_rpc_event( + peer_id, + RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), + ); + } + + fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.send(peer_id, OutgoingMessage::RPC(rpc_event)) + } + + fn send(&mut self, peer_id: PeerId, outgoing_message: OutgoingMessage) { + self.network_send + .try_send(NetworkMessage::Send(peer_id, outgoing_message)) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send RPC message to the network service" + ) + }); } }