diff --git a/beacon_node/libp2p/src/rpc/methods.rs b/beacon_node/libp2p/src/rpc/methods.rs index f05ade7ff..381fc8b01 100644 --- a/beacon_node/libp2p/src/rpc/methods.rs +++ b/beacon_node/libp2p/src/rpc/methods.rs @@ -1,4 +1,3 @@ -use beacon_chain::parking_lot::RwLockReadGuard; /// Available RPC methods types and ids. use ssz_derive::{Decode, Encode}; use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; @@ -115,75 +114,75 @@ pub struct HelloMessage { } /// Request a number of beacon block roots from a peer. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockRootsRequest { /// The starting slot of the requested blocks. - start_slot: Slot, + pub start_slot: Slot, /// The number of blocks from the start slot. - count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers + pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers } /// Response containing a number of beacon block roots from a peer. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockRootsResponse { /// List of requested blocks and associated slots. - roots: Vec, + pub roots: Vec, } /// Contains a block root and associated slot. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlockRootSlot { /// The block root. - block_root: Hash256, + pub block_root: Hash256, /// The block slot. - slot: Slot, + pub slot: Slot, } /// Request a number of beacon block headers from a peer. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockHeadersRequest { /// The starting header hash of the requested headers. - start_root: Hash256, + pub start_root: Hash256, /// The starting slot of the requested headers. - start_slot: Slot, + pub start_slot: Slot, /// The maximum number of headers than can be returned. - max_headers: u64, + pub max_headers: u64, /// The maximum number of slots to skip between blocks. - skip_slots: u64, + pub skip_slots: u64, } /// Response containing requested block headers. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockHeadersResponse { /// The list of requested beacon block headers. - headers: Vec, + pub headers: Vec, } /// Request a number of beacon block bodies from a peer. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesRequest { /// The list of beacon block bodies being requested. - block_roots: Hash256, + pub block_roots: Hash256, } /// Response containing the list of requested beacon block bodies. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesResponse { /// The list of beacon block bodies being requested. - block_bodies: Vec, + pub block_bodies: Vec, } /// Request values for tree hashes which yield a blocks `state_root`. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconChainStateRequest { /// The tree hashes that a value is requested for. - hashes: Vec, + pub hashes: Vec, } /// Request values for tree hashes which yield a blocks `state_root`. // Note: TBD -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconChainStateResponse { /// The values corresponding the to the requested tree hashes. - values: bool, //TBD - stubbed with encodeable bool + pub values: bool, //TBD - stubbed with encodeable bool } diff --git a/beacon_node/libp2p/src/rpc/mod.rs b/beacon_node/libp2p/src/rpc/mod.rs index a1cfadafe..a1573ec93 100644 --- a/beacon_node/libp2p/src/rpc/mod.rs +++ b/beacon_node/libp2p/src/rpc/mod.rs @@ -2,7 +2,7 @@ /// /// This is purpose built for Ethereum 2.0 serenity and the protocol listens on /// `/eth/serenity/rpc/1.0.0` -mod methods; +pub mod methods; mod protocol; use futures::prelude::*; diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 8e390b4af..afd407abe 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -6,7 +6,7 @@ use crossbeam_channel::{unbounded as channel, Sender}; use futures::future; use libp2p::{ rpc::{RPCRequest, RPCResponse}, - HelloMessage, PeerId, RPCEvent, + PeerId, RPCEvent, }; use slog::debug; use slog::warn; @@ -85,7 +85,7 @@ impl MessageHandler { match message { // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { - self.sync.on_connect(&peer_id, &mut self.network_context); + self.sync.on_connect(peer_id, &mut self.network_context); } // we have received an RPC message request/response HandlerMessage::RPC(peer_id, rpc_event) => { @@ -113,7 +113,7 @@ impl MessageHandler { match request { RPCRequest::Hello(hello_message) => { self.sync - .on_hello(&peer_id, hello_message, &mut self.network_context) + .on_hello(peer_id, hello_message, &mut self.network_context) } // TODO: Handle all requests _ => {} @@ -136,26 +136,13 @@ impl MessageHandler { match response { RPCResponse::Hello(hello_message) => { debug!(self.log, "Hello response received from peer: {:?}", peer_id); - self.validate_hello(peer_id, hello_message); + self.sync + .on_hello(peer_id, hello_message, &mut self.network_context); } // TODO: Handle all responses _ => {} } } - - /// Validate a HELLO RPC message. - fn validate_hello(&mut self, peer_id: PeerId, message: HelloMessage) { - self.sync - .on_hello(&peer_id, message.clone(), &mut self.network_context); - // validate the peer - if !self.sync.validate_peer(peer_id.clone(), message) { - debug!( - self.log, - "Peer dropped due to mismatching HELLO messages: {:?}", peer_id - ); - //TODO: block/ban the peer - } - } } pub struct NetworkContext { @@ -179,6 +166,10 @@ impl NetworkContext { } } + pub fn disconnect(&self, _peer_id: PeerId) { + // TODO: disconnect peers. + } + pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { let id = self.generate_request_id(&peer_id); self.send_rpc_event( diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d01188f5d..c3045d280 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -6,7 +6,6 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use futures::prelude::*; use futures::sync::oneshot; use futures::Stream; -use libp2p::rpc::RPCResponse; use libp2p::RPCEvent; use libp2p::Service as LibP2PService; use libp2p::{Libp2pEvent, PeerId}; diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index ae6a9e7a1..0d75f3739 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -1,8 +1,9 @@ use crate::beacon_chain::BeaconChain; -use crate::message_handler::{MessageHandler, NetworkContext}; +use crate::message_handler::NetworkContext; use crate::service::NetworkMessage; use crossbeam_channel::Sender; -use libp2p::rpc::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +use libp2p::rpc::methods::*; +use libp2p::rpc::{RPCRequest, RPCResponse}; use libp2p::PeerId; use slog::{debug, o}; use std::collections::HashMap; @@ -15,6 +16,7 @@ type NetworkSender = Sender; const SLOT_IMPORT_TOLERANCE: u64 = 100; /// Keeps track of syncing information for known connected peers. +#[derive(Clone, Copy)] pub struct PeerSyncInfo { latest_finalized_root: Hash256, latest_finalized_epoch: Epoch, @@ -23,18 +25,37 @@ pub struct PeerSyncInfo { } impl PeerSyncInfo { - pub fn is_on_chain(&self, chain: &Arc) -> bool { + fn is_on_chain(&self, chain: &Arc) -> bool { // TODO: make useful. true } - pub fn has_higher_finalized_epoch(&self, chain: &Arc) -> bool { + fn has_higher_finalized_epoch(&self, chain: &Arc) -> bool { self.latest_finalized_epoch > chain.get_state().finalized_epoch } - pub fn has_higher_best_slot(&self, chain: &Arc) -> bool { + fn has_higher_best_slot(&self, chain: &Arc) -> bool { self.latest_finalized_epoch > chain.get_state().finalized_epoch } + + pub fn status(&self, chain: &Arc) -> PeerStatus { + if self.has_higher_finalized_epoch(chain) { + PeerStatus::HigherFinalizedEpoch + } else if !self.is_on_chain(chain) { + PeerStatus::HigherFinalizedEpoch + } else if self.has_higher_best_slot(chain) { + PeerStatus::HigherBestSlot + } else { + PeerStatus::NotInteresting + } + } +} + +pub enum PeerStatus { + OnDifferentChain, + HigherFinalizedEpoch, + HigherBestSlot, + NotInteresting, } impl From for PeerSyncInfo { @@ -91,16 +112,13 @@ impl SimpleSync { } } - pub fn on_connect(&self, peer_id: &PeerId, network: &mut NetworkContext) { - network.send_rpc_request( - peer_id.clone(), - RPCRequest::Hello(self.chain.hello_message()), - ); + pub fn on_connect(&self, peer_id: PeerId, network: &mut NetworkContext) { + network.send_rpc_request(peer_id, RPCRequest::Hello(self.chain.hello_message())); } pub fn on_hello_request( - &self, - peer_id: &PeerId, + &mut self, + peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext, ) { @@ -111,97 +129,63 @@ impl SimpleSync { self.on_hello(peer_id, hello, network); } - pub fn on_hello(&self, peer_id: &PeerId, hello: HelloMessage, network: &mut NetworkContext) { + pub fn on_hello(&mut self, peer_id: PeerId, hello: HelloMessage, network: &mut NetworkContext) { + let spec = self.chain.get_spec(); + // network id must match if hello.network_id != self.network_id { debug!(self.log, "Bad network id. Peer: {:?}", peer_id); + network.disconnect(peer_id); return; } let peer = PeerSyncInfo::from(hello); - - /* - if peer.has_higher_finalized_epoch(&self.chain) { - // we need blocks - let peer_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch); - let our_slot = self.chain.finalized_epoch(); - let required_slots = peer_slot - our_slot; - } else { - if !peer.is_on_chain(&self.chain) { - return (true, responses); - } - // - } - */ - - /* - // compare latest epoch and finalized root to see if they exist in our chain - if peer_info.latest_finalized_epoch <= self.latest_finalized_epoch { - // ensure their finalized root is in our chain - // TODO: Get the finalized root at hello_message.latest_epoch and ensure they match - //if (hello_message.latest_finalized_root == self.chain.get_state() { - // return false; - // } - } - - // the client is valid, add it to our list of known_peers and request sync if required - // update peer list if peer already exists - let peer_info = PeerSyncInfo::from(hello); - debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); - self.known_peers.insert(peer_id, peer_info); + self.known_peers.insert(peer_id.clone(), peer); - // set state to sync - if self.state == SyncState::Idle - && hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE - { + match peer.status(&self.chain) { + PeerStatus::OnDifferentChain => { + debug!(self.log, "Peer is on different chain. Peer: {:?}", peer_id); + + network.disconnect(peer_id); + } + PeerStatus::HigherFinalizedEpoch => { + let start_slot = peer.latest_finalized_epoch.start_slot(spec.slots_per_epoch); + let required_slots = start_slot - self.chain.slot(); + + self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); + } + PeerStatus::HigherBestSlot => { + let start_slot = peer.best_slot; + let required_slots = start_slot - self.chain.slot(); + + self.request_block_roots(peer_id, start_slot, required_slots.as_u64(), network); + } + PeerStatus::NotInteresting => {} + } + } + + fn request_block_roots( + &mut self, + peer_id: PeerId, + start_slot: Slot, + count: u64, + network: &mut NetworkContext, + ) { + // Potentially set state to sync. + if self.state == SyncState::Idle && count > SLOT_IMPORT_TOLERANCE { self.state = SyncState::Downloading; - //TODO: Start requesting blocks from known peers. Ideally in batches } - true - */ + // TODO: handle count > max count. + network.send_rpc_request( + peer_id.clone(), + RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest { start_slot, count }), + ); } /// Generates our current state in the form of a HELLO RPC message. pub fn generate_hello(&self) -> HelloMessage { self.chain.hello_message() } - - pub fn validate_peer(&mut self, peer_id: PeerId, hello_message: HelloMessage) -> bool { - // network id must match - if hello_message.network_id != self.network_id { - return false; - } - // compare latest epoch and finalized root to see if they exist in our chain - if hello_message.latest_finalized_epoch <= self.latest_finalized_epoch { - // ensure their finalized root is in our chain - // TODO: Get the finalized root at hello_message.latest_epoch and ensure they match - //if (hello_message.latest_finalized_root == self.chain.get_state() { - // return false; - // } - } - - // the client is valid, add it to our list of known_peers and request sync if required - // update peer list if peer already exists - let peer_info = PeerSyncInfo { - latest_finalized_root: hello_message.latest_finalized_root, - latest_finalized_epoch: hello_message.latest_finalized_epoch, - best_root: hello_message.best_root, - best_slot: hello_message.best_slot, - }; - - debug!(self.log, "Handshake successful. Peer: {:?}", peer_id); - self.known_peers.insert(peer_id, peer_info); - - // set state to sync - if self.state == SyncState::Idle - && hello_message.best_slot > self.latest_slot + SLOT_IMPORT_TOLERANCE - { - self.state = SyncState::Downloading; - //TODO: Start requesting blocks from known peers. Ideally in batches - } - - true - } } diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index dc0832fed..b5635cf8c 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -1,5 +1,6 @@ use beacon_chain::test_utils::TestingBeaconChainBuilder; use crossbeam_channel::{unbounded, Receiver, Sender}; +use libp2p::rpc::methods::*; use libp2p::rpc::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; use libp2p::{PeerId, RPCEvent}; use network::beacon_chain::BeaconChain as NetworkBeaconChain; @@ -9,6 +10,7 @@ use sloggers::terminal::{Destination, TerminalLoggerBuilder}; use sloggers::types::Severity; use sloggers::Build; use std::sync::Arc; +use std::time::Duration; use test_harness::BeaconChainHarness; use tokio::runtime::TaskExecutor; use types::{test_utils::TestingBeaconStateBuilder, *}; @@ -42,7 +44,9 @@ impl SyncNode { } fn recv(&self) -> NetworkMessage { - self.receiver.recv().unwrap() + self.receiver + .recv_timeout(Duration::from_millis(500)) + .unwrap() } fn recv_rpc_response(&self) -> RPCResponse { @@ -106,6 +110,12 @@ impl SyncMaster { } } + pub fn build_blocks(&mut self, blocks: usize) { + for _ in 0..blocks { + self.harness.advance_chain_with_block(); + } + } + pub fn response_id(&mut self, node: &SyncNode) -> u64 { let id = self.response_ids[node.id]; self.response_ids[node.id] += 1; @@ -140,6 +150,17 @@ impl SyncMaster { } } +fn assert_sent_block_root_request(node: &SyncNode, expected: BeaconBlockRootsRequest) { + let request = node.recv_rpc_request(); + + match request { + RPCRequest::BeaconBlockRoots(response) => { + assert_eq!(expected, response, "Bad block roots response"); + } + _ => assert!(false, "Did not get block root request"), + } +} + fn test_setup( state_builder: TestingBeaconStateBuilder, node_count: usize, @@ -178,7 +199,17 @@ fn first_test() { let (runtime, mut master, nodes) = test_setup(state_builder, node_count, &spec, logger.clone()); + master.build_blocks(10); + master.do_hello_with(&nodes[0]); + assert_sent_block_root_request( + &nodes[0], + BeaconBlockRootsRequest { + start_slot: Slot::new(1), + count: 10, + }, + ); + runtime.shutdown_now(); }