Clean RPC names versions and encodings (#1024)

* cleanup RPC protocol names

* add rpc encodings

* Add RPC Versions as an enum
This commit is contained in:
divma 2020-04-20 08:15:08 -05:00 committed by GitHub
parent 11209ae966
commit fa7147f7c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 183 additions and 169 deletions

View File

@ -1,10 +1,7 @@
use crate::rpc::methods::*; use crate::rpc::methods::*;
use crate::rpc::{ use crate::rpc::{
codec::base::OutboundCodec, codec::base::OutboundCodec,
protocol::{ protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_META_DATA,
RPC_PING, RPC_STATUS,
},
}; };
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::{BufMut, Bytes, BytesMut}; use libp2p::bytes::{BufMut, Bytes, BytesMut};
@ -28,7 +25,7 @@ impl<T: EthSpec> SSZInboundCodec<T> {
uvi_codec.set_max_len(max_packet_size); uvi_codec.set_max_len(max_packet_size);
// this encoding only applies to ssz. // this encoding only applies to ssz.
debug_assert!(protocol.encoding.as_str() == "ssz"); debug_assert_eq!(protocol.encoding, Encoding::SSZ);
SSZInboundCodec { SSZInboundCodec {
inner: uvi_codec, inner: uvi_codec,
@ -81,39 +78,34 @@ impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src).map_err(RPCError::from) { match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => match self.protocol.message_name.as_str() { Ok(Some(packet)) => match self.protocol.message_name {
RPC_STATUS => match self.protocol.version.as_str() { Protocol::Status => match self.protocol.version {
"1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&packet, &packet,
)?))), )?))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_GOODBYE => match self.protocol.version.as_str() { Protocol::Goodbye => match self.protocol.version {
"1" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( Version::V1 => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
&packet, &packet,
)?))), )?))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { Protocol::BlocksByRange => match self.protocol.version {
"1" => Ok(Some(RPCRequest::BlocksByRange( Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&packet)?, BlocksByRangeRequest::from_ssz_bytes(&packet)?,
))), ))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { Protocol::BlocksByRoot => match self.protocol.version {
"1" => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&packet)?, block_roots: Vec::from_ssz_bytes(&packet)?,
}))), }))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_PING => match self.protocol.version.as_str() { Protocol::Ping => match self.protocol.version {
"1" => Ok(Some(RPCRequest::Ping(Ping { Version::V1 => Ok(Some(RPCRequest::Ping(Ping {
data: u64::from_ssz_bytes(&packet)?, data: u64::from_ssz_bytes(&packet)?,
}))), }))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_META_DATA => match self.protocol.version.as_str() { Protocol::MetaData => match self.protocol.version {
"1" => { Version::V1 => {
if packet.len() > 0 { if packet.len() > 0 {
Err(RPCError::Custom( Err(RPCError::Custom(
"Get metadata request should be empty".into(), "Get metadata request should be empty".into(),
@ -122,9 +114,7 @@ impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
Ok(Some(RPCRequest::MetaData(PhantomData))) Ok(Some(RPCRequest::MetaData(PhantomData)))
} }
} }
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"),
}, },
Ok(None) => Ok(None), Ok(None) => Ok(None),
Err(e) => Err(e), Err(e) => Err(e),
@ -146,7 +136,7 @@ impl<TSpec: EthSpec> SSZOutboundCodec<TSpec> {
uvi_codec.set_max_len(max_packet_size); uvi_codec.set_max_len(max_packet_size);
// this encoding only applies to ssz. // this encoding only applies to ssz.
debug_assert!(protocol.encoding.as_str() == "ssz"); debug_assert_eq!(protocol.encoding, Encoding::SSZ);
SSZOutboundCodec { SSZOutboundCodec {
inner: uvi_codec, inner: uvi_codec,
@ -191,39 +181,35 @@ impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
// the object is empty. We return the empty object if this is the case // the object is empty. We return the empty object if this is the case
// clear the buffer and return an empty object // clear the buffer and return an empty object
src.clear(); src.clear();
match self.protocol.message_name.as_str() { match self.protocol.message_name {
RPC_STATUS => match self.protocol.version.as_str() { Protocol::Status => match self.protocol.version {
"1" => Err(RPCError::Custom( Version::V1 => Err(RPCError::Custom(
"Status stream terminated unexpectedly".into(), "Status stream terminated unexpectedly".into(),
)), // cannot have an empty HELLO message. The stream has terminated unexpectedly )), // cannot have an empty HELLO message. The stream has terminated unexpectedly
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_GOODBYE => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), Protocol::Goodbye => {
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response"))
"1" => Err(RPCError::Custom( }
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Err(RPCError::Custom(
"Status stream terminated unexpectedly, empty block".into(), "Status stream terminated unexpectedly, empty block".into(),
)), // cannot have an empty block message. )), // cannot have an empty block message.
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { Protocol::BlocksByRoot => match self.protocol.version {
"1" => Err(RPCError::Custom( Version::V1 => Err(RPCError::Custom(
"Status stream terminated unexpectedly, empty block".into(), "Status stream terminated unexpectedly, empty block".into(),
)), // cannot have an empty block message. )), // cannot have an empty block message.
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_PING => match self.protocol.version.as_str() { Protocol::Ping => match self.protocol.version {
"1" => Err(RPCError::Custom( Version::V1 => Err(RPCError::Custom(
"PING stream terminated unexpectedly".into(), "PING stream terminated unexpectedly".into(),
)), // cannot have an empty block message. )), // cannot have an empty block message.
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_META_DATA => match self.protocol.version.as_str() { Protocol::MetaData => match self.protocol.version {
"1" => Err(RPCError::Custom( Version::V1 => Err(RPCError::Custom(
"Metadata stream terminated unexpectedly".into(), "Metadata stream terminated unexpectedly".into(),
)), // cannot have an empty block message. )), // cannot have an empty block message.
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"),
} }
} else { } else {
match self.inner.decode(src).map_err(RPCError::from) { match self.inner.decode(src).map_err(RPCError::from) {
@ -231,41 +217,35 @@ impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
// take the bytes from the buffer // take the bytes from the buffer
let raw_bytes = packet.take(); let raw_bytes = packet.take();
match self.protocol.message_name.as_str() { match self.protocol.message_name {
RPC_STATUS => match self.protocol.version.as_str() { Protocol::Status => match self.protocol.version {
"1" => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( Version::V1 => Ok(Some(RPCResponse::Status(
&raw_bytes, StatusMessage::from_ssz_bytes(&raw_bytes)?,
)?))), ))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_GOODBYE => { Protocol::Goodbye => {
Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response"))
} }
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { Protocol::BlocksByRange => match self.protocol.version {
"1" => Ok(Some(RPCResponse::BlocksByRange(Box::new( Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?,
)))), )))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { Protocol::BlocksByRoot => match self.protocol.version {
"1" => Ok(Some(RPCResponse::BlocksByRoot(Box::new( Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?,
)))), )))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_PING => match self.protocol.version.as_str() { Protocol::Ping => match self.protocol.version {
"1" => Ok(Some(RPCResponse::Pong(Ping { Version::V1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&raw_bytes)?, data: u64::from_ssz_bytes(&raw_bytes)?,
}))), }))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_META_DATA => match self.protocol.version.as_str() { Protocol::MetaData => match self.protocol.version {
"1" => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( Version::V1 => Ok(Some(RPCResponse::MetaData(
&raw_bytes, MetaData::from_ssz_bytes(&raw_bytes)?,
)?))), ))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"),
} }
} }
Ok(None) => Ok(None), // waiting for more bytes Ok(None) => Ok(None), // waiting for more bytes

View File

@ -1,10 +1,7 @@
use crate::rpc::methods::*; use crate::rpc::methods::*;
use crate::rpc::{ use crate::rpc::{
codec::base::OutboundCodec, codec::base::OutboundCodec,
protocol::{ protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_META_DATA,
RPC_PING, RPC_STATUS,
},
}; };
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BytesMut; use libp2p::bytes::BytesMut;
@ -34,7 +31,7 @@ impl<T: EthSpec> SSZSnappyInboundCodec<T> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = Uvi::default(); let uvi_codec = Uvi::default();
// this encoding only applies to ssz_snappy. // this encoding only applies to ssz_snappy.
debug_assert!(protocol.encoding.as_str() == "ssz_snappy"); debug_assert_eq!(protocol.encoding, Encoding::SSZSnappy);
SSZSnappyInboundCodec { SSZSnappyInboundCodec {
inner: uvi_codec, inner: uvi_codec,
@ -122,39 +119,34 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
let n = reader.get_ref().position(); let n = reader.get_ref().position();
self.len = None; self.len = None;
src.split_to(n as usize); src.split_to(n as usize);
match self.protocol.message_name.as_str() { match self.protocol.message_name {
RPC_STATUS => match self.protocol.version.as_str() { Protocol::Status => match self.protocol.version {
"1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer, &decoded_buffer,
)?))), )?))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_GOODBYE => match self.protocol.version.as_str() { Protocol::Goodbye => match self.protocol.version {
"1" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( Version::V1 => Ok(Some(RPCRequest::Goodbye(
&decoded_buffer, GoodbyeReason::from_ssz_bytes(&decoded_buffer)?,
)?))), ))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { Protocol::BlocksByRange => match self.protocol.version {
"1" => Ok(Some(RPCRequest::BlocksByRange( Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))), ))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { Protocol::BlocksByRoot => match self.protocol.version {
"1" => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&decoded_buffer)?, block_roots: Vec::from_ssz_bytes(&decoded_buffer)?,
}))), }))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_PING => match self.protocol.version.as_str() { Protocol::Ping => match self.protocol.version {
"1" => Ok(Some(RPCRequest::Ping(Ping::from_ssz_bytes( Version::V1 => Ok(Some(RPCRequest::Ping(Ping::from_ssz_bytes(
&decoded_buffer, &decoded_buffer,
)?))), )?))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_META_DATA => match self.protocol.version.as_str() { Protocol::MetaData => match self.protocol.version {
"1" => { Version::V1 => {
if decoded_buffer.len() > 0 { if decoded_buffer.len() > 0 {
Err(RPCError::Custom( Err(RPCError::Custom(
"Get metadata request should be empty".into(), "Get metadata request should be empty".into(),
@ -163,9 +155,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
Ok(Some(RPCRequest::MetaData(PhantomData))) Ok(Some(RPCRequest::MetaData(PhantomData)))
} }
} }
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"),
} }
} }
Err(e) => match e.kind() { Err(e) => match e.kind() {
@ -194,7 +184,7 @@ impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = Uvi::default(); let uvi_codec = Uvi::default();
// this encoding only applies to ssz_snappy. // this encoding only applies to ssz_snappy.
debug_assert!(protocol.encoding.as_str() == "ssz_snappy"); debug_assert_eq!(protocol.encoding, Encoding::SSZSnappy);
SSZSnappyOutboundCodec { SSZSnappyOutboundCodec {
inner: uvi_codec, inner: uvi_codec,
@ -279,41 +269,35 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
let n = reader.get_ref().position(); let n = reader.get_ref().position();
self.len = None; self.len = None;
src.split_to(n as usize); src.split_to(n as usize);
match self.protocol.message_name.as_str() { match self.protocol.message_name {
RPC_STATUS => match self.protocol.version.as_str() { Protocol::Status => match self.protocol.version {
"1" => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( Version::V1 => Ok(Some(RPCResponse::Status(
&decoded_buffer, StatusMessage::from_ssz_bytes(&decoded_buffer)?,
)?))), ))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_GOODBYE => { Protocol::Goodbye => {
Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response"))
} }
RPC_BLOCKS_BY_RANGE => match self.protocol.version.as_str() { Protocol::BlocksByRange => match self.protocol.version {
"1" => Ok(Some(RPCResponse::BlocksByRange(Box::new( Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))), )))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_BLOCKS_BY_ROOT => match self.protocol.version.as_str() { Protocol::BlocksByRoot => match self.protocol.version {
"1" => Ok(Some(RPCResponse::BlocksByRoot(Box::new( Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))), )))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_PING => match self.protocol.version.as_str() { Protocol::Ping => match self.protocol.version {
"1" => Ok(Some(RPCResponse::Pong(Ping { Version::V1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?, data: u64::from_ssz_bytes(&decoded_buffer)?,
}))), }))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
RPC_META_DATA => match self.protocol.version.as_str() { Protocol::MetaData => match self.protocol.version {
"1" => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes(
&decoded_buffer, &decoded_buffer,
)?))), )?))),
_ => unreachable!("Cannot negotiate an unknown version"),
}, },
_ => unreachable!("Cannot negotiate an unknown protocol"),
} }
} }
Err(e) => match e.kind() { Err(e) => match e.kind() {

View File

@ -34,18 +34,68 @@ const TTFB_TIMEOUT: u64 = 5;
const REQUEST_TIMEOUT: u64 = 15; const REQUEST_TIMEOUT: u64 = 15;
/// Protocol names to be used. /// Protocol names to be used.
#[derive(Debug, Clone)]
pub enum Protocol {
/// The Status protocol name. /// The Status protocol name.
pub const RPC_STATUS: &str = "status"; Status,
/// The Goodbye protocol name. /// The Goodbye protocol name.
pub const RPC_GOODBYE: &str = "goodbye"; Goodbye,
/// The `BlocksByRange` protocol name. /// The `BlocksByRange` protocol name.
pub const RPC_BLOCKS_BY_RANGE: &str = "beacon_blocks_by_range"; BlocksByRange,
/// The `BlocksByRoot` protocol name. /// The `BlocksByRoot` protocol name.
pub const RPC_BLOCKS_BY_ROOT: &str = "beacon_blocks_by_root"; BlocksByRoot,
/// The `Ping` protocol name. /// The `Ping` protocol name.
pub const RPC_PING: &str = "ping"; Ping,
/// The `MetaData` protocol name. /// The `MetaData` protocol name.
pub const RPC_META_DATA: &str = "metadata"; MetaData,
}
/// RPC Versions
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Version {
/// Version 1 of RPC
V1,
}
/// RPC Encondings supported.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Encoding {
SSZ,
SSZSnappy,
}
impl std::fmt::Display for Protocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
Protocol::Status => "status",
Protocol::Goodbye => "goodbye",
Protocol::BlocksByRange => "beacon_blocks_by_range",
Protocol::BlocksByRoot => "beacon_blocks_by_root",
Protocol::Ping => "ping",
Protocol::MetaData => "metadata",
};
f.write_str(repr)
}
}
impl std::fmt::Display for Encoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
Encoding::SSZ => "ssz",
Encoding::SSZSnappy => "ssz_snappy",
};
f.write_str(repr)
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr = match self {
Version::V1 => "1",
};
f.write_str(repr)
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCProtocol<TSpec: EthSpec> { pub struct RPCProtocol<TSpec: EthSpec> {
@ -59,18 +109,18 @@ impl<TSpec: EthSpec> UpgradeInfo for RPCProtocol<TSpec> {
/// The list of supported RPC protocols for Lighthouse. /// The list of supported RPC protocols for Lighthouse.
fn protocol_info(&self) -> Self::InfoIter { fn protocol_info(&self) -> Self::InfoIter {
vec![ vec![
ProtocolId::new(RPC_STATUS, "1", "ssz_snappy"), ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_STATUS, "1", "ssz"), ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZ),
ProtocolId::new(RPC_GOODBYE, "1", "ssz_snappy"), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_GOODBYE, "1", "ssz"), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZ),
ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz_snappy"), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz"), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZ),
ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz_snappy"), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz"), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZ),
ProtocolId::new(RPC_PING, "1", "ssz_snappy"), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_PING, "1", "ssz"), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZ),
ProtocolId::new(RPC_META_DATA, "1", "ssz_snappy"), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_META_DATA, "1", "ssz"), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZ),
] ]
} }
} }
@ -79,13 +129,13 @@ impl<TSpec: EthSpec> UpgradeInfo for RPCProtocol<TSpec> {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ProtocolId { pub struct ProtocolId {
/// The rpc message type/name. /// The rpc message type/name.
pub message_name: String, pub message_name: Protocol,
/// The version of the RPC. /// The version of the RPC.
pub version: String, pub version: Version,
/// The encoding of the RPC. /// The encoding of the RPC.
pub encoding: String, pub encoding: Encoding,
/// The protocol id that is formed from the above fields. /// The protocol id that is formed from the above fields.
protocol_id: String, protocol_id: String,
@ -93,16 +143,16 @@ pub struct ProtocolId {
/// An RPC protocol ID. /// An RPC protocol ID.
impl ProtocolId { impl ProtocolId {
pub fn new(message_name: &str, version: &str, encoding: &str) -> Self { pub fn new(message_name: Protocol, version: Version, encoding: Encoding) -> Self {
let protocol_id = format!( let protocol_id = format!(
"{}/{}/{}/{}", "{}/{}/{}/{}",
PROTOCOL_PREFIX, message_name, version, encoding PROTOCOL_PREFIX, message_name, version, encoding
); );
ProtocolId { ProtocolId {
message_name: message_name.into(), message_name,
version: version.into(), version: version,
encoding: encoding.into(), encoding,
protocol_id, protocol_id,
} }
} }
@ -154,13 +204,13 @@ where
protocol: ProtocolId, protocol: ProtocolId,
) -> Self::Future { ) -> Self::Future {
let protocol_name = protocol.message_name.clone(); let protocol_name = protocol.message_name.clone();
let codec = match protocol.encoding.as_str() { let codec = match protocol.encoding {
"ssz_snappy" => { Encoding::SSZSnappy => {
let ssz_snappy_codec = let ssz_snappy_codec =
BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE)); BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE));
InboundCodec::SSZSnappy(ssz_snappy_codec) InboundCodec::SSZSnappy(ssz_snappy_codec)
} }
"ssz" | _ => { Encoding::SSZ => {
let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE));
InboundCodec::SSZ(ssz_codec) InboundCodec::SSZ(ssz_codec)
} }
@ -171,13 +221,13 @@ where
let socket = Framed::new(timed_socket, codec); let socket = Framed::new(timed_socket, codec);
// MetaData requests should be empty, return the stream // MetaData requests should be empty, return the stream
if protocol_name == RPC_META_DATA { match protocol_name {
futures::future::Either::A(futures::future::ok(( Protocol::MetaData => futures::future::Either::A(futures::future::ok((
RPCRequest::MetaData(PhantomData), RPCRequest::MetaData(PhantomData),
socket, socket,
))) ))),
} else {
futures::future::Either::B( _ => futures::future::Either::B(
socket socket
.into_future() .into_future()
.timeout(Duration::from_secs(REQUEST_TIMEOUT)) .timeout(Duration::from_secs(REQUEST_TIMEOUT))
@ -190,7 +240,7 @@ where
)), )),
} }
} as FnAndThen<TSocket, TSpec>), } as FnAndThen<TSocket, TSpec>),
) ),
} }
} }
} }
@ -226,28 +276,28 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
match self { match self {
// add more protocols when versions/encodings are supported // add more protocols when versions/encodings are supported
RPCRequest::Status(_) => vec![ RPCRequest::Status(_) => vec![
ProtocolId::new(RPC_STATUS, "1", "ssz_snappy"), ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_STATUS, "1", "ssz"), ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZ),
], ],
RPCRequest::Goodbye(_) => vec![ RPCRequest::Goodbye(_) => vec![
ProtocolId::new(RPC_GOODBYE, "1", "ssz_snappy"), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_GOODBYE, "1", "ssz"), ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZ),
], ],
RPCRequest::BlocksByRange(_) => vec![ RPCRequest::BlocksByRange(_) => vec![
ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz_snappy"), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz"), ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZ),
], ],
RPCRequest::BlocksByRoot(_) => vec![ RPCRequest::BlocksByRoot(_) => vec![
ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz_snappy"), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz"), ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZ),
], ],
RPCRequest::Ping(_) => vec![ RPCRequest::Ping(_) => vec![
ProtocolId::new(RPC_PING, "1", "ssz_snappy"), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_PING, "1", "ssz"), ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZ),
], ],
RPCRequest::MetaData(_) => vec![ RPCRequest::MetaData(_) => vec![
ProtocolId::new(RPC_META_DATA, "1", "ssz_snappy"), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy),
ProtocolId::new(RPC_META_DATA, "1", "ssz"), ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZ),
], ],
} }
} }
@ -316,13 +366,13 @@ where
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
protocol: Self::Info, protocol: Self::Info,
) -> Self::Future { ) -> Self::Future {
let codec = match protocol.encoding.as_str() { let codec = match protocol.encoding {
"ssz_snappy" => { Encoding::SSZSnappy => {
let ssz_snappy_codec = let ssz_snappy_codec =
BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE)); BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE));
OutboundCodec::SSZSnappy(ssz_snappy_codec) OutboundCodec::SSZSnappy(ssz_snappy_codec)
} }
"ssz" | _ => { Encoding::SSZ => {
let ssz_codec = let ssz_codec =
BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, MAX_RPC_SIZE)); BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, MAX_RPC_SIZE));
OutboundCodec::SSZ(ssz_codec) OutboundCodec::SSZ(ssz_codec)