diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs new file mode 100644 index 000000000..5fe1aad8a --- /dev/null +++ b/beacon_node/eth2-libp2p/src/rpc/codecs/base.rs @@ -0,0 +1,442 @@ + +///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. +///! +///! Currently supported encodings are: +///! - ssz - Varint length-prefixed SSZ-encoding. + + +pub struct BaseCodec { + /// Inner codec for handling various encodings + inner: TCodec, + /// Optimisation for decoding. True if the response code has been read and we are awaiting a + /// response. + read_response_code: bool, +} + + +impl Encoder for BaseOutboundCodec +where + TCodec: Encoder +{ + type Item = RPCRequest; + type Error = ::Error; + + fn encode( + &mut self, + item: Self::Item, + dst: &mut BytesMut + ) -> Result<(), Self::Error> { + self.inner.encode(item, dst) + } +} + + +impl Encoder for BaseInboundCodec +where + TCodec: Encoder +{ + type Item = RPCResponse; + type Error = ::Error; + + fn encode( + &mut self, + item: Self::Item, + dst: &mut BytesMut + ) -> Result<(), Self::Error> { + + match item { + RPCResponse::Error(response) => { + match response = { + ErrorResponse::EncodingError => { + dst.clear(); + dst.reserve(1); + dst.put(response as u8); + return; + } + ErrorResponse:: + self.inner.encode(item, dst) + } +} + + +impl Decoder for BaseCodec +where + TCodec: Decoder, + ::Error: From, +{ + + type Item = RPCResponse; + type Error = ::Error; + + fn decode( + &mut self, + src: &mut BytesMut + ) -> Result, Self::Error> { + + if !self.read_response_code { + if src.len() < 1 { + return Err(io::Error::new(io::ErrorKind::InvalidData, "no bytes received")); + } + + let resp_code_byte = [0; 1]; + // data must be only 1-byte - this cannot panic + resp_code_byte.copy_from_slice(&src); + let response_code = + ResponseCode::from(u8::from_be_bytes(resp_code_byte)); + match response_code { + ResponseCode::EncodingError => { + // invalid encoding + let response = RPCResponse::Error("Invalid Encoding".into()); + return Ok(Async::Ready(response)); + } + ResponseCode::Success + | ResponseCode::InvalidRequest + | ResponseCode::ServerError => { + // need to read another packet + self.inner = RPCRequestResponseInner::Read( + read_one(socket, max_size), + response_code, + ) + } + ResponseCode::Unknown => { + // unknown response code + let response = RPCResponse::Error(format!( + "Unknown response code: {}", + (response_code as u8) + )); + return Ok(Async::Ready(response)); + + + + + + } + +} + + + + + +/// SSZ Input stream +pub struct SSZInboundSink { + inner: + protocol: ProtocolId + +impl for SSZInputStream +where + TSocket: AsyncRead + AsyncWrite +{ + + /// Set up the initial input stream object. + pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self { + + // this type of stream should only apply to ssz protocols + debug_assert!(protocol.encoding.as_str() == "ssz"); + + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_size); + + let inner = Framed::new(incomming, uvi_codec).from_err() + .with(|response| { + self.encode(response) + }) + .and_then(|bytes| { + self.decode(request) + }).into_future(); + + //TODO: add timeout + + SSZInputStream { + inner, + protocol + } + } + + /// Decodes an SSZ-encoded RPCRequest. + fn decode(&self, request: RPCRequest) { + + match self.protocol.message_name.as_str() { + "hello" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol( + "Unknown GOODBYE version.as_str()", + )), + }, + "beacon_block_roots" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockRoots( + BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockBodies( + BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconChainState( + BeaconChainStateRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + } + } + + fn encode(&self, response: RPCResponse) { + + // TODO: Add error code + + match response { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + +} + +type SSZInboundOutput = stream::AndThen>>, RPCError>, + RPCResponse, + fn(RPCResponse) -> Result, RPCError>, + Result, RPCError>, + >, + fn(BytesMut) -> Result, + Result + >; + +impl Sink for SSZInputStreamSink { + + type SinkItem = RPCResponse; + type SinkError = RPCError; + + fn start_send( + &mut self, + item: Self::SinkItem +) -> Result, Self::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.inner.poll_complete() + } +} + +/* Outbound specific stream */ + +// Implement our own decoder to handle the response byte + +struct SSZOutboundCodec + + + +pub struct SSZOutboundStreamSink { + inner: + protocol: ProtocolId + +impl for SSZOutboundStreamSink +where + TSocket: AsyncRead + AsyncWrite +{ + + /// Set up the initial outbound stream object. + pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self { + + // this type of stream should only apply to ssz protocols + debug_assert!(protocol.encoding.as_str() == "ssz"); + + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_size); + + let inner = Framed::new(socket, uvi_codec).from_err() + .with(|request| { + self.encode(request) + }) + .and_then(|bytes| { + self.decode(response) + }); + + SSZOutboundStream { + inner, + protocol + } + } + + + + + + /// Decodes a response that was received on the same stream as a request. The response type should + /// therefore match the request protocol type. + pub fn decode(&self, response: Vec, + protocol: ProtocolId, + response_code: ResponseCode, + ) -> Result { + match response_code { + ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), + ResponseCode::InvalidRequest => { + let response = match protocol.encoding.as_str() { + "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, + _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), + }; + Ok(RPCResponse::Error(format!( + "Invalid Request: {}", + response.error_message + ))) + } + ResponseCode::ServerError => { + let response = match protocol.encoding.as_str() { + "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, + _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), + }; + Ok(RPCResponse::Error(format!( + "Remote Server Error: {}", + response.error_message + ))) + } + ResponseCode::Success => match protocol.message_name.as_str() { + "hello" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), + }, + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), + }, + "goodbye" => Err(RPCError::Custom( + "GOODBYE should not have a response".into(), + )), + "beacon_block_roots" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockRoots( + BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse { headers: packet }, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { + block_bodies: packet, + })), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconChainState( + BeaconChainStateResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + }, + } + } + + fn encode(&self, response: RPCResponse) { + + match response { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + +} + +type SSZOutboundStream = stream::AndThen>>, RPCError>, + RPCResponse, + fn(RPCResponse) -> Result, RPCError>, + Result, RPCError>, + >, + fn(BytesMut) -> Result, + Result + >; + + +impl Stream for SSZInputStreamSink { + + type Item = SSZInboundOutput; + type Error = RPCError; + + fn poll(&mut self) -> Result>, Self::Error> { + self.inner.poll() + } +} + +impl Sink for SSZInputStreamSink { + + type SinkItem = RPCResponse; + type SinkError = RPCError; + + fn start_send( + &mut self, + item: Self::SinkItem +) -> Result, Self::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.inner.poll_complete() + } +} + + + + + + + diff --git a/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codecs/ssz.rs new file mode 100644 index 000000000..e69de29bb diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 4d1f0b021..ce58c5607 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -212,81 +212,6 @@ impl RPCRequest { RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(), } } - - // This function can be extended to provide further logic for supporting various protocol versions/encoding - /// Decodes a request received from our peer. - pub fn decode(packet: Vec, protocol: ProtocolId) -> Result { - match protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), - }, - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), - }, - "goodbye" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown GOODBYE encoding")), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown GOODBYE version.as_str()", - )), - }, - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCRequest::BeaconBlockRoots( - BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCRequest::BeaconBlockBodies( - BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCRequest::BeaconChainState( - BeaconChainStateRequest::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - } - } } /* Response Type */ @@ -305,163 +230,61 @@ pub enum RPCResponse { BeaconBlockBodies(BeaconBlockBodiesResponse), /// A response to a get BEACON_CHAIN_STATE request. BeaconChainState(BeaconChainStateResponse), - /// The Error returned from the peer during a request. - Error(String), } -pub enum ResponseCode { - Success = 0, - EncodingError = 1, - InvalidRequest = 2, - ServerError = 3, - Unknown = 255, +pub enum RPCErrorResponse { + Success(RPCResponse), + EncodingError, + InvalidRequest(ErrorMessage), + ServerError(ErrorMessage), + Unknown(ErrorMessage), } -impl From for ResponseCode { - fn from(val: u8) -> ResponseCode { - match val { - 0 => ResponseCode::Success, - 1 => ResponseCode::EncodingError, - 2 => ResponseCode::InvalidRequest, - 3 => ResponseCode::ServerError, - _ => ResponseCode::Unknown, +impl RPCErrorResponse { + /// If a response has no payload, returns the variant corresponding to the code. + pub fn internal_data(response_code: u8) -> Option { + match response_code { + // EncodingError + 1 => Some(RPCErrorResponse::EncodingError), + // All others require further data + _ => None, } } -} -impl Into for ResponseCode { - fn into(self) -> u8 { - self as u8 + pub fn as_u8(&self) -> u8 { + match self { + RPCErrorResponse::Success(_) => 0, + RPCErrorResponse::EncodingError => 1, + RPCErrorResponse::InvalidRequest(_) => 2, + RPCErrorResponse::ServerError(_) => 3, + RPCErrorResponse::Unknown(_) => 255, + } } + + /// Tells the codec whether to decode as an RPCResponse or an error. + pub fn is_response(response_code:u8) -> bool { + match response_code { + 0 => true, + _ => false, + } + + /// Builds an RPCErrorResponse from a response code and an ErrorMessage + pub fn from_error(response_code:u8, err: ErrorMessage) -> Self { + match response_code { + 2 => RPCErrorResponse::InvalidRequest(err), + 3 => RPCErrorResponse::ServerError(err), + _ => RPCErrorResponse::Unknown(err), + } } #[derive(Encode, Decode)] -struct ErrorResponse { - error_message: String, +struct ErrorMessage { + /// The UTF-8 encoded Error message string. + error_message: Vec, } -impl RPCResponse { - /// Decodes a response that was received on the same stream as a request. The response type should - /// therefore match the request protocol type. - pub fn decode( - packet: Vec, - protocol: ProtocolId, - response_code: ResponseCode, - ) -> Result { - match response_code { - ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), - ResponseCode::InvalidRequest => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Invalid Request: {}", - response.error_message - ))) - } - ResponseCode::ServerError => { - let response = match protocol.encoding.as_str() { - "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, - _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), - }; - Ok(RPCResponse::Error(format!( - "Remote Server Error: {}", - response.error_message - ))) - } - ResponseCode::Success => match protocol.message_name.as_str() { - "hello" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), - }, - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), - }, - "goodbye" => Err(RPCError::Custom( - "GOODBYE should not have a response".into(), - )), - "beacon_block_roots" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockRoots( - BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), - }, - "beacon_block_headers" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse { headers: packet }, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), - }, - "beacon_block_bodies" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { - block_bodies: packet, - })), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match protocol.version.as_str() { - "1.0.0" => match protocol.encoding.as_str() { - "ssz" => Ok(RPCResponse::BeaconChainState( - BeaconChainStateResponse::from_ssz_bytes(&packet)?, - )), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE encoding", - )), - }, - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - }, - } - } - - /// Encodes the Response object based on the negotiated protocol. - pub fn encode(&self, protocol: ProtocolId) -> Result, RPCError> { - // Match on the encoding and in the future, the version - match protocol.encoding.as_str() { - "ssz" => Ok(self.ssz_encode()), - _ => { - return Err(RPCError::Custom(format!( - "Unknown Encoding: {}", - protocol.encoding - ))) - } - } - } - - fn ssz_encode(&self) -> Vec { - match self { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::Goodbye => unreachable!(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), - } - } -} +// todo: SSZ-Encode +impl RPCResponse {} /* Outbound upgrades */ diff --git a/codec.rs b/codec.rs new file mode 100644 index 000000000..a4e81b5f0 --- /dev/null +++ b/codec.rs @@ -0,0 +1,461 @@ + +///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. +///! +///! Currently supported encodings are: +///! - ssz - Varint length-prefixed SSZ-encoding. + + +pub trait InnerCodec: Encoder + Decoder { + type Error; + + pub fn decode_error(&mut self, &mut BytesMut) -> Result, ::Error>; +} + +pub struct BaseInboundCodec { + /// Inner codec for handling various encodings + inner: TCodec, +} + +impl Encoder for BaseInboundCodec +where + TCodec: Encoder, + ::Item = RPCResponse, +{ + type Item = RPCResponse; + type Error = ::Error; + + fn encode( + &mut self, + item: Self::Item, + dst: &mut BytesMut + ) -> Result<(), Self::Error> { + dst.clear(); + dst.reserve(1); + dst.put_u8(item.as_u8); + return self.inner.encode(); + } +} + + +impl Decoder for BaseInboundCodec +where + TCodec: Decoder, + ::Item: RPCrequest, + ::Error: From, +{ + + type Item = RPCRequest; + type Error = ::Error; + + fn decode( + &mut self, + src: &mut BytesMut + ) -> Result, Self::Error> { + self.inner.decode(src) + } +} + +pub struct BaseOutboundCodec +where + TCodec: InnerCodec, + ::Item = RPCResponse, + ::ErrorItem = ErrorMessage, + { + /// Inner codec for handling various encodings + inner: TCodec, + /// Optimisation for decoding. True if the response code has been read and we are awaiting a + /// response. + response_code: Option, +} + +impl Encoder for BaseOutboundCodec +where + TCodec: Encoder +{ + type Item = RPCRequest; + type Error = ::Error; + + fn encode( + &mut self, + item: Self::Item, + dst: &mut BytesMut + ) -> Result<(), Self::Error> { + self.inner.encode(item, dst) + } +} + + +impl Decoder for BaseOutboundCodec +where + TCodec: InnerCodec, + ::Error: From, +{ + + type Item = RPCResponse; + type Error = ::Error; + + fn decode( + &mut self, + src: &mut BytesMut + ) -> Result, Self::Error> { + + + let response_code = { + if let Some(resp_code) = self.response_code { + resp_code; + } + else { + if src.is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidData, "no bytes received")); + } + let resp_byte = src.split_to(1); + let resp_code_byte = [0; 1]; + resp_code_byte.copy_from_slice(&resp_byte); + + let resp_code = u8::from_be_bytes(resp_code_byte); + + if let Some(response) = RPCErrorResponse::internal_data(resp_code) { + self.response_code = None; + return response; + } + resp_code + } + }; + + if RPCErrorResponse::is_response(response_code) { + // decode an actual response + return self.inner.decode(src).map(|r| r.map(|resp| RPCErrorResponse::Success(resp))); + } + else { + // decode an error + return self.inner.decode_error(src).map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))); + } + + } + +} + + + +/// SSZ Input stream +pub struct SSZInboundSink { + inner: + protocol: ProtocolId + +impl for SSZInputStream +where + TSocket: AsyncRead + AsyncWrite +{ + + /// Set up the initial input stream object. + pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self { + + // this type of stream should only apply to ssz protocols + debug_assert!(protocol.encoding.as_str() == "ssz"); + + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_size); + + let inner = Framed::new(incomming, uvi_codec).from_err() + .with(|response| { + self.encode(response) + }) + .and_then(|bytes| { + self.decode(request) + }).into_future(); + + //TODO: add timeout + + SSZInputStream { + inner, + protocol + } + } + + /// Decodes an SSZ-encoded RPCRequest. + fn decode(&self, request: RPCRequest) { + + match self.protocol.message_name.as_str() { + "hello" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol( + "Unknown GOODBYE version.as_str()", + )), + }, + "beacon_block_roots" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockRoots( + BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconBlockBodies( + BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match protocol.version.as_str() { + "1.0.0" => Ok(RPCRequest::BeaconChainState( + BeaconChainStateRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + } + } + + fn encode(&self, response: RPCResponse) { + + // TODO: Add error code + + match response { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + +} + +type SSZInboundOutput = stream::AndThen>>, RPCError>, + RPCResponse, + fn(RPCResponse) -> Result, RPCError>, + Result, RPCError>, + >, + fn(BytesMut) -> Result, + Result + >; + +impl Sink for SSZInputStreamSink { + + type SinkItem = RPCResponse; + type SinkError = RPCError; + + fn start_send( + &mut self, + item: Self::SinkItem +) -> Result, Self::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.inner.poll_complete() + } +} + +/* Outbound specific stream */ + +// Implement our own decoder to handle the response byte + +struct SSZOutboundCodec + + + +pub struct SSZOutboundStreamSink { + inner: + protocol: ProtocolId + +impl for SSZOutboundStreamSink +where + TSocket: AsyncRead + AsyncWrite +{ + + /// Set up the initial outbound stream object. + pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self { + + // this type of stream should only apply to ssz protocols + debug_assert!(protocol.encoding.as_str() == "ssz"); + + let mut uvi_codec = UviBytes::default(); + uvi_codec.set_max_len(max_size); + + let inner = Framed::new(socket, uvi_codec).from_err() + .with(|request| { + self.encode(request) + }) + .and_then(|bytes| { + self.decode(response) + }); + + SSZOutboundStream { + inner, + protocol + } + } + + + + + + /// Decodes a response that was received on the same stream as a request. The response type should + /// therefore match the request protocol type. + pub fn decode(&self, response: Vec, + protocol: ProtocolId, + response_code: ResponseCode, + ) -> Result { + match response_code { + ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())), + ResponseCode::InvalidRequest => { + let response = match protocol.encoding.as_str() { + "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, + _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), + }; + Ok(RPCResponse::Error(format!( + "Invalid Request: {}", + response.error_message + ))) + } + ResponseCode::ServerError => { + let response = match protocol.encoding.as_str() { + "ssz" => ErrorResponse::from_ssz_bytes(&packet)?, + _ => return Err(RPCError::InvalidProtocol("Unknown Encoding")), + }; + Ok(RPCResponse::Error(format!( + "Remote Server Error: {}", + response.error_message + ))) + } + ResponseCode::Success => match protocol.message_name.as_str() { + "hello" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), + }, + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), + }, + "goodbye" => Err(RPCError::Custom( + "GOODBYE should not have a response".into(), + )), + "beacon_block_roots" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockRoots( + BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version.", + )), + }, + "beacon_block_headers" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse { headers: packet }, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version.", + )), + }, + "beacon_block_bodies" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { + block_bodies: packet, + })), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version.", + )), + }, + "beacon_chain_state" => match protocol.version.as_str() { + "1.0.0" => match protocol.encoding.as_str() { + "ssz" => Ok(RPCResponse::BeaconChainState( + BeaconChainStateResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version.", + )), + }, + }, + } + } + + fn encode(&self, response: RPCResponse) { + + match response { + RPCResponse::Hello(res) => res.as_ssz_bytes(), + RPCResponse::Goodbye => unreachable!(), + RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes + RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes + RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + } + } + +} + +type SSZOutboundStream = stream::AndThen>>, RPCError>, + RPCResponse, + fn(RPCResponse) -> Result, RPCError>, + Result, RPCError>, + >, + fn(BytesMut) -> Result, + Result + >; + + +impl Stream for SSZInputStreamSink { + + type Item = SSZInboundOutput; + type Error = RPCError; + + fn poll(&mut self) -> Result>, Self::Error> { + self.inner.poll() + } +} + +impl Sink for SSZInputStreamSink { + + type SinkItem = RPCResponse; + type SinkError = RPCError; + + fn start_send( + &mut self, + item: Self::SinkItem +) -> Result, Self::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Result, Self::SinkError> { + self.inner.poll_complete() + } +} + + + + + + +