diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index e5fbf419b..04ab425ac 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -1,7 +1,10 @@ use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, - protocol::{Encoding, Protocol, ProtocolId, RPCError, Version}, + protocol::{ + Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX, + BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN, + }, }; use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::{BufMut, Bytes, BytesMut}; @@ -84,29 +87,61 @@ impl Decoder for SSZInboundCodec { match self.inner.decode(src).map_err(RPCError::from) { Ok(Some(packet)) => match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &packet, - )?))), + Version::V1 => { + if packet.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( + &packet, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Goodbye => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( - &packet, - )?))), + Version::V1 => { + if packet.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( + &packet, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(&packet)?, - ))), + Version::V1 => { + if packet.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&packet)?, + ))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&packet)?, - }))), + Version::V1 => { + if packet.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN + && packet.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX + { + Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&packet)?, + }))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Ping(Ping { - data: u64::from_ssz_bytes(&packet)?, - }))), + Version::V1 => { + if packet.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::Ping(Ping { + data: u64::from_ssz_bytes(&packet)?, + }))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::MetaData => match self.protocol.version { Version::V1 => { @@ -208,30 +243,64 @@ impl Decoder for SSZOutboundCodec { match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::Status( - StatusMessage::from_ssz_bytes(&raw_bytes)?, - ))), + Version::V1 => { + if raw_bytes.len() == ::ssz_fixed_len() { + Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( + &raw_bytes, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( - SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, - )))), + Version::V1 => { + if raw_bytes.len() >= *SIGNED_BEACON_BLOCK_MIN + && raw_bytes.len() <= *SIGNED_BEACON_BLOCK_MAX + { + Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, + )))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new( - SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, - )))), + Version::V1 => { + if raw_bytes.len() >= *SIGNED_BEACON_BLOCK_MIN + && raw_bytes.len() <= *SIGNED_BEACON_BLOCK_MAX + { + Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, + )))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::Pong(Ping { - data: u64::from_ssz_bytes(&raw_bytes)?, - }))), + Version::V1 => { + if raw_bytes.len() == ::ssz_fixed_len() { + Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(&raw_bytes)?, + }))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::MetaData => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::MetaData( - MetaData::from_ssz_bytes(&raw_bytes)?, - ))), + Version::V1 => { + if raw_bytes.len() == as Encode>::ssz_fixed_len() { + Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( + &raw_bytes, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, } } diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs index 5056cbc6b..b33cfdc32 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -1,7 +1,10 @@ use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, - protocol::{Encoding, Protocol, ProtocolId, RPCError, Version}, + protocol::{ + Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX, + BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN, + }, }; use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BytesMut; @@ -123,29 +126,63 @@ impl Decoder for SSZSnappyInboundCodec { let _read_bytes = src.split_to(n as usize); match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))), + Version::V1 => { + if decoded_buffer.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( + &decoded_buffer, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Goodbye => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Goodbye( - GoodbyeReason::from_ssz_bytes(&decoded_buffer)?, - ))), + Version::V1 => { + if decoded_buffer.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( + &decoded_buffer, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, - ))), + Version::V1 => { + if decoded_buffer.len() + == ::ssz_fixed_len() + { + Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, + ))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, - }))), + Version::V1 => { + if decoded_buffer.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN + && decoded_buffer.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX + { + Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, + }))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Ping(Ping::from_ssz_bytes( - &decoded_buffer, - )?))), + Version::V1 => { + if decoded_buffer.len() == ::ssz_fixed_len() { + Ok(Some(RPCRequest::Ping(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::MetaData => match self.protocol.version { Version::V1 => { @@ -268,33 +305,65 @@ impl Decoder for SSZSnappyOutboundCodec { let _read_byts = src.split_to(n as usize); match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::Status( - StatusMessage::from_ssz_bytes(&decoded_buffer)?, - ))), + Version::V1 => { + if decoded_buffer.len() == ::ssz_fixed_len() { + Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( + &decoded_buffer, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, - Protocol::Goodbye => { - // Goodbye does not have a response - Err(RPCError::InvalidData) - } + Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( - SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, - )))), + Version::V1 => { + if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN + && decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX + { + Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new( - SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, - )))), + Version::V1 => { + if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN + && decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX + { + Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::Pong(Ping { - data: u64::from_ssz_bytes(&decoded_buffer)?, - }))), + Version::V1 => { + if decoded_buffer.len() == ::ssz_fixed_len() { + Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))) + } else { + Err(RPCError::InvalidData) + } + } }, Protocol::MetaData => match self.protocol.version { - Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( - &decoded_buffer, - )?))), + Version::V1 => { + if decoded_buffer.len() == as Encode>::ssz_fixed_len() + { + Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( + &decoded_buffer, + )?))) + } else { + Err(RPCError::InvalidData) + } + } }, } } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 7a30d6fc4..cae9b1526 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -11,7 +11,7 @@ use std::ops::Deref; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Maximum number of blocks in a single request. -type MaxRequestBlocks = U1024; +pub type MaxRequestBlocks = U1024; pub const MAX_REQUEST_BLOCKS: u64 = 1024; /// Maximum length of error message. @@ -188,7 +188,7 @@ pub struct BlocksByRangeRequest { } /// Request a number of beacon block bodies from a peer. -#[derive(Clone, Debug, PartialEq)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BlocksByRootRequest { /// The list of beacon block bodies being requested. pub block_roots: VariableList, diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 56ef47f5a..7fffe694a 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -23,8 +23,8 @@ pub(crate) use protocol::{RPCProtocol, RPCRequest}; pub use handler::SubstreamId; pub use methods::{ - BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCResponseErrorCode, RequestId, - ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, + BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, MaxRequestBlocks, + RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; pub use protocol::{Protocol, RPCError}; diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 6f5beb749..b8a417981 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -9,11 +9,14 @@ use crate::rpc::{ InboundCodec, OutboundCodec, }, methods::ResponseTermination, + MaxRequestBlocks, MAX_REQUEST_BLOCKS, }; use futures::future::Ready; use futures::prelude::*; use futures::prelude::{AsyncRead, AsyncWrite}; use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use ssz::Encode; +use ssz_types::VariableList; use std::io; use std::marker::PhantomData; use std::pin::Pin; @@ -23,7 +26,38 @@ use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, }; -use types::EthSpec; +use types::{BeaconBlock, EthSpec, Hash256, MainnetEthSpec, Signature, SignedBeaconBlock}; + +lazy_static! { + // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is + // same across different `EthSpec` implementations. + pub static ref SIGNED_BEACON_BLOCK_MIN: usize = SignedBeaconBlock:: { + message: BeaconBlock::empty(&MainnetEthSpec::default_spec()), + signature: Signature::empty_signature(), + } + .as_ssz_bytes() + .len(); + pub static ref SIGNED_BEACON_BLOCK_MAX: usize = SignedBeaconBlock:: { + message: BeaconBlock::full(&MainnetEthSpec::default_spec()), + signature: Signature::empty_signature(), + } + .as_ssz_bytes() + .len(); + pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = BlocksByRootRequest { + block_roots: VariableList::::from(Vec::::new()) + } + .as_ssz_bytes() + .len(); + pub static ref BLOCKS_BY_ROOT_REQUEST_MAX: usize = BlocksByRootRequest { + block_roots: VariableList::::from(vec![ + Hash256::zero(); + MAX_REQUEST_BLOCKS + as usize + ]) + } + .as_ssz_bytes() + .len(); +} /// The maximum bytes that can be sent across the RPC. const MAX_RPC_SIZE: usize = 1_048_576; // 1M