diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 987f064a4..ff6c1b230 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -8,7 +8,7 @@ use tokio::runtime::TaskExecutor; use tokio::timer::Interval; /// The interval between heartbeat events. -pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 5; +pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15; /// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS` /// durations. @@ -25,19 +25,22 @@ pub fn run( Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS), ); - let _log = client.log.new(o!("Service" => "Notifier")); + let log = client.log.new(o!("Service" => "Notifier")); + + let libp2p = client.network.libp2p_service(); + + let heartbeat = move |_| { + // Notify the number of connected nodes + // Panic if libp2p is poisoned + debug!(log, ""; "Connected Peers" => libp2p.lock().swarm.connected_peers()); - let heartbeat = |_| { - // There is not presently any heartbeat logic. - // - // We leave this function empty for future use. Ok(()) }; // map error and spawn - let log = client.log.clone(); + let err_log = client.log.clone(); let heartbeat_interval = interval - .map_err(move |e| debug!(log, "Timer error {}", e)) + .map_err(move |e| debug!(err_log, "Timer error {}", e)) .for_each(heartbeat); executor.spawn(exit.until(heartbeat_interval).map(|_| ())); diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 2fbedf780..cd894797d 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -22,3 +22,8 @@ futures = "0.1.25" error-chain = "0.12.0" tokio-timer = "0.2.10" dirs = "2.0.1" +tokio-io = "0.1.12" +smallvec = "0.6.10" +fnv = "1.0.6" +unsigned-varint = "0.2.2" +bytes = "0.4.12" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 4e4cf24f3..9a30a60b9 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,5 +1,5 @@ use crate::discovery::Discovery; -use crate::rpc::{RPCEvent, RPCMessage, Rpc}; +use crate::rpc::{RPCEvent, RPCMessage, RPC}; use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; use futures::prelude::*; @@ -29,7 +29,7 @@ pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, /// The serenity RPC specified in the wire-0 protocol. - serenity_rpc: Rpc, + serenity_rpc: RPC, /// Keep regular connection to peers and disconnect if absent. ping: Ping, /// Kademlia for peer discovery. @@ -57,7 +57,7 @@ impl Behaviour { .with_keep_alive(false); Ok(Behaviour { - serenity_rpc: Rpc::new(log), + serenity_rpc: RPC::new(log), gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), discovery: Discovery::new(local_key, net_conf, log)?, ping: Ping::new(ping_config), @@ -109,6 +109,9 @@ impl NetworkBehaviourEventProcess { self.events.push(BehaviourEvent::PeerDialed(peer_id)) } + RPCMessage::PeerDisconnected(peer_id) => { + self.events.push(BehaviourEvent::PeerDisconnected(peer_id)) + } RPCMessage::RPC(peer_id, rpc_event) => { self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) } @@ -168,12 +171,18 @@ impl Behaviour { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.serenity_rpc.send_rpc(peer_id, rpc_event); } + + /* Discovery / Peer management functions */ + pub fn connected_peers(&self) -> usize { + self.discovery.connected_peers() + } } /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), + PeerDisconnected(PeerId), GossipMessage { source: PeerId, topics: Vec, diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 239815f0c..8523d694a 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -77,6 +77,7 @@ impl Discovery { info!(log, "Local ENR: {}", local_enr.to_base64()); debug!(log, "Local Node Id: {}", local_enr.node_id()); + debug!(log, "Local ENR seq: {}", local_enr.seq()); let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address) .map_err(|e| format!("Discv5 service failed: {:?}", e))?; @@ -115,6 +116,11 @@ impl Discovery { self.discovery.add_enr(enr); } + /// The current number of connected libp2p peers. + pub fn connected_peers(&self) -> usize { + self.connected_peers.len() + } + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs new file mode 100644 index 000000000..639a8a730 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -0,0 +1,135 @@ +//! This handles the various supported encoding mechanism for the Eth 2.0 RPC. + +use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use bytes::BufMut; +use bytes::BytesMut; +use tokio::codec::{Decoder, Encoder}; + +pub trait OutboundCodec: Encoder + Decoder { + type ErrorType; + + fn decode_error( + &mut self, + src: &mut BytesMut, + ) -> Result, ::Error>; +} + +pub struct BaseInboundCodec +where + TCodec: Encoder + Decoder, +{ + /// Inner codec for handling various encodings + inner: TCodec, +} + +impl BaseInboundCodec +where + TCodec: Encoder + Decoder, +{ + pub fn new(codec: TCodec) -> Self { + BaseInboundCodec { inner: codec } + } +} + +pub struct BaseOutboundCodec +where + TOutboundCodec: OutboundCodec, +{ + /// Inner codec for handling various encodings + inner: TOutboundCodec, + /// Optimisation for decoding. True if the response code has been read and we are awaiting a + /// response. + response_code: Option, +} + +impl BaseOutboundCodec +where + TOutboundCodec: OutboundCodec, +{ + pub fn new(codec: TOutboundCodec) -> Self { + BaseOutboundCodec { + inner: codec, + response_code: None, + } + } +} + +impl Encoder for BaseInboundCodec +where + TCodec: Decoder + Encoder, +{ + type Item = RPCErrorResponse; + type Error = ::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.clear(); + dst.reserve(1); + dst.put_u8(item.as_u8()); + return self.inner.encode(item, dst); + } +} + +impl Decoder for BaseInboundCodec +where + TCodec: Encoder + Decoder, +{ + type Item = RPCRequest; + type Error = ::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + self.inner.decode(src) + } +} + +impl Encoder for BaseOutboundCodec +where + TCodec: OutboundCodec + Encoder, +{ + type Item = RPCRequest; + type Error = ::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.inner.encode(item, dst) + } +} + +impl Decoder for BaseOutboundCodec +where + TCodec: OutboundCodec + Decoder, +{ + type Item = RPCErrorResponse; + type Error = ::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let response_code = { + if let Some(resp_code) = self.response_code { + resp_code + } else { + // buffer should not be empty + debug_assert!(!src.is_empty()); + + let resp_byte = src.split_to(1); + let mut resp_code_byte = [0; 1]; + resp_code_byte.copy_from_slice(&resp_byte); + + let resp_code = u8::from_be_bytes(resp_code_byte); + self.response_code = Some(resp_code); + resp_code + } + }; + + if RPCErrorResponse::is_response(response_code) { + // decode an actual response + return self + .inner + .decode(src) + .map(|r| r.map(|resp| RPCErrorResponse::Success(resp))); + } else { + // decode an error + return self + .inner + .decode_error(src) + .map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))); + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs new file mode 100644 index 000000000..d0d1b650b --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -0,0 +1,62 @@ +pub(crate) mod base; +pub(crate) mod ssz; + +use self::base::{BaseInboundCodec, BaseOutboundCodec}; +use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; +use crate::rpc::protocol::RPCError; +use crate::rpc::{RPCErrorResponse, RPCRequest}; +use bytes::BytesMut; +use tokio::codec::{Decoder, Encoder}; + +// Known types of codecs +pub enum InboundCodec { + SSZ(BaseInboundCodec), +} + +pub enum OutboundCodec { + SSZ(BaseOutboundCodec), +} + +impl Encoder for InboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + match self { + InboundCodec::SSZ(codec) => codec.encode(item, dst), + } + } +} + +impl Decoder for InboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self { + InboundCodec::SSZ(codec) => codec.decode(src), + } + } +} + +impl Encoder for OutboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + match self { + OutboundCodec::SSZ(codec) => codec.encode(item, dst), + } + } +} + +impl Decoder for OutboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self { + OutboundCodec::SSZ(codec) => codec.decode(src), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs new file mode 100644 index 000000000..8e2bdaa64 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -0,0 +1,242 @@ +use crate::rpc::methods::*; +use crate::rpc::{ + codec::base::OutboundCodec, + protocol::{ProtocolId, RPCError}, +}; +use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use bytes::{Bytes, BytesMut}; +use ssz::{Decode, Encode}; +use tokio::codec::{Decoder, Encoder}; +use unsigned_varint::codec::UviBytes; + +/* Inbound Codec */ + +pub struct SSZInboundCodec { + inner: UviBytes, + protocol: ProtocolId, +} + +impl SSZInboundCodec { + pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_packet_size); + + // this encoding only applies to ssz. + debug_assert!(protocol.encoding.as_str() == "ssz"); + + SSZInboundCodec { + inner: uvi_codec, + protocol, + } + } +} + +// Encoder for inbound +impl Encoder for SSZInboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + RPCErrorResponse::Success(resp) => { + match resp { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), + RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), + }; + + if !bytes.is_empty() { + // length-prefix and return + return self + .inner + .encode(Bytes::from(bytes), dst) + .map_err(RPCError::from); + } + Ok(()) + } +} + +// Decoder for inbound +impl Decoder for SSZInboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => match self.protocol.message_name.as_str() { + "hello" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes( + &packet, + )?))), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( + &packet, + )?))), + _ => Err(RPCError::InvalidProtocol( + "Unknown GOODBYE version.as_str()", + )), + }, + "beacon_block_roots" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconBlockRoots( + BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconBlockBodies( + BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconChainState( + BeaconChainStateRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + _ => Err(RPCError::InvalidProtocol("Unknown message name.")), + }, + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } +} + +/* Outbound Codec */ + +pub struct SSZOutboundCodec { + inner: UviBytes, + protocol: ProtocolId, +} + +impl SSZOutboundCodec { + pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_packet_size); + + // this encoding only applies to ssz. + debug_assert!(protocol.encoding.as_str() == "ssz"); + + SSZOutboundCodec { + inner: uvi_codec, + protocol, + } + } +} + +// Encoder for outbound +impl Encoder for SSZOutboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + RPCRequest::Hello(req) => req.as_ssz_bytes(), + RPCRequest::Goodbye(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(), + RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(), + }; + // length-prefix + self.inner + .encode(bytes::Bytes::from(bytes), dst) + .map_err(RPCError::from) + } +} + +// Decoder for outbound +impl Decoder for SSZOutboundCodec { + type Item = RPCResponse; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => match self.protocol.message_name.as_str() { + "hello" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( + &packet, + )?))), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), + }, + "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), + "beacon_block_roots" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconBlockRoots( + BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse { + headers: packet.to_vec(), + }, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconBlockBodies( + BeaconBlockBodiesResponse { + block_bodies: packet.to_vec(), + // this gets filled in the protocol handler + block_roots: None, + }, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconChainState( + BeaconChainStateResponse::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + _ => Err(RPCError::InvalidProtocol("Unknown method")), + }, + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } +} + +impl OutboundCodec for SSZOutboundCodec { + type ErrorType = ErrorMessage; + + fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => Ok(Some(ErrorMessage::from_ssz_bytes(&packet)?)), + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs new file mode 100644 index 000000000..df8769122 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -0,0 +1,391 @@ +use super::methods::{RPCErrorResponse, RPCResponse, RequestId}; +use super::protocol::{RPCError, RPCProtocol, RPCRequest}; +use super::RPCEvent; +use crate::rpc::protocol::{InboundFramed, OutboundFramed}; +use fnv::FnvHashMap; +use futures::prelude::*; +use libp2p::core::protocols_handler::{ + KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use smallvec::SmallVec; +use std::time::{Duration, Instant}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// The time (in seconds) before a substream that is awaiting a response times out. +pub const RESPONSE_TIMEOUT: u64 = 9; + +/// Implementation of `ProtocolsHandler` for the RPC protocol. +pub struct RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// The upgrade for inbound substreams. + listen_protocol: SubstreamProtocol, + + /// If `Some`, something bad happened and we should shut down the handler with an error. + pending_error: Option>, + + /// Queue of events to produce in `poll()`. + events_out: SmallVec<[RPCEvent; 4]>, + + /// Queue of outbound substreams to open. + dial_queue: SmallVec<[RPCEvent; 4]>, + + /// Current number of concurrent outbound substreams being opened. + dial_negotiated: u32, + + /// Map of current substreams awaiting a response to an RPC request. + waiting_substreams: FnvHashMap>, + + /// List of outbound substreams that need to be driven to completion. + substreams: Vec>, + + /// Sequential Id for waiting substreams. + current_substream_id: RequestId, + + /// Maximum number of concurrent outbound substreams being opened. Value is never modified. + max_dial_negotiated: u32, + + /// Value to return from `connection_keep_alive`. + keep_alive: KeepAlive, + + /// After the given duration has elapsed, an inactive connection will shutdown. + inactive_timeout: Duration, +} + +/// An outbound substream is waiting a response from the user. +struct WaitingResponse { + /// The framed negotiated substream. + substream: InboundFramed, + /// The time when the substream is closed. + timeout: Instant, +} + +/// State of an outbound substream. Either waiting for a response, or in the process of sending. +pub enum SubstreamState +where + TSubstream: AsyncRead + AsyncWrite, +{ + /// A response has been sent, pending writing and flush. + ResponsePendingSend { + substream: futures::sink::Send>, + }, + /// A request has been sent, and we are awaiting a response. This future is driven in the + /// handler because GOODBYE requests can be handled and responses dropped instantly. + RequestPendingResponse { + /// The framed negotiated substream. + substream: OutboundFramed, + /// Keeps track of the request id and the request to permit forming advanced responses which require + /// data from the request. + rpc_event: RPCEvent, + /// The time when the substream is closed. + timeout: Instant, + }, +} + +impl RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + pub fn new( + listen_protocol: SubstreamProtocol, + inactive_timeout: Duration, + ) -> Self { + RPCHandler { + listen_protocol, + pending_error: None, + events_out: SmallVec::new(), + dial_queue: SmallVec::new(), + dial_negotiated: 0, + waiting_substreams: FnvHashMap::default(), + substreams: Vec::new(), + current_substream_id: 1, + max_dial_negotiated: 8, + keep_alive: KeepAlive::Yes, + inactive_timeout, + } + } + + /// Returns the number of pending requests. + pub fn pending_requests(&self) -> u32 { + self.dial_negotiated + self.dial_queue.len() as u32 + } + + /// Returns a reference to the listen protocol configuration. + /// + /// > **Note**: If you modify the protocol, modifications will only applies to future inbound + /// > substreams, not the ones already being negotiated. + pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { + &self.listen_protocol + } + + /// Returns a mutable reference to the listen protocol configuration. + /// + /// > **Note**: If you modify the protocol, modifications will only applies to future inbound + /// > substreams, not the ones already being negotiated. + pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { + &mut self.listen_protocol + } + + /// Opens an outbound substream with a request. + #[inline] + pub fn send_request(&mut self, rpc_event: RPCEvent) { + self.keep_alive = KeepAlive::Yes; + + self.dial_queue.push(rpc_event); + } +} + +impl Default for RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + fn default() -> Self { + RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) + } +} + +impl ProtocolsHandler for RPCHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + type InEvent = RPCEvent; + type OutEvent = RPCEvent; + type Error = ProtocolsHandlerUpgrErr; + type Substream = TSubstream; + type InboundProtocol = RPCProtocol; + type OutboundProtocol = RPCRequest; + type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request + + #[inline] + fn listen_protocol(&self) -> SubstreamProtocol { + self.listen_protocol.clone() + } + + #[inline] + fn inject_fully_negotiated_inbound( + &mut self, + out: >::Output, + ) { + let (req, substream) = out; + // drop the stream and return a 0 id for goodbye "requests" + if let r @ RPCRequest::Goodbye(_) = req { + self.events_out.push(RPCEvent::Request(0, r)); + return; + } + + // New inbound request. Store the stream and tag the output. + let awaiting_stream = WaitingResponse { + substream, + timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), + }; + self.waiting_substreams + .insert(self.current_substream_id, awaiting_stream); + + self.events_out + .push(RPCEvent::Request(self.current_substream_id, req)); + self.current_substream_id += 1; + } + + #[inline] + fn inject_fully_negotiated_outbound( + &mut self, + out: >::Output, + rpc_event: Self::OutboundOpenInfo, + ) { + self.dial_negotiated -= 1; + + if self.dial_negotiated == 0 + && self.dial_queue.is_empty() + && self.waiting_substreams.is_empty() + { + self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); + } else { + self.keep_alive = KeepAlive::Yes; + } + + // add the stream to substreams if we expect a response, otherwise drop the stream. + if let RPCEvent::Request(id, req) = rpc_event { + if req.expect_response() { + let awaiting_stream = SubstreamState::RequestPendingResponse { + substream: out, + rpc_event: RPCEvent::Request(id, req), + timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), + }; + + self.substreams.push(awaiting_stream); + } + } + } + + // Note: If the substream has closed due to inactivity, or the substream is in the + // wrong state a response will fail silently. + #[inline] + fn inject_event(&mut self, rpc_event: Self::InEvent) { + match rpc_event { + RPCEvent::Request(_, _) => self.send_request(rpc_event), + RPCEvent::Response(rpc_id, res) => { + // check if the stream matching the response still exists + if let Some(waiting_stream) = self.waiting_substreams.remove(&rpc_id) { + // only send one response per stream. This must be in the waiting state. + self.substreams.push(SubstreamState::ResponsePendingSend { + substream: waiting_stream.substream.send(res), + }); + } + } + RPCEvent::Error(_, _) => {} + } + } + + #[inline] + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr< + >::Error, + >, + ) { + if self.pending_error.is_none() { + self.pending_error = Some(error); + } + } + + #[inline] + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + ) -> Poll< + ProtocolsHandlerEvent, + Self::Error, + > { + if let Some(err) = self.pending_error.take() { + return Err(err); + } + + // return any events that need to be reported + if !self.events_out.is_empty() { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + self.events_out.remove(0), + ))); + } else { + self.events_out.shrink_to_fit(); + } + + // remove any streams that have expired + self.waiting_substreams.retain(|_k, waiting_stream| { + if Instant::now() > waiting_stream.timeout { + false + } else { + true + } + }); + + // drive streams that need to be processed + for n in (0..self.substreams.len()).rev() { + let stream = self.substreams.swap_remove(n); + match stream { + SubstreamState::ResponsePendingSend { mut substream } => { + match substream.poll() { + Ok(Async::Ready(_substream)) => {} // sent and flushed + Ok(Async::NotReady) => { + self.substreams + .push(SubstreamState::ResponsePendingSend { substream }); + } + Err(e) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(0, e), + ))) + } + } + } + SubstreamState::RequestPendingResponse { + mut substream, + rpc_event, + timeout, + } => match substream.poll() { + Ok(Async::Ready(response)) => { + if let Some(response) = response { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + build_response(rpc_event, response), + ))); + } else { + // stream closed early + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error( + rpc_event.id(), + RPCError::Custom("Stream Closed Early".into()), + ), + ))); + } + } + Ok(Async::NotReady) => { + if Instant::now() < timeout { + self.substreams + .push(SubstreamState::RequestPendingResponse { + substream, + rpc_event, + timeout, + }); + } + } + Err(e) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(rpc_event.id(), e.into()), + ))) + } + }, + } + } + + // establish outbound substreams + if !self.dial_queue.is_empty() { + if self.dial_negotiated < self.max_dial_negotiated { + self.dial_negotiated += 1; + let rpc_event = self.dial_queue.remove(0); + if let RPCEvent::Request(id, req) = rpc_event { + return Ok(Async::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: RPCEvent::Request(id, req), + }, + )); + } + } + } else { + self.dial_queue.shrink_to_fit(); + } + Ok(Async::NotReady) + } +} + +/// Given a response back from a peer and the request that sent it, construct a response to send +/// back to the user. This allows for some data manipulation of responses given requests. +fn build_response(rpc_event: RPCEvent, rpc_response: RPCErrorResponse) -> RPCEvent { + let id = rpc_event.id(); + + // handle the types of responses + match rpc_response { + RPCErrorResponse::Success(response) => { + match response { + // if the response is block roots, tag on the extra request data + RPCResponse::BeaconBlockBodies(mut resp) => { + if let RPCEvent::Request(_id, RPCRequest::BeaconBlockBodies(bodies_req)) = + rpc_event + { + resp.block_roots = Some(bodies_req.block_roots); + } + RPCEvent::Response( + id, + RPCErrorResponse::Success(RPCResponse::BeaconBlockBodies(resp)), + ) + } + _ => RPCEvent::Response(id, RPCErrorResponse::Success(response)), + } + } + _ => RPCEvent::Response(id, rpc_response), + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index b752b74cb..0d6311d9d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -2,130 +2,54 @@ use ssz::{impl_decode_via_from, impl_encode_via_from}; use ssz_derive::{Decode, Encode}; -use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; - -#[derive(Debug)] -/// Available Serenity Libp2p RPC methods -pub enum RPCMethod { - /// Initialise handshake between connecting peers. - Hello, - /// Terminate a connection providing a reason. - Goodbye, - /// Requests a number of beacon block roots. - BeaconBlockRoots, - /// Requests a number of beacon block headers. - BeaconBlockHeaders, - /// Requests a number of beacon block bodies. - BeaconBlockBodies, - /// Requests values for a merkle proof for the current blocks state root. - BeaconChainState, // Note: experimental, not complete. - /// Unknown method received. - Unknown, -} - -impl From for RPCMethod { - fn from(method_id: u16) -> Self { - match method_id { - 0 => RPCMethod::Hello, - 1 => RPCMethod::Goodbye, - 10 => RPCMethod::BeaconBlockRoots, - 11 => RPCMethod::BeaconBlockHeaders, - 12 => RPCMethod::BeaconBlockBodies, - 13 => RPCMethod::BeaconChainState, - - _ => RPCMethod::Unknown, - } - } -} - -impl Into for RPCMethod { - fn into(self) -> u16 { - match self { - RPCMethod::Hello => 0, - RPCMethod::Goodbye => 1, - RPCMethod::BeaconBlockRoots => 10, - RPCMethod::BeaconBlockHeaders => 11, - RPCMethod::BeaconBlockBodies => 12, - RPCMethod::BeaconChainState => 13, - _ => 0, - } - } -} - -#[derive(Debug, Clone)] -pub enum RPCRequest { - Hello(HelloMessage), - Goodbye(GoodbyeReason), - BeaconBlockRoots(BeaconBlockRootsRequest), - BeaconBlockHeaders(BeaconBlockHeadersRequest), - BeaconBlockBodies(BeaconBlockBodiesRequest), - BeaconChainState(BeaconChainStateRequest), -} - -impl RPCRequest { - pub fn method_id(&self) -> u16 { - let method = match self { - RPCRequest::Hello(_) => RPCMethod::Hello, - RPCRequest::Goodbye(_) => RPCMethod::Goodbye, - RPCRequest::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots, - RPCRequest::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders, - RPCRequest::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies, - RPCRequest::BeaconChainState(_) => RPCMethod::BeaconChainState, - }; - method.into() - } -} - -#[derive(Debug, Clone)] -pub enum RPCResponse { - Hello(HelloMessage), - BeaconBlockRoots(BeaconBlockRootsResponse), - BeaconBlockHeaders(BeaconBlockHeadersResponse), - BeaconBlockBodies(BeaconBlockBodiesResponse), - BeaconChainState(BeaconChainStateResponse), -} - -impl RPCResponse { - pub fn method_id(&self) -> u16 { - let method = match self { - RPCResponse::Hello(_) => RPCMethod::Hello, - RPCResponse::BeaconBlockRoots(_) => RPCMethod::BeaconBlockRoots, - RPCResponse::BeaconBlockHeaders(_) => RPCMethod::BeaconBlockHeaders, - RPCResponse::BeaconBlockBodies(_) => RPCMethod::BeaconBlockBodies, - RPCResponse::BeaconChainState(_) => RPCMethod::BeaconChainState, - }; - method.into() - } -} +use types::{BeaconBlockBody, Epoch, Hash256, Slot}; /* Request/Response data structures for RPC methods */ +/* Requests */ + +pub type RequestId = usize; + /// The HELLO request/response handshake message. #[derive(Encode, Decode, Clone, Debug)] pub struct HelloMessage { /// The network ID of the peer. pub network_id: u8, + + /// The chain id for the HELLO request. + pub chain_id: u64, + /// The peers last finalized root. pub latest_finalized_root: Hash256, + /// The peers last finalized epoch. pub latest_finalized_epoch: Epoch, + /// The peers last block root. pub best_root: Hash256, + /// The peers last slot. pub best_slot: Slot, } /// The reason given for a `Goodbye` message. /// -/// Note: any unknown `u64::into(n)` will resolve to `GoodbyeReason::Unknown` for any unknown `n`, +/// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`, /// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then /// re-serializing may not return the same bytes. #[derive(Debug, Clone)] pub enum GoodbyeReason { - ClientShutdown, - IrreleventNetwork, - Fault, - Unknown, + /// This node has shutdown. + ClientShutdown = 1, + + /// Incompatible networks. + IrreleventNetwork = 2, + + /// Error/fault in the RPC. + Fault = 3, + + /// Unknown reason. + Unknown = 0, } impl From for GoodbyeReason { @@ -141,12 +65,7 @@ impl From for GoodbyeReason { impl Into for GoodbyeReason { fn into(self) -> u64 { - match self { - GoodbyeReason::Unknown => 0, - GoodbyeReason::ClientShutdown => 1, - GoodbyeReason::IrreleventNetwork => 2, - GoodbyeReason::Fault => 3, - } + self as u64 } } @@ -158,6 +77,7 @@ impl_decode_via_from!(GoodbyeReason, u64); pub struct BeaconBlockRootsRequest { /// The starting slot of the requested blocks. pub start_slot: Slot, + /// The number of blocks from the start slot. pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers } @@ -169,8 +89,19 @@ pub struct BeaconBlockRootsResponse { pub roots: Vec, } +/// Contains a block root and associated slot. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct BlockRootSlot { + /// The block root. + pub block_root: Hash256, + + /// The block slot. + pub slot: Slot, +} + +/// The response of a beacon block roots request. impl BeaconBlockRootsResponse { - /// Returns `true` if each `self.roots.slot[i]` is higher than the preceeding `i`. + /// Returns `true` if each `self.roots.slot[i]` is higher than the preceding `i`. pub fn slots_are_ascending(&self) -> bool { for window in self.roots.windows(2) { if window[0].slot >= window[1].slot { @@ -182,33 +113,27 @@ impl BeaconBlockRootsResponse { } } -/// Contains a block root and associated slot. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BlockRootSlot { - /// The block root. - pub block_root: Hash256, - /// The block slot. - pub slot: Slot, -} - /// Request a number of beacon block headers from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockHeadersRequest { /// The starting header hash of the requested headers. pub start_root: Hash256, + /// The starting slot of the requested headers. pub start_slot: Slot, + /// The maximum number of headers than can be returned. pub max_headers: u64, + /// The maximum number of slots to skip between blocks. pub skip_slots: u64, } /// Response containing requested block headers. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct BeaconBlockHeadersResponse { - /// The list of requested beacon block headers. - pub headers: Vec, + /// The list of ssz-encoded requested beacon block headers. + pub headers: Vec, } /// Request a number of beacon block bodies from a peer. @@ -219,9 +144,20 @@ pub struct BeaconBlockBodiesRequest { } /// Response containing the list of requested beacon block bodies. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesResponse { - /// The list of beacon block bodies being requested. + /// The list of hashes that were sent in the request and match these roots response. None when + /// sending outbound. + pub block_roots: Option>, + /// The list of ssz-encoded beacon block bodies being requested. + pub block_bodies: Vec, +} + +/// The decoded version of `BeaconBlockBodiesResponse` which is expected in `SimpleSync`. +pub struct DecodedBeaconBlockBodiesResponse { + /// The list of hashes sent in the request to get this response. + pub block_roots: Vec, + /// The valid decoded block bodies. pub block_bodies: Vec, } @@ -237,5 +173,71 @@ pub struct BeaconChainStateRequest { #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconChainStateResponse { /// The values corresponding the to the requested tree hashes. - pub values: bool, //TBD - stubbed with encodeable bool + pub values: bool, //TBD - stubbed with encodable bool +} + +/* RPC Handling and Grouping */ +// Collection of enums and structs used by the Codecs to encode/decode RPC messages + +#[derive(Debug, Clone)] +pub enum RPCResponse { + /// A HELLO message. + Hello(HelloMessage), + /// A response to a get BEACON_BLOCK_ROOTS request. + BeaconBlockRoots(BeaconBlockRootsResponse), + /// A response to a get BEACON_BLOCK_HEADERS request. + BeaconBlockHeaders(BeaconBlockHeadersResponse), + /// A response to a get BEACON_BLOCK_BODIES request. + BeaconBlockBodies(BeaconBlockBodiesResponse), + /// A response to a get BEACON_CHAIN_STATE request. + BeaconChainState(BeaconChainStateResponse), +} + +#[derive(Debug)] +pub enum RPCErrorResponse { + Success(RPCResponse), + InvalidRequest(ErrorMessage), + ServerError(ErrorMessage), + Unknown(ErrorMessage), +} + +impl RPCErrorResponse { + /// Used to encode the response. + pub fn as_u8(&self) -> u8 { + match self { + RPCErrorResponse::Success(_) => 0, + RPCErrorResponse::InvalidRequest(_) => 2, + RPCErrorResponse::ServerError(_) => 3, + RPCErrorResponse::Unknown(_) => 255, + } + } + + /// Tells the codec whether to decode as an RPCResponse or an error. + pub fn is_response(response_code: u8) -> bool { + match response_code { + 0 => true, + _ => false, + } + } + + /// Builds an RPCErrorResponse from a response code and an ErrorMessage + pub fn from_error(response_code: u8, err: ErrorMessage) -> Self { + match response_code { + 2 => RPCErrorResponse::InvalidRequest(err), + 3 => RPCErrorResponse::ServerError(err), + _ => RPCErrorResponse::Unknown(err), + } + } +} + +#[derive(Encode, Decode, Debug)] +pub struct ErrorMessage { + /// The UTF-8 encoded Error message string. + pub error_message: Vec, +} + +impl ErrorMessage { + pub fn as_string(&self) -> String { + String::from_utf8(self.error_message.clone()).unwrap_or_else(|_| "".into()) + } } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 2d303469c..f1f341908 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -1,26 +1,55 @@ -/// RPC Protocol over libp2p. -/// -/// This is purpose built for Ethereum 2.0 serenity and the protocol listens on -/// `/eth/serenity/rpc/1.0.0` -pub mod methods; -mod protocol; +//! The Ethereum 2.0 Wire Protocol +//! +//! This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate +//! direct peer-to-peer communication primarily for sending/receiving chain information for +//! syncing. use futures::prelude::*; -use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler}; +use handler::RPCHandler; +use libp2p::core::protocols_handler::ProtocolsHandler; use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; -pub use protocol::{RPCEvent, RPCProtocol, RequestId}; +pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; +pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -/// The network behaviour handles RPC requests/responses as specified in the Eth 2.0 phase 0 -/// specification. +pub(crate) mod codec; +mod handler; +pub mod methods; +mod protocol; +// mod request_response; -pub struct Rpc { +/// The return type used in the behaviour and the resultant event from the protocols handler. +#[derive(Debug)] +pub enum RPCEvent { + /// A request that was received from the RPC protocol. The first parameter is a sequential + /// id which tracks an awaiting substream for the response. + Request(RequestId, RPCRequest), + + /// A response that has been received from the RPC protocol. The first parameter returns + /// that which was sent with the corresponding request. + Response(RequestId, RPCErrorResponse), + /// An Error occurred. + Error(RequestId, RPCError), +} + +impl RPCEvent { + pub fn id(&self) -> usize { + match *self { + RPCEvent::Request(id, _) => id, + RPCEvent::Response(id, _) => id, + RPCEvent::Error(id, _) => id, + } + } +} + +/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level +/// logic. +pub struct RPC { /// Queue of events to processed. events: Vec>, /// Pins the generic substream. @@ -29,17 +58,19 @@ pub struct Rpc { _log: slog::Logger, } -impl Rpc { +impl RPC { pub fn new(log: &slog::Logger) -> Self { let log = log.new(o!("Service" => "Libp2p-RPC")); - Rpc { + RPC { events: Vec::new(), marker: PhantomData, _log: log, } } - /// Submits and RPC request. + /// Submits an RPC request. + /// + /// The peer must be connected for this to succeed. pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id, @@ -48,17 +79,18 @@ impl Rpc { } } -impl NetworkBehaviour for Rpc +impl NetworkBehaviour for RPC where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = RPCHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { Default::default() } + // handled by discovery fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { Vec::new() } @@ -72,19 +104,18 @@ where } } - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { + // inform the rpc handler that the peer has disconnected + self.events.push(NetworkBehaviourAction::GenerateEvent( + RPCMessage::PeerDisconnected(peer_id.clone()), + )); + } fn inject_node_event( &mut self, source: PeerId, event: ::OutEvent, ) { - // ignore successful send events - let event = match event { - OneShotEvent::Rx(event) => event, - OneShotEvent::Sent => return, - }; - // send the event to the user self.events .push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC( @@ -112,27 +143,5 @@ where pub enum RPCMessage { RPC(PeerId, RPCEvent), PeerDialed(PeerId), -} - -/// Transmission between the `OneShotHandler` and the `RPCEvent`. -#[derive(Debug)] -pub enum OneShotEvent { - /// We received an RPC from a remote. - Rx(RPCEvent), - /// We successfully sent an RPC request. - Sent, -} - -impl From for OneShotEvent { - #[inline] - fn from(rpc: RPCEvent) -> OneShotEvent { - OneShotEvent::Rx(rpc) - } -} - -impl From<()> for OneShotEvent { - #[inline] - fn from(_: ()) -> OneShotEvent { - OneShotEvent::Sent - } + PeerDisconnected(PeerId), } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 7afded3ac..8729de3a7 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,269 +1,317 @@ use super::methods::*; +use crate::rpc::codec::{ + base::{BaseInboundCodec, BaseOutboundCodec}, + ssz::{SSZInboundCodec, SSZOutboundCodec}, + InboundCodec, OutboundCodec, +}; +use futures::{ + future::{self, FutureResult}, + sink, stream, Sink, Stream, +}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use ssz::{impl_decode_via_from, impl_encode_via_from, ssz_encode, Decode, Encode}; -use ssz_derive::{Decode, Encode}; -use std::hash::{Hash, Hasher}; use std::io; -use std::iter; +use std::time::Duration; +use tokio::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::prelude::*; +use tokio::timer::timeout; +use tokio::util::FutureExt; /// The maximum bytes that can be sent across the RPC. -const MAX_READ_SIZE: usize = 4_194_304; // 4M +const MAX_RPC_SIZE: usize = 4_194_304; // 4M +/// The protocol prefix the RPC protocol id. +const PROTOCOL_PREFIX: &str = "/eth2/beacon_node/rpc"; +/// The number of seconds to wait for a request once a protocol has been established before the stream is terminated. +const REQUEST_TIMEOUT: u64 = 3; -/// Implementation of the `ConnectionUpgrade` for the rpc protocol. #[derive(Debug, Clone)] pub struct RPCProtocol; impl UpgradeInfo for RPCProtocol { - type Info = &'static [u8]; - type InfoIter = iter::Once; + type Info = RawProtocolId; + type InfoIter = Vec; - #[inline] fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/eth/serenity/rpc/1.0.0") + vec![ + ProtocolId::new("hello", "1.0.0", "ssz").into(), + ProtocolId::new("goodbye", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into(), + ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into(), + ] } } -impl Default for RPCProtocol { - fn default() -> Self { - RPCProtocol +/// The raw protocol id sent over the wire. +type RawProtocolId = Vec; + +/// Tracks the types in a protocol id. +pub struct ProtocolId { + /// The rpc message type/name. + pub message_name: String, + + /// The version of the RPC. + pub version: String, + + /// The encoding of the RPC. + pub encoding: String, +} + +/// An RPC protocol ID. +impl ProtocolId { + pub fn new(message_name: &str, version: &str, encoding: &str) -> Self { + ProtocolId { + message_name: message_name.into(), + version: version.into(), + encoding: encoding.into(), + } + } + + /// Converts a raw RPC protocol id string into an `RPCProtocolId` + pub fn from_bytes(bytes: &[u8]) -> Result { + let protocol_string = String::from_utf8(bytes.to_vec()) + .map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?; + let protocol_list: Vec<&str> = protocol_string.as_str().split('/').take(7).collect(); + + if protocol_list.len() != 7 { + return Err(RPCError::InvalidProtocol("Not enough '/'")); + } + + Ok(ProtocolId { + message_name: protocol_list[4].into(), + version: protocol_list[5].into(), + encoding: protocol_list[6].into(), + }) } } -/// A monotonic counter for ordering `RPCRequest`s. -#[derive(Debug, Clone, Copy, Default)] -pub struct RequestId(u64); - -impl RequestId { - /// Increment the request id. - pub fn increment(&mut self) { - self.0 += 1 - } - - /// Return the previous id. - pub fn previous(self) -> Self { - Self(self.0 - 1) +impl Into for ProtocolId { + fn into(self) -> RawProtocolId { + format!( + "{}/{}/{}/{}", + PROTOCOL_PREFIX, self.message_name, self.version, self.encoding + ) + .as_bytes() + .to_vec() } } -impl Eq for RequestId {} +/* Inbound upgrade */ -impl PartialEq for RequestId { - fn eq(&self, other: &RequestId) -> bool { - self.0 == other.0 - } -} +// The inbound protocol reads the request, decodes it and returns the stream to the protocol +// handler to respond to once ready. -impl Hash for RequestId { - fn hash(&self, state: &mut H) { - self.0.hash(state); - } -} - -impl From for RequestId { - fn from(x: u64) -> RequestId { - RequestId(x) - } -} - -impl Into for RequestId { - fn into(self) -> u64 { - self.0 - } -} - -impl_encode_via_from!(RequestId, u64); -impl_decode_via_from!(RequestId, u64); - -/// The RPC types which are sent/received in this protocol. -#[derive(Debug, Clone)] -pub enum RPCEvent { - Request { - id: RequestId, - method_id: u16, - body: RPCRequest, - }, - Response { - id: RequestId, - method_id: u16, //TODO: Remove and process decoding upstream - result: RPCResponse, - }, -} - -impl UpgradeInfo for RPCEvent { - type Info = &'static [u8]; - type InfoIter = iter::Once; - - #[inline] - fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/eth/serenity/rpc/1.0.0") - } -} - -type FnDecodeRPCEvent = fn(Vec, ()) -> Result; +pub type InboundOutput = (RPCRequest, InboundFramed); +pub type InboundFramed = Framed, InboundCodec>; +type FnAndThen = fn( + (Option, InboundFramed), +) -> FutureResult, RPCError>; +type FnMapErr = fn(timeout::Error<(RPCError, InboundFramed)>) -> RPCError; impl InboundUpgrade for RPCProtocol where TSocket: AsyncRead + AsyncWrite, { - type Output = RPCEvent; - type Error = DecodeError; - type Future = upgrade::ReadOneThen, (), FnDecodeRPCEvent>; + type Output = InboundOutput; + type Error = RPCError; - fn upgrade_inbound(self, socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { - upgrade::read_one_then(socket, MAX_READ_SIZE, (), |packet, ()| Ok(decode(packet)?)) + type Future = future::AndThen< + future::MapErr< + timeout::Timeout>>, + FnMapErr, + >, + FutureResult, RPCError>, + FnAndThen, + >; + + fn upgrade_inbound( + self, + socket: upgrade::Negotiated, + protocol: RawProtocolId, + ) -> Self::Future { + // TODO: Verify this + let protocol_id = + ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); + + match protocol_id.encoding.as_str() { + "ssz" | _ => { + let ssz_codec = + BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, MAX_RPC_SIZE)); + let codec = InboundCodec::SSZ(ssz_codec); + Framed::new(socket, codec) + .into_future() + .timeout(Duration::from_secs(REQUEST_TIMEOUT)) + .map_err(RPCError::from as FnMapErr) + .and_then({ + |(req, stream)| match req { + Some(req) => futures::future::ok((req, stream)), + None => futures::future::err(RPCError::Custom( + "Stream terminated early".into(), + )), + } + } as FnAndThen) + } + } } } -/// A helper structed used to obtain SSZ serialization for RPC messages. -#[derive(Encode, Decode, Default)] -struct SszContainer { - /// Note: the `is_request` field is not included in the spec. - /// - /// We are unable to determine a request from a response unless we add some flag to the - /// packet. Here we have added a bool (encoded as 1 byte) which is set to `1` if the - /// message is a request. - is_request: bool, - id: u64, - other: u16, - bytes: Vec, +/* Outbound request */ + +// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and +// `OutboundUpgrade` + +#[derive(Debug, Clone)] +pub enum RPCRequest { + Hello(HelloMessage), + Goodbye(GoodbyeReason), + BeaconBlockRoots(BeaconBlockRootsRequest), + BeaconBlockHeaders(BeaconBlockHeadersRequest), + BeaconBlockBodies(BeaconBlockBodiesRequest), + BeaconChainState(BeaconChainStateRequest), } -fn decode(packet: Vec) -> Result { - let msg = SszContainer::from_ssz_bytes(&packet)?; +impl UpgradeInfo for RPCRequest { + type Info = RawProtocolId; + type InfoIter = Vec; - if msg.is_request { - let body = match RPCMethod::from(msg.other) { - RPCMethod::Hello => RPCRequest::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?), - RPCMethod::Goodbye => RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(&msg.bytes)?), - RPCMethod::BeaconBlockRoots => { - RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::BeaconBlockHeaders => RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&msg.bytes)?, - ), - RPCMethod::BeaconBlockBodies => { - RPCRequest::BeaconBlockBodies(BeaconBlockBodiesRequest::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::BeaconChainState => { - RPCRequest::BeaconChainState(BeaconChainStateRequest::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), - }; - - Ok(RPCEvent::Request { - id: RequestId::from(msg.id), - method_id: msg.other, - body, - }) - } - // we have received a response - else { - let result = match RPCMethod::from(msg.other) { - RPCMethod::Hello => RPCResponse::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?), - RPCMethod::BeaconBlockRoots => { - RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::BeaconBlockHeaders => RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse::from_ssz_bytes(&msg.bytes)?, - ), - RPCMethod::BeaconBlockBodies => RPCResponse::BeaconBlockBodies( - BeaconBlockBodiesResponse::from_ssz_bytes(&msg.bytes)?, - ), - RPCMethod::BeaconChainState => { - RPCResponse::BeaconChainState(BeaconChainStateResponse::from_ssz_bytes(&msg.bytes)?) - } - // We should never receive a goodbye response; it is invalid. - RPCMethod::Goodbye => return Err(DecodeError::UnknownRPCMethod), - RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), - }; - - Ok(RPCEvent::Response { - id: RequestId::from(msg.id), - method_id: msg.other, - result, - }) + // add further protocols as we support more encodings/versions + fn protocol_info(&self) -> Self::InfoIter { + self.supported_protocols() } } -impl OutboundUpgrade for RPCEvent +/// Implements the encoding per supported protocol for RPCRequest. +impl RPCRequest { + pub fn supported_protocols(&self) -> Vec { + match self { + // add more protocols when versions/encodings are supported + RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1.0.0", "ssz").into()], + RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz").into()], + RPCRequest::BeaconBlockRoots(_) => { + vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into()] + } + RPCRequest::BeaconBlockHeaders(_) => { + vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into()] + } + RPCRequest::BeaconBlockBodies(_) => { + vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into()] + } + RPCRequest::BeaconChainState(_) => { + vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz").into()] + } + } + } + + /// This specifies whether a stream should remain open and await a response, given a request. + /// A GOODBYE request has no response. + pub fn expect_response(&self) -> bool { + match self { + RPCRequest::Goodbye(_) => false, + _ => true, + } + } +} + +/* RPC Response type - used for outbound upgrades */ + +/* Outbound upgrades */ + +pub type OutboundFramed = Framed, OutboundCodec>; + +impl OutboundUpgrade for RPCRequest where - TSocket: AsyncWrite, + TSocket: AsyncRead + AsyncWrite, { - type Output = (); - type Error = io::Error; - type Future = upgrade::WriteOne>; + type Output = OutboundFramed; + type Error = RPCError; + type Future = sink::Send>; + fn upgrade_outbound( + self, + socket: upgrade::Negotiated, + protocol: Self::Info, + ) -> Self::Future { + let protocol_id = + ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); - #[inline] - fn upgrade_outbound(self, socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { - let bytes = ssz_encode(&self); - upgrade::write_one(socket, bytes) - } -} - -impl Encode for RPCEvent { - fn is_ssz_fixed_len() -> bool { - false - } - - fn ssz_append(&self, buf: &mut Vec) { - let container = match self { - RPCEvent::Request { - id, - method_id, - body, - } => SszContainer { - is_request: true, - id: (*id).into(), - other: *method_id, - bytes: match body { - RPCRequest::Hello(body) => body.as_ssz_bytes(), - RPCRequest::Goodbye(body) => body.as_ssz_bytes(), - RPCRequest::BeaconBlockRoots(body) => body.as_ssz_bytes(), - RPCRequest::BeaconBlockHeaders(body) => body.as_ssz_bytes(), - RPCRequest::BeaconBlockBodies(body) => body.as_ssz_bytes(), - RPCRequest::BeaconChainState(body) => body.as_ssz_bytes(), - }, - }, - RPCEvent::Response { - id, - method_id, - result, - } => SszContainer { - is_request: false, - id: (*id).into(), - other: *method_id, - bytes: match result { - RPCResponse::Hello(response) => response.as_ssz_bytes(), - RPCResponse::BeaconBlockRoots(response) => response.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(response) => response.as_ssz_bytes(), - RPCResponse::BeaconBlockBodies(response) => response.as_ssz_bytes(), - RPCResponse::BeaconChainState(response) => response.as_ssz_bytes(), - }, - }, - }; - - container.ssz_append(buf) + match protocol_id.encoding.as_str() { + "ssz" | _ => { + let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096)); + let codec = OutboundCodec::SSZ(ssz_codec); + Framed::new(socket, codec).send(self) + } + } } } +/// Error in RPC Encoding/Decoding. #[derive(Debug)] -pub enum DecodeError { +pub enum RPCError { + /// Error when reading the packet from the socket. ReadError(upgrade::ReadOneError), + /// Error when decoding the raw buffer from ssz. SSZDecodeError(ssz::DecodeError), - UnknownRPCMethod, + /// Invalid Protocol ID. + InvalidProtocol(&'static str), + /// IO Error. + IoError(io::Error), + /// Waiting for a request/response timed out, or timer error'd. + StreamTimeout, + /// Custom message. + Custom(String), } -impl From for DecodeError { +impl From for RPCError { #[inline] fn from(err: upgrade::ReadOneError) -> Self { - DecodeError::ReadError(err) + RPCError::ReadError(err) } } -impl From for DecodeError { +impl From for RPCError { #[inline] fn from(err: ssz::DecodeError) -> Self { - DecodeError::SSZDecodeError(err) + RPCError::SSZDecodeError(err) + } +} +impl From> for RPCError { + fn from(err: tokio::timer::timeout::Error) -> Self { + if err.is_elapsed() { + RPCError::StreamTimeout + } else { + RPCError::Custom("Stream timer failed".into()) + } + } +} + +impl From for RPCError { + fn from(err: io::Error) -> Self { + RPCError::IoError(err) + } +} + +// Error trait is required for `ProtocolsHandler` +impl std::fmt::Display for RPCError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + RPCError::ReadError(ref err) => write!(f, "Error while reading from socket: {}", err), + RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), + RPCError::InvalidProtocol(ref err) => write!(f, "Invalid Protocol: {}", err), + RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), + RPCError::StreamTimeout => write!(f, "Stream Timeout"), + RPCError::Custom(ref err) => write!(f, "{}", err), + } + } +} + +impl std::error::Error for RPCError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match *self { + RPCError::ReadError(ref err) => Some(err), + RPCError::SSZDecodeError(_) => None, + RPCError::InvalidProtocol(_) => None, + RPCError::IoError(ref err) => Some(err), + RPCError::StreamTimeout => None, + RPCError::Custom(_) => None, + } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 2eecfac97..a8c70a3da 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -131,6 +131,9 @@ impl Stream for Service { BehaviourEvent::PeerDialed(peer_id) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); } + BehaviourEvent::PeerDisconnected(peer_id) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerDisconnected(peer_id)))); + } }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, @@ -180,6 +183,8 @@ pub enum Libp2pEvent { RPC(PeerId, RPCEvent), /// Initiated the connection to a new peer. PeerDialed(PeerId), + /// A peer has disconnected. + PeerDisconnected(PeerId), /// Received pubsub message. PubsubMessage { source: PeerId, diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 1499ac580..9eadede76 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -19,3 +19,4 @@ tree_hash = { path = "../../eth2/utils/tree_hash" } futures = "0.1.25" error-chain = "0.12.0" tokio = "0.1.16" +parking_lot = "0.9.0" diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 40538798a..b1d88415c 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -2,23 +2,19 @@ use crate::error; use crate::service::{NetworkMessage, OutgoingMessage}; use crate::sync::SimpleSync; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2_libp2p::rpc::methods::*; use eth2_libp2p::{ behaviour::PubsubMessage, - rpc::{methods::GoodbyeReason, RPCRequest, RPCResponse, RequestId}, + rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId}, PeerId, RPCEvent, }; use futures::future::Future; use futures::stream::Stream; use slog::{debug, warn}; -use std::collections::HashMap; +use ssz::{Decode, DecodeError}; use std::sync::Arc; -use std::time::Instant; use tokio::sync::mpsc; - -/// Timeout for RPC requests. -// const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); -/// Timeout before banning a peer for non-identification. -// const HELLO_TIMEOUT: Duration = Duration::from_secs(30); +use types::BeaconBlockHeader; /// Handles messages received from the network and client and organises syncing. pub struct MessageHandler { @@ -33,7 +29,7 @@ pub struct MessageHandler { } /// Types of messages the handler can receive. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum HandlerMessage { /// We have initiated a connection to a new peer. PeerDialed(PeerId), @@ -88,6 +84,10 @@ impl MessageHandler { HandlerMessage::PeerDialed(peer_id) => { self.sync.on_connect(peer_id, &mut self.network_context); } + // A peer has disconnected + HandlerMessage::PeerDisconnected(peer_id) => { + self.sync.on_disconnect(peer_id); + } // we have received an RPC message request/response HandlerMessage::RPC(peer_id, rpc_event) => { self.handle_rpc_message(peer_id, rpc_event); @@ -96,8 +96,6 @@ impl MessageHandler { HandlerMessage::PubsubMessage(peer_id, gossip) => { self.handle_gossip(peer_id, *gossip); } - //TODO: Handle all messages - _ => {} } } @@ -106,15 +104,14 @@ impl MessageHandler { /// Handle RPC messages fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { match rpc_message { - RPCEvent::Request { id, body, .. // TODO: Clean up RPC Message types, have a cleaner type by this point. - } => self.handle_rpc_request(peer_id, id, body), - RPCEvent::Response { id, result, .. } => self.handle_rpc_response(peer_id, id, result), + RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), + RPCEvent::Response(_id, resp) => self.handle_rpc_response(peer_id, resp), + RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), } } /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { - // TODO: process the `id`. match request { RPCRequest::Hello(hello_message) => self.sync.on_hello_request( peer_id, @@ -151,58 +148,104 @@ impl MessageHandler { /// An RPC response has been received from the network. // we match on id and ignore responses past the timeout. - fn handle_rpc_response(&mut self, peer_id: PeerId, id: RequestId, response: RPCResponse) { - // if response id is not related to a request, ignore (likely RPC timeout) - if self - .network_context - .outstanding_outgoing_request_ids - .remove(&(peer_id.clone(), id)) - .is_none() - { - warn!( - self.log, - "Unknown ResponseId for incoming RPCRequest"; - "peer" => format!("{:?}", peer_id), - "request_id" => format!("{:?}", id) - ); - return; + fn handle_rpc_response(&mut self, peer_id: PeerId, error_response: RPCErrorResponse) { + // an error could have occurred. + // TODO: Handle Error gracefully + match error_response { + RPCErrorResponse::InvalidRequest(error) => { + warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string()) + } + RPCErrorResponse::ServerError(error) => { + warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Server Error" => error.as_string()) + } + RPCErrorResponse::Unknown(error) => { + warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Unknown Error" => error.as_string()) + } + RPCErrorResponse::Success(response) => { + match response { + RPCResponse::Hello(hello_message) => { + self.sync.on_hello_response( + peer_id, + hello_message, + &mut self.network_context, + ); + } + RPCResponse::BeaconBlockRoots(response) => { + self.sync.on_beacon_block_roots_response( + peer_id, + response, + &mut self.network_context, + ); + } + RPCResponse::BeaconBlockHeaders(response) => { + match self.decode_block_headers(response) { + Ok(decoded_block_headers) => { + self.sync.on_beacon_block_headers_response( + peer_id, + decoded_block_headers, + &mut self.network_context, + ); + } + Err(_e) => { + warn!(self.log, "Peer sent invalid block headers";"peer" => format!("{:?}", peer_id)) + } + } + } + RPCResponse::BeaconBlockBodies(response) => { + match self.decode_block_bodies(response) { + Ok(decoded_block_bodies) => { + self.sync.on_beacon_block_bodies_response( + peer_id, + decoded_block_bodies, + &mut self.network_context, + ); + } + Err(_e) => { + warn!(self.log, "Peer sent invalid block bodies";"peer" => format!("{:?}", peer_id)) + } + } + } + RPCResponse::BeaconChainState(_) => { + // We do not implement this endpoint, it is not required and will only likely be + // useful for light-client support in later phases. + // + // Theoretically, we shouldn't reach this code because we should never send a + // beacon state RPC request. + warn!(self.log, "BeaconChainState RPC call is not supported."); + } + } + } } + } - match response { - RPCResponse::Hello(hello_message) => { - self.sync - .on_hello_response(peer_id, hello_message, &mut self.network_context); - } - RPCResponse::BeaconBlockRoots(response) => { - self.sync.on_beacon_block_roots_response( - peer_id, - response, - &mut self.network_context, - ); - } - RPCResponse::BeaconBlockHeaders(response) => { - self.sync.on_beacon_block_headers_response( - peer_id, - response, - &mut self.network_context, - ); - } - RPCResponse::BeaconBlockBodies(response) => { - self.sync.on_beacon_block_bodies_response( - peer_id, - response, - &mut self.network_context, - ); - } - RPCResponse::BeaconChainState(_) => { - // We do not implement this endpoint, it is not required and will only likely be - // useful for light-client support in later phases. - // - // Theoretically, we shouldn't reach this code because we should never send a - // beacon state RPC request. - warn!(self.log, "BeaconChainState RPC call is not supported."); - } - }; + /// Verifies and decodes the ssz-encoded block bodies received from peers. + fn decode_block_bodies( + &self, + bodies_response: BeaconBlockBodiesResponse, + ) -> Result { + //TODO: Implement faster block verification before decoding entirely + let block_bodies = Vec::from_ssz_bytes(&bodies_response.block_bodies)?; + Ok(DecodedBeaconBlockBodiesResponse { + block_roots: bodies_response + .block_roots + .expect("Responses must have associated roots"), + block_bodies, + }) + } + + /// Verifies and decodes the ssz-encoded block headers received from peers. + fn decode_block_headers( + &self, + headers_response: BeaconBlockHeadersResponse, + ) -> Result, DecodeError> { + //TODO: Implement faster header verification before decoding entirely + Vec::from_ssz_bytes(&headers_response.headers) + } + + /// Handle various RPC errors + fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { + //TODO: Handle error correctly + warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "Request Id" => format!("{}", request_id), "Error" => format!("{:?}", error)); } /// Handle RPC messages @@ -221,25 +264,17 @@ impl MessageHandler { } } +// TODO: RPC Rewrite makes this struct fairly pointless pub struct NetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender, - /// A mapping of peers and the RPC id we have sent an RPC request to. - outstanding_outgoing_request_ids: HashMap<(PeerId, RequestId), Instant>, - /// Stores the next `RequestId` we should include on an outgoing `RPCRequest` to a `PeerId`. - outgoing_request_ids: HashMap, /// The `MessageHandler` logger. log: slog::Logger, } impl NetworkContext { pub fn new(network_send: mpsc::UnboundedSender, log: slog::Logger) -> Self { - Self { - network_send, - outstanding_outgoing_request_ids: HashMap::new(), - outgoing_request_ids: HashMap::new(), - log, - } + Self { network_send, log } } pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { @@ -248,21 +283,12 @@ impl NetworkContext { } pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { - let id = self.generate_request_id(&peer_id); - - self.outstanding_outgoing_request_ids - .insert((peer_id.clone(), id), Instant::now()); - - self.send_rpc_event( - peer_id, - RPCEvent::Request { - id, - method_id: rpc_request.method_id(), - body: rpc_request, - }, - ); + // Note: There is currently no use of keeping track of requests. However the functionality + // is left here for future revisions. + self.send_rpc_event(peer_id, RPCEvent::Request(0, rpc_request)); } + //TODO: Handle Error responses pub fn send_rpc_response( &mut self, peer_id: PeerId, @@ -271,11 +297,7 @@ impl NetworkContext { ) { self.send_rpc_event( peer_id, - RPCEvent::Response { - id: request_id, - method_id: rpc_response.method_id(), - result: rpc_response, - }, + RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), ); } @@ -292,17 +314,5 @@ impl NetworkContext { "Could not send RPC message to the network service" ) }); - // - } - - /// Returns the next `RequestId` for sending an `RPCRequest` to the `peer_id`. - fn generate_request_id(&mut self, peer_id: &PeerId) -> RequestId { - let next_id = self - .outgoing_request_ids - .entry(peer_id.clone()) - .and_modify(RequestId::increment) - .or_insert_with(|| RequestId::from(1)); - - next_id.previous() } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index a2265bb8e..a771f8add 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,6 +8,7 @@ use eth2_libp2p::{Libp2pEvent, PeerId}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; +use parking_lot::Mutex; use slog::{debug, info, o, trace}; use std::marker::PhantomData; use std::sync::Arc; @@ -16,9 +17,9 @@ use tokio::sync::{mpsc, oneshot}; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - //libp2p_service: Arc>, + libp2p_service: Arc>, _libp2p_exit: oneshot::Sender<()>, - network_send: mpsc::UnboundedSender, + _network_send: mpsc::UnboundedSender, _phantom: PhantomData, //message_handler: MessageHandler, //message_handler_send: Sender } @@ -43,38 +44,33 @@ impl Service { // launch libp2p service let libp2p_log = log.new(o!("Service" => "Libp2p")); - let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; + let libp2p_service = Arc::new(Mutex::new(LibP2PService::new(config.clone(), libp2p_log)?)); // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. let libp2p_exit = spawn_service( - libp2p_service, + libp2p_service.clone(), network_recv, message_handler_send, executor, log, )?; let network_service = Service { + libp2p_service, _libp2p_exit: libp2p_exit, - network_send: network_send.clone(), + _network_send: network_send.clone(), _phantom: PhantomData, }; Ok((Arc::new(network_service), network_send)) } - // TODO: Testing only - pub fn send_message(&mut self) { - self.network_send - .try_send(NetworkMessage::Send( - PeerId::random(), - OutgoingMessage::NotifierTest, - )) - .unwrap(); + pub fn libp2p_service(&self) -> Arc> { + self.libp2p_service.clone() } } fn spawn_service( - libp2p_service: LibP2PService, + libp2p_service: Arc>, network_recv: mpsc::UnboundedReceiver, message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, @@ -103,7 +99,7 @@ fn spawn_service( //TODO: Potentially handle channel errors fn network_service( - mut libp2p_service: LibP2PService, + libp2p_service: Arc>, mut network_recv: mpsc::UnboundedReceiver, mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, @@ -115,28 +111,18 @@ fn network_service( not_ready_count = 0; // poll the network channel match network_recv.poll() { - Ok(Async::Ready(Some(message))) => { - match message { - // TODO: Testing message - remove - NetworkMessage::Send(peer_id, outgoing_message) => { - match outgoing_message { - OutgoingMessage::RPC(rpc_event) => { - trace!(log, "Sending RPC Event: {:?}", rpc_event); - //TODO: Make swarm private - //TODO: Implement correct peer id topic message handling - libp2p_service.swarm.send_rpc(peer_id, rpc_event); - } - OutgoingMessage::NotifierTest => { - // debug!(log, "Received message from notifier"); - } - }; - } - NetworkMessage::Publish { topics, message } => { - debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.swarm.publish(topics, *message); + Ok(Async::Ready(Some(message))) => match message { + NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message { + OutgoingMessage::RPC(rpc_event) => { + trace!(log, "Sending RPC Event: {:?}", rpc_event); + libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); } + }, + NetworkMessage::Publish { topics, message } => { + debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); + libp2p_service.lock().swarm.publish(topics, *message); } - } + }, Ok(Async::NotReady) => not_ready_count += 1, Ok(Async::Ready(None)) => { return Err(eth2_libp2p::error::Error::from("Network channel closed")); @@ -147,19 +133,25 @@ fn network_service( } // poll the swarm - match libp2p_service.poll() { + match libp2p_service.lock().poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) - .map_err(|_| "failed to send rpc to handler")?; + .map_err(|_| "Failed to send RPC to handler")?; } Libp2pEvent::PeerDialed(peer_id) => { debug!(log, "Peer Dialed: {:?}", peer_id); message_handler_send .try_send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "failed to send rpc to handler")?; + .map_err(|_| "Failed to send PeerDialed to handler")?; + } + Libp2pEvent::PeerDisconnected(peer_id) => { + debug!(log, "Peer Disconnected: {:?}", peer_id); + message_handler_send + .try_send(HandlerMessage::PeerDisconnected(peer_id)) + .map_err(|_| "Failed to send PeerDisconnected to handler")?; } Libp2pEvent::PubsubMessage { source, message, .. @@ -176,12 +168,13 @@ fn network_service( Err(_) => not_ready_count += 1, } } + Ok(Async::NotReady) }) } /// Types of messages that the network service can receive. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire @@ -194,10 +187,8 @@ pub enum NetworkMessage { } /// Type of outgoing messages that can be sent through the network service. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum OutgoingMessage { /// Send an RPC request/response. RPC(RPCEvent), - //TODO: Remove - NotifierTest, } diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 5ce921057..91594b999 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -5,6 +5,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::PeerId; use slog::{debug, error, info, o, trace, warn}; +use ssz::Encode; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -30,6 +31,7 @@ const SHOULD_NOT_FORWARD_GOSSIP_BLOCK: bool = false; #[derive(Clone, Copy, Debug)] pub struct PeerSyncInfo { network_id: u8, + chain_id: u64, latest_finalized_root: Hash256, latest_finalized_epoch: Epoch, best_root: Hash256, @@ -40,6 +42,7 @@ impl From for PeerSyncInfo { fn from(hello: HelloMessage) -> PeerSyncInfo { PeerSyncInfo { network_id: hello.network_id, + chain_id: hello.chain_id, latest_finalized_root: hello.latest_finalized_root, latest_finalized_epoch: hello.latest_finalized_epoch, best_root: hello.best_root, @@ -106,6 +109,17 @@ impl SimpleSync { self.known_peers.remove(&peer_id); } + /// Handle a peer disconnect. + /// + /// Removes the peer from `known_peers`. + pub fn on_disconnect(&mut self, peer_id: PeerId) { + info!( + self.log, "Peer Disconnected"; + "peer" => format!("{:?}", peer_id), + ); + self.known_peers.remove(&peer_id); + } + /// Handle the connection of a new peer. /// /// Sends a `Hello` message to the peer. @@ -407,6 +421,9 @@ impl SimpleSync { }) .collect(); + // ssz-encode the headers + let headers = headers.as_ssz_bytes(); + network.send_rpc_response( peer_id, request_id, @@ -418,17 +435,17 @@ impl SimpleSync { pub fn on_beacon_block_headers_response( &mut self, peer_id: PeerId, - res: BeaconBlockHeadersResponse, + headers: Vec, network: &mut NetworkContext, ) { debug!( self.log, "BlockHeadersResponse"; "peer" => format!("{:?}", peer_id), - "count" => res.headers.len(), + "count" => headers.len(), ); - if res.headers.is_empty() { + if headers.is_empty() { warn!( self.log, "Peer returned empty block headers response. PeerId: {:?}", peer_id @@ -438,9 +455,7 @@ impl SimpleSync { // Enqueue the headers, obtaining a list of the roots of the headers which were newly added // to the queue. - let block_roots = self - .import_queue - .enqueue_headers(res.headers, peer_id.clone()); + let block_roots = self.import_queue.enqueue_headers(headers, peer_id.clone()); if !block_roots.is_empty() { self.request_block_bodies(peer_id, BeaconBlockBodiesRequest { block_roots }, network); @@ -482,10 +497,15 @@ impl SimpleSync { "returned" => block_bodies.len(), ); + let bytes = block_bodies.as_ssz_bytes(); + network.send_rpc_response( peer_id, request_id, - RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { block_bodies }), + RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { + block_bodies: bytes, + block_roots: None, + }), ) } @@ -493,7 +513,7 @@ impl SimpleSync { pub fn on_beacon_block_bodies_response( &mut self, peer_id: PeerId, - res: BeaconBlockBodiesResponse, + res: DecodedBeaconBlockBodiesResponse, network: &mut NetworkContext, ) { debug!( @@ -574,6 +594,7 @@ impl SimpleSync { SHOULD_FORWARD_GOSSIP_BLOCK } + BlockProcessingOutcome::FutureSlot { present_slot, block_slot, @@ -890,7 +911,9 @@ fn hello_message(beacon_chain: &BeaconChain) -> HelloMes let state = &beacon_chain.head().beacon_state; HelloMessage { + //TODO: Correctly define the chain/network id network_id: spec.chain_id, + chain_id: spec.chain_id as u64, latest_finalized_root: state.finalized_root, latest_finalized_epoch: state.finalized_epoch, best_root: beacon_chain.head().beacon_block_root,