diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index cf31cee02..9c881e062 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -459,7 +459,7 @@ impl PeerManager { // Our fault. Do nothing return; } - RPCError::InvalidData => { + RPCError::InvalidData(_) => { // Peer is not complying with the protocol. This is considered a malicious action PeerAction::Fatal } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index eca057878..53f85d9a7 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -184,13 +184,25 @@ mod tests { use crate::rpc::protocol::*; use std::sync::Arc; - use types::{ForkContext, Hash256}; + use types::{Epoch, ForkContext, ForkName, Hash256, Slot}; use unsigned_varint::codec::Uvi; type Spec = types::MainnetEthSpec; - fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &Spec::default_spec()) + fn fork_context(fork_name: ForkName) -> ForkContext { + let mut chain_spec = Spec::default_spec(); + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } #[test] @@ -202,9 +214,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context), + fork_context, + ); // remove response code let mut snappy_buf = buf.clone(); @@ -234,9 +249,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context), + fork_context, + ); let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); @@ -260,36 +278,50 @@ mod tests { ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy); // Response limits - let limit = protocol_id.rpc_response_limits::(); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let max_rpc_size = max_rpc_size(&fork_context); + let limit = protocol_id.rpc_response_limits::(&fork_context); let mut max = encode_len(limit.max + 1); - let fork_context = Arc::new(fork_context()); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); let mut min = encode_len(limit.min - 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); // Request limits let limit = protocol_id.rpc_request_limits(); let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, 1_048_576, fork_context); - assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + let mut codec = + SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); + assert!(matches!( + codec.decode(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 0924dca0c..188ae59b6 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -146,7 +146,10 @@ impl Decoder for SSZSnappyInboundCodec { // packet size for ssz container corresponding to `self.protocol`. let ssz_limits = self.protocol.rpc_request_limits(); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC request length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length let max_compressed_len = snap::raw::max_compress_len(length) as u64; @@ -279,9 +282,14 @@ impl Decoder for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `self.protocol`. - let ssz_limits = self.protocol.rpc_response_limits::(); + let ssz_limits = self + .protocol + .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC response length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length let max_compressed_len = snap::raw::max_compress_len(length) as u64; @@ -327,7 +335,10 @@ impl OutboundCodec> for SSZSnappyOutbound // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `ErrorType`. if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC Error length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length @@ -364,7 +375,10 @@ fn handle_error( // If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message. // Report as `InvalidData` so that malicious peer gets banned. if num_bytes >= max_compressed_len { - Err(RPCError::InvalidData) + Err(RPCError::InvalidData(format!( + "Received malicious snappy message, num_bytes {}, max_compressed_len {}", + num_bytes, max_compressed_len + ))) } else { // Haven't received enough bytes to decode yet, wait for more Ok(None) @@ -460,7 +474,9 @@ fn handle_v1_request( // Handle this case just for completeness. Protocol::MetaData => { if !decoded_buffer.is_empty() { - Err(RPCError::InvalidData) + Err(RPCError::InternalError( + "Metadata requests shouldn't reach decoder", + )) } else { Ok(Some(InboundRequest::MetaData(PhantomData))) } @@ -486,7 +502,7 @@ fn handle_v2_request( // Handle this case just for completeness. Protocol::MetaData => { if !decoded_buffer.is_empty() { - Err(RPCError::InvalidData) + Err(RPCError::InvalidData("Metadata request".to_string())) } else { Ok(Some(InboundRequest::MetaData(PhantomData))) } @@ -510,7 +526,9 @@ fn handle_v1_response( decoded_buffer, )?))), // This case should be unreachable as `Goodbye` has no response. - Protocol::Goodbye => Err(RPCError::InvalidData), + Protocol::Goodbye => Err(RPCError::InvalidData( + "Goodbye RPC message has no valid response".to_string(), + )), Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Box::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), @@ -615,8 +633,8 @@ mod tests { }; use std::sync::Arc; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, ForkContext, Hash256, Signature, - SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, ForkContext, + FullPayload, Hash256, Signature, SignedBeaconBlock, Slot, }; use snap::write::FrameEncoder; @@ -625,12 +643,20 @@ mod tests { type Spec = types::MainnetEthSpec; - fn fork_context() -> ForkContext { + fn fork_context(fork_name: ForkName) -> ForkContext { let mut chain_spec = Spec::default_spec(); - // Set fork_epoch to `Some` to ensure that the `ForkContext` object - // includes altair in the list of forks - chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } fn base_block() -> SignedBeaconBlock { @@ -644,6 +670,36 @@ mod tests { SignedBeaconBlock::from_block(full_block, Signature::empty()) } + /// Merge block with length < max_rpc_size. + fn merge_block_small(fork_context: &ForkContext) -> SignedBeaconBlock { + let mut block: BeaconBlockMerge<_, FullPayload> = + BeaconBlockMerge::empty(&Spec::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); + + block.body.execution_payload.execution_payload.transactions = txs; + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + + /// Merge block with length > MAX_RPC_SIZE. + /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. + /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. + fn merge_block_large(fork_context: &ForkContext) -> SignedBeaconBlock { + let mut block: BeaconBlockMerge<_, FullPayload> = + BeaconBlockMerge::empty(&Spec::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); + + block.body.execution_payload.execution_payload.transactions = txs; + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + fn status_message() -> StatusMessage { StatusMessage { fork_digest: [0; 4], @@ -678,10 +734,11 @@ mod tests { protocol: Protocol, version: Version, message: RPCCodedResponse, + fork_name: ForkName, ) -> Result { - let max_packet_size = 1_048_576; let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(fork_name)); + let max_packet_size = max_rpc_size(&fork_context); let mut buf = BytesMut::new(); let mut snappy_inbound_codec = @@ -691,14 +748,43 @@ mod tests { Ok(buf) } + fn encode_without_length_checks( + bytes: Vec, + fork_name: ForkName, + ) -> Result { + let fork_context = fork_context(fork_name); + let mut dst = BytesMut::new(); + + // Add context bytes if required + dst.extend_from_slice(&fork_context.to_context_bytes(fork_name).unwrap()); + + let mut uvi_codec: Uvi = Uvi::default(); + + // Inserts the length prefix of the uncompressed bytes into dst + // encoded as a unsigned varint + uvi_codec + .encode(bytes.len(), &mut dst) + .map_err(RPCError::from)?; + + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&bytes).map_err(RPCError::from)?; + writer.flush().map_err(RPCError::from)?; + + // Write compressed bytes to `dst` + dst.extend_from_slice(writer.get_ref()); + + Ok(dst) + } + /// Attempts to decode the given protocol bytes as an rpc response fn decode( protocol: Protocol, version: Version, message: &mut BytesMut, + fork_name: ForkName, ) -> Result>, RPCError> { let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); @@ -711,9 +797,10 @@ mod tests { protocol: Protocol, version: Version, message: RPCCodedResponse, + fork_name: ForkName, ) -> Result>, RPCError> { - let mut encoded = encode(protocol, version.clone(), message)?; - decode(protocol, version, &mut encoded) + let mut encoded = encode(protocol, version.clone(), message, fork_name)?; + decode(protocol, version, &mut encoded, fork_name) } // Test RPCResponse encoding/decoding for V1 messages @@ -723,7 +810,8 @@ mod tests { encode_then_decode( Protocol::Status, Version::V1, - RPCCodedResponse::Success(RPCResponse::Status(status_message())) + RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ForkName::Base, ), Ok(Some(RPCResponse::Status(status_message()))) ); @@ -732,7 +820,8 @@ mod tests { encode_then_decode( Protocol::Ping, Version::V1, - RPCCodedResponse::Success(RPCResponse::Pong(ping_message())) + RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ForkName::Base, ), Ok(Some(RPCResponse::Pong(ping_message()))) ); @@ -741,7 +830,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V1, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) ); @@ -752,6 +842,7 @@ mod tests { Protocol::BlocksByRange, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -763,7 +854,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRoot, Version::V1, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) ); @@ -774,6 +866,7 @@ mod tests { Protocol::BlocksByRoot, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -786,6 +879,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -795,6 +889,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -805,6 +900,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -819,6 +915,7 @@ mod tests { Protocol::Status, Version::V2, RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ForkName::Base, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -832,6 +929,7 @@ mod tests { Protocol::Ping, Version::V2, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ForkName::Base, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -843,7 +941,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) ); @@ -852,35 +951,104 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ForkName::Altair, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(altair_block())))) ); + let merge_block_small = merge_block_small(&fork_context(ForkName::Merge)); + let merge_block_large = merge_block_large(&fork_context(ForkName::Merge)); + assert_eq!( encode_then_decode( - Protocol::BlocksByRoot, + Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new( + merge_block_small.clone() + ))), + ForkName::Merge, ), - Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) + Ok(Some(RPCResponse::BlocksByRange(Box::new( + merge_block_small.clone() + )))) + ); + + let mut encoded = + encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) + .unwrap(); + + assert!( + matches!( + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded, + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData(_) + ), + "Decoding a block larger than max_rpc_size should fail" ); assert_eq!( encode_then_decode( Protocol::BlocksByRoot, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))), + ); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ), Ok(Some(RPCResponse::BlocksByRoot(Box::new(altair_block())))) ); + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new( + merge_block_small.clone() + ))), + ForkName::Merge, + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new(merge_block_small)))) + ); + + let mut encoded = + encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) + .unwrap(); + + assert!( + matches!( + decode( + Protocol::BlocksByRoot, + Version::V2, + &mut encoded, + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData(_) + ), + "Decoding a block larger than max_rpc_size should fail" + ); + // A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2 assert_eq!( encode_then_decode( Protocol::MetaData, Version::V2, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata())) + RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -889,7 +1057,8 @@ mod tests { encode_then_decode( Protocol::MetaData, Version::V2, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())) + RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), + ForkName::Altair, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -898,20 +1067,27 @@ mod tests { // Test RPCResponse encoding/decoding for V2 messages #[test] fn test_context_bytes_v2() { - let fork_context = fork_context(); + let fork_context = fork_context(ForkName::Altair); // Removing context bytes for v2 messages should error let mut encoded_bytes = encode( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ) .unwrap(); let _ = encoded_bytes.split_to(4); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded_bytes, + ForkName::Base + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -919,13 +1095,20 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ) .unwrap(); let _ = encoded_bytes.split_to(4); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded_bytes, + ForkName::Base + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -934,6 +1117,7 @@ mod tests { Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); @@ -943,7 +1127,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::SSZDecodeError(_), )); @@ -952,6 +1142,7 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap(); @@ -960,7 +1151,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::SSZDecodeError(_), )); @@ -972,17 +1169,25 @@ mod tests { Protocol::MetaData, Version::V2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Altair, ) .unwrap(), ); - assert!(decode(Protocol::MetaData, Version::V2, &mut encoded_bytes).is_err()); + assert!(decode( + Protocol::MetaData, + Version::V2, + &mut encoded_bytes, + ForkName::Altair + ) + .is_err()); // Sending context bytes which do not correspond to any fork should return an error let mut encoded_bytes = encode( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); @@ -991,7 +1196,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -1000,13 +1211,19 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); let mut part = encoded_bytes.split_to(3); assert_eq!( - decode(Protocol::BlocksByRange, Version::V2, &mut part), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut part, + ForkName::Altair + ), Ok(None) ) } @@ -1061,17 +1278,17 @@ mod tests { dst.extend_from_slice(writer.get_ref()); // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - assert_eq!( - decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), - RPCError::InvalidData - ); + assert!(matches!( + decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + RPCError::InvalidData(_) + )); } /// Test a malicious snappy encoding for a V2 `BlocksByRange` message where the attacker /// sends a valid message filled with a stream of useless padding before the actual message. #[test] fn test_decode_malicious_v2_message() { - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(ForkName::Altair)); // 10 byte snappy stream identifier let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; @@ -1118,10 +1335,16 @@ mod tests { dst.extend_from_slice(writer.get_ref()); // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - assert_eq!( - decode(Protocol::BlocksByRange, Version::V2, &mut dst).unwrap_err(), - RPCError::InvalidData - ); + assert!(matches!( + decode( + Protocol::BlocksByRange, + Version::V2, + &mut dst, + ForkName::Altair + ) + .unwrap_err(), + RPCError::InvalidData(_) + )); } /// Test sending a message with encoded length prefix > max_rpc_size. @@ -1157,9 +1380,9 @@ mod tests { writer.flush().unwrap(); dst.extend_from_slice(writer.get_ref()); - assert_eq!( - decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), - RPCError::InvalidData - ); + assert!(matches!( + decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + RPCError::InvalidData(_) + )); } } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 2b9e7c490..b685c4334 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -477,7 +477,7 @@ where ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => { // Peer is sending invalid data during the negotiation phase, not // participating in the protocol - RPCError::InvalidData + RPCError::InvalidData("Invalid message during negotiation".to_string()) } }, }; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index e3ad6a803..d88f93de4 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -63,7 +63,13 @@ lazy_static! { /// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. /// We calculate the value from its fields instead of constructing the block and checking the length. - pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = types::ExecutionPayload::::max_execution_payload_size(); + /// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network + /// with `MAX_RPC_SIZE_POST_MERGE`. + pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = + // Size of a full altair block + *SIGNED_BEACON_BLOCK_ALTAIR_MAX + + types::ExecutionPayload::::max_execution_payload_size() // adding max size of execution payload (~16gb) + + ssz::BYTES_PER_LENGTH_OFFSET; // Adding the additional ssz offset for the `ExecutionPayload` field pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) @@ -106,10 +112,9 @@ const REQUEST_TIMEOUT: u64 = 15; /// Returns the maximum bytes that can be sent across the RPC. pub fn max_rpc_size(fork_context: &ForkContext) -> usize { - if fork_context.fork_exists(ForkName::Merge) { - MAX_RPC_SIZE_POST_MERGE - } else { - MAX_RPC_SIZE + match fork_context.current_fork() { + ForkName::Merge => MAX_RPC_SIZE_POST_MERGE, + ForkName::Altair | ForkName::Base => MAX_RPC_SIZE, } } @@ -269,39 +274,39 @@ impl ProtocolId { } /// Returns min and max size for messages of given protocol id responses. - pub fn rpc_response_limits(&self) -> RpcLimits { + pub fn rpc_response_limits(&self, fork_context: &ForkContext) -> RpcLimits { match self.message_name { Protocol::Status => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), ), Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response - Protocol::BlocksByRange => RpcLimits::new( - std::cmp::min( - std::cmp::min( - *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, - ), - *SIGNED_BEACON_BLOCK_MERGE_MIN, + Protocol::BlocksByRange => match fork_context.current_fork() { + ForkName::Base => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX) + } + ForkName::Altair => RpcLimits::new( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, ), - std::cmp::max( - std::cmp::max( - *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, - ), + ForkName::Merge => RpcLimits::new( + *SIGNED_BEACON_BLOCK_MERGE_MIN, *SIGNED_BEACON_BLOCK_MERGE_MAX, ), - ), - Protocol::BlocksByRoot => RpcLimits::new( - std::cmp::min( + }, + Protocol::BlocksByRoot => match fork_context.current_fork() { + ForkName::Base => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX) + } + ForkName::Altair => RpcLimits::new( *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, - ), - std::cmp::max( *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, ), - ), + ForkName::Merge => RpcLimits::new( + *SIGNED_BEACON_BLOCK_MERGE_MIN, + *SIGNED_BEACON_BLOCK_MERGE_MAX, + ), + }, Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), @@ -528,7 +533,7 @@ pub enum RPCError { /// Stream ended unexpectedly. IncompleteStream, /// Peer sent invalid data. - InvalidData, + InvalidData(String), /// An error occurred due to internal reasons. Ex: timer failure. InternalError(&'static str), /// Negotiation with this peer timed out. @@ -562,7 +567,7 @@ impl std::fmt::Display for RPCError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), - RPCError::InvalidData => write!(f, "Peer sent unexpected data"), + RPCError::InvalidData(ref err) => write!(f, "Peer sent unexpected data: {}", err), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), RPCError::ErrorResponse(ref code, ref reason) => write!( f, @@ -589,7 +594,7 @@ impl std::error::Error for RPCError { RPCError::StreamTimeout => None, RPCError::UnsupportedProtocol => None, RPCError::IncompleteStream => None, - RPCError::InvalidData => None, + RPCError::InvalidData(_) => None, RPCError::InternalError(_) => None, RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index e79fdf464..ea770de6c 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -10,7 +10,9 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; use tokio::runtime::Runtime; -use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec}; +use types::{ + ChainSpec, EnrForkId, Epoch, EthSpec, ForkContext, ForkName, Hash256, MinimalEthSpec, Slot, +}; use unused_port::unused_tcp_port; #[allow(clippy::type_complexity)] @@ -26,13 +28,20 @@ type ReqId = usize; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context -pub fn fork_context() -> ForkContext { +pub fn fork_context(fork_name: ForkName) -> ForkContext { let mut chain_spec = E::default_spec(); - // Set fork_epoch to `Some` to ensure that the `ForkContext` object - // includes altair in the list of forks - chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); - chain_spec.bellatrix_fork_epoch = Some(types::Epoch::new(84)); - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(E::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(E::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } pub struct Libp2pInstance(LibP2PService, exit_future::Signal); @@ -90,6 +99,7 @@ pub async fn build_libp2p_instance( rt: Weak, boot_nodes: Vec, log: slog::Logger, + fork_name: ForkName, ) -> Libp2pInstance { let port = unused_tcp_port().unwrap(); let config = build_config(port, boot_nodes); @@ -101,7 +111,7 @@ pub async fn build_libp2p_instance( let libp2p_context = lighthouse_network::Context { config: &config, enr_fork_id: EnrForkId::default(), - fork_context: Arc::new(fork_context()), + fork_context: Arc::new(fork_context(fork_name)), chain_spec: &ChainSpec::minimal(), gossipsub_registry: None, }; @@ -125,10 +135,11 @@ pub async fn build_full_mesh( rt: Weak, log: slog::Logger, n: usize, + fork_name: ForkName, ) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); } let multiaddrs: Vec = nodes .iter() @@ -154,12 +165,13 @@ pub async fn build_full_mesh( pub async fn build_node_pair( rt: Weak, log: &slog::Logger, + fork_name: ForkName, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await; - let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await; + let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name).await; + let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name).await; let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone(); @@ -198,10 +210,15 @@ pub async fn build_node_pair( // Returns `n` peers in a linear topology #[allow(dead_code)] -pub async fn build_linear(rt: Weak, log: slog::Logger, n: usize) -> Vec { +pub async fn build_linear( + rt: Weak, + log: slog::Logger, + n: usize, + fork_name: ForkName, +) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); } let multiaddrs: Vec = nodes diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a270e4044..5895d32d5 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -12,7 +12,7 @@ use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, ForkContext, - Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, + ForkName, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, }; mod common; @@ -23,7 +23,7 @@ type E = MinimalEthSpec; fn merge_block_small(fork_context: &ForkContext) -> BeaconBlock { let mut block = BeaconBlockMerge::::empty(&E::default_spec()); let tx = VariableList::from(vec![0; 1024]); - let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); + let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs; @@ -61,7 +61,8 @@ fn test_status_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // Dummy STATUS RPC message let rpc_request = Request::Status(StatusMessage { @@ -159,7 +160,8 @@ fn test_blocks_by_range_chunked_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -179,7 +181,7 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context()); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -298,7 +300,8 @@ fn test_blocks_by_range_over_limit() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -308,7 +311,7 @@ fn test_blocks_by_range_over_limit() { }); // BlocksByRange Response - let full_block = merge_block_large(&common::fork_context()); + let full_block = merge_block_large(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -395,7 +398,8 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -526,7 +530,8 @@ fn test_blocks_by_range_single_empty_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -641,7 +646,8 @@ fn test_blocks_by_root_chunked_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { @@ -664,7 +670,7 @@ fn test_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context()); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRoot(Some(Box::new(signed_full_block))); @@ -779,7 +785,8 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { @@ -916,7 +923,8 @@ fn test_goodbye_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // build the sender future let sender_future = async {