Improved error handling. Switching to codecs for easier encoding support

This commit is contained in:
Age Manning 2019-07-11 15:11:31 +10:00
parent 4a84b2f7cc
commit 0292679f27
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
3 changed files with 144 additions and 103 deletions

View File

@ -32,8 +32,8 @@ pub struct HelloMessage {
/// The reason given for a `Goodbye` message. /// The reason given for a `Goodbye` message.
/// ///
/// Note: any unknown `u64::into(n)` will resolve to `GoodbyeReason::Unknown` for any unknown `n`, /// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`,
/// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then /// however `Goodbye::Unknown.into()` will go into `0_u64`. Therefore de-serializing then
/// re-serializing may not return the same bytes. /// re-serializing may not return the same bytes.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Goodbye { pub enum Goodbye {
@ -61,6 +61,13 @@ impl From<u64> for Goodbye {
} }
} }
impl Into<u64> for Goodbye {
fn into(self) -> u64 {
self as u64
}
}
impl_encode_via_from!(Goodbye, u64);
impl_decode_via_from!(Goodbye, u64); impl_decode_via_from!(Goodbye, u64);
/// Request a number of beacon block roots from a peer. /// Request a number of beacon block roots from a peer.

View File

@ -3,6 +3,7 @@ use super::request_response::{rpc_request_response, RPCRequestResponse};
use futures::future::Future; use futures::future::Future;
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::io; use std::io;
use std::time::Duration; use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
@ -26,12 +27,12 @@ impl UpgradeInfo for RPCProtocol {
fn protocol_info(&self) -> Self::InfoIter { fn protocol_info(&self) -> Self::InfoIter {
vec![ vec![
b"/eth/serenity/rpc/hello/1/ssz", b"/eth/serenity/rpc/hello/1.0.0/ssz",
b"/eth/serenity/rpc/goodbye/1/ssz", b"/eth/serenity/rpc/goodbye/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_block_roots/1/ssz", b"/eth/serenity/rpc/beacon_block_roots/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_block_headers/1/ssz", b"/eth/serenity/rpc/beacon_block_headers/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", b"/eth/serenity/rpc/beacon_block_bodies/1.0.0/ssz",
b"/eth/serenity/rpc/beacon_chain_state/1/ssz", b"/eth/serenity/rpc/beacon_chain_state/1.0.0/ssz",
] ]
} }
} }
@ -45,7 +46,7 @@ pub struct ProtocolId {
pub message_name: String, pub message_name: String,
/// The version of the RPC. /// The version of the RPC.
pub version: usize, pub version: String,
/// The encoding of the RPC. /// The encoding of the RPC.
pub encoding: String, pub encoding: String,
@ -53,10 +54,10 @@ pub struct ProtocolId {
/// An RPC protocol ID. /// An RPC protocol ID.
impl ProtocolId { impl ProtocolId {
pub fn new(message_name: &str, version: usize, encoding: &str) -> Self { pub fn new(message_name: &str, version: &str, encoding: &str) -> Self {
ProtocolId { ProtocolId {
message_name: message_name.into(), message_name: message_name.into(),
version, version: version.into(),
encoding: encoding.into(), encoding: encoding.into(),
} }
} }
@ -73,9 +74,7 @@ impl ProtocolId {
Ok(ProtocolId { Ok(ProtocolId {
message_name: protocol_list[3].into(), message_name: protocol_list[3].into(),
version: protocol_list[4] version: protocol_list[4].into(),
.parse()
.map_err(|_| RPCError::InvalidProtocol("Invalid version"))?,
encoding: protocol_list[5].into(), encoding: protocol_list[5].into(),
}) })
} }
@ -172,19 +171,19 @@ impl RPCRequest {
pub fn supported_protocols(&self) -> Vec<RawProtocolId> { pub fn supported_protocols(&self) -> Vec<RawProtocolId> {
match self { match self {
// add more protocols when versions/encodings are supported // add more protocols when versions/encodings are supported
RPCRequest::Hello(_) => vec![ProtocolId::new("hello", 1, "ssz").into()], RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1.0.0", "ssz").into()],
RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", 1, "ssz").into()], RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz").into()],
RPCRequest::BeaconBlockRoots(_) => { RPCRequest::BeaconBlockRoots(_) => {
vec![ProtocolId::new("beacon_block_roots", 1, "ssz").into()] vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz").into()]
} }
RPCRequest::BeaconBlockHeaders(_) => { RPCRequest::BeaconBlockHeaders(_) => {
vec![ProtocolId::new("beacon_block_headers", 1, "ssz").into()] vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz").into()]
} }
RPCRequest::BeaconBlockBodies(_) => { RPCRequest::BeaconBlockBodies(_) => {
vec![ProtocolId::new("beacon_block_bodies", 1, "ssz").into()] vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz").into()]
} }
RPCRequest::BeaconChainState(_) => { RPCRequest::BeaconChainState(_) => {
vec![ProtocolId::new("beacon_block_state", 1, "ssz").into()] vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz").into()]
} }
} }
} }
@ -216,30 +215,26 @@ impl RPCRequest {
// This function can be extended to provide further logic for supporting various protocol versions/encoding // This function can be extended to provide further logic for supporting various protocol versions/encoding
/// Decodes a request received from our peer. /// Decodes a request received from our peer.
pub fn decode(packet: Vec<u8>, protocol: ProtocolId, response_code: ResponseCode) -> Result<Self, RPCError> { pub fn decode(packet: Vec<u8>, protocol: ProtocolId) -> Result<Self, RPCError> {
match response_code {
ResponseCode::
match protocol.message_name.as_str() { match protocol.message_name.as_str() {
"hello" => match protocol.version { "hello" => match protocol.version.as_str() {
1 => match protocol.encoding.as_str() { "1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), "ssz" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)),
_ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")),
}, },
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")),
}, },
"goodbye" => match protocol.version { "goodbye" => match protocol.version.as_str() {
1 => match protocol.encoding.as_str() { "1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), "ssz" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)),
_ => Err(RPCError::InvalidProtocol("Unknown GOODBYE encoding")), _ => Err(RPCError::InvalidProtocol("Unknown GOODBYE encoding")),
}, },
_ => Err(RPCError::InvalidProtocol("Unknown GOODBYE version")), _ => Err(RPCError::InvalidProtocol(
"Unknown GOODBYE version.as_str()",
)),
}, },
"beacon_block_roots" => match protocol.version { "beacon_block_roots" => match protocol.version.as_str() {
1 => match protocol.encoding.as_str() { "1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCRequest::BeaconBlockRoots( "ssz" => Ok(RPCRequest::BeaconBlockRoots(
BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, BeaconBlockRootsRequest::from_ssz_bytes(&packet)?,
)), )),
@ -248,11 +243,11 @@ impl RPCRequest {
)), )),
}, },
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS version", "Unknown BEACON_BLOCK_ROOTS version.",
)), )),
}, },
"beacon_block_headers" => match protocol.version { "beacon_block_headers" => match protocol.version.as_str() {
1 => match protocol.encoding.as_str() { "1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCRequest::BeaconBlockHeaders( "ssz" => Ok(RPCRequest::BeaconBlockHeaders(
BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?,
)), )),
@ -261,11 +256,11 @@ impl RPCRequest {
)), )),
}, },
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS version", "Unknown BEACON_BLOCK_HEADERS version.",
)), )),
}, },
"beacon_block_bodies" => match protocol.version { "beacon_block_bodies" => match protocol.version.as_str() {
1 => match protocol.encoding.as_str() { "1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCRequest::BeaconBlockBodies( "ssz" => Ok(RPCRequest::BeaconBlockBodies(
BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?,
)), )),
@ -274,11 +269,11 @@ impl RPCRequest {
)), )),
}, },
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES version", "Unknown BEACON_BLOCK_BODIES version.",
)), )),
}, },
"beacon_chain_state" => match protocol.version { "beacon_chain_state" => match protocol.version.as_str() {
1 => match protocol.encoding.as_str() { "1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCRequest::BeaconChainState( "ssz" => Ok(RPCRequest::BeaconChainState(
BeaconChainStateRequest::from_ssz_bytes(&packet)?, BeaconChainStateRequest::from_ssz_bytes(&packet)?,
)), )),
@ -287,7 +282,7 @@ impl RPCRequest {
)), )),
}, },
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version", "Unknown BEACON_CHAIN_STATE version.",
)), )),
}, },
} }
@ -319,12 +314,11 @@ pub enum ResponseCode {
EncodingError = 1, EncodingError = 1,
InvalidRequest = 2, InvalidRequest = 2,
ServerError = 3, ServerError = 3,
Unknown, Unknown = 255,
} }
impl From<u8> for ResponseCode {
impl From<u64> for ResponseCode { fn from(val: u8) -> ResponseCode {
fn from(val: u64) -> ResponseCode {
match val { match val {
0 => ResponseCode::Success, 0 => ResponseCode::Success,
1 => ResponseCode::EncodingError, 1 => ResponseCode::EncodingError,
@ -335,72 +329,110 @@ impl From<u64> for ResponseCode {
} }
} }
impl Into<u8> for ResponseCode {
fn into(self) -> u8 {
self as u8
}
}
#[derive(Encode, Decode)]
struct ErrorResponse {
error_message: String,
}
impl RPCResponse { impl RPCResponse {
/// Decodes a response that was received on the same stream as a request. The response type should /// Decodes a response that was received on the same stream as a request. The response type should
/// therefore match the request protocol type. /// therefore match the request protocol type.
fn decode(packet: Vec<u8>, protocol: ProtocolId) -> Result<Self, RPCError> { pub fn decode(
match protocol.message_name.as_str() { packet: Vec<u8>,
"hello" => match protocol.version { protocol: ProtocolId,
1 => match protocol.encoding.as_str() { response_code: ResponseCode,
"ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), ) -> Result<Self, RPCError> {
_ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), match response_code {
ResponseCode::EncodingError => Ok(RPCResponse::Error("Encoding error".into())),
ResponseCode::InvalidRequest => {
let response = match protocol.encoding.as_str() {
"ssz" => ErrorResponse::from_ssz_bytes(&packet)?,
_ => return Err(RPCError::InvalidProtocol("Unknown Encoding")),
};
Ok(RPCResponse::Error(format!(
"Invalid Request: {}",
response.error_message
)))
}
ResponseCode::ServerError => {
let response = match protocol.encoding.as_str() {
"ssz" => ErrorResponse::from_ssz_bytes(&packet)?,
_ => return Err(RPCError::InvalidProtocol("Unknown Encoding")),
};
Ok(RPCResponse::Error(format!(
"Remote Server Error: {}",
response.error_message
)))
}
ResponseCode::Success => match protocol.message_name.as_str() {
"hello" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)),
_ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")),
},
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")),
}, },
_ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), "goodbye" => Err(RPCError::Custom(
}, "GOODBYE should not have a response".into(),
"goodbye" => Err(RPCError::Custom(
"GOODBYE should not have a response".into(),
)),
"beacon_block_roots" => match protocol.version {
1 => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::BeaconBlockRoots(
BeaconBlockRootsResponse::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS encoding",
)),
},
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS version",
)), )),
}, "beacon_block_roots" => match protocol.version.as_str() {
"beacon_block_headers" => match protocol.version { "1.0.0" => match protocol.encoding.as_str() {
1 => match protocol.encoding.as_str() { "ssz" => Ok(RPCResponse::BeaconBlockRoots(
"ssz" => Ok(RPCResponse::BeaconBlockHeaders( BeaconBlockRootsResponse::from_ssz_bytes(&packet)?,
BeaconBlockHeadersResponse { headers: packet }, )),
)), _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_ROOTS encoding",
)),
},
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_HEADERS encoding", "Unknown BEACON_BLOCK_ROOTS version.",
)), )),
}, },
_ => Err(RPCError::InvalidProtocol( "beacon_block_headers" => match protocol.version.as_str() {
"Unknown BEACON_BLOCK_HEADERS version", "1.0.0" => match protocol.encoding.as_str() {
)), "ssz" => Ok(RPCResponse::BeaconBlockHeaders(
}, BeaconBlockHeadersResponse { headers: packet },
"beacon_block_bodies" => match protocol.version { )),
1 => match protocol.encoding.as_str() { _ => Err(RPCError::InvalidProtocol(
"ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { "Unknown BEACON_BLOCK_HEADERS encoding",
block_bodies: packet, )),
})), },
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_BLOCK_BODIES encoding", "Unknown BEACON_BLOCK_HEADERS version.",
)), )),
}, },
_ => Err(RPCError::InvalidProtocol( "beacon_block_bodies" => match protocol.version.as_str() {
"Unknown BEACON_BLOCK_BODIES version", "1.0.0" => match protocol.encoding.as_str() {
)), "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse {
}, block_bodies: packet,
"beacon_chain_state" => match protocol.version { })),
1 => match protocol.encoding.as_str() { _ => Err(RPCError::InvalidProtocol(
"ssz" => Ok(RPCResponse::BeaconChainState( "Unknown BEACON_BLOCK_BODIES encoding",
BeaconChainStateResponse::from_ssz_bytes(&packet)?, )),
)), },
_ => Err(RPCError::InvalidProtocol( _ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE encoding", "Unknown BEACON_BLOCK_BODIES version.",
)),
},
"beacon_chain_state" => match protocol.version.as_str() {
"1.0.0" => match protocol.encoding.as_str() {
"ssz" => Ok(RPCResponse::BeaconChainState(
BeaconChainStateResponse::from_ssz_bytes(&packet)?,
)),
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE encoding",
)),
},
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version.",
)), )),
}, },
_ => Err(RPCError::InvalidProtocol(
"Unknown BEACON_CHAIN_STATE version",
)),
}, },
} }
} }
@ -503,7 +535,6 @@ impl From<ssz::DecodeError> for RPCError {
RPCError::SSZDecodeError(err) RPCError::SSZDecodeError(err)
} }
} }
impl<T> From<tokio::timer::timeout::Error<T>> for RPCError { impl<T> From<tokio::timer::timeout::Error<T>> for RPCError {
fn from(err: tokio::timer::timeout::Error<T>) -> Self { fn from(err: tokio::timer::timeout::Error<T>) -> Self {
if err.is_elapsed() { if err.is_elapsed() {

View File

@ -91,8 +91,11 @@ where
RPCRequestResponseInner::ReadResponseCode(mut inner, max_size) => { RPCRequestResponseInner::ReadResponseCode(mut inner, max_size) => {
match inner.poll()? { match inner.poll()? {
Async::Ready((socket, data)) => { Async::Ready((socket, data)) => {
let resp_code_byte = [0; 1];
// data must be only 1-byte - this cannot panic
resp_code_byte.copy_from_slice(&data.into_inner());
let response_code = let response_code =
ResponseCode::from(u64::from_be_bytes(data.into_inner())); ResponseCode::from(u8::from_be_bytes(resp_code_byte));
// known response codes // known response codes
match response_code { match response_code {
ResponseCode::Success ResponseCode::Success
@ -113,7 +116,7 @@ where
// unknown response code // unknown response code
let response = RPCResponse::Error(format!( let response = RPCResponse::Error(format!(
"Unknown response code: {}", "Unknown response code: {}",
response_code (response_code as u8)
)); ));
return Ok(Async::Ready(response)); return Ok(Async::Ready(response));
} }