diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 542060014..1bff58ecd 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -109,6 +109,9 @@ impl NetworkBehaviourEventProcess { self.events.push(BehaviourEvent::PeerDialed(peer_id)) } + RPCMessage::PeerDisconnected(peer_id) => { + self.events.push(BehaviourEvent::PeerDisconnected(peer_id)) + } RPCMessage::RPC(peer_id, rpc_event) => { self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) } @@ -174,6 +177,7 @@ impl Behaviour { pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), + PeerDisconnected(PeerId), GossipMessage { source: PeerId, topics: Vec, diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 466fc5c3d..f2f4c085b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -5,7 +5,7 @@ use bytes::BufMut; use bytes::BytesMut; use tokio::codec::{Decoder, Encoder}; -pub(crate) trait OutboundCodec: Encoder + Decoder { +pub trait OutboundCodec: Encoder + Decoder { type ErrorType; fn decode_error( @@ -14,7 +14,7 @@ pub(crate) trait OutboundCodec: Encoder + Decoder { ) -> Result, ::Error>; } -pub(crate) struct BaseInboundCodec +pub struct BaseInboundCodec where TCodec: Encoder + Decoder, { @@ -31,7 +31,7 @@ where } } -pub(crate) struct BaseOutboundCodec +pub struct BaseOutboundCodec where TOutboundCodec: OutboundCodec, { @@ -109,7 +109,7 @@ where debug_assert!(!src.is_empty()); let resp_byte = src.split_to(1); - let resp_code_byte = [0; 1]; + let mut resp_code_byte = [0; 1]; resp_code_byte.copy_from_slice(&resp_byte); let resp_code = u8::from_be_bytes(resp_code_byte); diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index e339007d7..a78ac8f7f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -18,7 +18,7 @@ pub struct SSZInboundCodec { impl SSZInboundCodec { pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { - let uvi_codec = UviBytes::default(); + let mut uvi_codec = UviBytes::default(); uvi_codec.set_max_len(max_packet_size); // this encoding only applies to ssz. @@ -41,7 +41,6 @@ impl Encoder for SSZInboundCodec { RPCErrorResponse::Success(resp) => { match resp { RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes @@ -80,7 +79,9 @@ impl Decoder for SSZInboundCodec { _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), }, "goodbye" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?))), + "1.0.0" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( + &packet, + )?))), _ => Err(RPCError::InvalidProtocol( "Unknown GOODBYE version.as_str()", )), @@ -117,6 +118,7 @@ impl Decoder for SSZInboundCodec { "Unknown BEACON_CHAIN_STATE version.", )), }, + _ => Err(RPCError::InvalidProtocol("Unknown message name.")), }, Ok(None) => Ok(None), Err(e) => Err(e), @@ -133,7 +135,7 @@ pub struct SSZOutboundCodec { impl SSZOutboundCodec { pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { - let uvi_codec = UviBytes::default(); + let mut uvi_codec = UviBytes::default(); uvi_codec.set_max_len(max_packet_size); // this encoding only applies to ssz. @@ -204,6 +206,8 @@ impl Decoder for SSZOutboundCodec { "1.0.0" => Ok(Some(RPCResponse::BeaconBlockBodies( BeaconBlockBodiesResponse { block_bodies: packet.to_vec(), + // this gets filled in the protocol handler + block_roots: None, }, ))), _ => Err(RPCError::InvalidProtocol( diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 02b82adea..df8769122 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,11 +1,13 @@ -use super::protocol::{ProtocolId, RPCError, RPCProtocol, RPCRequest}; +use super::methods::{RPCErrorResponse, RPCResponse, RequestId}; +use super::protocol::{RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; +use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::core::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use libp2p::core::upgrade::{self, InboundUpgrade, OutboundUpgrade, WriteOne}; +use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; use smallvec::SmallVec; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -14,7 +16,10 @@ use tokio_io::{AsyncRead, AsyncWrite}; pub const RESPONSE_TIMEOUT: u64 = 9; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler { +pub struct RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, @@ -25,16 +30,19 @@ pub struct RPCHandler { events_out: SmallVec<[RPCEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[(usize, RPCRequest); 4]>, + dial_queue: SmallVec<[RPCEvent; 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, /// Map of current substreams awaiting a response to an RPC request. - waiting_substreams: FnvHashMap>, + waiting_substreams: FnvHashMap>, + + /// List of outbound substreams that need to be driven to completion. + substreams: Vec>, /// Sequential Id for waiting substreams. - current_substream_id: usize, + current_substream_id: RequestId, /// Maximum number of concurrent outbound substreams being opened. Value is never modified. max_dial_negotiated: u32, @@ -46,22 +54,40 @@ pub struct RPCHandler { inactive_timeout: Duration, } -/// State of an outbound substream. Either waiting for a response, or in the process of sending. -pub enum SubstreamState { - /// An outbound substream is waiting a response from the user. - WaitingResponse { - /// The negotiated substream. - substream: upgrade::Negotiated, - /// The protocol that was negotiated. - negotiated_protocol: ProtocolId, - /// The time until we close the substream. - timeout: Instant, - }, - /// A response has been sent and we are waiting for the stream to close. - PendingWrite(WriteOne, Vec>), +/// An outbound substream is waiting a response from the user. +struct WaitingResponse { + /// The framed negotiated substream. + substream: InboundFramed, + /// The time when the substream is closed. + timeout: Instant, } -impl RPCHandler { +/// State of an outbound substream. Either waiting for a response, or in the process of sending. +pub enum SubstreamState +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// A response has been sent, pending writing and flush. + ResponsePendingSend { + substream: futures::sink::Send>, + }, + /// A request has been sent, and we are awaiting a response. This future is driven in the + /// handler because GOODBYE requests can be handled and responses dropped instantly. + RequestPendingResponse { + /// The framed negotiated substream. + substream: OutboundFramed, + /// Keeps track of the request id and the request to permit forming advanced responses which require + /// data from the request. + rpc_event: RPCEvent, + /// The time when the substream is closed. + timeout: Instant, + }, +} + +impl RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ pub fn new( listen_protocol: SubstreamProtocol, inactive_timeout: Duration, @@ -73,7 +99,8 @@ impl RPCHandler { dial_queue: SmallVec::new(), dial_negotiated: 0, waiting_substreams: FnvHashMap::default(), - current_substream_id: 0, + substreams: Vec::new(), + current_substream_id: 1, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, inactive_timeout, @@ -101,15 +128,19 @@ impl RPCHandler { &mut self.listen_protocol } - /// Opens an outbound substream with `upgrade`. + /// Opens an outbound substream with a request. #[inline] - pub fn send_request(&mut self, request_id: usize, upgrade: RPCRequest) { + pub fn send_request(&mut self, rpc_event: RPCEvent) { self.keep_alive = KeepAlive::Yes; - self.dial_queue.push((request_id, upgrade)); + + self.dial_queue.push(rpc_event); } } -impl Default for RPCHandler { +impl Default for RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ fn default() -> Self { RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) } @@ -125,7 +156,7 @@ where type Substream = TSubstream; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; - type OutboundOpenInfo = usize; // request_id + type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request #[inline] fn listen_protocol(&self) -> SubstreamProtocol { @@ -137,7 +168,7 @@ where &mut self, out: >::Output, ) { - let (substream, req, negotiated_protocol) = out; + let (req, substream) = out; // drop the stream and return a 0 id for goodbye "requests" if let r @ RPCRequest::Goodbye(_) = req { self.events_out.push(RPCEvent::Request(0, r)); @@ -145,9 +176,8 @@ where } // New inbound request. Store the stream and tag the output. - let awaiting_stream = SubstreamState::WaitingResponse { + let awaiting_stream = WaitingResponse { substream, - negotiated_protocol, timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), }; self.waiting_substreams @@ -162,7 +192,7 @@ where fn inject_fully_negotiated_outbound( &mut self, out: >::Output, - request_id: Self::OutboundOpenInfo, + rpc_event: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -175,7 +205,18 @@ where self.keep_alive = KeepAlive::Yes; } - self.events_out.push(RPCEvent::Response(request_id, out)); + // add the stream to substreams if we expect a response, otherwise drop the stream. + if let RPCEvent::Request(id, req) = rpc_event { + if req.expect_response() { + let awaiting_stream = SubstreamState::RequestPendingResponse { + substream: out, + rpc_event: RPCEvent::Request(id, req), + timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), + }; + + self.substreams.push(awaiting_stream); + } + } } // Note: If the substream has closed due to inactivity, or the substream is in the @@ -183,25 +224,17 @@ where #[inline] fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { - RPCEvent::Request(rpc_id, req) => self.send_request(rpc_id, req), + RPCEvent::Request(_, _) => self.send_request(rpc_event), RPCEvent::Response(rpc_id, res) => { // check if the stream matching the response still exists - if let Some(waiting_stream) = self.waiting_substreams.get_mut(&rpc_id) { + if let Some(waiting_stream) = self.waiting_substreams.remove(&rpc_id) { // only send one response per stream. This must be in the waiting state. - if let SubstreamState::WaitingResponse { - substream, - negotiated_protocol, - .. - } = *waiting_stream - { - *waiting_stream = SubstreamState::PendingWrite(upgrade::write_one( - substream, - res.encode(negotiated_protocol) - .expect("Response should always be encodeable"), - )); - } + self.substreams.push(SubstreamState::ResponsePendingSend { + substream: waiting_stream.substream.send(res), + }); } } + RPCEvent::Error(_, _) => {} } } @@ -213,7 +246,6 @@ where >::Error, >, ) { - dbg!(error); if self.pending_error.is_none() { self.pending_error = Some(error); } @@ -234,30 +266,7 @@ where return Err(err); } - // prioritise sending responses for waiting substreams - self.waiting_substreams.retain(|_k, mut waiting_stream| { - match waiting_stream { - SubstreamState::PendingWrite(write_one) => { - match write_one.poll() { - Ok(Async::Ready(_socket)) => false, - Ok(Async::NotReady) => true, - Err(_e) => { - //TODO: Add logging - // throw away streams that error - false - } - } - } - SubstreamState::WaitingResponse { timeout, .. } => { - if Instant::now() > *timeout { - false - } else { - true - } - } - } - }); - + // return any events that need to be reported if !self.events_out.is_empty() { return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( self.events_out.remove(0), @@ -266,17 +275,85 @@ where self.events_out.shrink_to_fit(); } + // remove any streams that have expired + self.waiting_substreams.retain(|_k, waiting_stream| { + if Instant::now() > waiting_stream.timeout { + false + } else { + true + } + }); + + // drive streams that need to be processed + for n in (0..self.substreams.len()).rev() { + let stream = self.substreams.swap_remove(n); + match stream { + SubstreamState::ResponsePendingSend { mut substream } => { + match substream.poll() { + Ok(Async::Ready(_substream)) => {} // sent and flushed + Ok(Async::NotReady) => { + self.substreams + .push(SubstreamState::ResponsePendingSend { substream }); + } + Err(e) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(0, e), + ))) + } + } + } + SubstreamState::RequestPendingResponse { + mut substream, + rpc_event, + timeout, + } => match substream.poll() { + Ok(Async::Ready(response)) => { + if let Some(response) = response { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + build_response(rpc_event, response), + ))); + } else { + // stream closed early + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error( + rpc_event.id(), + RPCError::Custom("Stream Closed Early".into()), + ), + ))); + } + } + Ok(Async::NotReady) => { + if Instant::now() < timeout { + self.substreams + .push(SubstreamState::RequestPendingResponse { + substream, + rpc_event, + timeout, + }); + } + } + Err(e) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(rpc_event.id(), e.into()), + ))) + } + }, + } + } + // establish outbound substreams if !self.dial_queue.is_empty() { if self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; - let (request_id, req) = self.dial_queue.remove(0); - return Ok(Async::Ready( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req), - info: request_id, - }, - )); + let rpc_event = self.dial_queue.remove(0); + if let RPCEvent::Request(id, req) = rpc_event { + return Ok(Async::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: RPCEvent::Request(id, req), + }, + )); + } } } else { self.dial_queue.shrink_to_fit(); @@ -284,3 +361,31 @@ where Ok(Async::NotReady) } } + +/// Given a response back from a peer and the request that sent it, construct a response to send +/// back to the user. This allows for some data manipulation of responses given requests. +fn build_response(rpc_event: RPCEvent, rpc_response: RPCErrorResponse) -> RPCEvent { + let id = rpc_event.id(); + + // handle the types of responses + match rpc_response { + RPCErrorResponse::Success(response) => { + match response { + // if the response is block roots, tag on the extra request data + RPCResponse::BeaconBlockBodies(mut resp) => { + if let RPCEvent::Request(_id, RPCRequest::BeaconBlockBodies(bodies_req)) = + rpc_event + { + resp.block_roots = Some(bodies_req.block_roots); + } + RPCEvent::Response( + id, + RPCErrorResponse::Success(RPCResponse::BeaconBlockBodies(resp)), + ) + } + _ => RPCEvent::Response(id, RPCErrorResponse::Success(response)), + } + } + _ => RPCEvent::Response(id, rpc_response), + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 9e9087f9e..02dec8025 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -2,12 +2,14 @@ use ssz::{impl_decode_via_from, impl_encode_via_from}; use ssz_derive::{Decode, Encode}; -use types::{Epoch, Hash256, Slot}; +use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; /* Request/Response data structures for RPC methods */ /* Requests */ +pub type RequestId = usize; + /// The HELLO request/response handshake message. #[derive(Encode, Decode, Clone, Debug)] pub struct HelloMessage { @@ -33,10 +35,10 @@ pub struct HelloMessage { /// The reason given for a `Goodbye` message. /// /// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`, -/// however `Goodbye::Unknown.into()` will go into `0_u64`. Therefore de-serializing then +/// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then /// re-serializing may not return the same bytes. #[derive(Debug, Clone)] -pub enum Goodbye { +pub enum GoodbyeReason { /// This node has shutdown. ClientShutdown = 1, @@ -50,25 +52,25 @@ pub enum Goodbye { Unknown = 0, } -impl From for Goodbye { - fn from(id: u64) -> Goodbye { +impl From for GoodbyeReason { + fn from(id: u64) -> GoodbyeReason { match id { - 1 => Goodbye::ClientShutdown, - 2 => Goodbye::IrreleventNetwork, - 3 => Goodbye::Fault, - _ => Goodbye::Unknown, + 1 => GoodbyeReason::ClientShutdown, + 2 => GoodbyeReason::IrreleventNetwork, + 3 => GoodbyeReason::Fault, + _ => GoodbyeReason::Unknown, } } } -impl Into for Goodbye { +impl Into for GoodbyeReason { fn into(self) -> u64 { self as u64 } } -impl_encode_via_from!(Goodbye, u64); -impl_decode_via_from!(Goodbye, u64); +impl_encode_via_from!(GoodbyeReason, u64); +impl_decode_via_from!(GoodbyeReason, u64); /// Request a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] @@ -134,6 +136,11 @@ pub struct BeaconBlockHeadersResponse { pub headers: Vec, } +#[derive(Encode, Decode, Debug)] +pub struct EncodeableBeaconBlockHeadersResponse { + pub headers: Vec, +} + /// Request a number of beacon block bodies from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesRequest { @@ -144,10 +151,28 @@ pub struct BeaconBlockBodiesRequest { /// Response containing the list of requested beacon block bodies. #[derive(Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesResponse { + /// The list of hashes that were sent in the request and match these roots response. None when + /// sending outbound. + pub block_roots: Option>, /// The list of ssz-encoded beacon block bodies being requested. pub block_bodies: Vec, } +/// The decoded version of `BeaconBlockBodiesResponse` which is expected in `SimpleSync`. +pub struct DecodedBeaconBlockBodiesResponse { + /// The list of hashes sent in the request to get this response. + pub block_roots: Vec, + /// The valid decoded block bodies. + pub block_bodies: Vec, +} + +//TODO: Build a cleaner API for this encoding/decoding +/// This only exists to encode/decode beacon block bodies according to the wire protocol. +#[derive(Encode, Decode)] +pub struct EncodeableBeaconBlockBodiesResponse { + pub block_bodies: Vec, +} + /// Request values for tree hashes which yield a blocks `state_root`. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconChainStateRequest { @@ -170,8 +195,6 @@ pub struct BeaconChainStateResponse { pub enum RPCResponse { /// A HELLO message. Hello(HelloMessage), - /// An empty field returned from sending a GOODBYE request. - Goodbye, // empty value - required for protocol handler /// A response to a get BEACON_BLOCK_ROOTS request. BeaconBlockRoots(BeaconBlockRootsResponse), /// A response to a get BEACON_BLOCK_HEADERS request. @@ -182,6 +205,7 @@ pub enum RPCResponse { BeaconChainState(BeaconChainStateResponse), } +#[derive(Debug)] pub enum RPCErrorResponse { Success(RPCResponse), EncodingError, @@ -230,8 +254,14 @@ impl RPCErrorResponse { } } -#[derive(Encode, Decode)] +#[derive(Encode, Decode, Debug)] pub struct ErrorMessage { /// The UTF-8 encoded Error message string. - error_message: Vec, + pub error_message: Vec, +} + +impl ErrorMessage { + pub fn as_string(&self) -> String { + String::from_utf8(self.error_message.clone()).unwrap_or_else(|_| "".into()) + } } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index ab47b4362..f1f341908 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -11,8 +11,8 @@ use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse}; -pub use protocol::{RPCProtocol, RPCRequest}; +pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; +pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; @@ -24,15 +24,27 @@ mod protocol; // mod request_response; /// The return type used in the behaviour and the resultant event from the protocols handler. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum RPCEvent { /// A request that was received from the RPC protocol. The first parameter is a sequential /// id which tracks an awaiting substream for the response. - Request(usize, RPCRequest), + Request(RequestId, RPCRequest), /// A response that has been received from the RPC protocol. The first parameter returns /// that which was sent with the corresponding request. - Response(usize, RPCResponse), + Response(RequestId, RPCErrorResponse), + /// An Error occurred. + Error(RequestId, RPCError), +} + +impl RPCEvent { + pub fn id(&self) -> usize { + match *self { + RPCEvent::Request(id, _) => id, + RPCEvent::Response(id, _) => id, + RPCEvent::Error(id, _) => id, + } + } } /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level @@ -92,7 +104,12 @@ where } } - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { + // inform the rpc handler that the peer has disconnected + self.events.push(NetworkBehaviourAction::GenerateEvent( + RPCMessage::PeerDisconnected(peer_id.clone()), + )); + } fn inject_node_event( &mut self, @@ -126,4 +143,5 @@ where pub enum RPCMessage { RPC(PeerId, RPCEvent), PeerDialed(PeerId), + PeerDisconnected(PeerId), } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 7d24105f5..5ef2cd3c6 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -9,13 +9,10 @@ use futures::{ sink, stream, Sink, Stream, }; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use ssz::Encode; -use ssz_derive::{Decode, Encode}; use std::io; use std::time::Duration; use tokio::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::prelude::future::MapErr; use tokio::prelude::*; use tokio::timer::timeout; use tokio::util::FutureExt; @@ -24,25 +21,25 @@ use tokio::util::FutureExt; const MAX_RPC_SIZE: usize = 4_194_304; // 4M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/"; -/// The number of seconds to wait for a response before the stream is terminated. -const RESPONSE_TIMEOUT: u64 = 10; +/// The number of seconds to wait for a request once a protocol has been established before the stream is terminated. +const REQUEST_TIMEOUT: u64 = 3; /// Implementation of the `ConnectionUpgrade` for the RPC protocol. #[derive(Debug, Clone)] pub struct RPCProtocol; impl UpgradeInfo for RPCProtocol { - type Info = &'static [u8]; + type Info = RawProtocolId; type InfoIter = Vec; fn protocol_info(&self) -> Self::InfoIter { vec![ - b"/eth/serenity/rpc/hello/1.0.0/ssz", - b"/eth/serenity/rpc/goodbye/1.0.0/ssz", - b"/eth/serenity/rpc/beacon_block_roots/1.0.0/ssz", - b"/eth/serenity/rpc/beacon_block_headers/1.0.0/ssz", - b"/eth/serenity/rpc/beacon_block_bodies/1.0.0/ssz", - b"/eth/serenity/rpc/beacon_chain_state/1.0.0/ssz", + ProtocolId::new("hello", "1.0.0", "ssz").into(), + ProtocolId::new("goodbye", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_chain_state", "1.0.0", "ssz").into(), ] } } @@ -106,16 +103,18 @@ impl Into for ProtocolId { // The inbound protocol reads the request, decodes it and returns the stream to the protocol // handler to respond to once ready. -type InboundFramed = Framed, InboundCodec>; -type FnAndThen = - fn((Option, InboundFramed)) -> FutureResult; +pub type InboundOutput = (RPCRequest, InboundFramed); +pub type InboundFramed = Framed, InboundCodec>; +type FnAndThen = fn( + (Option, InboundFramed), +) -> FutureResult, RPCError>; type FnMapErr = fn(timeout::Error<(RPCError, InboundFramed)>) -> RPCError; impl InboundUpgrade for RPCProtocol where TSocket: AsyncRead + AsyncWrite, { - type Output = RPCRequest; + type Output = InboundOutput; type Error = RPCError; type Future = future::AndThen< @@ -123,31 +122,34 @@ where timeout::Timeout>>, FnMapErr, >, - FutureResult, + FutureResult, RPCError>, FnAndThen, >; fn upgrade_inbound( self, socket: upgrade::Negotiated, - protocol: &'static [u8], + protocol: RawProtocolId, ) -> Self::Future { // TODO: Verify this let protocol_id = - ProtocolId::from_bytes(protocol).expect("Can decode all supported protocols"); + ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); match protocol_id.encoding.as_str() { "ssz" | _ => { - let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096)); + let ssz_codec = + BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE)); let codec = InboundCodec::SSZ(ssz_codec); Framed::new(socket, codec) .into_future() - .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) + .timeout(Duration::from_secs(REQUEST_TIMEOUT)) .map_err(RPCError::from as FnMapErr) .and_then({ - |(madouby, _)| match madouby { - Some(x) => futures::future::ok(x), - None => futures::future::err(RPCError::Custom("Go home".into())), + |(req, stream)| match req { + Some(req) => futures::future::ok((req, stream)), + None => futures::future::err(RPCError::Custom( + "Stream terminated early".into(), + )), } } as FnAndThen) } @@ -163,7 +165,7 @@ where #[derive(Debug, Clone)] pub enum RPCRequest { Hello(HelloMessage), - Goodbye(Goodbye), + Goodbye(GoodbyeReason), BeaconBlockRoots(BeaconBlockRootsRequest), BeaconBlockHeaders(BeaconBlockHeadersRequest), BeaconBlockBodies(BeaconBlockBodiesRequest), @@ -202,28 +204,12 @@ impl RPCRequest { } } - /// Encodes the Request object based on the negotiated protocol. - pub fn encode(&self, protocol: ProtocolId) -> Result, RPCError> { - // Match on the encoding and in the future, the version - match protocol.encoding.as_str() { - "ssz" => Ok(self.ssz_encode()), - _ => { - return Err(RPCError::Custom(format!( - "Unknown Encoding: {}", - protocol.encoding - ))) - } - } - } - - fn ssz_encode(&self) -> Vec { + /// This specifies whether a stream should remain open and await a response, given a request. + /// A GOODBYE request has no response. + pub fn expect_response(&self) -> bool { match self { - RPCRequest::Hello(req) => req.as_ssz_bytes(), - RPCRequest::Goodbye(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(), - RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(), + RPCRequest::Goodbye(_) => false, + _ => true, } } } @@ -232,7 +218,7 @@ impl RPCRequest { /* Outbound upgrades */ -type OutboundFramed = Framed, OutboundCodec>; +pub type OutboundFramed = Framed, OutboundCodec>; impl OutboundUpgrade for RPCRequest where @@ -323,11 +309,11 @@ impl std::error::Error for RPCError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match *self { RPCError::ReadError(ref err) => Some(err), - RPCError::SSZDecodeError(ref err) => None, - RPCError::InvalidProtocol(ref err) => None, + RPCError::SSZDecodeError(_) => None, + RPCError::InvalidProtocol(_) => None, RPCError::IoError(ref err) => Some(err), RPCError::StreamTimeout => None, - RPCError::Custom(ref err) => None, + RPCError::Custom(_) => None, } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 69f8a1ca5..79f92c36a 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -133,6 +133,9 @@ impl Stream for Service { BehaviourEvent::PeerDialed(peer_id) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); } + BehaviourEvent::PeerDisconnected(peer_id) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerDisconnected(peer_id)))); + } }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, @@ -182,6 +185,8 @@ pub enum Libp2pEvent { RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. PeerDialed(PeerId), + /// A peer has disconnected. + PeerDisconnected(PeerId), /// Received pubsub message. PubsubMessage { source: PeerId, diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 40a396c3b..e2ea4b0e5 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -3,22 +3,19 @@ use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use beacon_chain::{BeaconChain, BeaconChainTypes}; use crossbeam_channel::{unbounded as channel, Sender}; +use eth2_libp2p::rpc::methods::*; use eth2_libp2p::{ behaviour::PubsubMessage, - rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId}, + rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId}, PeerId, RPCEvent, }; use futures::future; -use slog::{debug, warn}; +use slog::{debug, error, warn}; +use ssz::Decode; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -/// Timeout for RPC requests. -// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); -/// Timeout before banning a peer for non-identification. -// const HELLO_TIMEOUT: Duration = Duration::from_secs(30); - /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { /// Currently loaded and initialised beacon chain. @@ -32,7 +29,7 @@ pub struct MessageHandler { } /// Types of messages the handler can receive. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), @@ -87,6 +84,10 @@ impl MessageHandler { HandlerMessage::PeerDialed(peer_id) => { self.sync.on_connect(peer_id, &mut self.network_context); } + // A peer has disconnected + HandlerMessage::PeerDisconnected(peer_id) => { + self.sync.on_disconnect(peer_id); + } // we have received an RPC message request/response HandlerMessage::RPC(peer_id, rpc_event) => { self.handle_rpc_message(peer_id, rpc_event); @@ -105,9 +106,9 @@ impl MessageHandler { /// Handle RPC messages fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { match rpc_message { - RPCEvent::Request { id, body, .. // TODO: Clean up RPC Message types, have a cleaner type by this point. - } => self.handle_rpc_request(peer_id, id, body), - RPCEvent::Response { id, result, .. } => self.handle_rpc_response(peer_id, id, result), + RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), + RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), + RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), } } @@ -150,58 +151,137 @@ impl MessageHandler { /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. - fn handle_rpc_response(&mut self, peer_id: PeerId, id: RequestId, response: RPCResponse) { - // if response id is not related to a request, ignore (likely RPC timeout) + fn handle_rpc_response( + &mut self, + peer_id: PeerId, + id: RequestId, + error_response: RPCErrorResponse, + ) { + //TODO: Potentially do not need to keep track of this at all. This has all been shifted + //into libp2p stack. Tracking Id's will only be necessary if a response is important + //relative to a specific request. Note: BeaconBlockBodies already returns with the data + //associated with its request. + // Currently leave this here for testing, to ensure it is redundant. if self .network_context .outstanding_outgoing_request_ids .remove(&(peer_id.clone(), id)) .is_none() { - warn!( + // This should never happen. The RPC layer handles all timeouts and ensures a response + // matches a request. + debug_assert!(false); + + error!( self.log, "Unknown ResponseId for incoming RPCRequest"; "peer" => format!("{:?}", peer_id), - "request_id" => format!("{:?}", id) + "request_id" => format!("{}", id) ); return; } - match response { - RPCResponse::Hello(hello_message) => { - self.sync - .on_hello_response(peer_id, hello_message, &mut self.network_context); + // an error could have occurred. + // TODO: Handle Error gracefully + match error_response { + RPCErrorResponse::EncodingError => { + warn!(self.log, "Encoding Error"; "peer" => format!("{:?}", peer_id), "request_id" => format!("{}",id)) } - RPCResponse::BeaconBlockRoots(response) => { - self.sync.on_beacon_block_roots_response( - peer_id, - response, - &mut self.network_context, - ); + RPCErrorResponse::InvalidRequest(error) => { + warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string()) } - RPCResponse::BeaconBlockHeaders(response) => { - self.sync.on_beacon_block_headers_response( - peer_id, - response, - &mut self.network_context, - ); + RPCErrorResponse::ServerError(error) => { + warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Server Error" => error.as_string()) } - RPCResponse::BeaconBlockBodies(response) => { - self.sync.on_beacon_block_bodies_response( - peer_id, - response, - &mut self.network_context, - ); + RPCErrorResponse::Unknown(error) => { + warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Unknown Error" => error.as_string()) } - RPCResponse::BeaconChainState(_) => { - // We do not implement this endpoint, it is not required and will only likely be - // useful for light-client support in later phases. - // - // Theoretically, we shouldn't reach this code because we should never send a - // beacon state RPC request. - warn!(self.log, "BeaconChainState RPC call is not supported."); + RPCErrorResponse::Success(response) => { + match response { + RPCResponse::Hello(hello_message) => { + self.sync.on_hello_response( + peer_id, + hello_message, + &mut self.network_context, + ); + } + RPCResponse::BeaconBlockRoots(response) => { + self.sync.on_beacon_block_roots_response( + peer_id, + response, + &mut self.network_context, + ); + } + RPCResponse::BeaconBlockHeaders(response) => { + if let Some(decoded_block_headers) = self.decode_block_headers(response) { + self.sync.on_beacon_block_headers_response( + peer_id, + decoded_block_headers, + &mut self.network_context, + ); + } else { + warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id)) + } + } + RPCResponse::BeaconBlockBodies(response) => { + if let Some(decoded_block_bodies) = self.decode_block_bodies(response) { + self.sync.on_beacon_block_bodies_response( + peer_id, + decoded_block_bodies, + &mut self.network_context, + ); + } else { + warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id)) + } + } + RPCResponse::BeaconChainState(_) => { + // We do not implement this endpoint, it is not required and will only likely be + // useful for light-client support in later phases. + // + // Theoretically, we shouldn't reach this code because we should never send a + // beacon state RPC request. + warn!(self.log, "BeaconChainState RPC call is not supported."); + } + } } - }; + } + } + + /// Verifies and decodes the ssz-encoded block bodies received from peers. + fn decode_block_bodies( + &self, + bodies_response: BeaconBlockBodiesResponse, + ) -> Option { + //TODO: Implement faster block verification before decoding entirely + let simple_decoded_bodies = + EncodeableBeaconBlockBodiesResponse::from_ssz_bytes(&bodies_response.block_bodies); + + //TODO: Potentially improve the types used here for SSZ encoding/decoding + if let Ok(simple_decoded_bodies) = simple_decoded_bodies { + Some(DecodedBeaconBlockBodiesResponse { + block_roots: bodies_response + .block_roots + .expect("Responses must have associated roots"), + block_bodies: simple_decoded_bodies.block_bodies, + }) + } else { + None + } + } + + /// Verifies and decodes the ssz-encoded block headers received from peers. + fn decode_block_headers( + &self, + headers_response: BeaconBlockHeadersResponse, + ) -> Option { + //TODO: Implement faster header verification before decoding entirely + EncodeableBeaconBlockHeadersResponse::from_ssz_bytes(&headers_response.headers).ok() + } + + /// Handle various RPC errors + fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + //TODO: Handle error correctly + warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); } /// Handle RPC messages @@ -252,16 +332,10 @@ impl NetworkContext { self.outstanding_outgoing_request_ids .insert((peer_id.clone(), id), Instant::now()); - self.send_rpc_event( - peer_id, - RPCEvent::Request { - id, - method_id: rpc_request.method_id(), - body: rpc_request, - }, - ); + self.send_rpc_event(peer_id, RPCEvent::Request(id, rpc_request)); } + //TODO: Handle Error responses pub fn send_rpc_response( &mut self, peer_id: PeerId, @@ -270,11 +344,7 @@ impl NetworkContext { ) { self.send_rpc_event( peer_id, - RPCEvent::Response { - id: request_id, - method_id: rpc_response.method_id(), - result: rpc_response, - }, + RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), ); } @@ -291,7 +361,6 @@ impl NetworkContext { "Could not send RPC message to the network service" ) }); - // } /// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`. @@ -299,9 +368,9 @@ impl NetworkContext { let next_id = self .outgoing_request_ids .entry(peer_id.clone()) - .and_modify(RequestId::increment) - .or_insert_with(|| RequestId::from(1)); + .and_modify(|id| *id += 1) + .or_insert_with(|| 0); - next_id.previous() + *next_id } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index b2ecc1a0b..0b8b70651 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -118,13 +118,19 @@ fn network_service( trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); message_handler_send .send(HandlerMessage::RPC(peer_id, rpc_event)) - .map_err(|_| "failed to send rpc to handler")?; + .map_err(|_| "Failed to send rpc to handler")?; } Libp2pEvent::PeerDialed(peer_id) => { debug!(log, "Peer Dialed: {:?}", peer_id); message_handler_send .send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "failed to send rpc to handler")?; + .map_err(|_| "Failed to send PeerDialed to handler")?; + } + Libp2pEvent::PeerDisconnected(peer_id) => { + debug!(log, "Peer Disconnected: {:?}", peer_id); + message_handler_send + .send(HandlerMessage::PeerDisconnected(peer_id)) + .map_err(|_| "Failed to send PeerDisconnected to handler")?; } Libp2pEvent::PubsubMessage { source, message, .. @@ -176,7 +182,7 @@ fn network_service( } /// Types of messages that the network service can receive. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire @@ -189,7 +195,7 @@ pub enum NetworkMessage { } /// Type of outgoing messages that can be sent through the network service. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum OutgoingMessage { /// Send an RPC request/response. RPC(RPCEvent), diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 925221673..fd79c10f0 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -5,6 +5,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, trace, warn}; +use ssz::Encode; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -30,6 +31,7 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { network_id: u8, + chain_id: u64, latest_finalized_root: Hash256, latest_finalized_epoch: Epoch, best_root: Hash256, @@ -40,6 +42,7 @@ impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { network_id: hello.network_id, + chain_id: hello.chain_id, latest_finalized_root: hello.latest_finalized_root, latest_finalized_epoch: hello.latest_finalized_epoch, best_root: hello.best_root, @@ -107,6 +110,17 @@ impl SimpleSync { self.known_peers.remove(&peer_id); } + /// Handle a peer disconnect. + /// + /// Removes the peer from `known_peers`. + pub fn on_disconnect(&mut self, peer_id: PeerId) { + info!( + self.log, "Peer Disconnected"; + "peer" => format!("{:?}", peer_id), + ); + self.known_peers.remove(&peer_id); + } + /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. @@ -200,7 +214,7 @@ impl SimpleSync { // If we have equal or better finalized epochs and best slots, we require nothing else from // this peer. // - // We make an exception when our best slot is 0. Best slot does not indicate wether or + // We make an exception when our best slot is 0. Best slot does not indicate whether or // not there is a block at slot zero. if (remote.latest_finalized_epoch <= local.latest_finalized_epoch) && (remote.best_slot <= local.best_slot) @@ -398,6 +412,13 @@ impl SimpleSync { }) .collect(); + // ssz-encode the headers + //TODO: Make this more elegant + let headers = { + let resp = EncodeableBeaconBlockHeadersResponse { headers }; + resp.as_ssz_bytes() + }; + network.send_rpc_response( peer_id, request_id, @@ -409,7 +430,7 @@ impl SimpleSync { pub fn on_beacon_block_headers_response( &mut self, peer_id: PeerId, - res: BeaconBlockHeadersResponse, + res: EncodeableBeaconBlockHeadersResponse, network: &mut NetworkContext, ) { debug!( @@ -471,10 +492,19 @@ impl SimpleSync { "returned" => block_bodies.len(), ); + //TODO: Elegant ssz encoding. Either here or in the message handler + let bytes = { + let resp = EncodeableBeaconBlockBodiesResponse { block_bodies }; + resp.as_ssz_bytes() + }; + network.send_rpc_response( peer_id, request_id, - RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }), + RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { + block_bodies: bytes, + block_roots: None, + }), ) } @@ -482,7 +512,7 @@ impl SimpleSync { pub fn on_beacon_block_bodies_response( &mut self, peer_id: PeerId, - res: BeaconBlockBodiesResponse, + res: DecodedBeaconBlockBodiesResponse, network: &mut NetworkContext, ) { debug!( @@ -802,7 +832,9 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes let state = &beacon_chain.head().beacon_state; HelloMessage { + //TODO: Correctly define the chain/network id network_id: spec.chain_id, + chain_id: spec.chain_id as u64, latest_finalized_root: state.finalized_root, latest_finalized_epoch: state.finalized_epoch, best_root: beacon_chain.head().beacon_block_root,