RPC Update. WIP

This commit is contained in:
Age Manning 2019-07-15 17:07:23 +10:00
parent 15cdd2afb9
commit 15c99b5f37
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
9 changed files with 465 additions and 478 deletions

View File

@ -26,3 +26,4 @@ tokio-io = "0.1.12"
smallvec = "0.6.10" smallvec = "0.6.10"
fnv = "1.0.6" fnv = "1.0.6"
unsigned-varint = "0.2.2" unsigned-varint = "0.2.2"
bytes = "0.4.12"

View File

@ -1,53 +1,77 @@
///! This handles the various supported encoding mechanism for the Eth 2.0 RPC. //! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
pub trait InnerCodec: Encoder + Decoder { use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
type Error; use bytes::BufMut;
use bytes::BytesMut;
use tokio::codec::{Decoder, Encoder};
pub(crate) trait OutboundCodec: Encoder + Decoder {
type ErrorType;
fn decode_error( fn decode_error(
&mut self, &mut self,
&mut BytesMut, src: &mut BytesMut,
) -> Result<Option<Self::Error>, <Self as Decoder>::Error>; ) -> Result<Option<Self::ErrorType>, <Self as Decoder>::Error>;
} }
pub struct BaseInboundCodec<TCodec: InnerCodec> { pub(crate) struct BaseInboundCodec<TCodec>
/// Inner codec for handling various encodings
inner: TCodec,
}
pub struct BaseOutboundCodec<TCodec>
where where
TCodec: InnerCodec, TCodec: Encoder + Decoder,
<TCodec as Decoder>::Item = RPCResponse,
<TCodec as InnerCodec>::ErrorItem = ErrorMessage,
{ {
/// Inner codec for handling various encodings /// Inner codec for handling various encodings
inner: TCodec, inner: TCodec,
}
impl<TCodec> BaseInboundCodec<TCodec>
where
TCodec: Encoder + Decoder,
{
pub fn new(codec: TCodec) -> Self {
BaseInboundCodec { inner: codec }
}
}
pub(crate) struct BaseOutboundCodec<TOutboundCodec>
where
TOutboundCodec: OutboundCodec,
{
/// Inner codec for handling various encodings
inner: TOutboundCodec,
/// Optimisation for decoding. True if the response code has been read and we are awaiting a /// Optimisation for decoding. True if the response code has been read and we are awaiting a
/// response. /// response.
response_code: Option<u8>, response_code: Option<u8>,
} }
impl<TOutboundCodec> BaseOutboundCodec<TOutboundCodec>
where
TOutboundCodec: OutboundCodec,
{
pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec {
inner: codec,
response_code: None,
}
}
}
impl<TCodec> Encoder for BaseInboundCodec<TCodec> impl<TCodec> Encoder for BaseInboundCodec<TCodec>
where where
TCodec: Encoder, TCodec: Decoder + Encoder<Item = RPCErrorResponse>,
<TCodec as Encoder>::Item = RPCResponse,
{ {
type Item = RPCResponse; type Item = RPCErrorResponse;
type Error = <TCodec as Encoder>::Error; type Error = <TCodec as Encoder>::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.clear(); dst.clear();
dst.reserve(1); dst.reserve(1);
dst.put_u8(item.as_u8); dst.put_u8(item.as_u8());
return self.inner.encode(); return self.inner.encode(item, dst);
} }
} }
impl<TCodec> Decoder for BaseInboundCodec<TCodec> impl<TCodec> Decoder for BaseInboundCodec<TCodec>
where where
TCodec: Decoder, TCodec: Encoder + Decoder<Item = RPCRequest>,
<TCodec as Decoder>::Item: RPCrequest,
<TCodec as Decoder>::Error: From<RPCError>,
{ {
type Item = RPCRequest; type Item = RPCRequest;
type Error = <TCodec as Decoder>::Error; type Error = <TCodec as Decoder>::Error;
@ -59,7 +83,7 @@ where
impl<TCodec> Encoder for BaseOutboundCodec<TCodec> impl<TCodec> Encoder for BaseOutboundCodec<TCodec>
where where
TCodec: Encoder, TCodec: OutboundCodec + Encoder<Item = RPCRequest>,
{ {
type Item = RPCRequest; type Item = RPCRequest;
type Error = <TCodec as Encoder>::Error; type Error = <TCodec as Encoder>::Error;
@ -71,23 +95,19 @@ where
impl<TCodec> Decoder for BaseOutboundCodec<TCodec> impl<TCodec> Decoder for BaseOutboundCodec<TCodec>
where where
TCodec: InnerCodec, TCodec: OutboundCodec<ErrorType = ErrorMessage> + Decoder<Item = RPCResponse>,
<TCodec as Decoder>::Error: From<RPCError>,
{ {
type Item = RPCResponse; type Item = RPCErrorResponse;
type Error = <TCodec as Decoder>::Error; type Error = <TCodec as Decoder>::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let response_code = { let response_code = {
if let Some(resp_code) = self.response_code { if let Some(resp_code) = self.response_code {
resp_code; resp_code
} else { } else {
if src.is_empty() { // buffer should not be empty
return Err(io::Error::new( debug_assert!(!src.is_empty());
io::ErrorKind::InvalidData,
"no bytes received",
));
}
let resp_byte = src.split_to(1); let resp_byte = src.split_to(1);
let resp_code_byte = [0; 1]; let resp_code_byte = [0; 1];
resp_code_byte.copy_from_slice(&resp_byte); resp_code_byte.copy_from_slice(&resp_byte);
@ -96,8 +116,9 @@ where
if let Some(response) = RPCErrorResponse::internal_data(resp_code) { if let Some(response) = RPCErrorResponse::internal_data(resp_code) {
self.response_code = None; self.response_code = None;
return response; return Ok(Some(response));
} }
self.response_code = Some(resp_code);
resp_code resp_code
} }
}; };

View File

@ -0,0 +1,2 @@
pub(crate) mod base;
pub(crate) mod ssz;

View File

@ -0,0 +1,239 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{ProtocolId, RPCError},
};
use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse};
use bytes::{Bytes, BytesMut};
use ssz::{Decode, Encode};
use tokio::codec::{Decoder, Encoder};
use unsigned_varint::codec::UviBytes;
/* Inbound Codec */
pub struct SSZInboundCodec {
inner: UviBytes,
protocol: ProtocolId,
}
impl SSZInboundCodec {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = UviBytes::default();
uvi_codec.set_max_len(max_packet_size);
// this encoding only applies to ssz.
debug_assert!(protocol.encoding.as_str() == "ssz");
SSZInboundCodec {
inner: uvi_codec,
protocol,
}
}
}
// Encoder for inbound
impl Encoder for SSZInboundCodec {
type Item = RPCErrorResponse;
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCErrorResponse::Success(resp) => {
match resp {
RPCResponse::Hello(res) => res.as_ssz_bytes(),
RPCResponse::Goodbye => unreachable!(),
RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(),
RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes
RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes
RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(),
}
}
RPCErrorResponse::EncodingError => vec![],
RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(),
RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(),
RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(),
};
if !bytes.is_empty() {
// length-prefix and return
return self
.inner
.encode(Bytes::from(bytes), dst)
.map_err(RPCError::from);
}
Ok(())
}
}
// Decoder for inbound
impl Decoder for SSZInboundCodec {
type Item = RPCRequest;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => match self.protocol.message_name.as_str() {
"hello" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes(
&packet,
)?))),
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version")),
},
"goodbye" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?))),
_ => Err(RPCError::InvalidProtocol(
"Unknown GOODBYE version.as_str()",
)),
},
"beacon_block_roots" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCRequest::BeaconBlockRoots(
BeaconBlockRootsRequest::from_ssz_bytes(&packet)?,
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS version.",
)),
},
"beacon_block_headers" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCRequest::BeaconBlockHeaders(
BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?,
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS version.",
)),
},
"beacon_block_bodies" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCRequest::BeaconBlockBodies(
BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?,
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES version.",
)),
},
"beacon_chain_state" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCRequest::BeaconChainState(
BeaconChainStateRequest::from_ssz_bytes(&packet)?,
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version.",
)),
},
},
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
}
/* Outbound Codec */
pub struct SSZOutboundCodec {
inner: UviBytes,
protocol: ProtocolId,
}
impl SSZOutboundCodec {
pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self {
let uvi_codec = UviBytes::default();
uvi_codec.set_max_len(max_packet_size);
// this encoding only applies to ssz.
debug_assert!(protocol.encoding.as_str() == "ssz");
SSZOutboundCodec {
inner: uvi_codec,
protocol,
}
}
}
// Encoder for outbound
impl Encoder for SSZOutboundCodec {
type Item = RPCRequest;
type Error = RPCError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Hello(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(),
RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(),
RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(),
RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(),
};
// length-prefix
self.inner
.encode(bytes::Bytes::from(bytes), dst)
.map_err(RPCError::from)
}
}
// Decoder for outbound
impl Decoder for SSZOutboundCodec {
type Item = RPCResponse;
type Error = RPCError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => match self.protocol.message_name.as_str() {
"hello" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes(
&packet,
)?))),
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")),
},
"goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")),
"beacon_block_roots" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCResponse::BeaconBlockRoots(
BeaconBlockRootsResponse::from_ssz_bytes(&packet)?,
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS version.",
)),
},
"beacon_block_headers" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCResponse::BeaconBlockHeaders(
BeaconBlockHeadersResponse {
headers: packet.to_vec(),
},
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS version.",
)),
},
"beacon_block_bodies" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCResponse::BeaconBlockBodies(
BeaconBlockBodiesResponse {
block_bodies: packet.to_vec(),
},
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES version.",
)),
},
"beacon_chain_state" => match self.protocol.version.as_str() {
"1.0.0" => Ok(Some(RPCResponse::BeaconChainState(
BeaconChainStateResponse::from_ssz_bytes(&packet)?,
))),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version.",
)),
},
_ => Err(RPCError::InvalidProtocol("Unknown method")),
},
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
}
impl OutboundCodec for SSZOutboundCodec {
type ErrorType = ErrorMessage;
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(packet)) => Ok(Some(ErrorMessage::from_ssz_bytes(&packet)?)),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
}

View File

@ -1 +0,0 @@
mod base;

View File

@ -1,323 +0,0 @@
/// SSZ Input stream
pub struct SSZInboundSink<TSocket> {
inner:
protocol: ProtocolId
impl<TSocket> for SSZInputStream<TSocket>
where
TSocket: AsyncRead + AsyncWrite
{
/// Set up the initial input stream object.
pub fn new(incomming: TSocket, protocol: ProtocolId, max_size: usize) -> Self {
// this type of stream should only apply to ssz protocols
debug_assert!(protocol.encoding.as_str() == "ssz");
let mut uvi_codec = UviBytes::default();
uvi_codec.set_max_len(max_size);
let inner = Framed::new(incomming, uvi_codec).from_err()
.with(|response| {
self.encode(response)
})
.and_then(|bytes| {
self.decode(request)
}).into_future();
//TODO: add timeout
SSZInputStream {
inner,
protocol
}
}
/// Decodes an SSZ-encoded RPCRequest.
fn decode(&self, request: RPCRequest) {
match self.protocol.message_name.as_str() {
"hello" => match protocol.version.as_str() {
"1.0.0" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)),
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version")),
},
"goodbye" => match protocol.version.as_str() {
"1.0.0" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)),
_ => Err(RPCError::InvalidProtocol(
"Unknown GOODBYE version.as_str()",
)),
},
"beacon_block_roots" => match protocol.version.as_str() {
"1.0.0" => Ok(RPCRequest::BeaconBlockRoots(
BeaconBlockRootsRequest::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS version.",
)),
},
"beacon_block_headers" => match protocol.version.as_str() {
"1.0.0" => Ok(RPCRequest::BeaconBlockHeaders(
BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS version.",
)),
},
"beacon_block_bodies" => match protocol.version.as_str() {
"1.0.0" => Ok(RPCRequest::BeaconBlockBodies(
BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES version.",
)),
},
"beacon_chain_state" => match protocol.version.as_str() {
"1.0.0" => Ok(RPCRequest::BeaconChainState(
BeaconChainStateRequest::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version.",
)),
},
}
}
fn encode(&self, response: RPCResponse) {
// TODO: Add error code
match response {
RPCResponse::Hello(res) => res.as_ssz_bytes(),
RPCResponse::Goodbye => unreachable!(),
RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(),
RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes
RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes
RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(),
}
}
}
type SSZInboundOutput = stream::AndThen<sink::With<stream::FromErr<Framed<TSocket, UviBytes<Vec<u8>>>, RPCError>,
RPCResponse,
fn(RPCResponse) -> Result<Vec<u8>, RPCError>,
Result<Vec<u8>, RPCError>,
>,
fn(BytesMut) -> Result<RPCRequest, RPCError>,
Result<RPCRequest, RPCError>
>;
impl<TSocket> Sink for SSZInputStreamSink<TSocket> {
type SinkItem = RPCResponse;
type SinkError = RPCError;
fn start_send(
&mut self,
item: Self::SinkItem
) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
self.inner.poll_complete()
}
}
/* Outbound specific stream */
// Implement our own decoder to handle the response byte
struct SSZOutboundCodec
pub struct SSZOutboundStreamSink<TSocket> {
inner:
protocol: ProtocolId
impl<TSocket> for SSZOutboundStreamSink<TSocket>
where
TSocket: AsyncRead + AsyncWrite
{
/// Set up the initial outbound stream object.
pub fn new(socket: TSocket, protocol: ProtocolId, max_size: usize) -> Self {
// this type of stream should only apply to ssz protocols
debug_assert!(protocol.encoding.as_str() == "ssz");
let mut uvi_codec = UviBytes::default();
uvi_codec.set_max_len(max_size);
let inner = Framed::new(socket, uvi_codec).from_err()
.with(|request| {
self.encode(request)
})
.and_then(|bytes| {
self.decode(response)
});
SSZOutboundStream {
inner,
protocol
}
}
/// Decodes a response that was received on the same stream as a request. The response type should
/// therefore match the request protocol type.
pub fn decode(&self, response: Vec<u8>,
protocol: ProtocolId,
response_code: ResponseCode,
) -> Result<Self, RPCError> {
match response_code {
ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())),
ResponseCode::InvalidRequest => {
let response = match protocol.encoding.as_str() {
"ssz" => ErrorResponse::from_ssz_bytes(&packet)?,
_ => return Err(RPCError::InvalidProtocol("Unknown Encoding")),
};
Ok(RPCResponse::Error(format!(
"Invalid Request: {}",
response.error_message
)))
}
ResponseCode::ServerError => {
let response = match protocol.encoding.as_str() {
"ssz" => ErrorResponse::from_ssz_bytes(&packet)?,
_ => return Err(RPCError::InvalidProtocol("Unknown Encoding")),
};
Ok(RPCResponse::Error(format!(
"Remote Server Error: {}",
response.error_message
)))
}
ResponseCode::Success => match protocol.message_name.as_str() {
"hello" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)),
_ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")),
},
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")),
},
"goodbye" => Err(RPCError::Custom(
"GOODBYE should not have a response".into(),
)),
"beacon_block_roots" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::BeaconBlockRoots(
BeaconBlockRootsResponse::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS encoding",
)),
},
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS version.",
)),
},
"beacon_block_headers" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::BeaconBlockHeaders(
BeaconBlockHeadersResponse { headers: packet },
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS encoding",
)),
},
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS version.",
)),
},
"beacon_block_bodies" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse {
block_bodies: packet,
})),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES encoding",
)),
},
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES version.",
)),
},
"beacon_chain_state" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::BeaconChainState(
BeaconChainStateResponse::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE encoding",
)),
},
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version.",
)),
},
},
}
}
fn encode(&self, response: RPCResponse) {
match response {
RPCResponse::Hello(res) => res.as_ssz_bytes(),
RPCResponse::Goodbye => unreachable!(),
RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(),
RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes
RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes
RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(),
}
}
}
type SSZOutboundStream = stream::AndThen<sink::With<stream::FromErr<Framed<TSocket, UviBytes<Vec<u8>>>, RPCError>,
RPCResponse,
fn(RPCResponse) -> Result<Vec<u8>, RPCError>,
Result<Vec<u8>, RPCError>,
>,
fn(BytesMut) -> Result<RPCRequest, RPCError>,
Result<RPCRequest, RPCError>
>;
impl<TSocket> Stream for SSZInputStreamSink<TSocket> {
type Item = SSZInboundOutput;
type Error = RPCError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.inner.poll()
}
}
impl<TSocket> Sink for SSZInputStreamSink<TSocket> {
type SinkItem = RPCResponse;
type SinkError = RPCError;
fn start_send(
&mut self,
item: Self::SinkItem
) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
self.inner.poll_complete()
}
}

View File

@ -162,3 +162,76 @@ pub struct BeaconChainStateResponse {
/// The values corresponding the to the requested tree hashes. /// The values corresponding the to the requested tree hashes.
pub values: bool, //TBD - stubbed with encodable bool pub values: bool, //TBD - stubbed with encodable bool
} }
/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages
#[derive(Debug, Clone)]
pub enum RPCResponse {
/// A HELLO message.
Hello(HelloMessage),
/// An empty field returned from sending a GOODBYE request.
Goodbye, // empty value - required for protocol handler
/// A response to a get BEACON_BLOCK_ROOTS request.
BeaconBlockRoots(BeaconBlockRootsResponse),
/// A response to a get BEACON_BLOCK_HEADERS request.
BeaconBlockHeaders(BeaconBlockHeadersResponse),
/// A response to a get BEACON_BLOCK_BODIES request.
BeaconBlockBodies(BeaconBlockBodiesResponse),
/// A response to a get BEACON_CHAIN_STATE request.
BeaconChainState(BeaconChainStateResponse),
}
pub enum RPCErrorResponse {
Success(RPCResponse),
EncodingError,
InvalidRequest(ErrorMessage),
ServerError(ErrorMessage),
Unknown(ErrorMessage),
}
impl RPCErrorResponse {
/// If a response has no payload, returns the variant corresponding to the code.
pub fn internal_data(response_code: u8) -> Option<RPCErrorResponse> {
match response_code {
// EncodingError
1 => Some(RPCErrorResponse::EncodingError),
// All others require further data
_ => None,
}
}
/// Used to encode the response.
pub fn as_u8(&self) -> u8 {
match self {
RPCErrorResponse::Success(_) => 0,
RPCErrorResponse::EncodingError => 1,
RPCErrorResponse::InvalidRequest(_) => 2,
RPCErrorResponse::ServerError(_) => 3,
RPCErrorResponse::Unknown(_) => 255,
}
}
/// Tells the codec whether to decode as an RPCResponse or an error.
pub fn is_response(response_code: u8) -> bool {
match response_code {
0 => true,
_ => false,
}
}
/// Builds an RPCErrorResponse from a response code and an ErrorMessage
pub fn from_error(response_code: u8, err: ErrorMessage) -> Self {
match response_code {
2 => RPCErrorResponse::InvalidRequest(err),
3 => RPCErrorResponse::ServerError(err),
_ => RPCErrorResponse::Unknown(err),
}
}
}
#[derive(Encode, Decode)]
pub struct ErrorMessage {
/// The UTF-8 encoded Error message string.
error_message: Vec<u8>,
}

View File

@ -1,8 +1,9 @@
///! The Ethereum 2.0 Wire Protocol //! The Ethereum 2.0 Wire Protocol
///! //!
///! This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate //! This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate
///! direct peer-to-peer communication primarily for sending/receiving chain information for //! direct peer-to-peer communication primarily for sending/receiving chain information for
///! syncing. //! syncing.
use futures::prelude::*; use futures::prelude::*;
use handler::RPCHandler; use handler::RPCHandler;
use libp2p::core::protocols_handler::ProtocolsHandler; use libp2p::core::protocols_handler::ProtocolsHandler;
@ -10,17 +11,17 @@ use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
}; };
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
pub use methods::HelloMessage; pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse};
pub use protocol::{RPCProtocol, RPCRequest, RPCResponse}; pub use protocol::{RPCProtocol, RPCRequest};
use slog::o; use slog::o;
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
mod codecs; pub(crate) mod codec;
mod handler; mod handler;
pub mod methods; pub mod methods;
mod protocol; mod protocol;
mod request_response; // mod request_response;
/// The return type used in the behaviour and the resultant event from the protocols handler. /// The return type used in the behaviour and the resultant event from the protocols handler.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@ -1,13 +1,18 @@
use super::methods::*; use super::methods::*;
use super::request_response::{rpc_request_response, RPCRequestResponse}; use crate::rpc::codec::{
base::{BaseInboundCodec, BaseOutboundCodec},
ssz::{SSZInboundCodec, SSZOutboundCodec},
};
use futures::future::Future; use futures::future::Future;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{Decode, Encode}; use ssz::Encode;
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use std::io; use std::io;
use std::time::Duration; use std::time::Duration;
use tokio::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::future::MapErr; use tokio::prelude::future::MapErr;
use tokio::prelude::*;
use tokio::util::FutureExt; use tokio::util::FutureExt;
/// The maximum bytes that can be sent across the RPC. /// The maximum bytes that can be sent across the RPC.
@ -17,7 +22,7 @@ const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/";
/// The number of seconds to wait for a response before the stream is terminated. /// The number of seconds to wait for a response before the stream is terminated.
const RESPONSE_TIMEOUT: u64 = 10; const RESPONSE_TIMEOUT: u64 = 10;
/// Implementation of the `ConnectionUpgrade` for the rpc protocol. /// Implementation of the `ConnectionUpgrade` for the RPC protocol.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCProtocol; pub struct RPCProtocol;
@ -96,6 +101,10 @@ impl Into<RawProtocolId> for ProtocolId {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol // The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready. // handler to respond to once ready.
enum InboundCodec {
SSZ(BaseInboundCodec<SSZInboundCodec>),
}
type FnDecodeRPCEvent<TSocket> = type FnDecodeRPCEvent<TSocket> =
fn( fn(
upgrade::Negotiated<TSocket>, upgrade::Negotiated<TSocket>,
@ -107,8 +116,11 @@ impl<TSocket> InboundUpgrade<TSocket> for RPCProtocol
where where
TSocket: AsyncRead + AsyncWrite, TSocket: AsyncRead + AsyncWrite,
{ {
type Output = (upgrade::Negotiated<TSocket>, RPCRequest, ProtocolId); type Output = RPCRequest;
type Error = RPCError; type Error = RPCError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error>>;
/*
type Future = MapErr< type Future = MapErr<
tokio_timer::Timeout< tokio_timer::Timeout<
upgrade::ReadRespond< upgrade::ReadRespond<
@ -119,12 +131,35 @@ where
>, >,
fn(tokio::timer::timeout::Error<RPCError>) -> RPCError, fn(tokio::timer::timeout::Error<RPCError>) -> RPCError,
>; >;
*/
fn upgrade_inbound( fn upgrade_inbound(
self, self,
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
protocol: &'static [u8], protocol: &'static [u8],
) -> Self::Future { ) -> Self::Future {
let protocol_id = match ProtocolId::from_bytes(protocol) {
Ok(v) => v,
Err(e) => return Box::new(futures::future::err(e)),
};
match protocol_id.encoding.as_str() {
"ssz" | _ => {
let codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol_id, 4096));
Box::new(
Framed::new(socket, codec)
.into_future()
.timeout(Duration::from_secs(RESPONSE_TIMEOUT))
.map_err(RPCError::from)
.and_then(|(madouby, _)| match madouby {
Some(x) => futures::future::ok(x),
None => futures::future::err(RPCError::Custom("Go home".into())),
}),
)
}
}
/*
upgrade::read_respond(socket, MAX_RPC_SIZE, protocol, { upgrade::read_respond(socket, MAX_RPC_SIZE, protocol, {
|socket, packet, protocol| { |socket, packet, protocol| {
let protocol_id = ProtocolId::from_bytes(protocol)?; let protocol_id = ProtocolId::from_bytes(protocol)?;
@ -136,8 +171,8 @@ where
} }
} }
as FnDecodeRPCEvent<TSocket>) as FnDecodeRPCEvent<TSocket>)
.timeout(Duration::from_secs(RESPONSE_TIMEOUT)) }
.map_err(RPCError::from) */
} }
} }
@ -214,78 +249,7 @@ impl RPCRequest {
} }
} }
/* Response Type */ /* RPC Response type - used for outbound upgrades */
#[derive(Debug, Clone)]
pub enum RPCResponse {
/// A HELLO message.
Hello(HelloMessage),
/// An empty field returned from sending a GOODBYE request.
Goodbye, // empty value - required for protocol handler
/// A response to a get BEACON_BLOCK_ROOTS request.
BeaconBlockRoots(BeaconBlockRootsResponse),
/// A response to a get BEACON_BLOCK_HEADERS request.
BeaconBlockHeaders(BeaconBlockHeadersResponse),
/// A response to a get BEACON_BLOCK_BODIES request.
BeaconBlockBodies(BeaconBlockBodiesResponse),
/// A response to a get BEACON_CHAIN_STATE request.
BeaconChainState(BeaconChainStateResponse),
}
pub enum RPCErrorResponse {
Success(RPCResponse),
EncodingError,
InvalidRequest(ErrorMessage),
ServerError(ErrorMessage),
Unknown(ErrorMessage),
}
impl RPCErrorResponse {
/// If a response has no payload, returns the variant corresponding to the code.
pub fn internal_data(response_code: u8) -> Option<RPCErrorResponse> {
match response_code {
// EncodingError
1 => Some(RPCErrorResponse::EncodingError),
// All others require further data
_ => None,
}
}
/// Used to encode the response.
pub fn as_u8(&self) -> u8 {
match self {
RPCErrorResponse::Success(_) => 0,
RPCErrorResponse::EncodingError => 1,
RPCErrorResponse::InvalidRequest(_) => 2,
RPCErrorResponse::ServerError(_) => 3,
RPCErrorResponse::Unknown(_) => 255,
}
}
/// Tells the codec whether to decode as an RPCResponse or an error.
pub fn is_response(response_code:u8) -> bool {
match response_code {
0 => true,
_ => false,
}
/// Builds an RPCErrorResponse from a response code and an ErrorMessage
pub fn from_error(response_code:u8, err: ErrorMessage) -> Self {
match response_code {
2 => RPCErrorResponse::InvalidRequest(err),
3 => RPCErrorResponse::ServerError(err),
_ => RPCErrorResponse::Unknown(err),
}
}
#[derive(Encode, Decode)]
struct ErrorMessage {
/// The UTF-8 encoded Error message string.
error_message: Vec<u8>,
}
// todo: SSZ-Encode
impl RPCResponse {}
/* Outbound upgrades */ /* Outbound upgrades */
@ -295,37 +259,47 @@ where
{ {
type Output = RPCResponse; type Output = RPCResponse;
type Error = RPCError; type Error = RPCError;
type Future = MapErr< type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error>>;
tokio_timer::Timeout<RPCRequestResponse<upgrade::Negotiated<TSocket>, Vec<u8>>>,
fn(tokio::timer::timeout::Error<RPCError>) -> RPCError,
>;
fn upgrade_outbound( fn upgrade_outbound(
self, self,
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
protocol: Self::Info, protocol: Self::Info,
) -> Self::Future { ) -> Self::Future {
let protocol_id = ProtocolId::from_bytes(&protocol) panic!()
.expect("Protocol ID must be valid for outbound requests");
let request_bytes = self /*
.encode(protocol_id) let protocol_id = match ProtocolId::from_bytes(&protocol) {
.expect("Should be able to encode a supported protocol"); Ok(v) => v,
// if sending a goodbye, drop the stream and return an empty GOODBYE response Err(e) => return futures::future::err(e),
let short_circuit_return = if let RPCRequest::Goodbye(_) = self {
Some(RPCResponse::Goodbye)
} else {
None
}; };
rpc_request_response(
socket, // select which codec to use
request_bytes, let inbound_stream = match protocol_id.encoding.as_str() {
MAX_RPC_SIZE, "ssz" => {
short_circuit_return, let codec = BaseInboundCodec::new(SSZCodec::new());
protocol_id, Framed::new(socket, codec).send(self)
) }
_ => futures::future::err(RPCError::InvalidProtocol("Unsupported encoding")),
};
// do not wait for a timeout if we send a GOODBYE request
match protocol_id.message_name.as_str() {
// goodbye messages do not have a response
"goodbye" => inbound_stream.and_then(|| {
RPCErrorResponse::Unknown(ErrorMessage {
error_message: String::from("goodbye response").as_bytes(),
})
}),
// get a response for all other requests
_ => inbound_stream.and_then(|stream| {
stream
.into_future()
.timeout(Duration::from_secs(RESPONSE_TIMEOUT)) .timeout(Duration::from_secs(RESPONSE_TIMEOUT))
.map(|resp, _| resp)
.map_err(RPCError::from) .map_err(RPCError::from)
}),
}
*/
} }
} }