diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 44b4e655b..49e5dbeb5 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -68,6 +68,7 @@ impl Discovery { info!(log, "Local ENR: {}", local_enr.to_base64()); debug!(log, "Local Node Id: {}", local_enr.node_id()); + debug!(log, "Local ENR seq: {}", local_enr.seq()); let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address) .map_err(|e| format!("Discv5 service failed: {:?}", e))?; diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index f2f4c085b..639a8a730 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -113,11 +113,6 @@ where resp_code_byte.copy_from_slice(&resp_byte); let resp_code = u8::from_be_bytes(resp_code_byte); - - if let Some(response) = RPCErrorResponse::internal_data(resp_code) { - self.response_code = None; - return Ok(Some(response)); - } self.response_code = Some(resp_code); resp_code } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 8a62b87c5..c73435a9f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -2,7 +2,7 @@ use ssz::{impl_decode_via_from, impl_encode_via_from}; use ssz_derive::{Decode, Encode}; -use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; +use types::{BeaconBlockBody, Epoch, Hash256, Slot}; /* Request/Response data structures for RPC methods */ @@ -136,11 +136,6 @@ pub struct BeaconBlockHeadersResponse { pub headers: Vec, } -#[derive(Encode, Decode, Debug)] -pub struct EncodeableBeaconBlockHeadersResponse { - pub headers: Vec, -} - /// Request a number of beacon block bodies from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesRequest { @@ -208,16 +203,6 @@ pub enum RPCErrorResponse { } impl RPCErrorResponse { - /// If a response has no payload, returns the variant corresponding to the code. - pub fn internal_data(response_code: u8) -> Option { - match response_code { - // EncodingError - 1 => Some(RPCErrorResponse::EncodingError), - // All others require further data - _ => None, - } - } - /// Used to encode the response. pub fn as_u8(&self) -> u8 { match self { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 6664b1d5c..bae9618bd 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -20,7 +20,7 @@ use tokio::util::FutureExt; /// The maximum bytes that can be sent across the RPC. const MAX_RPC_SIZE: usize = 4_194_304; // 4M /// The protocol prefix the RPC protocol id. -const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/"; +const PROTOCOL_PREFIX: &str = "/eth2/beacon_node/rpc"; /// The number of seconds to wait for a request once a protocol has been established before the stream is terminated. const REQUEST_TIMEOUT: u64 = 3; @@ -72,16 +72,16 @@ impl ProtocolId { pub fn from_bytes(bytes: &[u8]) -> Result { let protocol_string = String::from_utf8(bytes.to_vec()) .map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?; - let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(5).collect(); + let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(7).collect(); - if protocol_list.len() != 5 { + if protocol_list.len() != 7 { return Err(RPCError::InvalidProtocol("Not enough '/'")); } Ok(ProtocolId { - message_name: protocol_list[3].into(), - version: protocol_list[4].into(), - encoding: protocol_list[5].into(), + message_name: protocol_list[4].into(), + version: protocol_list[5].into(), + encoding: protocol_list[6].into(), }) } } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 0f67f63a9..239547078 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -10,12 +10,11 @@ use eth2_libp2p::{ }; use futures::future::Future; use futures::stream::Stream; -use slog::{debug, error, warn}; -use ssz::Decode; -use std::collections::HashMap; +use slog::{debug, warn}; +use ssz::{Decode, DecodeError}; use std::sync::Arc; -use std::time::Instant; use tokio::sync::mpsc; +use types::BeaconBlockHeader; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -97,8 +96,6 @@ impl MessageHandler { HandlerMessage::PubsubMessage(peer_id, gossip) => { self.handle_gossip(peer_id, *gossip); } - //TODO: Handle all messages - _ => {} } } @@ -115,7 +112,6 @@ impl MessageHandler { /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { - // TODO: process the `id`. match request { RPCRequest::Hello(hello_message) => self.sync.on_hello_request( peer_id, @@ -158,30 +154,6 @@ impl MessageHandler { id: RequestId, error_response: RPCErrorResponse, ) { - //TODO: Potentially do not need to keep track of this at all. This has all been shifted - //into libp2p stack. Tracking Id's will only be necessary if a response is important - //relative to a specific request. Note: BeaconBlockBodies already returns with the data - //associated with its request. - // Currently leave this here for testing, to ensure it is redundant. - if self - .network_context - .outstanding_outgoing_request_ids - .remove(&(peer_id.clone(), id)) - .is_none() - { - // This should never happen. The RPC layer handles all timeouts and ensures a response - // matches a request. - debug_assert!(false); - - error!( - self.log, - "Unknown ResponseId for incoming RPCRequest"; - "peer" => format!("{:?}", peer_id), - "request_id" => format!("{}", id) - ); - return; - } - // an error could have occurred. // TODO: Handle Error gracefully match error_response { @@ -214,25 +186,31 @@ impl MessageHandler { ); } RPCResponse::BeaconBlockHeaders(response) => { - if let Some(decoded_block_headers) = self.decode_block_headers(response) { - self.sync.on_beacon_block_headers_response( - peer_id, - decoded_block_headers, - &mut self.network_context, - ); - } else { - warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id)) + match self.decode_block_headers(response) { + Ok(decoded_block_headers) => { + self.sync.on_beacon_block_headers_response( + peer_id, + decoded_block_headers, + &mut self.network_context, + ); + } + Err(_e) => { + warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id)) + } } } RPCResponse::BeaconBlockBodies(response) => { - if let Some(decoded_block_bodies) = self.decode_block_bodies(response) { - self.sync.on_beacon_block_bodies_response( - peer_id, - decoded_block_bodies, - &mut self.network_context, - ); - } else { - warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id)) + match self.decode_block_bodies(response) { + Ok(decoded_block_bodies) => { + self.sync.on_beacon_block_bodies_response( + peer_id, + decoded_block_bodies, + &mut self.network_context, + ); + } + Err(_e) => { + warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id)) + } } } RPCResponse::BeaconChainState(_) => { @@ -252,27 +230,24 @@ impl MessageHandler { fn decode_block_bodies( &self, bodies_response: BeaconBlockBodiesResponse, - ) -> Option { + ) -> Result { //TODO: Implement faster block verification before decoding entirely - if let Ok(simple_decoded_bodies) = simple_decoded_bodies { - Some(DecodedBeaconBlockBodiesResponse { - block_roots: bodies_response - .block_roots - .expect("Responses must have associated roots"), - block_bodies: Vec::from_ssz_bytes(&bodies_response.block_bodies).unwrap(), - }) - } else { - None - } + let block_bodies = Vec::from_ssz_bytes(&bodies_response.block_bodies)?; + Ok(DecodedBeaconBlockBodiesResponse { + block_roots: bodies_response + .block_roots + .expect("Responses must have associated roots"), + block_bodies, + }) } /// Verifies and decodes the ssz-encoded block headers received from peers. fn decode_block_headers( &self, headers_response: BeaconBlockHeadersResponse, - ) -> Option { + ) -> Result, DecodeError> { //TODO: Implement faster header verification before decoding entirely - EncodeableBeaconBlockHeadersResponse::from_ssz_bytes(&headers_response.headers).ok() + Vec::from_ssz_bytes(&headers_response.headers) } /// Handle various RPC errors @@ -297,25 +272,17 @@ impl MessageHandler { } } +// TODO: RPC Rewrite makes this struct fairly pointless pub struct NetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender, - /// A mapping of peers and the RPC id we have sent an RPC request to. - outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>, - /// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`. - outgoing_request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } impl NetworkContext { pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { - Self { - network_send, - outstanding_outgoing_request_ids: HashMap::new(), - outgoing_request_ids: HashMap::new(), - log, - } + Self { network_send, log } } pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { @@ -324,12 +291,9 @@ impl NetworkContext { } pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { - let id = self.generate_request_id(&peer_id); - - self.outstanding_outgoing_request_ids - .insert((peer_id.clone(), id), Instant::now()); - - self.send_rpc_event(peer_id, RPCEvent::Request(id, rpc_request)); + // Note: There is currently no use of keeping track of requests. However the functionality + // is left here for future revisions. + self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request)); } //TODO: Handle Error responses @@ -359,15 +323,4 @@ impl NetworkContext { ) }); } - - /// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`. - fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId { - let next_id = self - .outgoing_request_ids - .entry(peer_id.clone()) - .and_modify(|id| *id += 1) - .or_insert_with(|| 0); - - *next_id - } } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index e4115ff37..91594b999 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -422,11 +422,7 @@ impl SimpleSync { .collect(); // ssz-encode the headers - //TODO: Make this more elegant - let headers = { - let resp = EncodeableBeaconBlockHeadersResponse { headers }; - resp.as_ssz_bytes() - }; + let headers = headers.as_ssz_bytes(); network.send_rpc_response( peer_id, @@ -439,17 +435,17 @@ impl SimpleSync { pub fn on_beacon_block_headers_response( &mut self, peer_id: PeerId, - res: EncodeableBeaconBlockHeadersResponse, + headers: Vec, network: &mut NetworkContext, ) { debug!( self.log, "BlockHeadersResponse"; "peer" => format!("{:?}", peer_id), - "count" => res.headers.len(), + "count" => headers.len(), ); - if res.headers.is_empty() { + if headers.is_empty() { warn!( self.log, "Peer returned empty block headers response. PeerId: {:?}", peer_id @@ -459,9 +455,7 @@ impl SimpleSync { // Enqueue the headers, obtaining a list of the roots of the headers which were newly added // to the queue. - let block_roots = self - .import_queue - .enqueue_headers(res.headers, peer_id.clone()); + let block_roots = self.import_queue.enqueue_headers(headers, peer_id.clone()); if !block_roots.is_empty() { self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); @@ -503,7 +497,7 @@ impl SimpleSync { "returned" => block_bodies.len(), ); - let bytes = block_bodes.as_ssz_bytes(); + let bytes = block_bodies.as_ssz_bytes(); network.send_rpc_response( peer_id,