Clean up Protocol types

This commit is contained in:
Age Manning 2019-07-15 18:41:05 +10:00
parent 15c99b5f37
commit 704263e35f
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
2 changed files with 99 additions and 83 deletions

View File

@ -1,2 +1,62 @@
pub(crate) mod base; pub(crate) mod base;
pub(crate) mod ssz; pub(crate) mod ssz;
use self::base::{BaseInboundCodec, BaseOutboundCodec};
use self::ssz::{SSZInboundCodec, SSZOutboundCodec};
use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCErrorResponse, RPCRequest};
use bytes::BytesMut;
use tokio::codec::{Decoder, Encoder};
// Known types of codecs
pub enum InboundCodec {
SSZ(BaseInboundCodec<SSZInboundCodec>),
}
pub enum OutboundCodec {
SSZ(BaseOutboundCodec<SSZOutboundCodec>),
}
impl Encoder for InboundCodec {
type Item = RPCErrorResponse;
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.encode(item, dst),
}
}
}
impl Decoder for InboundCodec {
type Item = RPCRequest;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
InboundCodec::SSZ(codec) => codec.decode(src),
}
}
}
impl Encoder for OutboundCodec {
type Item = RPCRequest;
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.encode(item, dst),
}
}
}
impl Decoder for OutboundCodec {
type Item = RPCErrorResponse;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
OutboundCodec::SSZ(codec) => codec.decode(src),
}
}
}

View File

@ -2,8 +2,12 @@ use super::methods::*;
use crate::rpc::codec::{ use crate::rpc::codec::{
base::{BaseInboundCodec, BaseOutboundCodec}, base::{BaseInboundCodec, BaseOutboundCodec},
ssz::{SSZInboundCodec, SSZOutboundCodec}, ssz::{SSZInboundCodec, SSZOutboundCodec},
InboundCodec, OutboundCodec,
};
use futures::{
future::{self, FutureResult},
sink, stream, Sink, Stream,
}; };
use futures::future::Future;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::Encode; use ssz::Encode;
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
@ -13,6 +17,7 @@ use tokio::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::future::MapErr; use tokio::prelude::future::MapErr;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::timer::timeout;
use tokio::util::FutureExt; use tokio::util::FutureExt;
/// The maximum bytes that can be sent across the RPC. /// The maximum bytes that can be sent across the RPC.
@ -101,16 +106,10 @@ impl Into<RawProtocolId> for ProtocolId {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol // The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready. // handler to respond to once ready.
enum InboundCodec { type InboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, InboundCodec>;
SSZ(BaseInboundCodec<SSZInboundCodec>), type FnAndThen<TSocket> =
} fn((Option<RPCRequest>, InboundFramed<TSocket>)) -> FutureResult<RPCRequest, RPCError>;
type FnMapErr<TSocket> = fn(timeout::Error<(RPCError, InboundFramed<TSocket>)>) -> RPCError;
type FnDecodeRPCEvent<TSocket> =
fn(
upgrade::Negotiated<TSocket>,
Vec<u8>,
&'static [u8], // protocol id
) -> Result<(upgrade::Negotiated<TSocket>, RPCRequest, ProtocolId), RPCError>;
impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
where where
@ -119,60 +118,40 @@ where
type Output = RPCRequest; type Output = RPCRequest;
type Error = RPCError; type Error = RPCError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error>>; type Future = future::AndThen<
/* future::MapErr<
type Future = MapErr< timeout::Timeout<stream::StreamFuture<InboundFramed<TSocket>>>,
tokio_timer::Timeout< FnMapErr<TSocket>,
upgrade::ReadRespond<
upgrade::Negotiated<TSocket>,
Self::Info,
FnDecodeRPCEvent<TSocket>,
>, >,
>, FutureResult<RPCRequest, RPCError>,
fn(tokio::timer::timeout::Error<RPCError>) -> RPCError, FnAndThen<TSocket>,
>; >;
*/
fn upgrade_inbound( fn upgrade_inbound(
self, self,
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
protocol: &'static [u8], protocol: &'static [u8],
) -> Self::Future { ) -> Self::Future {
let protocol_id = match ProtocolId::from_bytes(protocol) { // TODO: Verify this
Ok(v) => v, let protocol_id =
Err(e) => return Box::new(futures::future::err(e)), ProtocolId::from_bytes(protocol).expect("Can decode all supported protocols");
};
match protocol_id.encoding.as_str() { match protocol_id.encoding.as_str() {
"ssz" | _ => { "ssz" | _ => {
let codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096)); let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096));
Box::new( let codec = InboundCodec::SSZ(ssz_codec);
Framed::new(socket, codec) Framed::new(socket, codec)
.into_future() .into_future()
.timeout(Duration::from_secs(RESPONSE_TIMEOUT)) .timeout(Duration::from_secs(RESPONSE_TIMEOUT))
.map_err(RPCError::from) .map_err(RPCError::from as FnMapErr<TSocket>)
.and_then(|(madouby, _)| match madouby { .and_then({
|(madouby, _)| match madouby {
Some(x) => futures::future::ok(x), Some(x) => futures::future::ok(x),
None => futures::future::err(RPCError::Custom("Go home".into())), None => futures::future::err(RPCError::Custom("Go home".into())),
}), }
) } as FnAndThen<TSocket>)
} }
} }
/*
upgrade::read_respond(socket, MAX_RPC_SIZE, protocol, {
|socket, packet, protocol| {
let protocol_id = ProtocolId::from_bytes(protocol)?;
Ok((
socket,
RPCRequest::decode(packet, protocol_id)?,
protocol_id,
))
}
}
as FnDecodeRPCEvent<TSocket>)
}
*/
} }
} }
@ -253,53 +232,30 @@ impl RPCRequest {
/* Outbound upgrades */ /* Outbound upgrades */
type OutboundFramed<TSocket> = Framed<upgrade::Negotiated<TSocket>, OutboundCodec>;
impl<TSocket> OutboundUpgrade<TSocket> for RPCRequest impl<TSocket> OutboundUpgrade<TSocket> for RPCRequest
where where
TSocket: AsyncRead + AsyncWrite, TSocket: AsyncRead + AsyncWrite,
{ {
type Output = RPCResponse; type Output = OutboundFramed<TSocket>;
type Error = RPCError; type Error = RPCError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error>>; type Future = sink::Send<OutboundFramed<TSocket>>;
fn upgrade_outbound( fn upgrade_outbound(
self, self,
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
protocol: Self::Info, protocol: Self::Info,
) -> Self::Future { ) -> Self::Future {
panic!() let protocol_id =
ProtocolId::from_bytes(&protocol).expect("Can decode all supported protocols");
/* match protocol_id.encoding.as_str() {
let protocol_id = match ProtocolId::from_bytes(&protocol) { "ssz" | _ => {
Ok(v) => v, let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol_id, 4096));
Err(e) => return futures::future::err(e), let codec = OutboundCodec::SSZ(ssz_codec);
};
// select which codec to use
let inbound_stream = match protocol_id.encoding.as_str() {
"ssz" => {
let codec = BaseInboundCodec::new(SSZCodec::new());
Framed::new(socket, codec).send(self) Framed::new(socket, codec).send(self)
} }
_ => futures::future::err(RPCError::InvalidProtocol("Unsupported encoding")),
};
// do not wait for a timeout if we send a GOODBYE request
match protocol_id.message_name.as_str() {
// goodbye messages do not have a response
"goodbye" => inbound_stream.and_then(|| {
RPCErrorResponse::Unknown(ErrorMessage {
error_message: String::from("goodbye response").as_bytes(),
})
}),
// get a response for all other requests
_ => inbound_stream.and_then(|stream| {
stream
.into_future()
.timeout(Duration::from_secs(RESPONSE_TIMEOUT))
.map(|resp, _| resp)
.map_err(RPCError::from)
}),
} }
*/
} }
} }