From 7af57420810772b2a1b0d7d75a0d045c0333093b Mon Sep 17 00:00:00 2001 From: Divma Date: Wed, 22 Jun 2022 16:23:34 +0000 Subject: [PATCH] Deprecate step param in BlocksByRange RPC request (#3275) ## Issue Addressed Deprecates the step parameter in the blocks by range request ## Proposed Changes - Modifies the BlocksByRangeRequest type to remove the step parameter and everywhere we took it into account before - Adds a new type to still handle coding and decoding of requests that use the parameter ## Additional Info I went with a deprecation over the type itself so that requests received outside `lighthouse_network` don't even need to deal with this parameter. After the deprecation period just removing the Old blocks by range request should be straightforward --- .../lighthouse_network/src/behaviour/mod.rs | 40 +++- .../src/rpc/codec/ssz_snappy.rs | 177 +++++++++++++----- .../lighthouse_network/src/rpc/methods.rs | 16 ++ .../lighthouse_network/src/rpc/outbound.rs | 2 +- .../lighthouse_network/src/rpc/protocol.rs | 6 +- .../src/rpc/rate_limiter.rs | 24 +-- .../lighthouse_network/tests/rpc_tests.rs | 4 - .../beacon_processor/worker/rpc_methods.rs | 44 ++--- .../network/src/sync/range_sync/batch.rs | 1 - 9 files changed, 201 insertions(+), 113 deletions(-) diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index 81de3f015..bf1918662 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -1065,11 +1065,33 @@ where // propagate the STATUS message upwards self.propagate_request(peer_request_id, peer_id, Request::Status(msg)) } - InboundRequest::BlocksByRange(req) => self.propagate_request( - peer_request_id, - peer_id, - Request::BlocksByRange(req), - ), + InboundRequest::BlocksByRange(req) => { + let methods::OldBlocksByRangeRequest { + start_slot, + mut count, + step, + } = req; + // Still disconnect the peer if the request is naughty. + if step == 0 { + self.peer_manager.handle_rpc_error( + &peer_id, + Protocol::BlocksByRange, + &RPCError::InvalidData( + "Blocks by range with 0 step parameter".into(), + ), + ConnectionDirection::Incoming, + ); + } + // return just one block in case the step parameter is used. https://github.com/ethereum/consensus-specs/pull/2856 + if step > 1 { + count = 1; + } + self.propagate_request( + peer_request_id, + peer_id, + Request::BlocksByRange(BlocksByRangeRequest { start_slot, count }), + ); + } InboundRequest::BlocksByRoot(req) => { self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) } @@ -1313,7 +1335,13 @@ impl std::convert::From for OutboundRequest { fn from(req: Request) -> OutboundRequest { match req { Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r), - Request::BlocksByRange(r) => OutboundRequest::BlocksByRange(r), + Request::BlocksByRange(BlocksByRangeRequest { start_slot, count }) => { + OutboundRequest::BlocksByRange(methods::OldBlocksByRangeRequest { + start_slot, + count, + step: 1, + }) + } Request::Status(s) => OutboundRequest::Status(s), } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 6bd4a96fb..f6c3e61b0 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -137,6 +137,9 @@ impl Decoder for SSZSnappyInboundCodec { type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if self.protocol.message_name == Protocol::MetaData { + return Ok(Some(InboundRequest::MetaData(PhantomData))); + } let length = match handle_length(&mut self.inner, &mut self.len, src)? { Some(len) => len, None => return Ok(None), @@ -461,7 +464,7 @@ fn handle_v1_request( GoodbyeReason::from_ssz_bytes(decoded_buffer)?, ))), Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, + OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, @@ -493,7 +496,7 @@ fn handle_v2_request( ) -> Result>, RPCError> { match protocol { Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange( - BlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, + OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, ))), Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, @@ -712,6 +715,20 @@ mod tests { } } + fn bbrange_request() -> OldBlocksByRangeRequest { + OldBlocksByRangeRequest { + start_slot: 0, + count: 10, + step: 1, + } + } + + fn bbroot_request() -> BlocksByRootRequest { + BlocksByRootRequest { + block_roots: VariableList::from(vec![Hash256::zero()]), + } + } + fn ping_message() -> Ping { Ping { data: 1 } } @@ -732,7 +749,7 @@ mod tests { } /// Encodes the given protocol response as bytes. - fn encode( + fn encode_response( protocol: Protocol, version: Version, message: RPCCodedResponse, @@ -779,7 +796,7 @@ mod tests { } /// Attempts to decode the given protocol bytes as an rpc response - fn decode( + fn decode_response( protocol: Protocol, version: Version, message: &mut BytesMut, @@ -795,21 +812,70 @@ mod tests { } /// Encodes the provided protocol message as bytes and tries to decode the encoding bytes. - fn encode_then_decode( + fn encode_then_decode_response( protocol: Protocol, version: Version, message: RPCCodedResponse, fork_name: ForkName, ) -> Result>, RPCError> { - let mut encoded = encode(protocol, version.clone(), message, fork_name)?; - decode(protocol, version, &mut encoded, fork_name) + let mut encoded = encode_response(protocol, version.clone(), message, fork_name)?; + decode_response(protocol, version, &mut encoded, fork_name) + } + + /// Verifies that requests we send are encoded in a way that we would correctly decode too. + fn encode_then_decode_request(req: OutboundRequest, fork_name: ForkName) { + let fork_context = Arc::new(fork_context(fork_name)); + let max_packet_size = max_rpc_size(&fork_context); + for protocol in req.supported_protocols() { + // Encode a request we send + let mut buf = BytesMut::new(); + let mut outbound_codec = SSZSnappyOutboundCodec::::new( + protocol.clone(), + max_packet_size, + fork_context.clone(), + ); + outbound_codec.encode(req.clone(), &mut buf).unwrap(); + + let mut inbound_codec = SSZSnappyInboundCodec::::new( + protocol.clone(), + max_packet_size, + fork_context.clone(), + ); + + let decoded = inbound_codec.decode(&mut buf).unwrap().unwrap_or_else(|| { + panic!( + "Should correctly decode the request {} over protocol {:?} and fork {}", + req, protocol, fork_name + ) + }); + match req.clone() { + OutboundRequest::Status(status) => { + assert_eq!(decoded, InboundRequest::Status(status)) + } + OutboundRequest::Goodbye(goodbye) => { + assert_eq!(decoded, InboundRequest::Goodbye(goodbye)) + } + OutboundRequest::BlocksByRange(bbrange) => { + assert_eq!(decoded, InboundRequest::BlocksByRange(bbrange)) + } + OutboundRequest::BlocksByRoot(bbroot) => { + assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot)) + } + OutboundRequest::Ping(ping) => { + assert_eq!(decoded, InboundRequest::Ping(ping)) + } + OutboundRequest::MetaData(metadata) => { + assert_eq!(decoded, InboundRequest::MetaData(metadata)) + } + } + } } // Test RPCResponse encoding/decoding for V1 messages #[test] fn test_encode_then_decode_v1() { assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::Status, Version::V1, RPCCodedResponse::Success(RPCResponse::Status(status_message())), @@ -819,7 +885,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::Ping, Version::V1, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), @@ -829,7 +895,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRange, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(empty_base_block()))), @@ -842,7 +908,7 @@ mod tests { assert!( matches!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRange, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), @@ -855,7 +921,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRoot, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(empty_base_block()))), @@ -868,7 +934,7 @@ mod tests { assert!( matches!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRoot, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), @@ -881,7 +947,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), @@ -891,7 +957,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), @@ -902,7 +968,7 @@ mod tests { // A MetaDataV2 still encodes as a MetaDataV1 since version is Version::V1 assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), @@ -917,7 +983,7 @@ mod tests { fn test_encode_then_decode_v2() { assert!( matches!( - encode_then_decode( + encode_then_decode_response( Protocol::Status, Version::V2, RPCCodedResponse::Success(RPCResponse::Status(status_message())), @@ -931,7 +997,7 @@ mod tests { assert!( matches!( - encode_then_decode( + encode_then_decode_response( Protocol::Ping, Version::V2, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), @@ -944,7 +1010,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(empty_base_block()))), @@ -959,7 +1025,7 @@ mod tests { // This is useful for checking that we allow for blocks smaller than // the current_fork's rpc limit assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(empty_base_block()))), @@ -971,7 +1037,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), @@ -984,7 +1050,7 @@ mod tests { let merge_block_large = merge_block_large(&fork_context(ForkName::Merge)); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new( @@ -1003,7 +1069,7 @@ mod tests { assert!( matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut encoded, @@ -1016,7 +1082,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(empty_base_block()))), @@ -1031,7 +1097,7 @@ mod tests { // This is useful for checking that we allow for blocks smaller than // the current_fork's rpc limit assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(empty_base_block()))), @@ -1043,7 +1109,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), @@ -1053,7 +1119,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new( @@ -1070,7 +1136,7 @@ mod tests { assert!( matches!( - decode( + decode_response( Protocol::BlocksByRoot, Version::V2, &mut encoded, @@ -1084,7 +1150,7 @@ mod tests { // A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2 assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::MetaData, Version::V2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), @@ -1094,7 +1160,7 @@ mod tests { ); assert_eq!( - encode_then_decode( + encode_then_decode_response( Protocol::MetaData, Version::V2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), @@ -1110,7 +1176,7 @@ mod tests { let fork_context = fork_context(ForkName::Altair); // Removing context bytes for v2 messages should error - let mut encoded_bytes = encode( + let mut encoded_bytes = encode_response( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(empty_base_block()))), @@ -1121,7 +1187,7 @@ mod tests { let _ = encoded_bytes.split_to(4); assert!(matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut encoded_bytes, @@ -1131,7 +1197,7 @@ mod tests { RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); - let mut encoded_bytes = encode( + let mut encoded_bytes = encode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(empty_base_block()))), @@ -1142,7 +1208,7 @@ mod tests { let _ = encoded_bytes.split_to(4); assert!(matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut encoded_bytes, @@ -1153,7 +1219,7 @@ mod tests { )); // Trying to decode a base block with altair context bytes should give ssz decoding error - let mut encoded_bytes = encode( + let mut encoded_bytes = encode_response( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(empty_base_block()))), @@ -1167,7 +1233,7 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes, @@ -1178,7 +1244,7 @@ mod tests { )); // Trying to decode an altair block with base context bytes should give ssz decoding error - let mut encoded_bytes = encode( + let mut encoded_bytes = encode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), @@ -1191,7 +1257,7 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes, @@ -1205,7 +1271,7 @@ mod tests { let mut encoded_bytes = BytesMut::new(); encoded_bytes.extend_from_slice(&fork_context.to_context_bytes(ForkName::Altair).unwrap()); encoded_bytes.extend_from_slice( - &encode( + &encode_response( Protocol::MetaData, Version::V2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), @@ -1214,7 +1280,7 @@ mod tests { .unwrap(), ); - assert!(decode( + assert!(decode_response( Protocol::MetaData, Version::V2, &mut encoded_bytes, @@ -1223,7 +1289,7 @@ mod tests { .is_err()); // Sending context bytes which do not correspond to any fork should return an error - let mut encoded_bytes = encode( + let mut encoded_bytes = encode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(empty_base_block()))), @@ -1236,7 +1302,7 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes, @@ -1247,7 +1313,7 @@ mod tests { )); // Sending bytes less than context bytes length should wait for more bytes by returning `Ok(None)` - let mut encoded_bytes = encode( + let mut encoded_bytes = encode_response( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(empty_base_block()))), @@ -1258,7 +1324,7 @@ mod tests { let mut part = encoded_bytes.split_to(3); assert_eq!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut part, @@ -1268,6 +1334,23 @@ mod tests { ) } + #[test] + fn test_encode_then_decode_request() { + let requests: &[OutboundRequest] = &[ + OutboundRequest::Ping(ping_message()), + OutboundRequest::Status(status_message()), + OutboundRequest::Goodbye(GoodbyeReason::Fault), + OutboundRequest::BlocksByRange(bbrange_request()), + OutboundRequest::BlocksByRoot(bbroot_request()), + OutboundRequest::MetaData(PhantomData::), + ]; + for req in requests.iter() { + for fork_name in ForkName::list_all() { + encode_then_decode_request(req.clone(), fork_name); + } + } + } + /// Test a malicious snappy encoding for a V1 `Status` message where the attacker /// sends a valid message filled with a stream of useless padding before the actual message. #[test] @@ -1319,7 +1402,7 @@ mod tests { // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert!(matches!( - decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + decode_response(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), RPCError::InvalidData(_) )); } @@ -1376,7 +1459,7 @@ mod tests { // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert!(matches!( - decode( + decode_response( Protocol::BlocksByRange, Version::V2, &mut dst, @@ -1421,7 +1504,7 @@ mod tests { dst.extend_from_slice(writer.get_ref()); assert!(matches!( - decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + decode_response(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), RPCError::InvalidData(_) )); } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 1ac9c9b2c..46de772d8 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -201,6 +201,16 @@ pub struct BlocksByRangeRequest { /// The number of blocks from the start slot. pub count: u64, +} + +/// Request a number of beacon block roots from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct OldBlocksByRangeRequest { + /// The starting slot to request blocks. + pub start_slot: u64, + + /// The number of blocks from the start slot. + pub count: u64, /// The step increment to receive blocks. /// @@ -410,6 +420,12 @@ impl std::fmt::Display for GoodbyeReason { } impl std::fmt::Display for BlocksByRangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count) + } +} + +impl std::fmt::Display for OldBlocksByRangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 17201c6cf..7d5acc436 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -36,7 +36,7 @@ pub struct OutboundRequestContainer { pub enum OutboundRequest { Status(StatusMessage), Goodbye(GoodbyeReason), - BlocksByRange(BlocksByRangeRequest), + BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), Ping(Ping), MetaData(PhantomData), diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 1639d1794..81960214b 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -279,8 +279,8 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlocksByRange => RpcLimits::new( - ::ssz_fixed_len(), - ::ssz_fixed_len(), + ::ssz_fixed_len(), + ::ssz_fixed_len(), ), Protocol::BlocksByRoot => { RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) @@ -415,7 +415,7 @@ where pub enum InboundRequest { Status(StatusMessage), Goodbye(GoodbyeReason), - BlocksByRange(BlocksByRangeRequest), + BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), Ping(Ping), MetaData(PhantomData), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 5e1b533c6..70b14c33d 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -188,29 +188,7 @@ impl RPCRateLimiter { request: &InboundRequest, ) -> Result<(), RateLimitedErr> { let time_since_start = self.init_time.elapsed(); - let mut tokens = request.expected_responses().max(1); - - // Increase the rate limit for blocks by range requests with large step counts. - // We count to tokens as a quadratic increase with step size. - // Using (step_size/5)^2 + 1 as penalty factor allows step sizes of 1-4 to have no penalty - // but step sizes higher than this add a quadratic penalty. - // Penalty's go: - // Step size | Penalty Factor - // 1 | 1 - // 2 | 1 - // 3 | 1 - // 4 | 1 - // 5 | 2 - // 6 | 2 - // 7 | 2 - // 8 | 3 - // 9 | 4 - // 10 | 5 - - if let InboundRequest::BlocksByRange(bbr_req) = request { - let penalty_factor = (bbr_req.step as f64 / 5.0).powi(2) as u64 + 1; - tokens *= penalty_factor; - } + let tokens = request.expected_responses().max(1); let check = |limiter: &mut Limiter| limiter.allows(time_since_start, peer_id, tokens); diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 5895d32d5..973485fc4 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -167,7 +167,6 @@ fn test_blocks_by_range_chunked_rpc() { let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: messages_to_send, - step: 0, }); let spec = E::default_spec(); @@ -307,7 +306,6 @@ fn test_blocks_by_range_over_limit() { let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: messages_to_send, - step: 0, }); // BlocksByRange Response @@ -405,7 +403,6 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: messages_to_send, - step: 0, }); // BlocksByRange Response @@ -537,7 +534,6 @@ fn test_blocks_by_range_single_empty_rpc() { let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: 10, - step: 0, }); // BlocksByRange Response diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 2d2196b9e..cf113ca1f 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -7,7 +7,7 @@ use itertools::process_results; use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; -use slog::{debug, error, warn}; +use slog::{debug, error}; use slot_clock::SlotClock; use task_executor::TaskExecutor; use types::{Epoch, EthSpec, Hash256, Slot}; @@ -196,16 +196,12 @@ impl Worker { "peer_id" => %peer_id, "count" => req.count, "start_slot" => req.start_slot, - "step" => req.step); + ); // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOCKS { req.count = MAX_REQUEST_BLOCKS; } - if req.step == 0 { - self.goodbye_peer(peer_id, GoodbyeReason::Fault); - return warn!(self.log, "Peer sent invalid range request"; "error" => "Step sent was 0"); - } let forwards_block_root_iter = match self .chain @@ -229,29 +225,21 @@ impl Worker { Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), }; - // Pick out the required blocks, ignoring skip-slots and stepping by the step parameter. - // - // NOTE: We don't mind if req.count * req.step overflows as it just ends the iterator early and - // the peer will get less blocks. - // The step parameter is quadratically weighted in the filter, so large values should be - // prevented before reaching this point. + // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = None; let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| { - slot.as_u64() < req.start_slot.saturating_add(req.count * req.step) - }) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .step_by(req.step as usize) - .collect::>>() + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() }); let block_roots = match maybe_block_roots { @@ -273,7 +261,7 @@ impl Worker { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending if block.slot() >= req.start_slot - && block.slot() < req.start_slot + req.count * req.step + && block.slot() < req.start_slot + req.count { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index aaebe022c..447f0bd11 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -199,7 +199,6 @@ impl BatchInfo { BlocksByRangeRequest { start_slot: self.start_slot.into(), count: self.end_slot.sub(self.start_slot).into(), - step: 1, } }