Fix merge rpc length limits (#3133)

## Issue Addressed

N/A

## Proposed Changes

Fix the upper bound for blocks by root responses to be equal to the max merge block size instead of altair.
Further make the rpc response limits fork aware.
This commit is contained in:
Pawan Dhananjay 2022-04-04 00:26:15 +00:00
parent 375e2b49b3
commit ab434bc075
7 changed files with 414 additions and 129 deletions

View File

@ -459,7 +459,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// Our fault. Do nothing
return;
}
RPCError::InvalidData => {
RPCError::InvalidData(_) => {
// Peer is not complying with the protocol. This is considered a malicious action
PeerAction::Fatal
}

View File

@ -184,13 +184,25 @@ mod tests {
use crate::rpc::protocol::*;
use std::sync::Arc;
use types::{ForkContext, Hash256};
use types::{Epoch, ForkContext, ForkName, Hash256, Slot};
use unsigned_varint::codec::Uvi;
type Spec = types::MainnetEthSpec;
fn fork_context() -> ForkContext {
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &Spec::default_spec())
fn fork_context(fork_name: ForkName) -> ForkContext {
let mut chain_spec = Spec::default_spec();
let altair_fork_epoch = Epoch::new(1);
let merge_fork_epoch = Epoch::new(2);
chain_spec.altair_fork_epoch = Some(altair_fork_epoch);
chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch);
let current_slot = match fork_name {
ForkName::Base => Slot::new(0),
ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()),
ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()),
};
ForkContext::new::<Spec>(current_slot, Hash256::zero(), &chain_spec)
}
#[test]
@ -202,9 +214,12 @@ mod tests {
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context());
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576, fork_context);
let fork_context = Arc::new(fork_context(ForkName::Base));
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context),
fork_context,
);
// remove response code
let mut snappy_buf = buf.clone();
@ -234,9 +249,12 @@ mod tests {
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context());
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576, fork_context);
let fork_context = Arc::new(fork_context(ForkName::Base));
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context),
fork_context,
);
let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err();
@ -260,36 +278,50 @@ mod tests {
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy);
// Response limits
let limit = protocol_id.rpc_response_limits::<Spec>();
let fork_context = Arc::new(fork_context(ForkName::Base));
let max_rpc_size = max_rpc_size(&fork_context);
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
let mut max = encode_len(limit.max + 1);
let fork_context = Arc::new(fork_context());
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
1_048_576,
max_rpc_size,
fork_context.clone(),
);
assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData);
assert!(matches!(
codec.decode(&mut max).unwrap_err(),
RPCError::InvalidData(_)
));
let mut min = encode_len(limit.min - 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
1_048_576,
max_rpc_size,
fork_context.clone(),
);
assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData);
assert!(matches!(
codec.decode(&mut min).unwrap_err(),
RPCError::InvalidData(_)
));
// Request limits
let limit = protocol_id.rpc_request_limits();
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
1_048_576,
max_rpc_size,
fork_context.clone(),
);
assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData);
assert!(matches!(
codec.decode(&mut max).unwrap_err(),
RPCError::InvalidData(_)
));
let mut min = encode_len(limit.min - 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id, 1_048_576, fork_context);
assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData);
let mut codec =
SSZSnappyOutboundCodec::<Spec>::new(protocol_id, max_rpc_size, fork_context);
assert!(matches!(
codec.decode(&mut min).unwrap_err(),
RPCError::InvalidData(_)
));
}
}

View File

@ -146,7 +146,10 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
// packet size for ssz container corresponding to `self.protocol`.
let ssz_limits = self.protocol.rpc_request_limits();
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
return Err(RPCError::InvalidData);
return Err(RPCError::InvalidData(format!(
"RPC request length is out of bounds, length {}",
length
)));
}
// Calculate worst case compression length for given uncompressed length
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
@ -279,9 +282,14 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `self.protocol`.
let ssz_limits = self.protocol.rpc_response_limits::<TSpec>();
let ssz_limits = self
.protocol
.rpc_response_limits::<TSpec>(&self.fork_context);
if ssz_limits.is_out_of_bounds(length, self.max_packet_size) {
return Err(RPCError::InvalidData);
return Err(RPCError::InvalidData(format!(
"RPC response length is out of bounds, length {}",
length
)));
}
// Calculate worst case compression length for given uncompressed length
let max_compressed_len = snap::raw::max_compress_len(length) as u64;
@ -327,7 +335,10 @@ impl<TSpec: EthSpec> OutboundCodec<OutboundRequest<TSpec>> for SSZSnappyOutbound
// Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of
// packet size for ssz container corresponding to `ErrorType`.
if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN {
return Err(RPCError::InvalidData);
return Err(RPCError::InvalidData(format!(
"RPC Error length is out of bounds, length {}",
length
)));
}
// Calculate worst case compression length for given uncompressed length
@ -364,7 +375,10 @@ fn handle_error<T>(
// If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message.
// Report as `InvalidData` so that malicious peer gets banned.
if num_bytes >= max_compressed_len {
Err(RPCError::InvalidData)
Err(RPCError::InvalidData(format!(
"Received malicious snappy message, num_bytes {}, max_compressed_len {}",
num_bytes, max_compressed_len
)))
} else {
// Haven't received enough bytes to decode yet, wait for more
Ok(None)
@ -460,7 +474,9 @@ fn handle_v1_request<T: EthSpec>(
// Handle this case just for completeness.
Protocol::MetaData => {
if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData)
Err(RPCError::InternalError(
"Metadata requests shouldn't reach decoder",
))
} else {
Ok(Some(InboundRequest::MetaData(PhantomData)))
}
@ -486,7 +502,7 @@ fn handle_v2_request<T: EthSpec>(
// Handle this case just for completeness.
Protocol::MetaData => {
if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData)
Err(RPCError::InvalidData("Metadata request".to_string()))
} else {
Ok(Some(InboundRequest::MetaData(PhantomData)))
}
@ -510,7 +526,9 @@ fn handle_v1_response<T: EthSpec>(
decoded_buffer,
)?))),
// This case should be unreachable as `Goodbye` has no response.
Protocol::Goodbye => Err(RPCError::InvalidData),
Protocol::Goodbye => Err(RPCError::InvalidData(
"Goodbye RPC message has no valid response".to_string(),
)),
Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Box::new(
SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?),
)))),
@ -615,8 +633,8 @@ mod tests {
};
use std::sync::Arc;
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, ForkContext, Hash256, Signature,
SignedBeaconBlock, Slot,
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, ForkContext,
FullPayload, Hash256, Signature, SignedBeaconBlock, Slot,
};
use snap::write::FrameEncoder;
@ -625,12 +643,20 @@ mod tests {
type Spec = types::MainnetEthSpec;
fn fork_context() -> ForkContext {
fn fork_context(fork_name: ForkName) -> ForkContext {
let mut chain_spec = Spec::default_spec();
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
// includes altair in the list of forks
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &chain_spec)
let altair_fork_epoch = Epoch::new(1);
let merge_fork_epoch = Epoch::new(2);
chain_spec.altair_fork_epoch = Some(altair_fork_epoch);
chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch);
let current_slot = match fork_name {
ForkName::Base => Slot::new(0),
ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()),
ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()),
};
ForkContext::new::<Spec>(current_slot, Hash256::zero(), &chain_spec)
}
fn base_block() -> SignedBeaconBlock<Spec> {
@ -644,6 +670,36 @@ mod tests {
SignedBeaconBlock::from_block(full_block, Signature::empty())
}
/// Merge block with length < max_rpc_size.
fn merge_block_small(fork_context: &ForkContext) -> SignedBeaconBlock<Spec> {
let mut block: BeaconBlockMerge<_, FullPayload<Spec>> =
BeaconBlockMerge::empty(&Spec::default_spec());
let tx = VariableList::from(vec![0; 1024]);
let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::<Vec<_>>());
block.body.execution_payload.execution_payload.transactions = txs;
let block = BeaconBlock::Merge(block);
assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context));
SignedBeaconBlock::from_block(block, Signature::empty())
}
/// Merge block with length > MAX_RPC_SIZE.
/// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory.
/// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer.
fn merge_block_large(fork_context: &ForkContext) -> SignedBeaconBlock<Spec> {
let mut block: BeaconBlockMerge<_, FullPayload<Spec>> =
BeaconBlockMerge::empty(&Spec::default_spec());
let tx = VariableList::from(vec![0; 1024]);
let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::<Vec<_>>());
block.body.execution_payload.execution_payload.transactions = txs;
let block = BeaconBlock::Merge(block);
assert!(block.ssz_bytes_len() > max_rpc_size(fork_context));
SignedBeaconBlock::from_block(block, Signature::empty())
}
fn status_message() -> StatusMessage {
StatusMessage {
fork_digest: [0; 4],
@ -678,10 +734,11 @@ mod tests {
protocol: Protocol,
version: Version,
message: RPCCodedResponse<Spec>,
fork_name: ForkName,
) -> Result<BytesMut, RPCError> {
let max_packet_size = 1_048_576;
let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context());
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context);
let mut buf = BytesMut::new();
let mut snappy_inbound_codec =
@ -691,14 +748,43 @@ mod tests {
Ok(buf)
}
fn encode_without_length_checks(
bytes: Vec<u8>,
fork_name: ForkName,
) -> Result<BytesMut, RPCError> {
let fork_context = fork_context(fork_name);
let mut dst = BytesMut::new();
// Add context bytes if required
dst.extend_from_slice(&fork_context.to_context_bytes(fork_name).unwrap());
let mut uvi_codec: Uvi<usize> = Uvi::default();
// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
uvi_codec
.encode(bytes.len(), &mut dst)
.map_err(RPCError::from)?;
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;
// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(dst)
}
/// Attempts to decode the given protocol bytes as an rpc response
fn decode(
protocol: Protocol,
version: Version,
message: &mut BytesMut,
fork_name: ForkName,
) -> Result<Option<RPCResponse<Spec>>, RPCError> {
let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context());
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context);
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
@ -711,9 +797,10 @@ mod tests {
protocol: Protocol,
version: Version,
message: RPCCodedResponse<Spec>,
fork_name: ForkName,
) -> Result<Option<RPCResponse<Spec>>, RPCError> {
let mut encoded = encode(protocol, version.clone(), message)?;
decode(protocol, version, &mut encoded)
let mut encoded = encode(protocol, version.clone(), message, fork_name)?;
decode(protocol, version, &mut encoded, fork_name)
}
// Test RPCResponse encoding/decoding for V1 messages
@ -723,7 +810,8 @@ mod tests {
encode_then_decode(
Protocol::Status,
Version::V1,
RPCCodedResponse::Success(RPCResponse::Status(status_message()))
RPCCodedResponse::Success(RPCResponse::Status(status_message())),
ForkName::Base,
),
Ok(Some(RPCResponse::Status(status_message())))
);
@ -732,7 +820,8 @@ mod tests {
encode_then_decode(
Protocol::Ping,
Version::V1,
RPCCodedResponse::Success(RPCResponse::Pong(ping_message()))
RPCCodedResponse::Success(RPCResponse::Pong(ping_message())),
ForkName::Base,
),
Ok(Some(RPCResponse::Pong(ping_message())))
);
@ -741,7 +830,8 @@ mod tests {
encode_then_decode(
Protocol::BlocksByRange,
Version::V1,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block())))
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))),
ForkName::Base,
),
Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block()))))
);
@ -752,6 +842,7 @@ mod tests {
Protocol::BlocksByRange,
Version::V1,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))),
ForkName::Altair,
)
.unwrap_err(),
RPCError::SSZDecodeError(_)
@ -763,7 +854,8 @@ mod tests {
encode_then_decode(
Protocol::BlocksByRoot,
Version::V1,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block())))
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))),
ForkName::Base,
),
Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block()))))
);
@ -774,6 +866,7 @@ mod tests {
Protocol::BlocksByRoot,
Version::V1,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))),
ForkName::Altair,
)
.unwrap_err(),
RPCError::SSZDecodeError(_)
@ -786,6 +879,7 @@ mod tests {
Protocol::MetaData,
Version::V1,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Base,
),
Ok(Some(RPCResponse::MetaData(metadata()))),
);
@ -795,6 +889,7 @@ mod tests {
Protocol::MetaData,
Version::V1,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Base,
),
Ok(Some(RPCResponse::MetaData(metadata()))),
);
@ -805,6 +900,7 @@ mod tests {
Protocol::MetaData,
Version::V1,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())),
ForkName::Base,
),
Ok(Some(RPCResponse::MetaData(metadata()))),
);
@ -819,6 +915,7 @@ mod tests {
Protocol::Status,
Version::V2,
RPCCodedResponse::Success(RPCResponse::Status(status_message())),
ForkName::Base,
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
@ -832,6 +929,7 @@ mod tests {
Protocol::Ping,
Version::V2,
RPCCodedResponse::Success(RPCResponse::Pong(ping_message())),
ForkName::Base,
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
@ -843,7 +941,8 @@ mod tests {
encode_then_decode(
Protocol::BlocksByRange,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block())))
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))),
ForkName::Base,
),
Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block()))))
);
@ -852,35 +951,104 @@ mod tests {
encode_then_decode(
Protocol::BlocksByRange,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block())))
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))),
ForkName::Altair,
),
Ok(Some(RPCResponse::BlocksByRange(Box::new(altair_block()))))
);
let merge_block_small = merge_block_small(&fork_context(ForkName::Merge));
let merge_block_large = merge_block_large(&fork_context(ForkName::Merge));
assert_eq!(
encode_then_decode(
Protocol::BlocksByRoot,
Protocol::BlocksByRange,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block())))
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(
merge_block_small.clone()
))),
ForkName::Merge,
),
Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block()))))
Ok(Some(RPCResponse::BlocksByRange(Box::new(
merge_block_small.clone()
))))
);
let mut encoded =
encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge)
.unwrap();
assert!(
matches!(
decode(
Protocol::BlocksByRange,
Version::V2,
&mut encoded,
ForkName::Merge,
)
.unwrap_err(),
RPCError::InvalidData(_)
),
"Decoding a block larger than max_rpc_size should fail"
);
assert_eq!(
encode_then_decode(
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block())))
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))),
ForkName::Base,
),
Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))),
);
assert_eq!(
encode_then_decode(
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))),
ForkName::Altair,
),
Ok(Some(RPCResponse::BlocksByRoot(Box::new(altair_block()))))
);
assert_eq!(
encode_then_decode(
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(
merge_block_small.clone()
))),
ForkName::Merge,
),
Ok(Some(RPCResponse::BlocksByRoot(Box::new(merge_block_small))))
);
let mut encoded =
encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge)
.unwrap();
assert!(
matches!(
decode(
Protocol::BlocksByRoot,
Version::V2,
&mut encoded,
ForkName::Merge,
)
.unwrap_err(),
RPCError::InvalidData(_)
),
"Decoding a block larger than max_rpc_size should fail"
);
// A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2
assert_eq!(
encode_then_decode(
Protocol::MetaData,
Version::V2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata()))
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Base,
),
Ok(Some(RPCResponse::MetaData(metadata_v2())))
);
@ -889,7 +1057,8 @@ mod tests {
encode_then_decode(
Protocol::MetaData,
Version::V2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2()))
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())),
ForkName::Altair,
),
Ok(Some(RPCResponse::MetaData(metadata_v2())))
);
@ -898,20 +1067,27 @@ mod tests {
// Test RPCResponse encoding/decoding for V2 messages
#[test]
fn test_context_bytes_v2() {
let fork_context = fork_context();
let fork_context = fork_context(ForkName::Altair);
// Removing context bytes for v2 messages should error
let mut encoded_bytes = encode(
Protocol::BlocksByRange,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))),
ForkName::Base,
)
.unwrap();
let _ = encoded_bytes.split_to(4);
assert!(matches!(
decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(),
decode(
Protocol::BlocksByRange,
Version::V2,
&mut encoded_bytes,
ForkName::Base
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
));
@ -919,13 +1095,20 @@ mod tests {
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))),
ForkName::Base,
)
.unwrap();
let _ = encoded_bytes.split_to(4);
assert!(matches!(
decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(),
decode(
Protocol::BlocksByRange,
Version::V2,
&mut encoded_bytes,
ForkName::Base
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
));
@ -934,6 +1117,7 @@ mod tests {
Protocol::BlocksByRange,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))),
ForkName::Altair,
)
.unwrap();
@ -943,7 +1127,13 @@ mod tests {
wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4));
assert!(matches!(
decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(),
decode(
Protocol::BlocksByRange,
Version::V2,
&mut wrong_fork_bytes,
ForkName::Altair
)
.unwrap_err(),
RPCError::SSZDecodeError(_),
));
@ -952,6 +1142,7 @@ mod tests {
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))),
ForkName::Altair,
)
.unwrap();
@ -960,7 +1151,13 @@ mod tests {
wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4));
assert!(matches!(
decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(),
decode(
Protocol::BlocksByRange,
Version::V2,
&mut wrong_fork_bytes,
ForkName::Altair
)
.unwrap_err(),
RPCError::SSZDecodeError(_),
));
@ -972,17 +1169,25 @@ mod tests {
Protocol::MetaData,
Version::V2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata())),
ForkName::Altair,
)
.unwrap(),
);
assert!(decode(Protocol::MetaData, Version::V2, &mut encoded_bytes).is_err());
assert!(decode(
Protocol::MetaData,
Version::V2,
&mut encoded_bytes,
ForkName::Altair
)
.is_err());
// Sending context bytes which do not correspond to any fork should return an error
let mut encoded_bytes = encode(
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))),
ForkName::Altair,
)
.unwrap();
@ -991,7 +1196,13 @@ mod tests {
wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4));
assert!(matches!(
decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(),
decode(
Protocol::BlocksByRange,
Version::V2,
&mut wrong_fork_bytes,
ForkName::Altair
)
.unwrap_err(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _),
));
@ -1000,13 +1211,19 @@ mod tests {
Protocol::BlocksByRoot,
Version::V2,
RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))),
ForkName::Altair,
)
.unwrap();
let mut part = encoded_bytes.split_to(3);
assert_eq!(
decode(Protocol::BlocksByRange, Version::V2, &mut part),
decode(
Protocol::BlocksByRange,
Version::V2,
&mut part,
ForkName::Altair
),
Ok(None)
)
}
@ -1061,17 +1278,17 @@ mod tests {
dst.extend_from_slice(writer.get_ref());
// 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`.
assert_eq!(
decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(),
RPCError::InvalidData
);
assert!(matches!(
decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(),
RPCError::InvalidData(_)
));
}
/// Test a malicious snappy encoding for a V2 `BlocksByRange` message where the attacker
/// sends a valid message filled with a stream of useless padding before the actual message.
#[test]
fn test_decode_malicious_v2_message() {
let fork_context = Arc::new(fork_context());
let fork_context = Arc::new(fork_context(ForkName::Altair));
// 10 byte snappy stream identifier
let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY";
@ -1118,10 +1335,16 @@ mod tests {
dst.extend_from_slice(writer.get_ref());
// 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`.
assert_eq!(
decode(Protocol::BlocksByRange, Version::V2, &mut dst).unwrap_err(),
RPCError::InvalidData
);
assert!(matches!(
decode(
Protocol::BlocksByRange,
Version::V2,
&mut dst,
ForkName::Altair
)
.unwrap_err(),
RPCError::InvalidData(_)
));
}
/// Test sending a message with encoded length prefix > max_rpc_size.
@ -1157,9 +1380,9 @@ mod tests {
writer.flush().unwrap();
dst.extend_from_slice(writer.get_ref());
assert_eq!(
decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(),
RPCError::InvalidData
);
assert!(matches!(
decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(),
RPCError::InvalidData(_)
));
}
}

View File

@ -477,7 +477,7 @@ where
ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => {
// Peer is sending invalid data during the negotiation phase, not
// participating in the protocol
RPCError::InvalidData
RPCError::InvalidData("Invalid message during negotiation".to_string())
}
},
};

View File

@ -63,7 +63,13 @@ lazy_static! {
/// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing.
/// We calculate the value from its fields instead of constructing the block and checking the length.
pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = types::ExecutionPayload::<MainnetEthSpec>::max_execution_payload_size();
/// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network
/// with `MAX_RPC_SIZE_POST_MERGE`.
pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize =
// Size of a full altair block
*SIGNED_BEACON_BLOCK_ALTAIR_MAX
+ types::ExecutionPayload::<MainnetEthSpec>::max_execution_payload_size() // adding max size of execution payload (~16gb)
+ ssz::BYTES_PER_LENGTH_OFFSET; // Adding the additional ssz offset for the `ExecutionPayload` field
pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize =
VariableList::<Hash256, MaxRequestBlocks>::from(Vec::<Hash256>::new())
@ -106,10 +112,9 @@ const REQUEST_TIMEOUT: u64 = 15;
/// Returns the maximum bytes that can be sent across the RPC.
pub fn max_rpc_size(fork_context: &ForkContext) -> usize {
if fork_context.fork_exists(ForkName::Merge) {
MAX_RPC_SIZE_POST_MERGE
} else {
MAX_RPC_SIZE
match fork_context.current_fork() {
ForkName::Merge => MAX_RPC_SIZE_POST_MERGE,
ForkName::Altair | ForkName::Base => MAX_RPC_SIZE,
}
}
@ -269,39 +274,39 @@ impl ProtocolId {
}
/// Returns min and max size for messages of given protocol id responses.
pub fn rpc_response_limits<T: EthSpec>(&self) -> RpcLimits {
pub fn rpc_response_limits<T: EthSpec>(&self, fork_context: &ForkContext) -> RpcLimits {
match self.message_name {
Protocol::Status => RpcLimits::new(
<StatusMessage as Encode>::ssz_fixed_len(),
<StatusMessage as Encode>::ssz_fixed_len(),
),
Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response
Protocol::BlocksByRange => RpcLimits::new(
std::cmp::min(
std::cmp::min(
*SIGNED_BEACON_BLOCK_ALTAIR_MIN,
*SIGNED_BEACON_BLOCK_BASE_MIN,
),
*SIGNED_BEACON_BLOCK_MERGE_MIN,
Protocol::BlocksByRange => match fork_context.current_fork() {
ForkName::Base => {
RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX)
}
ForkName::Altair => RpcLimits::new(
*SIGNED_BEACON_BLOCK_ALTAIR_MIN,
*SIGNED_BEACON_BLOCK_ALTAIR_MAX,
),
std::cmp::max(
std::cmp::max(
*SIGNED_BEACON_BLOCK_ALTAIR_MAX,
*SIGNED_BEACON_BLOCK_BASE_MAX,
),
ForkName::Merge => RpcLimits::new(
*SIGNED_BEACON_BLOCK_MERGE_MIN,
*SIGNED_BEACON_BLOCK_MERGE_MAX,
),
),
Protocol::BlocksByRoot => RpcLimits::new(
std::cmp::min(
},
Protocol::BlocksByRoot => match fork_context.current_fork() {
ForkName::Base => {
RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX)
}
ForkName::Altair => RpcLimits::new(
*SIGNED_BEACON_BLOCK_ALTAIR_MIN,
*SIGNED_BEACON_BLOCK_BASE_MIN,
),
std::cmp::max(
*SIGNED_BEACON_BLOCK_ALTAIR_MAX,
*SIGNED_BEACON_BLOCK_BASE_MAX,
),
),
ForkName::Merge => RpcLimits::new(
*SIGNED_BEACON_BLOCK_MERGE_MIN,
*SIGNED_BEACON_BLOCK_MERGE_MAX,
),
},
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
@ -528,7 +533,7 @@ pub enum RPCError {
/// Stream ended unexpectedly.
IncompleteStream,
/// Peer sent invalid data.
InvalidData,
InvalidData(String),
/// An error occurred due to internal reasons. Ex: timer failure.
InternalError(&'static str),
/// Negotiation with this peer timed out.
@ -562,7 +567,7 @@ impl std::fmt::Display for RPCError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err),
RPCError::InvalidData => write!(f, "Peer sent unexpected data"),
RPCError::InvalidData(ref err) => write!(f, "Peer sent unexpected data: {}", err),
RPCError::IoError(ref err) => write!(f, "IO Error: {}", err),
RPCError::ErrorResponse(ref code, ref reason) => write!(
f,
@ -589,7 +594,7 @@ impl std::error::Error for RPCError {
RPCError::StreamTimeout => None,
RPCError::UnsupportedProtocol => None,
RPCError::IncompleteStream => None,
RPCError::InvalidData => None,
RPCError::InvalidData(_) => None,
RPCError::InternalError(_) => None,
RPCError::ErrorResponse(_, _) => None,
RPCError::NegotiationTimeout => None,

View File

@ -10,7 +10,9 @@ use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::runtime::Runtime;
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec};
use types::{
ChainSpec, EnrForkId, Epoch, EthSpec, ForkContext, ForkName, Hash256, MinimalEthSpec, Slot,
};
use unused_port::unused_tcp_port;
#[allow(clippy::type_complexity)]
@ -26,13 +28,20 @@ type ReqId = usize;
use tempfile::Builder as TempBuilder;
/// Returns a dummy fork context
pub fn fork_context() -> ForkContext {
pub fn fork_context(fork_name: ForkName) -> ForkContext {
let mut chain_spec = E::default_spec();
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
// includes altair in the list of forks
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
chain_spec.bellatrix_fork_epoch = Some(types::Epoch::new(84));
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &chain_spec)
let altair_fork_epoch = Epoch::new(1);
let merge_fork_epoch = Epoch::new(2);
chain_spec.altair_fork_epoch = Some(altair_fork_epoch);
chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch);
let current_slot = match fork_name {
ForkName::Base => Slot::new(0),
ForkName::Altair => altair_fork_epoch.start_slot(E::slots_per_epoch()),
ForkName::Merge => merge_fork_epoch.start_slot(E::slots_per_epoch()),
};
ForkContext::new::<E>(current_slot, Hash256::zero(), &chain_spec)
}
pub struct Libp2pInstance(LibP2PService<ReqId, E>, exit_future::Signal);
@ -90,6 +99,7 @@ pub async fn build_libp2p_instance(
rt: Weak<Runtime>,
boot_nodes: Vec<Enr>,
log: slog::Logger,
fork_name: ForkName,
) -> Libp2pInstance {
let port = unused_tcp_port().unwrap();
let config = build_config(port, boot_nodes);
@ -101,7 +111,7 @@ pub async fn build_libp2p_instance(
let libp2p_context = lighthouse_network::Context {
config: &config,
enr_fork_id: EnrForkId::default(),
fork_context: Arc::new(fork_context()),
fork_context: Arc::new(fork_context(fork_name)),
chain_spec: &ChainSpec::minimal(),
gossipsub_registry: None,
};
@ -125,10 +135,11 @@ pub async fn build_full_mesh(
rt: Weak<Runtime>,
log: slog::Logger,
n: usize,
fork_name: ForkName,
) -> Vec<Libp2pInstance> {
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await);
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await);
}
let multiaddrs: Vec<Multiaddr> = nodes
.iter()
@ -154,12 +165,13 @@ pub async fn build_full_mesh(
pub async fn build_node_pair(
rt: Weak<Runtime>,
log: &slog::Logger,
fork_name: ForkName,
) -> (Libp2pInstance, Libp2pInstance) {
let sender_log = log.new(o!("who" => "sender"));
let receiver_log = log.new(o!("who" => "receiver"));
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await;
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await;
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name).await;
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name).await;
let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone();
@ -198,10 +210,15 @@ pub async fn build_node_pair(
// Returns `n` peers in a linear topology
#[allow(dead_code)]
pub async fn build_linear(rt: Weak<Runtime>, log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
pub async fn build_linear(
rt: Weak<Runtime>,
log: slog::Logger,
n: usize,
fork_name: ForkName,
) -> Vec<Libp2pInstance> {
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await);
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await);
}
let multiaddrs: Vec<Multiaddr> = nodes

View File

@ -12,7 +12,7 @@ use tokio::runtime::Runtime;
use tokio::time::sleep;
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, ForkContext,
Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot,
ForkName, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot,
};
mod common;
@ -23,7 +23,7 @@ type E = MinimalEthSpec;
fn merge_block_small(fork_context: &ForkContext) -> BeaconBlock<E> {
let mut block = BeaconBlockMerge::<E>::empty(&E::default_spec());
let tx = VariableList::from(vec![0; 1024]);
let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::<Vec<_>>());
let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::<Vec<_>>());
block.body.execution_payload.execution_payload.transactions = txs;
@ -61,7 +61,8 @@ fn test_status_rpc() {
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await;
// Dummy STATUS RPC message
let rpc_request = Request::Status(StatusMessage {
@ -159,7 +160,8 @@ fn test_blocks_by_range_chunked_rpc() {
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
@ -179,7 +181,7 @@ fn test_blocks_by_range_chunked_rpc() {
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block)));
let full_block = merge_block_small(&common::fork_context());
let full_block = merge_block_small(&common::fork_context(ForkName::Merge));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block)));
@ -298,7 +300,8 @@ fn test_blocks_by_range_over_limit() {
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
@ -308,7 +311,7 @@ fn test_blocks_by_range_over_limit() {
});
// BlocksByRange Response
let full_block = merge_block_large(&common::fork_context());
let full_block = merge_block_large(&common::fork_context(ForkName::Merge));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block)));
@ -395,7 +398,8 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
@ -526,7 +530,8 @@ fn test_blocks_by_range_single_empty_rpc() {
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await;
// BlocksByRange Request
let rpc_request = Request::BlocksByRange(BlocksByRangeRequest {
@ -641,7 +646,8 @@ fn test_blocks_by_root_chunked_rpc() {
let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await;
// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest {
@ -664,7 +670,7 @@ fn test_blocks_by_root_chunked_rpc() {
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block)));
let full_block = merge_block_small(&common::fork_context());
let full_block = merge_block_small(&common::fork_context(ForkName::Merge));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_merge_small = Response::BlocksByRoot(Some(Box::new(signed_full_block)));
@ -779,7 +785,8 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await;
// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest {
@ -916,7 +923,8 @@ fn test_goodbye_rpc() {
let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await;
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await;
// build the sender future
let sender_future = async {