From 99a02fd2ab01d7af5783b8e39d3262e78df58fab Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Sun, 11 Oct 2020 22:45:33 +0000 Subject: [PATCH] Limit snappy input stream (#1738) ## Issue Addressed N/A ## Proposed Changes This PR limits the length of the stream received by the snappy decoder to be the maximum allowed size for the received rpc message type. Also adds further checks to ensure that the length specified in the rpc [encoding-dependent header](https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/p2p-interface.md#encoding-strategies) is within the bounds for the rpc message type being decoded. --- beacon_node/eth2_libp2p/src/rpc/codec/base.rs | 63 +++- .../eth2_libp2p/src/rpc/codec/ssz_snappy.rs | 289 ++++++------------ beacon_node/eth2_libp2p/src/rpc/methods.rs | 5 +- beacon_node/eth2_libp2p/src/rpc/protocol.rs | 91 +++++- 4 files changed, 252 insertions(+), 196 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs index d7a64b58d..1613b59d5 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs @@ -177,7 +177,15 @@ where mod tests { use super::super::ssz_snappy::*; use super::*; + use crate::rpc::methods::StatusMessage; use crate::rpc::protocol::*; + use snap::write::FrameEncoder; + use ssz::Encode; + use std::io::Write; + use types::{Epoch, Hash256, Slot}; + use unsigned_varint::codec::Uvi; + + type Spec = types::MainnetEthSpec; #[test] fn test_decode_status_message() { @@ -185,8 +193,6 @@ mod tests { let mut buf = BytesMut::new(); buf.extend_from_slice(&message); - type Spec = types::MainnetEthSpec; - let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); @@ -207,4 +213,57 @@ mod tests { let _ = dbg!(snappy_decoded_message); let _ = dbg!(snappy_decoded_chunk); } + + #[test] + fn test_decode_malicious_status_message() { + // Snappy stream identifier + let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; + + // byte 0(0xFE) is padding chunk type identifier for snappy messages + // byte 1,2,3 are chunk length (little endian) + let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00"; + + // Status message is 84 bytes uncompressed. `max_compressed_len` is 130. + let status_message_bytes = StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + } + .as_ssz_bytes(); + + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Insert length-prefix + uvi_codec + .encode(status_message_bytes.len(), &mut dst) + .unwrap(); + + // Insert snappy stream identifier + dst.extend_from_slice(stream_identifier); + + // Insert malicious padding of 80 bytes. + for _ in 0..20 { + dst.extend_from_slice(malicious_padding); + } + + // Insert payload (42 bytes compressed) + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&status_message_bytes).unwrap(); + writer.flush().unwrap(); + dst.extend_from_slice(writer.get_ref()); + + // 42 + 80 = 132 > max_compressed_len. Hence, decoding should fail with `InvalidData`. + + let snappy_protocol_id = + ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + + let mut snappy_outbound_codec = + SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); + + let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst.clone()).unwrap_err(); + assert_eq!(snappy_decoded_message, RPCError::InvalidData); + } } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 8b35a4c68..8fe12adca 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -1,10 +1,7 @@ use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, - protocol::{ - Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX, - BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN, - }, + protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, }; use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BytesMut; @@ -110,79 +107,56 @@ impl Decoder for SSZSnappyInboundCodec { let length = self.len.expect("length should be Some"); - // Should not attempt to decode rpc chunks with length > max_packet_size - if length > self.max_packet_size { + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `self.protocol`. + let ssz_limits = self.protocol.rpc_request_limits(); + if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) { return Err(RPCError::InvalidData); } - let mut reader = FrameDecoder::new(Cursor::new(&src)); + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + + // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); let mut decoded_buffer = vec![0; length]; - match read_exact(&mut reader, &mut decoded_buffer, length) { + match reader.read_exact(&mut decoded_buffer) { Ok(()) => { // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().position(); + let n = reader.get_ref().get_ref().position(); self.len = None; let _read_bytes = src.split_to(n as usize); + + // We need not check that decoded_buffer.len() is within bounds here + // since we have already checked `length` above. match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( + &decoded_buffer, + )?))), }, Protocol::Goodbye => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::Goodbye( + GoodbyeReason::from_ssz_bytes(&decoded_buffer)?, + ))), }, Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() - == ::ssz_fixed_len() - { - Ok(Some(RPCRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, - ))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, + ))), }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN - && decoded_buffer.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX - { - Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, + }))), }, Protocol::Ping => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCRequest::Ping(Ping { - data: u64::from_ssz_bytes(&decoded_buffer)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCRequest::Ping(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))), }, + // This case should be unreachable as `MetaData` requests are handled separately in the `InboundUpgrade` Protocol::MetaData => match self.protocol.version { Version::V1 => { if !decoded_buffer.is_empty() { @@ -194,11 +168,7 @@ impl Decoder for SSZSnappyInboundCodec { }, } } - Err(e) => match e.kind() { - // Haven't received enough bytes to decode yet, wait for more - ErrorKind::UnexpectedEof => Ok(None), - _ => Err(e).map_err(RPCError::from), - }, + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } } } @@ -288,87 +258,60 @@ impl Decoder for SSZSnappyOutboundCodec { let length = self.len.expect("length should be Some"); - // Should not attempt to decode rpc chunks with length > max_packet_size - if length > self.max_packet_size { + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `self.protocol`. + let ssz_limits = self.protocol.rpc_response_limits::(); + if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) { return Err(RPCError::InvalidData); } - let mut reader = FrameDecoder::new(Cursor::new(&src)); + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); + let mut decoded_buffer = vec![0; length]; - match read_exact(&mut reader, &mut decoded_buffer, length) { + + match reader.read_exact(&mut decoded_buffer) { Ok(()) => { // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().position(); + let n = reader.get_ref().get_ref().position(); self.len = None; - let _read_byts = src.split_to(n as usize); + let _read_bytes = src.split_to(n as usize); + + // We need not check that decoded_buffer.len() is within bounds here + // since we have already checked `length` above. match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::Status( + StatusMessage::from_ssz_bytes(&decoded_buffer)?, + ))), }, + // This case should be unreachable as `Goodbye` has no response. Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN - && decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX - { - Ok(Some(RPCResponse::BlocksByRange(Box::new( - SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, - )))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))), }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN - && decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX - { - Ok(Some(RPCResponse::BlocksByRoot(Box::new( - SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, - )))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new( + SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?, + )))), }, Protocol::Ping => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == ::ssz_fixed_len() { - Ok(Some(RPCResponse::Pong(Ping { - data: u64::from_ssz_bytes(&decoded_buffer)?, - }))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(&decoded_buffer)?, + }))), }, Protocol::MetaData => match self.protocol.version { - Version::V1 => { - if decoded_buffer.len() == as Encode>::ssz_fixed_len() - { - Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( - &decoded_buffer, - )?))) - } else { - Err(RPCError::InvalidData) - } - } + Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( + &decoded_buffer, + )?))), }, } } - Err(e) => match e.kind() { - // Haven't received enough bytes to decode yet, wait for more - ErrorKind::UnexpectedEof => Ok(None), - _ => Err(e).map_err(RPCError::from), - }, + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } } } @@ -392,84 +335,52 @@ impl OutboundCodec> for SSZSnappyOutboundCodec let length = self.len.expect("length should be Some"); - // Should not attempt to decode rpc chunks with length > max_packet_size - if length > self.max_packet_size { + // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of + // packet size for ssz container corresponding to `ErrorType`. + if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { return Err(RPCError::InvalidData); } - let mut reader = FrameDecoder::new(Cursor::new(&src)); + + // Calculate worst case compression length for given uncompressed length + let max_compressed_len = snap::raw::max_compress_len(length) as u64; + // // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`. + let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len); + let mut reader = FrameDecoder::new(limit_reader); let mut decoded_buffer = vec![0; length]; - match read_exact(&mut reader, &mut decoded_buffer, length) { + match reader.read_exact(&mut decoded_buffer) { Ok(()) => { // `n` is how many bytes the reader read in the compressed stream - let n = reader.get_ref().position(); + let n = reader.get_ref().get_ref().position(); self.len = None; let _read_bytes = src.split_to(n as usize); Ok(Some(ErrorType(VariableList::from_ssz_bytes( &decoded_buffer, )?))) } - Err(e) => match e.kind() { + Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), + } + } +} + +/// Handle errors that we get from decoding an RPC message from the stream. +/// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream. +/// `max_compressed_len` is the maximum compressed size for a given uncompressed size. +fn handle_error( + err: std::io::Error, + num_bytes: u64, + max_compressed_len: u64, +) -> Result, RPCError> { + match err.kind() { + ErrorKind::UnexpectedEof => { + // If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message. + // Report as `InvalidData` so that malicious peer gets banned. + if num_bytes >= max_compressed_len { + Err(RPCError::InvalidData) + } else { // Haven't received enough bytes to decode yet, wait for more - ErrorKind::UnexpectedEof => Ok(None), - _ => Err(e).map_err(RPCError::from), - }, - } - } -} - -/// Wrapper over `read` implementation of `FrameDecoder`. -/// -/// Works like the standard `read_exact` implementation, except that it returns an error if length of -// compressed bytes read from the underlying reader is greater than worst case compression length for snappy. -fn read_exact>( - reader: &mut FrameDecoder>, - mut buf: &mut [u8], - uncompressed_length: usize, -) -> Result<(), std::io::Error> { - // Calculate worst case compression length for given uncompressed length - let max_compressed_len = snap::raw::max_compress_len(uncompressed_length) as u64; - - // Initialize the position of the reader - let mut pos = reader.get_ref().position(); - let mut count = 0; - while !buf.is_empty() { - match reader.read(buf) { - Ok(0) => break, - Ok(n) => { - let tmp = buf; - buf = &mut tmp[n..]; + Ok(None) } - Err(ref e) if e.kind() == ErrorKind::Interrupted => {} - Err(e) => return Err(e), } - // Get current position of reader - let curr_pos = reader.get_ref().position(); - // Note: reader should always advance forward. However, this behaviour - // depends on the implementation of `snap::FrameDecoder`, so it is better - // to check to avoid underflow panic. - if curr_pos > pos { - count += reader.get_ref().position() - pos; - pos = curr_pos; - } else { - return Err(std::io::Error::new( - ErrorKind::InvalidData, - "snappy: reader is not advanced forward while reading", - )); - } - - if count > max_compressed_len { - return Err(std::io::Error::new( - ErrorKind::InvalidData, - "snappy: compressed data is > max_compressed_len", - )); - } - } - if !buf.is_empty() { - Err(std::io::Error::new( - ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )) - } else { - Ok(()) + _ => Err(err).map_err(RPCError::from), } } diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 30c9b13fb..f3a239c8d 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -16,7 +16,8 @@ pub type MaxRequestBlocks = U1024; pub const MAX_REQUEST_BLOCKS: u64 = 1024; /// Maximum length of error message. -type MaxErrorLen = U256; +pub type MaxErrorLen = U256; +pub const MAX_ERROR_LEN: u64 = 256; /// Wrapper over SSZ List to represent error message in rpc responses. #[derive(Debug, Clone)] @@ -256,7 +257,7 @@ pub enum RPCCodedResponse { } /// The code assigned to an erroneous `RPCResponse`. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum RPCResponseErrorCode { RateLimited, InvalidRequest, diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index fbd5326a9..9adfa241a 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -5,7 +5,7 @@ use crate::rpc::{ ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}, InboundCodec, OutboundCodec, }, - methods::ResponseTermination, + methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN}, MaxRequestBlocks, MAX_REQUEST_BLOCKS, }; use futures::future::BoxFuture; @@ -51,6 +51,19 @@ lazy_static! { ]) .as_ssz_bytes() .len(); + pub static ref ERROR_TYPE_MIN: usize = + VariableList::::from(Vec::::new()) + .as_ssz_bytes() + .len(); + pub static ref ERROR_TYPE_MAX: usize = + VariableList::::from(vec![ + 0u8; + MAX_ERROR_LEN + as usize + ]) + .as_ssz_bytes() + .len(); + } /// The maximum bytes that can be sent across the RPC. @@ -147,6 +160,24 @@ impl UpgradeInfo for RPCProtocol { } } +/// Represents the ssz length bounds for RPC messages. +#[derive(Debug, PartialEq)] +pub struct RpcLimits { + min: usize, + max: usize, +} + +impl RpcLimits { + pub fn new(min: usize, max: usize) -> Self { + Self { min, max } + } + + /// Returns true if the given length is out of bounds, false otherwise. + pub fn is_out_of_bounds(&self, length: usize) -> bool { + length > self.max || length < self.min + } +} + /// Tracks the types in a protocol id. #[derive(Clone, Debug)] pub struct ProtocolId { @@ -163,6 +194,59 @@ pub struct ProtocolId { protocol_id: String, } +impl ProtocolId { + /// Returns min and max size for messages of given protocol id requests. + pub fn rpc_request_limits(&self) -> RpcLimits { + match self.message_name { + Protocol::Status => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::Goodbye => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::BlocksByRange => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::BlocksByRoot => { + RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) + } + Protocol::Ping => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::MetaData => RpcLimits::new(0, 0), // Metadata requests are empty + } + } + + /// Returns min and max size for messages of given protocol id responses. + pub fn rpc_response_limits(&self) -> RpcLimits { + match self.message_name { + Protocol::Status => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response + Protocol::BlocksByRange => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX) + } + Protocol::BlocksByRoot => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX) + } + Protocol::Ping => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), + Protocol::MetaData => RpcLimits::new( + as Encode>::ssz_fixed_len(), + as Encode>::ssz_fixed_len(), + ), + } + } +} + /// An RPC protocol ID. impl ProtocolId { pub fn new(message_name: Protocol, version: Version, encoding: Encoding) -> Self { @@ -233,7 +317,8 @@ where { Err(e) => Err(RPCError::from(e)), Ok((Some(Ok(request)), stream)) => Ok((request, stream)), - Ok((Some(Err(_)), _)) | Ok((None, _)) => Err(RPCError::IncompleteStream), + Ok((Some(Err(e)), _)) => Err(e), + Ok((None, _)) => Err(RPCError::IncompleteStream), } } } @@ -385,7 +470,7 @@ where } /// Error in RPC Encoding/Decoding. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum RPCError { /// Error when decoding the raw buffer from ssz. // NOTE: in the future a ssz::DecodeError should map to an InvalidData error