Correctly encode/decode RPC errors (#1157)
This commit is contained in:
parent
2d8e2dd7f5
commit
a4b07a833c
@ -1,6 +1,6 @@
|
|||||||
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
|
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.
|
||||||
|
|
||||||
use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse};
|
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
|
||||||
use libp2p::bytes::BufMut;
|
use libp2p::bytes::BufMut;
|
||||||
use libp2p::bytes::BytesMut;
|
use libp2p::bytes::BytesMut;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
@ -130,8 +130,8 @@ where
|
|||||||
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
|
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
|
||||||
where
|
where
|
||||||
TSpec: EthSpec,
|
TSpec: EthSpec,
|
||||||
TCodec: OutboundCodec<RPCRequest<TSpec>, ErrorType = ErrorMessage>
|
TCodec:
|
||||||
+ Decoder<Item = RPCResponse<TSpec>>,
|
OutboundCodec<RPCRequest<TSpec>, ErrorType = String> + Decoder<Item = RPCResponse<TSpec>>,
|
||||||
{
|
{
|
||||||
type Item = RPCCodedResponse<TSpec>;
|
type Item = RPCCodedResponse<TSpec>;
|
||||||
type Error = <TCodec as Decoder>::Error;
|
type Error = <TCodec as Decoder>::Error;
|
||||||
|
@ -3,7 +3,7 @@ use crate::rpc::{
|
|||||||
codec::base::OutboundCodec,
|
codec::base::OutboundCodec,
|
||||||
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
|
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
|
||||||
};
|
};
|
||||||
use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse};
|
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
|
||||||
use libp2p::bytes::{BufMut, Bytes, BytesMut};
|
use libp2p::bytes::{BufMut, Bytes, BytesMut};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
@ -52,9 +52,9 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZInboundCodec<TSpec>
|
|||||||
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
|
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
|
||||||
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
|
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
|
||||||
},
|
},
|
||||||
RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(),
|
RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(),
|
||||||
RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(),
|
RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(),
|
||||||
RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(),
|
RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(),
|
||||||
RPCCodedResponse::StreamTermination(_) => {
|
RPCCodedResponse::StreamTermination(_) => {
|
||||||
unreachable!("Code error - attempting to encode a stream termination")
|
unreachable!("Code error - attempting to encode a stream termination")
|
||||||
}
|
}
|
||||||
@ -242,11 +242,13 @@ impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
|
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZOutboundCodec<TSpec> {
|
||||||
type ErrorType = ErrorMessage;
|
type ErrorType = String;
|
||||||
|
|
||||||
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
|
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
|
||||||
match self.inner.decode(src).map_err(RPCError::from) {
|
match self.inner.decode(src).map_err(RPCError::from) {
|
||||||
Ok(Some(packet)) => Ok(Some(ErrorMessage::from_ssz_bytes(&packet)?)),
|
Ok(Some(packet)) => Ok(Some(
|
||||||
|
String::from_utf8_lossy(&<Vec<u8>>::from_ssz_bytes(&packet)?).into(),
|
||||||
|
)),
|
||||||
Ok(None) => Ok(None),
|
Ok(None) => Ok(None),
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,7 @@ use crate::rpc::{
|
|||||||
codec::base::OutboundCodec,
|
codec::base::OutboundCodec,
|
||||||
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
|
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version},
|
||||||
};
|
};
|
||||||
use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse};
|
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
|
||||||
use libp2p::bytes::BytesMut;
|
use libp2p::bytes::BytesMut;
|
||||||
use snap::read::FrameDecoder;
|
use snap::read::FrameDecoder;
|
||||||
use snap::write::FrameEncoder;
|
use snap::write::FrameEncoder;
|
||||||
@ -60,9 +60,9 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
|
|||||||
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
|
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
|
||||||
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
|
RPCResponse::MetaData(res) => res.as_ssz_bytes(),
|
||||||
},
|
},
|
||||||
RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(),
|
RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(),
|
||||||
RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(),
|
RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(),
|
||||||
RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(),
|
RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(),
|
||||||
RPCCodedResponse::StreamTermination(_) => {
|
RPCCodedResponse::StreamTermination(_) => {
|
||||||
unreachable!("Code error - attempting to encode a stream termination")
|
unreachable!("Code error - attempting to encode a stream termination")
|
||||||
}
|
}
|
||||||
@ -310,7 +310,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
|
impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
|
||||||
type ErrorType = ErrorMessage;
|
type ErrorType = String;
|
||||||
|
|
||||||
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
|
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<Self::ErrorType>, RPCError> {
|
||||||
if self.len.is_none() {
|
if self.len.is_none() {
|
||||||
@ -337,7 +337,9 @@ impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec
|
|||||||
let n = reader.get_ref().position();
|
let n = reader.get_ref().position();
|
||||||
self.len = None;
|
self.len = None;
|
||||||
let _read_bytes = src.split_to(n as usize);
|
let _read_bytes = src.split_to(n as usize);
|
||||||
Ok(Some(ErrorMessage::from_ssz_bytes(&decoded_buffer)?))
|
Ok(Some(
|
||||||
|
String::from_utf8_lossy(&<Vec<u8>>::from_ssz_bytes(&decoded_buffer)?).into(),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
Err(e) => match e.kind() {
|
Err(e) => match e.kind() {
|
||||||
// Haven't received enough bytes to decode yet
|
// Haven't received enough bytes to decode yet
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#![allow(clippy::type_complexity)]
|
#![allow(clippy::type_complexity)]
|
||||||
#![allow(clippy::cognitive_complexity)]
|
#![allow(clippy::cognitive_complexity)]
|
||||||
|
|
||||||
use super::methods::{ErrorMessage, RPCCodedResponse, RequestId, ResponseTermination};
|
use super::methods::{RPCCodedResponse, RequestId, ResponseTermination};
|
||||||
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
|
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
|
||||||
use super::RPCEvent;
|
use super::RPCEvent;
|
||||||
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
|
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
|
||||||
@ -163,9 +163,7 @@ where
|
|||||||
pub fn close(&mut self, outbound_queue: &mut Vec<RPCCodedResponse<TSpec>>) {
|
pub fn close(&mut self, outbound_queue: &mut Vec<RPCCodedResponse<TSpec>>) {
|
||||||
// When terminating a stream, report the stream termination to the requesting user via
|
// When terminating a stream, report the stream termination to the requesting user via
|
||||||
// an RPC error
|
// an RPC error
|
||||||
let error = RPCCodedResponse::ServerError(ErrorMessage {
|
let error = RPCCodedResponse::ServerError("Request timed out".into());
|
||||||
error_message: b"Request timed out".to_vec(),
|
|
||||||
});
|
|
||||||
|
|
||||||
// The stream termination type is irrelevant, this will terminate the
|
// The stream termination type is irrelevant, this will terminate the
|
||||||
// stream
|
// stream
|
||||||
|
@ -181,13 +181,13 @@ pub enum RPCCodedResponse<T: EthSpec> {
|
|||||||
Success(RPCResponse<T>),
|
Success(RPCResponse<T>),
|
||||||
|
|
||||||
/// The response was invalid.
|
/// The response was invalid.
|
||||||
InvalidRequest(ErrorMessage),
|
InvalidRequest(String),
|
||||||
|
|
||||||
/// The response indicates a server error.
|
/// The response indicates a server error.
|
||||||
ServerError(ErrorMessage),
|
ServerError(String),
|
||||||
|
|
||||||
/// There was an unknown response.
|
/// There was an unknown response.
|
||||||
Unknown(ErrorMessage),
|
Unknown(String),
|
||||||
|
|
||||||
/// Received a stream termination indicating which response is being terminated.
|
/// Received a stream termination indicating which response is being terminated.
|
||||||
StreamTermination(ResponseTermination),
|
StreamTermination(ResponseTermination),
|
||||||
@ -222,7 +222,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Builds an RPCCodedResponse from a response code and an ErrorMessage
|
/// Builds an RPCCodedResponse from a response code and an ErrorMessage
|
||||||
pub fn from_error(response_code: u8, err: ErrorMessage) -> Self {
|
pub fn from_error(response_code: u8, err: String) -> Self {
|
||||||
match response_code {
|
match response_code {
|
||||||
1 => RPCCodedResponse::InvalidRequest(err),
|
1 => RPCCodedResponse::InvalidRequest(err),
|
||||||
2 => RPCCodedResponse::ServerError(err),
|
2 => RPCCodedResponse::ServerError(err),
|
||||||
@ -268,18 +268,6 @@ impl<T: EthSpec> RPCCodedResponse<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Encode, Decode, Debug, Clone)]
|
|
||||||
pub struct ErrorMessage {
|
|
||||||
/// The UTF-8 encoded Error message string.
|
|
||||||
pub error_message: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::string::ToString for ErrorMessage {
|
|
||||||
fn to_string(&self) -> String {
|
|
||||||
String::from_utf8(self.error_message.clone()).unwrap_or_else(|_| "".into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for RPCResponseErrorCode {
|
impl std::fmt::Display for RPCResponseErrorCode {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
let repr = match self {
|
let repr = match self {
|
||||||
|
@ -12,8 +12,8 @@ use libp2p::swarm::{
|
|||||||
};
|
};
|
||||||
use libp2p::{Multiaddr, PeerId};
|
use libp2p::{Multiaddr, PeerId};
|
||||||
pub use methods::{
|
pub use methods::{
|
||||||
ErrorMessage, MetaData, RPCCodedResponse, RPCResponse, RPCResponseErrorCode, RequestId,
|
MetaData, RPCCodedResponse, RPCResponse, RPCResponseErrorCode, RequestId, ResponseTermination,
|
||||||
ResponseTermination, StatusMessage,
|
StatusMessage,
|
||||||
};
|
};
|
||||||
pub use protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
|
pub use protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
|
||||||
use slog::{debug, o};
|
use slog::{debug, o};
|
||||||
|
Loading…
Reference in New Issue
Block a user