Limit snappy input stream (#1738)

## Issue Addressed

N/A

## Proposed Changes

This PR limits the length of the stream received by the snappy decoder to be the maximum allowed size for the received rpc message type. Also adds further checks to ensure that the length specified in the rpc [encoding-dependent header](https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/p2p-interface.md#encoding-strategies) is within the bounds for the rpc message type being decoded.
This commit is contained in:
Pawan Dhananjay 2020-10-11 22:45:33 +00:00
parent b185d7bbd8
commit 99a02fd2ab
4 changed files with 252 additions and 196 deletions

View File

@ -177,7 +177,15 @@ where
mod tests {
use super::super::ssz_snappy::*;
use super::*;
use crate::rpc::methods::StatusMessage;
use crate::rpc::protocol::*;
use snap::write::FrameEncoder;
use ssz::Encode;
use std::io::Write;
use types::{Epoch, Hash256, Slot};
use unsigned_varint::codec::Uvi;
type Spec = types::MainnetEthSpec;
#[test]
fn test_decode_status_message() {
@ -185,8 +193,6 @@ mod tests {
let mut buf = BytesMut::new();
buf.extend_from_slice(&message);
type Spec = types::MainnetEthSpec;
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
@ -207,4 +213,57 @@ mod tests {
let _ = dbg!(snappy_decoded_message);
let _ = dbg!(snappy_decoded_chunk);
}
#[test]
fn test_decode_malicious_status_message() {
// Snappy stream identifier
let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY";
// byte 0(0xFE) is padding chunk type identifier for snappy messages
// byte 1,2,3 are chunk length (little endian)
let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00";
// Status message is 84 bytes uncompressed. `max_compressed_len` is 130.
let status_message_bytes = StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
}
.as_ssz_bytes();
let mut uvi_codec: Uvi<usize> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);
// Insert length-prefix
uvi_codec
.encode(status_message_bytes.len(), &mut dst)
.unwrap();
// Insert snappy stream identifier
dst.extend_from_slice(stream_identifier);
// Insert malicious padding of 80 bytes.
for _ in 0..20 {
dst.extend_from_slice(malicious_padding);
}
// Insert payload (42 bytes compressed)
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&status_message_bytes).unwrap();
writer.flush().unwrap();
dst.extend_from_slice(writer.get_ref());
// 42 + 80 = 132 > max_compressed_len. Hence, decoding should fail with `InvalidData`.
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst.clone()).unwrap_err();
assert_eq!(snappy_decoded_message, RPCError::InvalidData);
}
}

View File

@ -1,10 +1,7 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{
Encoding, Protocol, ProtocolId, RPCError, Version, BLOCKS_BY_ROOT_REQUEST_MAX,
BLOCKS_BY_ROOT_REQUEST_MIN, SIGNED_BEACON_BLOCK_MAX, SIGNED_BEACON_BLOCK_MIN,
},
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
};
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use libp2p::bytes::BytesMut;
@ -110,79 +107,56 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `self.protocol`.
let ssz_limits = self.protocol.rpc_request_limits();
if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) {
return Err(RPCError::InvalidData);
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
// Calculate worst case compression length for given uncompressed length
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
// Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
let mut reader = FrameDecoder::new(limit_reader);
let mut decoded_buffer = vec![0; length];
match read_exact(&mut reader, &mut decoded_buffer, length) {
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
// We need not check that decoded_buffer.len() is within bounds here
// since we have already checked `length` above.
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() == <StatusMessage as Encode>::ssz_fixed_len() {
Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?)))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?))),
},
Protocol::Goodbye => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() == <GoodbyeReason as Encode>::ssz_fixed_len() {
Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(
&decoded_buffer,
)?)))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCRequest::Goodbye(
GoodbyeReason::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len()
== <BlocksByRangeRequest as Encode>::ssz_fixed_len()
{
Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
)))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() >= *BLOCKS_BY_ROOT_REQUEST_MIN
&& decoded_buffer.len() <= *BLOCKS_BY_ROOT_REQUEST_MAX
{
Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
})))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
}))),
},
Protocol::Ping => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() == <Ping as Encode>::ssz_fixed_len() {
Ok(Some(RPCRequest::Ping(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
})))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCRequest::Ping(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
}))),
},
// This case should be unreachable as `MetaData` requests are handled separately in the `InboundUpgrade`
Protocol::MetaData => match self.protocol.version {
Version::V1 => {
if !decoded_buffer.is_empty() {
@ -194,11 +168,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
},
}
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet, wait for more
ErrorKind::UnexpectedEof => Ok(None),
_ => Err(e).map_err(RPCError::from),
},
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
}
}
@ -288,87 +258,60 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `self.protocol`.
let ssz_limits = self.protocol.rpc_response_limits::<TSpec>();
if length > self.max_packet_size || ssz_limits.is_out_of_bounds(length) {
return Err(RPCError::InvalidData);
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
// Calculate worst case compression length for given uncompressed length
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
// Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
let mut reader = FrameDecoder::new(limit_reader);
let mut decoded_buffer = vec![0; length];
match read_exact(&mut reader, &mut decoded_buffer, length) {
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_byts = src.split_to(n as usize);
let _read_bytes = src.split_to(n as usize);
// We need not check that decoded_buffer.len() is within bounds here
// since we have already checked `length` above.
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() == <StatusMessage as Encode>::ssz_fixed_len() {
Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?)))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCResponse::Status(
StatusMessage::from_ssz_bytes(&decoded_buffer)?,
))),
},
// This case should be unreachable as `Goodbye` has no response.
Protocol::Goodbye => Err(RPCError::InvalidData),
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN
&& decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX
{
Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
))))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() >= *SIGNED_BEACON_BLOCK_MIN
&& decoded_buffer.len() <= *SIGNED_BEACON_BLOCK_MAX
{
Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
))))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCResponse::BlocksByRoot(Box::new(
SignedBeaconBlock::from_ssz_bytes(&decoded_buffer)?,
)))),
},
Protocol::Ping => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() == <Ping as Encode>::ssz_fixed_len() {
Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
})))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
}))),
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() == <MetaData<TSpec> as Encode>::ssz_fixed_len()
{
Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes(
&decoded_buffer,
)?)))
} else {
Err(RPCError::InvalidData)
}
}
Version::V1 => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes(
&decoded_buffer,
)?))),
},
}
}
Err(e) => match e.kind() {
// Haven't received enough bytes to decode yet, wait for more
ErrorKind::UnexpectedEof => Ok(None),
_ => Err(e).map_err(RPCError::from),
},
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
}
}
@ -392,84 +335,52 @@ impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec
let length = self.len.expect("length should be Some");
// Should not attempt to decode rpc chunks with length > max_packet_size
if length > self.max_packet_size {
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `ErrorType`.
if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN {
return Err(RPCError::InvalidData);
}
let mut reader = FrameDecoder::new(Cursor::new(&src));
// Calculate worst case compression length for given uncompressed length
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
// // Create a limit reader as a wrapper that reads only upto `max_compressed_len` from `src`.
let limit_reader = Cursor::new(src.as_ref()).take(max_compressed_len);
let mut reader = FrameDecoder::new(limit_reader);
let mut decoded_buffer = vec![0; length];
match read_exact(&mut reader, &mut decoded_buffer, length) {
match reader.read_exact(&mut decoded_buffer) {
Ok(()) => {
// `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position();
let n = reader.get_ref().get_ref().position();
self.len = None;
let _read_bytes = src.split_to(n as usize);
Ok(Some(ErrorType(VariableList::from_ssz_bytes(
&decoded_buffer,
)?)))
}
Err(e) => match e.kind() {
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
}
}
/// Handle errors that we get from decoding an RPC message from the stream.
/// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream.
/// `max_compressed_len` is the maximum compressed size for a given uncompressed size.
fn handle_error<T>(
err: std::io::Error,
num_bytes: u64,
max_compressed_len: u64,
) -> Result<Option<T>, RPCError> {
match err.kind() {
ErrorKind::UnexpectedEof => {
// If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message.
// Report as `InvalidData` so that malicious peer gets banned.
if num_bytes >= max_compressed_len {
Err(RPCError::InvalidData)
} else {
// Haven't received enough bytes to decode yet, wait for more
ErrorKind::UnexpectedEof => Ok(None),
_ => Err(e).map_err(RPCError::from),
},
}
}
}
/// Wrapper over `read` implementation of `FrameDecoder`.
///
/// Works like the standard `read_exact` implementation, except that it returns an error if length of
// compressed bytes read from the underlying reader is greater than worst case compression length for snappy.
fn read_exact<T: std::convert::AsRef<[u8]>>(
reader: &mut FrameDecoder<Cursor<T>>,
mut buf: &mut [u8],
uncompressed_length: usize,
) -> Result<(), std::io::Error> {
// Calculate worst case compression length for given uncompressed length
let max_compressed_len = snap::raw::max_compress_len(uncompressed_length) as u64;
// Initialize the position of the reader
let mut pos = reader.get_ref().position();
let mut count = 0;
while !buf.is_empty() {
match reader.read(buf) {
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
Ok(None)
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
// Get current position of reader
let curr_pos = reader.get_ref().position();
// Note: reader should always advance forward. However, this behaviour
// depends on the implementation of `snap::FrameDecoder`, so it is better
// to check to avoid underflow panic.
if curr_pos > pos {
count += reader.get_ref().position() - pos;
pos = curr_pos;
} else {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"snappy: reader is not advanced forward while reading",
));
}
if count > max_compressed_len {
return Err(std::io::Error::new(
ErrorKind::InvalidData,
"snappy: compressed data is > max_compressed_len",
));
}
}
if !buf.is_empty() {
Err(std::io::Error::new(
ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(())
_ => Err(err).map_err(RPCError::from),
}
}

View File

@ -16,7 +16,8 @@ pub type MaxRequestBlocks = U1024;
pub const MAX_REQUEST_BLOCKS: u64 = 1024;
/// Maximum length of error message.
type MaxErrorLen = U256;
pub type MaxErrorLen = U256;
pub const MAX_ERROR_LEN: u64 = 256;
/// Wrapper over SSZ List to represent error message in rpc responses.
#[derive(Debug, Clone)]
@ -256,7 +257,7 @@ pub enum RPCCodedResponse<T: EthSpec> {
}
/// The code assigned to an erroneous `RPCResponse`.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum RPCResponseErrorCode {
RateLimited,
InvalidRequest,

View File

@ -5,7 +5,7 @@ use crate::rpc::{
ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec},
InboundCodec, OutboundCodec,
},
methods::ResponseTermination,
methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN},
MaxRequestBlocks, MAX_REQUEST_BLOCKS,
};
use futures::future::BoxFuture;
@ -51,6 +51,19 @@ lazy_static! {
])
.as_ssz_bytes()
.len();
pub static ref ERROR_TYPE_MIN: usize =
VariableList::<u8, MaxErrorLen>::from(Vec::<u8>::new())
.as_ssz_bytes()
.len();
pub static ref ERROR_TYPE_MAX: usize =
VariableList::<u8, MaxErrorLen>::from(vec![
0u8;
MAX_ERROR_LEN
as usize
])
.as_ssz_bytes()
.len();
}
/// The maximum bytes that can be sent across the RPC.
@ -147,6 +160,24 @@ impl<TSpec: EthSpec> UpgradeInfo for RPCProtocol<TSpec> {
}
}
/// Represents the ssz length bounds for RPC messages.
#[derive(Debug, PartialEq)]
pub struct RpcLimits {
min: usize,
max: usize,
}
impl RpcLimits {
pub fn new(min: usize, max: usize) -> Self {
Self { min, max }
}
/// Returns true if the given length is out of bounds, false otherwise.
pub fn is_out_of_bounds(&self, length: usize) -> bool {
length > self.max || length < self.min
}
}
/// Tracks the types in a protocol id.
#[derive(Clone, Debug)]
pub struct ProtocolId {
@ -163,6 +194,59 @@ pub struct ProtocolId {
protocol_id: String,
}
impl ProtocolId {
/// Returns min and max size for messages of given protocol id requests.
pub fn rpc_request_limits(&self) -> RpcLimits {
match self.message_name {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
<StatusMessage as Encode>::ssz_fixed_len(),
),
Protocol::Goodbye => RpcLimits::new(
<GoodbyeReason as Encode>::ssz_fixed_len(),
<GoodbyeReason as Encode>::ssz_fixed_len(),
),
Protocol::BlocksByRange => RpcLimits::new(
<BlocksByRangeRequest as Encode>::ssz_fixed_len(),
<BlocksByRangeRequest as Encode>::ssz_fixed_len(),
),
Protocol::BlocksByRoot => {
RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX)
}
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
),
Protocol::MetaData => RpcLimits::new(0, 0), // Metadata requests are empty
}
}
/// Returns min and max size for messages of given protocol id responses.
pub fn rpc_response_limits<T: EthSpec>(&self) -> RpcLimits {
match self.message_name {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
<StatusMessage as Encode>::ssz_fixed_len(),
),
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
Protocol::BlocksByRange => {
RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX)
}
Protocol::BlocksByRoot => {
RpcLimits::new(*SIGNED_BEACON_BLOCK_MIN, *SIGNED_BEACON_BLOCK_MAX)
}
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
),
Protocol::MetaData => RpcLimits::new(
<MetaData<T> as Encode>::ssz_fixed_len(),
<MetaData<T> as Encode>::ssz_fixed_len(),
),
}
}
}
/// An RPC protocol ID.
impl ProtocolId {
pub fn new(message_name: Protocol, version: Version, encoding: Encoding) -> Self {
@ -233,7 +317,8 @@ where
{
Err(e) => Err(RPCError::from(e)),
Ok((Some(Ok(request)), stream)) => Ok((request, stream)),
Ok((Some(Err(_)), _)) | Ok((None, _)) => Err(RPCError::IncompleteStream),
Ok((Some(Err(e)), _)) => Err(e),
Ok((None, _)) => Err(RPCError::IncompleteStream),
}
}
}
@ -385,7 +470,7 @@ where
}
/// Error in RPC Encoding/Decoding.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum RPCError {
/// Error when decoding the raw buffer from ssz.
// NOTE: in the future a ssz::DecodeError should map to an InvalidData error