From 704263e35f500cd08655150951fd017b7c8a9f8e Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 15 Jul 2019 18:41:05 +1000 Subject: [PATCH] Clean up Protocol types --- beacon_node/eth2-libp2p/src/rpc/codec/mod.rs | 60 +++++++++ beacon_node/eth2-libp2p/src/rpc/protocol.rs | 122 ++++++------------- 2 files changed, 99 insertions(+), 83 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index 68dcd4650..d0d1b650b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -1,2 +1,62 @@ pub(crate) mod base; pub(crate) mod ssz; + +use self::base::{BaseInboundCodec, BaseOutboundCodec}; +use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; +use crate::rpc::protocol::RPCError; +use crate::rpc::{RPCErrorResponse, RPCRequest}; +use bytes::BytesMut; +use tokio::codec::{Decoder, Encoder}; + +// Known types of codecs +pub enum InboundCodec { + SSZ(BaseInboundCodec), +} + +pub enum OutboundCodec { + SSZ(BaseOutboundCodec), +} + +impl Encoder for InboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + match self { + InboundCodec::SSZ(codec) => codec.encode(item, dst), + } + } +} + +impl Decoder for InboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self { + InboundCodec::SSZ(codec) => codec.decode(src), + } + } +} + +impl Encoder for OutboundCodec { + type Item = RPCRequest; + type Error = RPCError; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + match self { + OutboundCodec::SSZ(codec) => codec.encode(item, dst), + } + } +} + +impl Decoder for OutboundCodec { + type Item = RPCErrorResponse; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self { + OutboundCodec::SSZ(codec) => codec.decode(src), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index af94e476d..7d24105f5 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -2,8 +2,12 @@ use super::methods::*; use crate::rpc::codec::{ base::{BaseInboundCodec, BaseOutboundCodec}, ssz::{SSZInboundCodec, SSZOutboundCodec}, + InboundCodec, OutboundCodec, +}; +use futures::{ + future::{self, FutureResult}, + sink, stream, Sink, Stream, }; -use futures::future::Future; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use ssz::Encode; use ssz_derive::{Decode, Encode}; @@ -13,6 +17,7 @@ use tokio::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::prelude::future::MapErr; use tokio::prelude::*; +use tokio::timer::timeout; use tokio::util::FutureExt; /// The maximum bytes that can be sent across the RPC. @@ -101,16 +106,10 @@ impl Into for ProtocolId { // The inbound protocol reads the request, decodes it and returns the stream to the protocol // handler to respond to once ready. -enum InboundCodec { - SSZ(BaseInboundCodec), -} - -type FnDecodeRPCEvent = - fn( - upgrade::Negotiated, - Vec, - &'static [u8], // protocol id - ) -> Result<(upgrade::Negotiated, RPCRequest, ProtocolId), RPCError>; +type InboundFramed = Framed, InboundCodec>; +type FnAndThen = + fn((Option, InboundFramed)) -> FutureResult; +type FnMapErr = fn(timeout::Error<(RPCError, InboundFramed)>) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -119,60 +118,40 @@ where type Output = RPCRequest; type Error = RPCError; - type Future = Box>; - /* - type Future = MapErr< - tokio_timer::Timeout< - upgrade::ReadRespond< - upgrade::Negotiated, - Self::Info, - FnDecodeRPCEvent, - >, + type Future = future::AndThen< + future::MapErr< + timeout::Timeout>>, + FnMapErr, >, - fn(tokio::timer::timeout::Error) -> RPCError, + FutureResult, + FnAndThen, >; - */ fn upgrade_inbound( self, socket: upgrade::Negotiated, protocol: &'static [u8], ) -> Self::Future { - let protocol_id = match ProtocolId::from_bytes(protocol) { - Ok(v) => v, - Err(e) => return Box::new(futures::future::err(e)), - }; + // TODO: Verify this + let protocol_id = + ProtocolId::from_bytes(protocol).expect("Can decode all supported protocols"); match protocol_id.encoding.as_str() { "ssz" | _ => { - let codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096)); - Box::new( - Framed::new(socket, codec) - .into_future() - .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) - .map_err(RPCError::from) - .and_then(|(madouby, _)| match madouby { + let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096)); + let codec = InboundCodec::SSZ(ssz_codec); + Framed::new(socket, codec) + .into_future() + .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) + .map_err(RPCError::from as FnMapErr) + .and_then({ + |(madouby, _)| match madouby { Some(x) => futures::future::ok(x), None => futures::future::err(RPCError::Custom("Go home".into())), - }), - ) + } + } as FnAndThen) } } - - /* - upgrade::read_respond(socket, MAX_RPC_SIZE, protocol, { - |socket, packet, protocol| { - let protocol_id = ProtocolId::from_bytes(protocol)?; - Ok(( - socket, - RPCRequest::decode(packet, protocol_id)?, - protocol_id, - )) - } - } - as FnDecodeRPCEvent) - } - */ } } @@ -253,53 +232,30 @@ impl RPCRequest { /* Outbound upgrades */ +type OutboundFramed = Framed, OutboundCodec>; + impl OutboundUpgrade for RPCRequest where TSocket: AsyncRead + AsyncWrite, { - type Output = RPCResponse; + type Output = OutboundFramed; type Error = RPCError; - type Future = Box>; + type Future = sink::Send>; fn upgrade_outbound( self, socket: upgrade::Negotiated, protocol: Self::Info, ) -> Self::Future { - panic!() + let protocol_id = + ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols"); - /* - let protocol_id = match ProtocolId::from_bytes(&protocol) { - Ok(v) => v, - Err(e) => return futures::future::err(e), - }; - - // select which codec to use - let inbound_stream = match protocol_id.encoding.as_str() { - "ssz" => { - let codec = BaseInboundCodec::new(SSZCodec::new()); + match protocol_id.encoding.as_str() { + "ssz" | _ => { + let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096)); + let codec = OutboundCodec::SSZ(ssz_codec); Framed::new(socket, codec).send(self) } - _ => futures::future::err(RPCError::InvalidProtocol("Unsupported encoding")), - }; - - // do not wait for a timeout if we send a GOODBYE request - match protocol_id.message_name.as_str() { - // goodbye messages do not have a response - "goodbye" => inbound_stream.and_then(|| { - RPCErrorResponse::Unknown(ErrorMessage { - error_message: String::from("goodbye response").as_bytes(), - }) - }), - // get a response for all other requests - _ => inbound_stream.and_then(|stream| { - stream - .into_future() - .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) - .map(|resp, _| resp) - .map_err(RPCError::from) - }), } - */ } }