diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index c1059d415..220948cd5 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -26,3 +26,4 @@ tokio-io = "0.1.12" smallvec = "0.6.10" fnv = "1.0.6" unsigned-varint = "0.2.2" +bytes = "0.4.12" diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs similarity index 58% rename from beacon_node/eth2-libp2p/src/rpc/codecs/base.rs rename to beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 6957ad180..466fc5c3d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -1,53 +1,77 @@ -///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. +//! This handles the various supported encoding mechanism for the Eth 2.0 RPC. -pub trait InnerCodec: Encoder + Decoder { - type Error; +use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use bytes::BufMut; +use bytes::BytesMut; +use tokio::codec::{Decoder, Encoder}; + +pub(crate) trait OutboundCodec: Encoder + Decoder { + type ErrorType; fn decode_error( &mut self, - &mut BytesMut, - ) -> Result, ::Error>; + src: &mut BytesMut, + ) -> Result, ::Error>; } -pub struct BaseInboundCodec { - /// Inner codec for handling various encodings - inner: TCodec, -} - -pub struct BaseOutboundCodec +pub(crate) struct BaseInboundCodec where - TCodec: InnerCodec, - ::Item = RPCResponse, - ::ErrorItem = ErrorMessage, + TCodec: Encoder + Decoder, { /// Inner codec for handling various encodings inner: TCodec, +} + +impl BaseInboundCodec +where + TCodec: Encoder + Decoder, +{ + pub fn new(codec: TCodec) -> Self { + BaseInboundCodec { inner: codec } + } +} + +pub(crate) struct BaseOutboundCodec +where + TOutboundCodec: OutboundCodec, +{ + /// Inner codec for handling various encodings + inner: TOutboundCodec, /// Optimisation for decoding. True if the response code has been read and we are awaiting a /// response. response_code: Option, } +impl BaseOutboundCodec +where + TOutboundCodec: OutboundCodec, +{ + pub fn new(codec: TOutboundCodec) -> Self { + BaseOutboundCodec { + inner: codec, + response_code: None, + } + } +} + impl Encoder for BaseInboundCodec where - TCodec: Encoder, - ::Item = RPCResponse, + TCodec: Decoder + Encoder, { - type Item = RPCResponse; + type Item = RPCErrorResponse; type Error = ::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.clear(); dst.reserve(1); - dst.put_u8(item.as_u8); - return self.inner.encode(); + dst.put_u8(item.as_u8()); + return self.inner.encode(item, dst); } } impl Decoder for BaseInboundCodec where - TCodec: Decoder, - ::Item: RPCrequest, - ::Error: From, + TCodec: Encoder + Decoder, { type Item = RPCRequest; type Error = ::Error; @@ -59,7 +83,7 @@ where impl Encoder for BaseOutboundCodec where - TCodec: Encoder, + TCodec: OutboundCodec + Encoder, { type Item = RPCRequest; type Error = ::Error; @@ -71,23 +95,19 @@ where impl Decoder for BaseOutboundCodec where - TCodec: InnerCodec, - ::Error: From, + TCodec: OutboundCodec + Decoder, { - type Item = RPCResponse; + type Item = RPCErrorResponse; type Error = ::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let response_code = { if let Some(resp_code) = self.response_code { - resp_code; + resp_code } else { - if src.is_empty() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "no bytes received", - )); - } + // buffer should not be empty + debug_assert!(!src.is_empty()); + let resp_byte = src.split_to(1); let resp_code_byte = [0; 1]; resp_code_byte.copy_from_slice(&resp_byte); @@ -96,8 +116,9 @@ where if let Some(response) = RPCErrorResponse::internal_data(resp_code) { self.response_code = None; - return response; + return Ok(Some(response)); } + self.response_code = Some(resp_code); resp_code } }; diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs new file mode 100644 index 000000000..68dcd4650 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod base; +pub(crate) mod ssz; diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs new file mode 100644 index 000000000..e339007d7 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -0,0 +1,239 @@ +use crate::rpc::methods::*; +use crate::rpc::{ + codec::base::OutboundCodec, + protocol::{ProtocolId, RPCError}, +}; +use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use bytes::{Bytes, BytesMut}; +use ssz::{Decode, Encode}; +use tokio::codec::{Decoder, Encoder}; +use unsigned_varint::codec::UviBytes; + +/* Inbound Codec */ + +pub struct SSZInboundCodec { + inner: UviBytes, + protocol: ProtocolId, +} + +impl SSZInboundCodec { + pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + let uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_packet_size); + + // this encoding only applies to ssz. + debug_assert!(protocol.encoding.as_str() == "ssz"); + + SSZInboundCodec { + inner: uvi_codec, + protocol, + } + } +} + +// Encoder for inbound +impl Encoder for SSZInboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + RPCErrorResponse::Success(resp) => { + match resp { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + RPCErrorResponse::EncodingError => vec![], + RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), + RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), + }; + + if !bytes.is_empty() { + // length-prefix and return + return self + .inner + .encode(Bytes::from(bytes), dst) + .map_err(RPCError::from); + } + Ok(()) + } +} + +// Decoder for inbound +impl Decoder for SSZInboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => match self.protocol.message_name.as_str() { + "hello" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes( + &packet, + )?))), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?))), + _ => Err(RPCError::InvalidProtocol( + "Unknown GOODBYE version.as_str()", + )), + }, + "beacon_block_roots" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconBlockRoots( + BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconBlockBodies( + BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCRequest::BeaconChainState( + BeaconChainStateRequest::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + }, + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } +} + +/* Outbound Codec */ + +pub struct SSZOutboundCodec { + inner: UviBytes, + protocol: ProtocolId, +} + +impl SSZOutboundCodec { + pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { + let uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_packet_size); + + // this encoding only applies to ssz. + debug_assert!(protocol.encoding.as_str() == "ssz"); + + SSZOutboundCodec { + inner: uvi_codec, + protocol, + } + } +} + +// Encoder for outbound +impl Encoder for SSZOutboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + RPCRequest::Hello(req) => req.as_ssz_bytes(), + RPCRequest::Goodbye(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(), + RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(), + }; + // length-prefix + self.inner + .encode(bytes::Bytes::from(bytes), dst) + .map_err(RPCError::from) + } +} + +// Decoder for outbound +impl Decoder for SSZOutboundCodec { + type Item = RPCResponse; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => match self.protocol.message_name.as_str() { + "hello" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( + &packet, + )?))), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), + }, + "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), + "beacon_block_roots" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconBlockRoots( + BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse { + headers: packet.to_vec(), + }, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconBlockBodies( + BeaconBlockBodiesResponse { + block_bodies: packet.to_vec(), + }, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match self.protocol.version.as_str() { + "1.0.0" => Ok(Some(RPCResponse::BeaconChainState( + BeaconChainStateResponse::from_ssz_bytes(&packet)?, + ))), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + _ => Err(RPCError::InvalidProtocol("Unknown method")), + }, + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } +} + +impl OutboundCodec for SSZOutboundCodec { + type ErrorType = ErrorMessage; + + fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { + match self.inner.decode(src).map_err(RPCError::from) { + Ok(Some(packet)) => Ok(Some(ErrorMessage::from_ssz_bytes(&packet)?)), + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs deleted file mode 100644 index 77ed8456d..000000000 --- a/beacon_node/eth2-libp2p/src/rpc/codecs/mod.rs +++ /dev/null @@ -1 +0,0 @@ -mod base; diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs deleted file mode 100644 index fd20ae57e..000000000 --- a/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs +++ /dev/null @@ -1,323 +0,0 @@ - -/// SSZ Input stream -pub struct SSZInboundSink { - inner: - protocol: ProtocolId - -impl for SSZInputStream -where - TSocket: AsyncRead + AsyncWrite -{ - - /// Set up the initial input stream object. - pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self { - - // this type of stream should only apply to ssz protocols - debug_assert!(protocol.encoding.as_str() == "ssz"); - - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_size); - - let inner = Framed::new(incomming, uvi_codec).from_err() - .with(|response| { - self.encode(response) - }) - .and_then(|bytes| { - self.decode(request) - }).into_future(); - - //TODO: add timeout - - SSZInputStream { - inner, - protocol - } - } - - /// Decodes an SSZ-encoded RPCRequest. - fn decode(&self, request: RPCRequest) { - - match self.protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), - }, - "goodbye" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol( - "Unknown GOODBYE version.as_str()", - )), - }, - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockRoots( - BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconBlockBodies( - BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => Ok(RPCRequest::BeaconChainState( - BeaconChainStateRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - } - } - - fn encode(&self, response: RPCResponse) { - - // TODO: Add error code - - match response { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } - -} - -type SSZInboundOutput = stream::AndThen>>, RPCError>, - RPCResponse, - fn(RPCResponse) -> Result, RPCError>, - Result, RPCError>, - >, - fn(BytesMut) -> Result, - Result - >; - -impl Sink for SSZInputStreamSink { - - type SinkItem = RPCResponse; - type SinkError = RPCError; - - fn start_send( - &mut self, - item: Self::SinkItem -) -> Result, Self::SinkError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Result, Self::SinkError> { - self.inner.poll_complete() - } -} - -/* Outbound specific stream */ - -// Implement our own decoder to handle the response byte - -struct SSZOutboundCodec - - - -pub struct SSZOutboundStreamSink { - inner: - protocol: ProtocolId - -impl for SSZOutboundStreamSink -where - TSocket: AsyncRead + AsyncWrite -{ - - /// Set up the initial outbound stream object. - pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self { - - // this type of stream should only apply to ssz protocols - debug_assert!(protocol.encoding.as_str() == "ssz"); - - let mut uvi_codec = UviBytes::default(); - uvi_codec.set_max_len(max_size); - - let inner = Framed::new(socket, uvi_codec).from_err() - .with(|request| { - self.encode(request) - }) - .and_then(|bytes| { - self.decode(response) - }); - - SSZOutboundStream { - inner, - protocol - } - } - - - - - - /// Decodes a response that was received on the same stream as a request. The response type should - /// therefore match the request protocol type. - pub fn decode(&self, response: Vec, - protocol: ProtocolId, - response_code: ResponseCode, - ) -> Result { - match response_code { - ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), - ResponseCode::InvalidRequest => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Invalid Request: {}", - response.error_message - ))) - } - ResponseCode::ServerError => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Remote Server Error: {}", - response.error_message - ))) - } - ResponseCode::Success => match protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), - }, - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), - }, - "goodbye" => Err(RPCError::Custom( - "GOODBYE should not have a response".into(), - )), - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockRoots( - BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse { headers: packet }, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { - block_bodies: packet, - })), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconChainState( - BeaconChainStateResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - }, - } - } - - fn encode(&self, response: RPCResponse) { - - match response { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } - -} - -type SSZOutboundStream = stream::AndThen>>, RPCError>, - RPCResponse, - fn(RPCResponse) -> Result, RPCError>, - Result, RPCError>, - >, - fn(BytesMut) -> Result, - Result - >; - - -impl Stream for SSZInputStreamSink { - - type Item = SSZInboundOutput; - type Error = RPCError; - - fn poll(&mut self) -> Result>, Self::Error> { - self.inner.poll() - } -} - -impl Sink for SSZInputStreamSink { - - type SinkItem = RPCResponse; - type SinkError = RPCError; - - fn start_send( - &mut self, - item: Self::SinkItem -) -> Result, Self::SinkError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Result, Self::SinkError> { - self.inner.poll_complete() - } -} - - - - - - - diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 066da1351..9e9087f9e 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -162,3 +162,76 @@ pub struct BeaconChainStateResponse { /// The values corresponding the to the requested tree hashes. pub values: bool, //TBD - stubbed with encodable bool } + +/* RPC Handling and Grouping */ +// Collection of enums and structs used by the Codecs to encode/decode RPC messages + +#[derive(Debug, Clone)] +pub enum RPCResponse { + /// A HELLO message. + Hello(HelloMessage), + /// An empty field returned from sending a GOODBYE request. + Goodbye, // empty value - required for protocol handler + /// A response to a get BEACON_BLOCK_ROOTS request. + BeaconBlockRoots(BeaconBlockRootsResponse), + /// A response to a get BEACON_BLOCK_HEADERS request. + BeaconBlockHeaders(BeaconBlockHeadersResponse), + /// A response to a get BEACON_BLOCK_BODIES request. + BeaconBlockBodies(BeaconBlockBodiesResponse), + /// A response to a get BEACON_CHAIN_STATE request. + BeaconChainState(BeaconChainStateResponse), +} + +pub enum RPCErrorResponse { + Success(RPCResponse), + EncodingError, + InvalidRequest(ErrorMessage), + ServerError(ErrorMessage), + Unknown(ErrorMessage), +} + +impl RPCErrorResponse { + /// If a response has no payload, returns the variant corresponding to the code. + pub fn internal_data(response_code: u8) -> Option { + match response_code { + // EncodingError + 1 => Some(RPCErrorResponse::EncodingError), + // All others require further data + _ => None, + } + } + + /// Used to encode the response. + pub fn as_u8(&self) -> u8 { + match self { + RPCErrorResponse::Success(_) => 0, + RPCErrorResponse::EncodingError => 1, + RPCErrorResponse::InvalidRequest(_) => 2, + RPCErrorResponse::ServerError(_) => 3, + RPCErrorResponse::Unknown(_) => 255, + } + } + + /// Tells the codec whether to decode as an RPCResponse or an error. + pub fn is_response(response_code: u8) -> bool { + match response_code { + 0 => true, + _ => false, + } + } + + /// Builds an RPCErrorResponse from a response code and an ErrorMessage + pub fn from_error(response_code: u8, err: ErrorMessage) -> Self { + match response_code { + 2 => RPCErrorResponse::InvalidRequest(err), + 3 => RPCErrorResponse::ServerError(err), + _ => RPCErrorResponse::Unknown(err), + } + } +} + +#[derive(Encode, Decode)] +pub struct ErrorMessage { + /// The UTF-8 encoded Error message string. + error_message: Vec, +} diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index fc7560eff..ab47b4362 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -1,8 +1,9 @@ -///! The Ethereum 2.0 Wire Protocol -///! -///! This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate -///! direct peer-to-peer communication primarily for sending/receiving chain information for -///! syncing. +//! The Ethereum 2.0 Wire Protocol +//! +//! This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate +//! direct peer-to-peer communication primarily for sending/receiving chain information for +//! syncing. + use futures::prelude::*; use handler::RPCHandler; use libp2p::core::protocols_handler::ProtocolsHandler; @@ -10,17 +11,17 @@ use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::HelloMessage; -pub use protocol::{RPCProtocol, RPCRequest, RPCResponse}; +pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse}; +pub use protocol::{RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -mod codecs; +pub(crate) mod codec; mod handler; pub mod methods; mod protocol; -mod request_response; +// mod request_response; /// The return type used in the behaviour and the resultant event from the protocols handler. #[derive(Debug, Clone)] diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index bb593f3c1..af94e476d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,13 +1,18 @@ use super::methods::*; -use super::request_response::{rpc_request_response, RPCRequestResponse}; +use crate::rpc::codec::{ + base::{BaseInboundCodec, BaseOutboundCodec}, + ssz::{SSZInboundCodec, SSZOutboundCodec}, +}; use futures::future::Future; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use ssz::{Decode, Encode}; +use ssz::Encode; use ssz_derive::{Decode, Encode}; use std::io; use std::time::Duration; +use tokio::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::prelude::future::MapErr; +use tokio::prelude::*; use tokio::util::FutureExt; /// The maximum bytes that can be sent across the RPC. @@ -17,7 +22,7 @@ const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/"; /// The number of seconds to wait for a response before the stream is terminated. const RESPONSE_TIMEOUT: u64 = 10; -/// Implementation of the `ConnectionUpgrade` for the rpc protocol. +/// Implementation of the `ConnectionUpgrade` for the RPC protocol. #[derive(Debug, Clone)] pub struct RPCProtocol; @@ -96,6 +101,10 @@ impl Into for ProtocolId { // The inbound protocol reads the request, decodes it and returns the stream to the protocol // handler to respond to once ready. +enum InboundCodec { + SSZ(BaseInboundCodec), +} + type FnDecodeRPCEvent = fn( upgrade::Negotiated, @@ -107,8 +116,11 @@ impl InboundUpgrade for RPCProtocol where TSocket: AsyncRead + AsyncWrite, { - type Output = (upgrade::Negotiated, RPCRequest, ProtocolId); + type Output = RPCRequest; type Error = RPCError; + + type Future = Box>; + /* type Future = MapErr< tokio_timer::Timeout< upgrade::ReadRespond< @@ -119,25 +131,48 @@ where >, fn(tokio::timer::timeout::Error) -> RPCError, >; + */ fn upgrade_inbound( self, socket: upgrade::Negotiated, protocol: &'static [u8], ) -> Self::Future { - upgrade::read_respond(socket, MAX_RPC_SIZE, protocol, { - |socket, packet, protocol| { - let protocol_id = ProtocolId::from_bytes(protocol)?; - Ok(( - socket, - RPCRequest::decode(packet, protocol_id)?, - protocol_id, - )) + let protocol_id = match ProtocolId::from_bytes(protocol) { + Ok(v) => v, + Err(e) => return Box::new(futures::future::err(e)), + }; + + match protocol_id.encoding.as_str() { + "ssz" | _ => { + let codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096)); + Box::new( + Framed::new(socket, codec) + .into_future() + .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) + .map_err(RPCError::from) + .and_then(|(madouby, _)| match madouby { + Some(x) => futures::future::ok(x), + None => futures::future::err(RPCError::Custom("Go home".into())), + }), + ) } } - as FnDecodeRPCEvent) - .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) - .map_err(RPCError::from) + + /* + upgrade::read_respond(socket, MAX_RPC_SIZE, protocol, { + |socket, packet, protocol| { + let protocol_id = ProtocolId::from_bytes(protocol)?; + Ok(( + socket, + RPCRequest::decode(packet, protocol_id)?, + protocol_id, + )) + } + } + as FnDecodeRPCEvent) + } + */ } } @@ -214,78 +249,7 @@ impl RPCRequest { } } -/* Response Type */ - -#[derive(Debug, Clone)] -pub enum RPCResponse { - /// A HELLO message. - Hello(HelloMessage), - /// An empty field returned from sending a GOODBYE request. - Goodbye, // empty value - required for protocol handler - /// A response to a get BEACON_BLOCK_ROOTS request. - BeaconBlockRoots(BeaconBlockRootsResponse), - /// A response to a get BEACON_BLOCK_HEADERS request. - BeaconBlockHeaders(BeaconBlockHeadersResponse), - /// A response to a get BEACON_BLOCK_BODIES request. - BeaconBlockBodies(BeaconBlockBodiesResponse), - /// A response to a get BEACON_CHAIN_STATE request. - BeaconChainState(BeaconChainStateResponse), -} - -pub enum RPCErrorResponse { - Success(RPCResponse), - EncodingError, - InvalidRequest(ErrorMessage), - ServerError(ErrorMessage), - Unknown(ErrorMessage), -} - -impl RPCErrorResponse { - /// If a response has no payload, returns the variant corresponding to the code. - pub fn internal_data(response_code: u8) -> Option { - match response_code { - // EncodingError - 1 => Some(RPCErrorResponse::EncodingError), - // All others require further data - _ => None, - } - } - - /// Used to encode the response. - pub fn as_u8(&self) -> u8 { - match self { - RPCErrorResponse::Success(_) => 0, - RPCErrorResponse::EncodingError => 1, - RPCErrorResponse::InvalidRequest(_) => 2, - RPCErrorResponse::ServerError(_) => 3, - RPCErrorResponse::Unknown(_) => 255, - } - } - - /// Tells the codec whether to decode as an RPCResponse or an error. - pub fn is_response(response_code:u8) -> bool { - match response_code { - 0 => true, - _ => false, - } - - /// Builds an RPCErrorResponse from a response code and an ErrorMessage - pub fn from_error(response_code:u8, err: ErrorMessage) -> Self { - match response_code { - 2 => RPCErrorResponse::InvalidRequest(err), - 3 => RPCErrorResponse::ServerError(err), - _ => RPCErrorResponse::Unknown(err), - } -} - -#[derive(Encode, Decode)] -struct ErrorMessage { - /// The UTF-8 encoded Error message string. - error_message: Vec, -} - -// todo: SSZ-Encode -impl RPCResponse {} +/* RPC Response type - used for outbound upgrades */ /* Outbound upgrades */ @@ -295,37 +259,47 @@ where { type Output = RPCResponse; type Error = RPCError; - type Future = MapErr< - tokio_timer::Timeout, Vec>>, - fn(tokio::timer::timeout::Error) -> RPCError, - >; - + type Future = Box>; fn upgrade_outbound( self, socket: upgrade::Negotiated, protocol: Self::Info, ) -> Self::Future { - let protocol_id = ProtocolId::from_bytes(&protocol) - .expect("Protocol ID must be valid for outbound requests"); + panic!() - let request_bytes = self - .encode(protocol_id) - .expect("Should be able to encode a supported protocol"); - // if sending a goodbye, drop the stream and return an empty GOODBYE response - let short_circuit_return = if let RPCRequest::Goodbye(_) = self { - Some(RPCResponse::Goodbye) - } else { - None + /* + let protocol_id = match ProtocolId::from_bytes(&protocol) { + Ok(v) => v, + Err(e) => return futures::future::err(e), }; - rpc_request_response( - socket, - request_bytes, - MAX_RPC_SIZE, - short_circuit_return, - protocol_id, - ) - .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) - .map_err(RPCError::from) + + // select which codec to use + let inbound_stream = match protocol_id.encoding.as_str() { + "ssz" => { + let codec = BaseInboundCodec::new(SSZCodec::new()); + Framed::new(socket, codec).send(self) + } + _ => futures::future::err(RPCError::InvalidProtocol("Unsupported encoding")), + }; + + // do not wait for a timeout if we send a GOODBYE request + match protocol_id.message_name.as_str() { + // goodbye messages do not have a response + "goodbye" => inbound_stream.and_then(|| { + RPCErrorResponse::Unknown(ErrorMessage { + error_message: String::from("goodbye response").as_bytes(), + }) + }), + // get a response for all other requests + _ => inbound_stream.and_then(|stream| { + stream + .into_future() + .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) + .map(|resp, _| resp) + .map_err(RPCError::from) + }), + } + */ } }