From 0ecca1dcb086a440ac4663e3d7aceb74e4101959 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 14 Jun 2023 05:08:50 +0000 Subject: [PATCH] Rework internal rpc protocol handling (#4290) ## Issue Addressed Resolves #3980. Builds on work by @GeemoCandama in #4084 ## Proposed Changes Extends the `SupportedProtocol` abstraction added in Geemo's PR and attempts to fix internal versioning of requests that are mentioned in this comment https://github.com/sigp/lighthouse/pull/4084#issuecomment-1496380033 Co-authored-by: geemo --- .../lighthouse_network/src/rpc/codec/base.rs | 9 +- .../src/rpc/codec/ssz_snappy.rs | 591 +++++++----------- .../lighthouse_network/src/rpc/handler.rs | 14 +- .../lighthouse_network/src/rpc/methods.rs | 125 +++- beacon_node/lighthouse_network/src/rpc/mod.rs | 4 +- .../lighthouse_network/src/rpc/outbound.rs | 68 +- .../lighthouse_network/src/rpc/protocol.rs | 187 +++--- .../src/rpc/rate_limiter.rs | 4 +- .../src/rpc/self_limiter.rs | 8 +- .../src/service/api_types.rs | 28 +- .../lighthouse_network/src/service/mod.rs | 49 +- .../lighthouse_network/src/service/utils.rs | 8 +- .../lighthouse_network/tests/rpc_tests.rs | 34 +- .../beacon_processor/worker/rpc_methods.rs | 54 +- .../sync/block_lookups/single_block_lookup.rs | 4 +- .../network/src/sync/network_context.rs | 8 +- .../network/src/sync/range_sync/batch.rs | 8 +- 17 files changed, 619 insertions(+), 584 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index 6c6ce2da3..d568f2789 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -214,8 +214,7 @@ mod tests { let mut buf = BytesMut::new(); buf.extend_from_slice(&message); - let snappy_protocol_id = - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(ForkName::Base)); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( @@ -249,8 +248,7 @@ mod tests { // Insert length-prefix uvi_codec.encode(len, &mut dst).unwrap(); - let snappy_protocol_id = - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(ForkName::Base)); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( @@ -277,8 +275,7 @@ mod tests { dst } - let protocol_id = - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy); + let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy); // Response limits let fork_context = Arc::new(fork_context(ForkName::Base)); diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 28fea40a2..39cf8b3eb 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -1,9 +1,9 @@ +use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, - protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, + protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, }; use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; -use crate::{rpc::methods::*, EnrSyncCommitteeBitfield}; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -76,27 +76,14 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::MetaData(res) => // Encode the correct version of the MetaData response based on the negotiated version. { - match self.protocol.version { - Version::V1 => MetaData::::V1(MetaDataV1 { - seq_number: *res.seq_number(), - attnets: res.attnets().clone(), - }) - .as_ssz_bytes(), - Version::V2 => { - // `res` is of type MetaDataV2, return the ssz bytes - if res.syncnets().is_ok() { - res.as_ssz_bytes() - } else { - // `res` is of type MetaDataV1, create a MetaDataV2 by adding a default syncnets field - // Note: This code path is redundant as `res` would be always of type MetaDataV2 - MetaData::::V2(MetaDataV2 { - seq_number: *res.seq_number(), - attnets: res.attnets().clone(), - syncnets: EnrSyncCommitteeBitfield::::default(), - }) - .as_ssz_bytes() - } - } + match self.protocol.versioned_protocol { + SupportedProtocol::MetaDataV1 => res.metadata_v1().as_ssz_bytes(), + // We always send V2 metadata responses from the behaviour + // No change required. + SupportedProtocol::MetaDataV2 => res.metadata_v2().as_ssz_bytes(), + _ => unreachable!( + "We only send metadata responses on negotiating metadata requests" + ), } } }, @@ -139,8 +126,11 @@ impl Decoder for SSZSnappyInboundCodec { type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if self.protocol.message_name == Protocol::MetaData { - return Ok(Some(InboundRequest::MetaData(PhantomData))); + if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV1 { + return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v1()))); + } + if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV2 { + return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v2()))); } let length = match handle_length(&mut self.inner, &mut self.len, src)? { Some(len) => len, @@ -152,8 +142,8 @@ impl Decoder for SSZSnappyInboundCodec { let ssz_limits = self.protocol.rpc_request_limits(); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { return Err(RPCError::InvalidData(format!( - "RPC request length is out of bounds, length {}", - length + "RPC request length for protocol {:?} is out of bounds, length {}", + self.protocol.versioned_protocol, length ))); } // Calculate worst case compression length for given uncompressed length @@ -170,11 +160,7 @@ impl Decoder for SSZSnappyInboundCodec { let n = reader.get_ref().get_ref().position(); self.len = None; let _read_bytes = src.split_to(n as usize); - - match self.protocol.version { - Version::V1 => handle_v1_request(self.protocol.message_name, &decoded_buffer), - Version::V2 => handle_v2_request(self.protocol.message_name, &decoded_buffer), - } + handle_rpc_request(self.protocol.versioned_protocol, &decoded_buffer) } Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } @@ -228,11 +214,16 @@ impl Encoder> for SSZSnappyOutboundCodec< let bytes = match item { OutboundRequest::Status(req) => req.as_ssz_bytes(), OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), - OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(), - OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), + OutboundRequest::BlocksByRange(r) => match r { + OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(), + OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(), + }, + OutboundRequest::BlocksByRoot(r) => match r { + BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), + BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), + }, OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode - OutboundRequest::LightClientBootstrap(req) => req.as_ssz_bytes(), }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { @@ -311,15 +302,10 @@ impl Decoder for SSZSnappyOutboundCodec { let n = reader.get_ref().get_ref().position(); self.len = None; let _read_bytes = src.split_to(n as usize); - - match self.protocol.version { - Version::V1 => handle_v1_response(self.protocol.message_name, &decoded_buffer), - Version::V2 => handle_v2_response( - self.protocol.message_name, - &decoded_buffer, - &mut self.fork_name, - ), - } + // Safe to `take` from `self.fork_name` as we have all the bytes we need to + // decode an ssz object at this point. + let fork_name = self.fork_name.take(); + handle_rpc_response(self.protocol.versioned_protocol, &decoded_buffer, fork_name) } Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } @@ -456,181 +442,150 @@ fn handle_length( } } -/// Decodes a `Version::V1` `InboundRequest` from the byte stream. +/// Decodes an `InboundRequest` from the byte stream. /// `decoded_buffer` should be an ssz-encoded bytestream with // length = length-prefix received in the beginning of the stream. -fn handle_v1_request( - protocol: Protocol, +fn handle_rpc_request( + versioned_protocol: SupportedProtocol, decoded_buffer: &[u8], ) -> Result>, RPCError> { - match protocol { - Protocol::Status => Ok(Some(InboundRequest::Status(StatusMessage::from_ssz_bytes( - decoded_buffer, - )?))), - Protocol::Goodbye => Ok(Some(InboundRequest::Goodbye( + match versioned_protocol { + SupportedProtocol::StatusV1 => Ok(Some(InboundRequest::Status( + StatusMessage::from_ssz_bytes(decoded_buffer)?, + ))), + SupportedProtocol::GoodbyeV1 => Ok(Some(InboundRequest::Goodbye( GoodbyeReason::from_ssz_bytes(decoded_buffer)?, ))), - Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange( - OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, + SupportedProtocol::BlocksByRangeV2 => Ok(Some(InboundRequest::BlocksByRange( + OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2::from_ssz_bytes(decoded_buffer)?), ))), - Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, - }))), - Protocol::Ping => Ok(Some(InboundRequest::Ping(Ping { + SupportedProtocol::BlocksByRangeV1 => Ok(Some(InboundRequest::BlocksByRange( + OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1::from_ssz_bytes(decoded_buffer)?), + ))), + SupportedProtocol::BlocksByRootV2 => Ok(Some(InboundRequest::BlocksByRoot( + BlocksByRootRequest::V2(BlocksByRootRequestV2 { + block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, + }), + ))), + SupportedProtocol::BlocksByRootV1 => Ok(Some(InboundRequest::BlocksByRoot( + BlocksByRootRequest::V1(BlocksByRootRequestV1 { + block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, + }), + ))), + SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), - Protocol::LightClientBootstrap => Ok(Some(InboundRequest::LightClientBootstrap( - LightClientBootstrapRequest { + SupportedProtocol::LightClientBootstrapV1 => Ok(Some( + InboundRequest::LightClientBootstrap(LightClientBootstrapRequest { root: Hash256::from_ssz_bytes(decoded_buffer)?, - }, - ))), + }), + )), // MetaData requests return early from InboundUpgrade and do not reach the decoder. // Handle this case just for completeness. - Protocol::MetaData => { + SupportedProtocol::MetaDataV2 => { if !decoded_buffer.is_empty() { Err(RPCError::InternalError( "Metadata requests shouldn't reach decoder", )) } else { - Ok(Some(InboundRequest::MetaData(PhantomData))) + Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v2()))) } } - } -} - -/// Decodes a `Version::V2` `InboundRequest` from the byte stream. -/// `decoded_buffer` should be an ssz-encoded bytestream with -// length = length-prefix received in the beginning of the stream. -fn handle_v2_request( - protocol: Protocol, - decoded_buffer: &[u8], -) -> Result>, RPCError> { - match protocol { - Protocol::BlocksByRange => Ok(Some(InboundRequest::BlocksByRange( - OldBlocksByRangeRequest::from_ssz_bytes(decoded_buffer)?, - ))), - Protocol::BlocksByRoot => Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(decoded_buffer)?, - }))), - // MetaData requests return early from InboundUpgrade and do not reach the decoder. - // Handle this case just for completeness. - Protocol::MetaData => { + SupportedProtocol::MetaDataV1 => { if !decoded_buffer.is_empty() { Err(RPCError::InvalidData("Metadata request".to_string())) } else { - Ok(Some(InboundRequest::MetaData(PhantomData))) + Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v1()))) } } - _ => Err(RPCError::ErrorResponse( - RPCResponseErrorCode::InvalidRequest, - format!("{} does not support version 2", protocol), - )), } } -/// Decodes a `Version::V1` `RPCResponse` from the byte stream. +/// Decodes a `RPCResponse` from the byte stream. /// `decoded_buffer` should be an ssz-encoded bytestream with -// length = length-prefix received in the beginning of the stream. -fn handle_v1_response( - protocol: Protocol, - decoded_buffer: &[u8], -) -> Result>, RPCError> { - match protocol { - Protocol::Status => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( - decoded_buffer, - )?))), - // This case should be unreachable as `Goodbye` has no response. - Protocol::Goodbye => Err(RPCError::InvalidData( - "Goodbye RPC message has no valid response".to_string(), - )), - Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Arc::new( - SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), - )))), - Protocol::BlocksByRoot => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( - SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), - )))), - Protocol::Ping => Ok(Some(RPCResponse::Pong(Ping { - data: u64::from_ssz_bytes(decoded_buffer)?, - }))), - Protocol::MetaData => Ok(Some(RPCResponse::MetaData(MetaData::V1( - MetaDataV1::from_ssz_bytes(decoded_buffer)?, - )))), - Protocol::LightClientBootstrap => Ok(Some(RPCResponse::LightClientBootstrap( - LightClientBootstrap::from_ssz_bytes(decoded_buffer)?, - ))), - } -} - -/// Decodes a `Version::V2` `RPCResponse` from the byte stream. -/// `decoded_buffer` should be an ssz-encoded bytestream with -// length = length-prefix received in the beginning of the stream. +/// length = length-prefix received in the beginning of the stream. /// /// For BlocksByRange/BlocksByRoot reponses, decodes the appropriate response /// according to the received `ForkName`. -fn handle_v2_response( - protocol: Protocol, +fn handle_rpc_response( + versioned_protocol: SupportedProtocol, decoded_buffer: &[u8], - fork_name: &mut Option, + fork_name: Option, ) -> Result>, RPCError> { - // MetaData does not contain context_bytes - if let Protocol::MetaData = protocol { - Ok(Some(RPCResponse::MetaData(MetaData::V2( + match versioned_protocol { + SupportedProtocol::StatusV1 => Ok(Some(RPCResponse::Status( + StatusMessage::from_ssz_bytes(decoded_buffer)?, + ))), + // This case should be unreachable as `Goodbye` has no response. + SupportedProtocol::GoodbyeV1 => Err(RPCError::InvalidData( + "Goodbye RPC message has no valid response".to_string(), + )), + SupportedProtocol::BlocksByRangeV1 => Ok(Some(RPCResponse::BlocksByRange(Arc::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), + )))), + SupportedProtocol::BlocksByRootV1 => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), + )))), + SupportedProtocol::PingV1 => Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(decoded_buffer)?, + }))), + SupportedProtocol::MetaDataV1 => Ok(Some(RPCResponse::MetaData(MetaData::V1( + MetaDataV1::from_ssz_bytes(decoded_buffer)?, + )))), + SupportedProtocol::LightClientBootstrapV1 => Ok(Some(RPCResponse::LightClientBootstrap( + LightClientBootstrap::from_ssz_bytes(decoded_buffer)?, + ))), + // MetaData V2 responses have no context bytes, so behave similarly to V1 responses + SupportedProtocol::MetaDataV2 => Ok(Some(RPCResponse::MetaData(MetaData::V2( MetaDataV2::from_ssz_bytes(decoded_buffer)?, - )))) - } else { - let fork_name = fork_name.take().ok_or_else(|| { - RPCError::ErrorResponse( - RPCResponseErrorCode::InvalidRequest, - format!("No context bytes provided for {} response", protocol), - ) - })?; - match protocol { - Protocol::BlocksByRange => match fork_name { - ForkName::Altair => Ok(Some(RPCResponse::BlocksByRange(Arc::new( - SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes( - decoded_buffer, - )?), - )))), + )))), + SupportedProtocol::BlocksByRangeV2 => match fork_name { + Some(ForkName::Altair) => Ok(Some(RPCResponse::BlocksByRange(Arc::new( + SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(decoded_buffer)?), + )))), - ForkName::Base => Ok(Some(RPCResponse::BlocksByRange(Arc::new( - SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), - )))), - ForkName::Merge => Ok(Some(RPCResponse::BlocksByRange(Arc::new( - SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes( - decoded_buffer, - )?), - )))), - ForkName::Capella => Ok(Some(RPCResponse::BlocksByRange(Arc::new( - SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes( - decoded_buffer, - )?), - )))), - }, - Protocol::BlocksByRoot => match fork_name { - ForkName::Altair => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( - SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes( - decoded_buffer, - )?), - )))), - ForkName::Base => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( - SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), - )))), - ForkName::Merge => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( - SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes( - decoded_buffer, - )?), - )))), - ForkName::Capella => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( - SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes( - decoded_buffer, - )?), - )))), - }, - _ => Err(RPCError::ErrorResponse( + Some(ForkName::Base) => Ok(Some(RPCResponse::BlocksByRange(Arc::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Merge) => Ok(Some(RPCResponse::BlocksByRange(Arc::new( + SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Capella) => Ok(Some(RPCResponse::BlocksByRange(Arc::new( + SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes( + decoded_buffer, + )?), + )))), + None => Err(RPCError::ErrorResponse( RPCResponseErrorCode::InvalidRequest, - "Invalid v2 request".to_string(), + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), )), - } + }, + SupportedProtocol::BlocksByRootV2 => match fork_name { + Some(ForkName::Altair) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( + SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Base) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Merge) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( + SignedBeaconBlock::Merge(SignedBeaconBlockMerge::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Capella) => Ok(Some(RPCResponse::BlocksByRoot(Arc::new( + SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes( + decoded_buffer, + )?), + )))), + None => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, } } @@ -742,18 +697,20 @@ mod tests { } } - fn bbrange_request() -> OldBlocksByRangeRequest { - OldBlocksByRangeRequest { - start_slot: 0, - count: 10, - step: 1, - } + fn bbrange_request_v1() -> OldBlocksByRangeRequest { + OldBlocksByRangeRequest::new_v1(0, 10, 1) } - fn bbroot_request() -> BlocksByRootRequest { - BlocksByRootRequest { - block_roots: VariableList::from(vec![Hash256::zero()]), - } + fn bbrange_request_v2() -> OldBlocksByRangeRequest { + OldBlocksByRangeRequest::new(0, 10, 1) + } + + fn bbroot_request_v1() -> BlocksByRootRequest { + BlocksByRootRequest::new_v1(vec![Hash256::zero()].into()) + } + + fn bbroot_request_v2() -> BlocksByRootRequest { + BlocksByRootRequest::new(vec![Hash256::zero()].into()) } fn ping_message() -> Ping { @@ -777,12 +734,11 @@ mod tests { /// Encodes the given protocol response as bytes. fn encode_response( - protocol: Protocol, - version: Version, + protocol: SupportedProtocol, message: RPCCodedResponse, fork_name: ForkName, ) -> Result { - let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); + let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context); @@ -824,12 +780,11 @@ mod tests { /// Attempts to decode the given protocol bytes as an rpc response fn decode_response( - protocol: Protocol, - version: Version, + protocol: SupportedProtocol, message: &mut BytesMut, fork_name: ForkName, ) -> Result>, RPCError> { - let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); + let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy); let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context); let mut snappy_outbound_codec = @@ -840,63 +795,55 @@ mod tests { /// Encodes the provided protocol message as bytes and tries to decode the encoding bytes. fn encode_then_decode_response( - protocol: Protocol, - version: Version, + protocol: SupportedProtocol, message: RPCCodedResponse, fork_name: ForkName, ) -> Result>, RPCError> { - let mut encoded = encode_response(protocol, version.clone(), message, fork_name)?; - decode_response(protocol, version, &mut encoded, fork_name) + let mut encoded = encode_response(protocol, message, fork_name)?; + decode_response(protocol, &mut encoded, fork_name) } /// Verifies that requests we send are encoded in a way that we would correctly decode too. fn encode_then_decode_request(req: OutboundRequest, fork_name: ForkName) { let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context); - for protocol in req.supported_protocols() { - // Encode a request we send - let mut buf = BytesMut::new(); - let mut outbound_codec = SSZSnappyOutboundCodec::::new( - protocol.clone(), - max_packet_size, - fork_context.clone(), - ); - outbound_codec.encode(req.clone(), &mut buf).unwrap(); + let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy); + // Encode a request we send + let mut buf = BytesMut::new(); + let mut outbound_codec = SSZSnappyOutboundCodec::::new( + protocol.clone(), + max_packet_size, + fork_context.clone(), + ); + outbound_codec.encode(req.clone(), &mut buf).unwrap(); - let mut inbound_codec = SSZSnappyInboundCodec::::new( - protocol.clone(), - max_packet_size, - fork_context.clone(), - ); + let mut inbound_codec = + SSZSnappyInboundCodec::::new(protocol.clone(), max_packet_size, fork_context); - let decoded = inbound_codec.decode(&mut buf).unwrap().unwrap_or_else(|| { - panic!( - "Should correctly decode the request {} over protocol {:?} and fork {}", - req, protocol, fork_name - ) - }); - match req.clone() { - OutboundRequest::Status(status) => { - assert_eq!(decoded, InboundRequest::Status(status)) - } - OutboundRequest::Goodbye(goodbye) => { - assert_eq!(decoded, InboundRequest::Goodbye(goodbye)) - } - OutboundRequest::BlocksByRange(bbrange) => { - assert_eq!(decoded, InboundRequest::BlocksByRange(bbrange)) - } - OutboundRequest::BlocksByRoot(bbroot) => { - assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot)) - } - OutboundRequest::Ping(ping) => { - assert_eq!(decoded, InboundRequest::Ping(ping)) - } - OutboundRequest::MetaData(metadata) => { - assert_eq!(decoded, InboundRequest::MetaData(metadata)) - } - OutboundRequest::LightClientBootstrap(bootstrap) => { - assert_eq!(decoded, InboundRequest::LightClientBootstrap(bootstrap)) - } + let decoded = inbound_codec.decode(&mut buf).unwrap().unwrap_or_else(|| { + panic!( + "Should correctly decode the request {} over protocol {:?} and fork {}", + req, protocol, fork_name + ) + }); + match req { + OutboundRequest::Status(status) => { + assert_eq!(decoded, InboundRequest::Status(status)) + } + OutboundRequest::Goodbye(goodbye) => { + assert_eq!(decoded, InboundRequest::Goodbye(goodbye)) + } + OutboundRequest::BlocksByRange(bbrange) => { + assert_eq!(decoded, InboundRequest::BlocksByRange(bbrange)) + } + OutboundRequest::BlocksByRoot(bbroot) => { + assert_eq!(decoded, InboundRequest::BlocksByRoot(bbroot)) + } + OutboundRequest::Ping(ping) => { + assert_eq!(decoded, InboundRequest::Ping(ping)) + } + OutboundRequest::MetaData(metadata) => { + assert_eq!(decoded, InboundRequest::MetaData(metadata)) } } } @@ -906,8 +853,7 @@ mod tests { fn test_encode_then_decode_v1() { assert_eq!( encode_then_decode_response( - Protocol::Status, - Version::V1, + SupportedProtocol::StatusV1, RPCCodedResponse::Success(RPCResponse::Status(status_message())), ForkName::Base, ), @@ -916,8 +862,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::Ping, - Version::V1, + SupportedProtocol::PingV1, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), ForkName::Base, ), @@ -926,8 +871,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRange, - Version::V1, + SupportedProtocol::BlocksByRangeV1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Base, ), @@ -939,8 +883,7 @@ mod tests { assert!( matches!( encode_then_decode_response( - Protocol::BlocksByRange, - Version::V1, + SupportedProtocol::BlocksByRangeV1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(altair_block()))), ForkName::Altair, ) @@ -952,8 +895,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRoot, - Version::V1, + SupportedProtocol::BlocksByRootV1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Base, ), @@ -965,8 +907,7 @@ mod tests { assert!( matches!( encode_then_decode_response( - Protocol::BlocksByRoot, - Version::V1, + SupportedProtocol::BlocksByRootV1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))), ForkName::Altair, ) @@ -978,18 +919,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::MetaData, - Version::V1, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), - ForkName::Base, - ), - Ok(Some(RPCResponse::MetaData(metadata()))), - ); - - assert_eq!( - encode_then_decode_response( - Protocol::MetaData, - Version::V1, + SupportedProtocol::MetaDataV1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), ForkName::Base, ), @@ -999,8 +929,7 @@ mod tests { // A MetaDataV2 still encodes as a MetaDataV1 since version is Version::V1 assert_eq!( encode_then_decode_response( - Protocol::MetaData, - Version::V1, + SupportedProtocol::MetaDataV1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), ForkName::Base, ), @@ -1011,38 +940,9 @@ mod tests { // Test RPCResponse encoding/decoding for V1 messages #[test] fn test_encode_then_decode_v2() { - assert!( - matches!( - encode_then_decode_response( - Protocol::Status, - Version::V2, - RPCCodedResponse::Success(RPCResponse::Status(status_message())), - ForkName::Base, - ) - .unwrap_err(), - RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), - ), - "status does not have V2 message" - ); - - assert!( - matches!( - encode_then_decode_response( - Protocol::Ping, - Version::V2, - RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), - ForkName::Base, - ) - .unwrap_err(), - RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), - ), - "ping does not have V2 message" - ); - assert_eq!( encode_then_decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Base, ), @@ -1056,8 +956,7 @@ mod tests { // the current_fork's rpc limit assert_eq!( encode_then_decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Altair, ), @@ -1068,8 +967,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(altair_block()))), ForkName::Altair, ), @@ -1081,8 +979,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new( merge_block_small.clone() ))), @@ -1100,8 +997,7 @@ mod tests { assert!( matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut encoded, ForkName::Merge, ) @@ -1113,8 +1009,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Base, ), @@ -1128,8 +1023,7 @@ mod tests { // the current_fork's rpc limit assert_eq!( encode_then_decode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Altair, ), @@ -1140,8 +1034,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))), ForkName::Altair, ), @@ -1150,8 +1043,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new( merge_block_small.clone() ))), @@ -1167,8 +1059,7 @@ mod tests { assert!( matches!( decode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, &mut encoded, ForkName::Merge, ) @@ -1181,8 +1072,7 @@ mod tests { // A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2 assert_eq!( encode_then_decode_response( - Protocol::MetaData, - Version::V2, + SupportedProtocol::MetaDataV2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), ForkName::Base, ), @@ -1191,8 +1081,7 @@ mod tests { assert_eq!( encode_then_decode_response( - Protocol::MetaData, - Version::V2, + SupportedProtocol::MetaDataV2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), ForkName::Altair, ), @@ -1207,8 +1096,7 @@ mod tests { // Removing context bytes for v2 messages should error let mut encoded_bytes = encode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Base, ) @@ -1218,8 +1106,7 @@ mod tests { assert!(matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut encoded_bytes, ForkName::Base ) @@ -1228,8 +1115,7 @@ mod tests { )); let mut encoded_bytes = encode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Base, ) @@ -1239,8 +1125,7 @@ mod tests { assert!(matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut encoded_bytes, ForkName::Base ) @@ -1250,8 +1135,7 @@ mod tests { // Trying to decode a base block with altair context bytes should give ssz decoding error let mut encoded_bytes = encode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Arc::new(empty_base_block()))), ForkName::Altair, ) @@ -1264,8 +1148,7 @@ mod tests { assert!(matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut wrong_fork_bytes, ForkName::Altair ) @@ -1275,8 +1158,7 @@ mod tests { // Trying to decode an altair block with base context bytes should give ssz decoding error let mut encoded_bytes = encode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(altair_block()))), ForkName::Altair, ) @@ -1288,8 +1170,7 @@ mod tests { assert!(matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut wrong_fork_bytes, ForkName::Altair ) @@ -1302,8 +1183,7 @@ mod tests { encoded_bytes.extend_from_slice(&fork_context.to_context_bytes(ForkName::Altair).unwrap()); encoded_bytes.extend_from_slice( &encode_response( - Protocol::MetaData, - Version::V2, + SupportedProtocol::MetaDataV2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), ForkName::Altair, ) @@ -1311,8 +1191,7 @@ mod tests { ); assert!(decode_response( - Protocol::MetaData, - Version::V2, + SupportedProtocol::MetaDataV2, &mut encoded_bytes, ForkName::Altair ) @@ -1320,8 +1199,7 @@ mod tests { // Sending context bytes which do not correspond to any fork should return an error let mut encoded_bytes = encode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Altair, ) @@ -1333,8 +1211,7 @@ mod tests { assert!(matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut wrong_fork_bytes, ForkName::Altair ) @@ -1344,8 +1221,7 @@ mod tests { // Sending bytes less than context bytes length should wait for more bytes by returning `Ok(None)` let mut encoded_bytes = encode_response( - Protocol::BlocksByRoot, - Version::V2, + SupportedProtocol::BlocksByRootV2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Arc::new(empty_base_block()))), ForkName::Altair, ) @@ -1355,8 +1231,7 @@ mod tests { assert_eq!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut part, ForkName::Altair ), @@ -1370,9 +1245,12 @@ mod tests { OutboundRequest::Ping(ping_message()), OutboundRequest::Status(status_message()), OutboundRequest::Goodbye(GoodbyeReason::Fault), - OutboundRequest::BlocksByRange(bbrange_request()), - OutboundRequest::BlocksByRoot(bbroot_request()), - OutboundRequest::MetaData(PhantomData::), + OutboundRequest::BlocksByRange(bbrange_request_v1()), + OutboundRequest::BlocksByRange(bbrange_request_v2()), + OutboundRequest::BlocksByRoot(bbroot_request_v1()), + OutboundRequest::BlocksByRoot(bbroot_request_v2()), + OutboundRequest::MetaData(MetadataRequest::new_v1()), + OutboundRequest::MetaData(MetadataRequest::new_v2()), ]; for req in requests.iter() { for fork_name in ForkName::list_all() { @@ -1432,7 +1310,7 @@ mod tests { // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert!(matches!( - decode_response(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + decode_response(SupportedProtocol::StatusV1, &mut dst, ForkName::Base).unwrap_err(), RPCError::InvalidData(_) )); } @@ -1490,8 +1368,7 @@ mod tests { // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. assert!(matches!( decode_response( - Protocol::BlocksByRange, - Version::V2, + SupportedProtocol::BlocksByRangeV2, &mut dst, ForkName::Altair ) @@ -1534,7 +1411,7 @@ mod tests { dst.extend_from_slice(writer.get_ref()); assert!(matches!( - decode_response(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + decode_response(SupportedProtocol::StatusV1, &mut dst, ForkName::Base).unwrap_err(), RPCError::InvalidData(_) )); } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index a1743c15f..8199bee2a 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -245,7 +245,7 @@ where while let Some((id, req)) = self.dial_queue.pop() { self.events_out.push(Err(HandlerErr::Outbound { error: RPCError::Disconnected, - proto: req.protocol(), + proto: req.versioned_protocol().protocol(), id, })); } @@ -269,7 +269,7 @@ where } _ => self.events_out.push(Err(HandlerErr::Outbound { error: RPCError::Disconnected, - proto: req.protocol(), + proto: req.versioned_protocol().protocol(), id, })), } @@ -334,7 +334,7 @@ where ) { self.dial_negotiated -= 1; let (id, request) = request_info; - let proto = request.protocol(); + let proto = request.versioned_protocol().protocol(); // accept outbound connections only if the handler is not deactivated if matches!(self.state, HandlerState::Deactivated) { @@ -414,7 +414,7 @@ where 128, ) as usize), delay_key: Some(delay_key), - protocol: req.protocol(), + protocol: req.versioned_protocol().protocol(), request_start_time: Instant::now(), remaining_chunks: expected_responses, }, @@ -422,7 +422,7 @@ where } else { self.events_out.push(Err(HandlerErr::Inbound { id: self.current_inbound_substream_id, - proto: req.protocol(), + proto: req.versioned_protocol().protocol(), error: RPCError::HandlerRejected, })); return self.shutdown(None); @@ -498,7 +498,7 @@ where }; self.events_out.push(Err(HandlerErr::Outbound { error, - proto: req.protocol(), + proto: req.versioned_protocol().protocol(), id, })); } @@ -895,7 +895,7 @@ where // else we return an error, stream should not have closed early. let outbound_err = HandlerErr::Outbound { id: request_id, - proto: request.protocol(), + proto: request.versioned_protocol().protocol(), error: RPCError::IncompleteStream, }; return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err))); diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 5da595c3d..af0ba2510 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -3,11 +3,13 @@ use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; use regex::bytes::Regex; use serde::Serialize; +use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::{ typenum::{U1024, U256}, VariableList, }; +use std::marker::PhantomData; use std::ops::Deref; use std::sync::Arc; use strum::IntoStaticStr; @@ -85,6 +87,30 @@ pub struct Ping { pub data: u64, } +/// The METADATA request structure. +#[superstruct( + variants(V1, V2), + variant_attributes(derive(Clone, Debug, PartialEq, Serialize),) +)] +#[derive(Clone, Debug, PartialEq)] +pub struct MetadataRequest { + _phantom_data: PhantomData, +} + +impl MetadataRequest { + pub fn new_v1() -> Self { + Self::V1(MetadataRequestV1 { + _phantom_data: PhantomData, + }) + } + + pub fn new_v2() -> Self { + Self::V2(MetadataRequestV2 { + _phantom_data: PhantomData, + }) + } +} + /// The METADATA response structure. #[superstruct( variants(V1, V2), @@ -93,9 +119,8 @@ pub struct Ping { serde(bound = "T: EthSpec", deny_unknown_fields), ) )] -#[derive(Clone, Debug, PartialEq, Serialize, Encode)] +#[derive(Clone, Debug, PartialEq, Serialize)] #[serde(bound = "T: EthSpec")] -#[ssz(enum_behaviour = "transparent")] pub struct MetaData { /// A sequential counter indicating when data gets modified. pub seq_number: u64, @@ -106,6 +131,38 @@ pub struct MetaData { pub syncnets: EnrSyncCommitteeBitfield, } +impl MetaData { + /// Returns a V1 MetaData response from self. + pub fn metadata_v1(&self) -> Self { + match self { + md @ MetaData::V1(_) => md.clone(), + MetaData::V2(metadata) => MetaData::V1(MetaDataV1 { + seq_number: metadata.seq_number, + attnets: metadata.attnets.clone(), + }), + } + } + + /// Returns a V2 MetaData response from self by filling unavailable fields with default. + pub fn metadata_v2(&self) -> Self { + match self { + MetaData::V1(metadata) => MetaData::V2(MetaDataV2 { + seq_number: metadata.seq_number, + attnets: metadata.attnets.clone(), + syncnets: Default::default(), + }), + md @ MetaData::V2(_) => md.clone(), + } + } + + pub fn as_ssz_bytes(&self) -> Vec { + match self { + MetaData::V1(md) => md.as_ssz_bytes(), + MetaData::V2(md) => md.as_ssz_bytes(), + } + } +} + /// The reason given for a `Goodbye` message. /// /// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`, @@ -197,7 +254,11 @@ impl ssz::Decode for GoodbyeReason { } /// Request a number of beacon block roots from a peer. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[superstruct( + variants(V1, V2), + variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq)) +)] +#[derive(Clone, Debug, PartialEq)] pub struct BlocksByRangeRequest { /// The starting slot to request blocks. pub start_slot: u64, @@ -206,8 +267,23 @@ pub struct BlocksByRangeRequest { pub count: u64, } +impl BlocksByRangeRequest { + /// The default request is V2 + pub fn new(start_slot: u64, count: u64) -> Self { + Self::V2(BlocksByRangeRequestV2 { start_slot, count }) + } + + pub fn new_v1(start_slot: u64, count: u64) -> Self { + Self::V1(BlocksByRangeRequestV1 { start_slot, count }) + } +} + /// Request a number of beacon block roots from a peer. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[superstruct( + variants(V1, V2), + variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq)) +)] +#[derive(Clone, Debug, PartialEq)] pub struct OldBlocksByRangeRequest { /// The starting slot to request blocks. pub start_slot: u64, @@ -223,13 +299,43 @@ pub struct OldBlocksByRangeRequest { pub step: u64, } +impl OldBlocksByRangeRequest { + /// The default request is V2 + pub fn new(start_slot: u64, count: u64, step: u64) -> Self { + Self::V2(OldBlocksByRangeRequestV2 { + start_slot, + count, + step, + }) + } + + pub fn new_v1(start_slot: u64, count: u64, step: u64) -> Self { + Self::V1(OldBlocksByRangeRequestV1 { + start_slot, + count, + step, + }) + } +} + /// Request a number of beacon block bodies from a peer. +#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))] #[derive(Clone, Debug, PartialEq)] pub struct BlocksByRootRequest { /// The list of beacon block bodies being requested. pub block_roots: VariableList, } +impl BlocksByRootRequest { + pub fn new(block_roots: VariableList) -> Self { + Self::V2(BlocksByRootRequestV2 { block_roots }) + } + + pub fn new_v1(block_roots: VariableList) -> Self { + Self::V1(BlocksByRootRequestV1 { block_roots }) + } +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages @@ -438,7 +544,12 @@ impl std::fmt::Display for GoodbyeReason { impl std::fmt::Display for BlocksByRangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count) + write!( + f, + "Start Slot: {}, Count: {}", + self.start_slot(), + self.count() + ) } } @@ -447,7 +558,9 @@ impl std::fmt::Display for OldBlocksByRangeRequest { write!( f, "Start Slot: {}, Count: {}, Step: {}", - self.start_slot, self.count, self.step + self.start_slot(), + self.count(), + self.step() ) } } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 4f7af95cf..ffdc193bb 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -247,7 +247,7 @@ where } Err(RateLimitedErr::TooLarge) => { // we set the batch sizes, so this is a coding/config err for most protocols - let protocol = req.protocol(); + let protocol = req.versioned_protocol().protocol(); if matches!(protocol, Protocol::BlocksByRange) { debug!(self.log, "Blocks by range request will never be processed"; "request" => %req); } else { @@ -335,7 +335,7 @@ where serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?; let (msg_kind, protocol) = match &self.event { Ok(received) => match received { - RPCReceived::Request(_, req) => ("request", req.protocol()), + RPCReceived::Request(_, req) => ("request", req.versioned_protocol().protocol()), RPCReceived::Response(_, res) => ("response", res.protocol()), RPCReceived::EndOfStream(_, end) => ( "end_of_stream", diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 774303800..d12f36686 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -1,11 +1,8 @@ -use std::marker::PhantomData; - use super::methods::*; -use super::protocol::Protocol; use super::protocol::ProtocolId; +use super::protocol::SupportedProtocol; use super::RPCError; use crate::rpc::protocol::Encoding; -use crate::rpc::protocol::Version; use crate::rpc::{ codec::{base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec}, methods::ResponseTermination, @@ -38,9 +35,8 @@ pub enum OutboundRequest { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), - LightClientBootstrap(LightClientBootstrapRequest), Ping(Ping), - MetaData(PhantomData), + MetaData(MetadataRequest), } impl UpgradeInfo for OutboundRequestContainer { @@ -59,36 +55,29 @@ impl OutboundRequest { match self { // add more protocols when versions/encodings are supported OutboundRequest::Status(_) => vec![ProtocolId::new( - Protocol::Status, - Version::V1, + SupportedProtocol::StatusV1, Encoding::SSZSnappy, )], OutboundRequest::Goodbye(_) => vec![ProtocolId::new( - Protocol::Goodbye, - Version::V1, + SupportedProtocol::GoodbyeV1, Encoding::SSZSnappy, )], OutboundRequest::BlocksByRange(_) => vec![ - ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy), ], OutboundRequest::BlocksByRoot(_) => vec![ - ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy), ], OutboundRequest::Ping(_) => vec![ProtocolId::new( - Protocol::Ping, - Version::V1, + SupportedProtocol::PingV1, Encoding::SSZSnappy, )], OutboundRequest::MetaData(_) => vec![ - ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::MetaDataV2, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::MetaDataV1, Encoding::SSZSnappy), ], - // Note: This match arm is technically unreachable as we only respond to light client requests - // that we generate from the beacon state. - // We do not make light client rpc requests from the beacon node - OutboundRequest::LightClientBootstrap(_) => vec![], } } /* These functions are used in the handler for stream management */ @@ -98,24 +87,31 @@ impl OutboundRequest { match self { OutboundRequest::Status(_) => 1, OutboundRequest::Goodbye(_) => 0, - OutboundRequest::BlocksByRange(req) => req.count, - OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, + OutboundRequest::BlocksByRange(req) => *req.count(), + OutboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64, OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, - OutboundRequest::LightClientBootstrap(_) => 1, } } - /// Gives the corresponding `Protocol` to this request. - pub fn protocol(&self) -> Protocol { + /// Gives the corresponding `SupportedProtocol` to this request. + pub fn versioned_protocol(&self) -> SupportedProtocol { match self { - OutboundRequest::Status(_) => Protocol::Status, - OutboundRequest::Goodbye(_) => Protocol::Goodbye, - OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, - OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, - OutboundRequest::Ping(_) => Protocol::Ping, - OutboundRequest::MetaData(_) => Protocol::MetaData, - OutboundRequest::LightClientBootstrap(_) => Protocol::LightClientBootstrap, + OutboundRequest::Status(_) => SupportedProtocol::StatusV1, + OutboundRequest::Goodbye(_) => SupportedProtocol::GoodbyeV1, + OutboundRequest::BlocksByRange(req) => match req { + OldBlocksByRangeRequest::V1(_) => SupportedProtocol::BlocksByRangeV1, + OldBlocksByRangeRequest::V2(_) => SupportedProtocol::BlocksByRangeV2, + }, + OutboundRequest::BlocksByRoot(req) => match req { + BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, + BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, + }, + OutboundRequest::Ping(_) => SupportedProtocol::PingV1, + OutboundRequest::MetaData(req) => match req { + MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, + MetadataRequest::V2(_) => SupportedProtocol::MetaDataV2, + }, } } @@ -127,7 +123,6 @@ impl OutboundRequest { // variants that have `multiple_responses()` can have values. OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, - OutboundRequest::LightClientBootstrap(_) => unreachable!(), OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), @@ -185,9 +180,6 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), - OutboundRequest::LightClientBootstrap(bootstrap) => { - write!(f, "Lightclient Bootstrap: {}", bootstrap.root) - } } } } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index a8423e47b..ea39c1423 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -179,21 +179,74 @@ pub enum Protocol { LightClientBootstrap, } -/// RPC Versions -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Version { - /// Version 1 of RPC - V1, - /// Version 2 of RPC - V2, -} - /// RPC Encondings supported. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Encoding { SSZSnappy, } +/// All valid protocol name and version combinations. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum SupportedProtocol { + StatusV1, + GoodbyeV1, + BlocksByRangeV1, + BlocksByRangeV2, + BlocksByRootV1, + BlocksByRootV2, + PingV1, + MetaDataV1, + MetaDataV2, + LightClientBootstrapV1, +} + +impl SupportedProtocol { + pub fn version_string(&self) -> &'static str { + match self { + SupportedProtocol::StatusV1 => "1", + SupportedProtocol::GoodbyeV1 => "1", + SupportedProtocol::BlocksByRangeV1 => "1", + SupportedProtocol::BlocksByRangeV2 => "2", + SupportedProtocol::BlocksByRootV1 => "1", + SupportedProtocol::BlocksByRootV2 => "2", + SupportedProtocol::PingV1 => "1", + SupportedProtocol::MetaDataV1 => "1", + SupportedProtocol::MetaDataV2 => "2", + SupportedProtocol::LightClientBootstrapV1 => "1", + } + } + + pub fn protocol(&self) -> Protocol { + match self { + SupportedProtocol::StatusV1 => Protocol::Status, + SupportedProtocol::GoodbyeV1 => Protocol::Goodbye, + SupportedProtocol::BlocksByRangeV1 => Protocol::BlocksByRange, + SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange, + SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot, + SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, + SupportedProtocol::PingV1 => Protocol::Ping, + SupportedProtocol::MetaDataV1 => Protocol::MetaData, + SupportedProtocol::MetaDataV2 => Protocol::MetaData, + SupportedProtocol::LightClientBootstrapV1 => Protocol::LightClientBootstrap, + } + } + + fn currently_supported() -> Vec { + vec![ + ProtocolId::new(Self::StatusV1, Encoding::SSZSnappy), + ProtocolId::new(Self::GoodbyeV1, Encoding::SSZSnappy), + // V2 variants have higher preference then V1 + ProtocolId::new(Self::BlocksByRangeV2, Encoding::SSZSnappy), + ProtocolId::new(Self::BlocksByRangeV1, Encoding::SSZSnappy), + ProtocolId::new(Self::BlocksByRootV2, Encoding::SSZSnappy), + ProtocolId::new(Self::BlocksByRootV1, Encoding::SSZSnappy), + ProtocolId::new(Self::PingV1, Encoding::SSZSnappy), + ProtocolId::new(Self::MetaDataV2, Encoding::SSZSnappy), + ProtocolId::new(Self::MetaDataV1, Encoding::SSZSnappy), + ] + } +} + impl std::fmt::Display for Encoding { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let repr = match self { @@ -203,16 +256,6 @@ impl std::fmt::Display for Encoding { } } -impl std::fmt::Display for Version { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let repr = match self { - Version::V1 => "1", - Version::V2 => "2", - }; - f.write_str(repr) - } -} - #[derive(Debug, Clone)] pub struct RPCProtocol { pub fork_context: Arc, @@ -227,22 +270,10 @@ impl UpgradeInfo for RPCProtocol { /// The list of supported RPC protocols for Lighthouse. fn protocol_info(&self) -> Self::InfoIter { - let mut supported_protocols = vec![ - ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Goodbye, Version::V1, Encoding::SSZSnappy), - // V2 variants have higher preference then V1 - ProtocolId::new(Protocol::BlocksByRange, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRoot, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::Ping, Version::V1, Encoding::SSZSnappy), - ProtocolId::new(Protocol::MetaData, Version::V2, Encoding::SSZSnappy), - ProtocolId::new(Protocol::MetaData, Version::V1, Encoding::SSZSnappy), - ]; + let mut supported_protocols = SupportedProtocol::currently_supported(); if self.enable_light_client_server { supported_protocols.push(ProtocolId::new( - Protocol::LightClientBootstrap, - Version::V1, + SupportedProtocol::LightClientBootstrapV1, Encoding::SSZSnappy, )); } @@ -272,11 +303,8 @@ impl RpcLimits { /// Tracks the types in a protocol id. #[derive(Clone, Debug)] pub struct ProtocolId { - /// The RPC message type/name. - pub message_name: Protocol, - - /// The version of the RPC. - pub version: Version, + /// The protocol name and version + pub versioned_protocol: SupportedProtocol, /// The encoding of the RPC. pub encoding: Encoding, @@ -288,7 +316,7 @@ pub struct ProtocolId { impl ProtocolId { /// Returns min and max size for messages of given protocol id requests. pub fn rpc_request_limits(&self) -> RpcLimits { - match self.message_name { + match self.versioned_protocol.protocol() { Protocol::Status => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -297,9 +325,10 @@ impl ProtocolId { ::ssz_fixed_len(), ::ssz_fixed_len(), ), + // V1 and V2 requests are the same Protocol::BlocksByRange => RpcLimits::new( - ::ssz_fixed_len(), - ::ssz_fixed_len(), + ::ssz_fixed_len(), + ::ssz_fixed_len(), ), Protocol::BlocksByRoot => { RpcLimits::new(*BLOCKS_BY_ROOT_REQUEST_MIN, *BLOCKS_BY_ROOT_REQUEST_MAX) @@ -318,7 +347,7 @@ impl ProtocolId { /// Returns min and max size for messages of given protocol id responses. pub fn rpc_response_limits(&self, fork_context: &ForkContext) -> RpcLimits { - match self.message_name { + match self.versioned_protocol.protocol() { Protocol::Status => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -344,30 +373,34 @@ impl ProtocolId { /// Returns `true` if the given `ProtocolId` should expect `context_bytes` in the /// beginning of the stream, else returns `false`. pub fn has_context_bytes(&self) -> bool { - match self.message_name { - Protocol::BlocksByRange | Protocol::BlocksByRoot => match self.version { - Version::V2 => true, - Version::V1 => false, - }, - Protocol::LightClientBootstrap => match self.version { - Version::V2 | Version::V1 => true, - }, - Protocol::Goodbye | Protocol::Ping | Protocol::Status | Protocol::MetaData => false, + match self.versioned_protocol { + SupportedProtocol::BlocksByRangeV2 + | SupportedProtocol::BlocksByRootV2 + | SupportedProtocol::LightClientBootstrapV1 => true, + SupportedProtocol::StatusV1 + | SupportedProtocol::BlocksByRootV1 + | SupportedProtocol::BlocksByRangeV1 + | SupportedProtocol::PingV1 + | SupportedProtocol::MetaDataV1 + | SupportedProtocol::MetaDataV2 + | SupportedProtocol::GoodbyeV1 => false, } } } /// An RPC protocol ID. impl ProtocolId { - pub fn new(message_name: Protocol, version: Version, encoding: Encoding) -> Self { + pub fn new(versioned_protocol: SupportedProtocol, encoding: Encoding) -> Self { let protocol_id = format!( "{}/{}/{}/{}", - PROTOCOL_PREFIX, message_name, version, encoding + PROTOCOL_PREFIX, + versioned_protocol.protocol(), + versioned_protocol.version_string(), + encoding ); ProtocolId { - message_name, - version, + versioned_protocol, encoding, protocol_id, } @@ -400,7 +433,7 @@ where fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { async move { - let protocol_name = protocol.message_name; + let versioned_protocol = protocol.versioned_protocol; // convert the socket to tokio compatible socket let socket = socket.compat(); let codec = match protocol.encoding { @@ -419,8 +452,13 @@ where let socket = Framed::new(Box::pin(timed_socket), codec); // MetaData requests should be empty, return the stream - match protocol_name { - Protocol::MetaData => Ok((InboundRequest::MetaData(PhantomData), socket)), + match versioned_protocol { + SupportedProtocol::MetaDataV1 => { + Ok((InboundRequest::MetaData(MetadataRequest::new_v1()), socket)) + } + SupportedProtocol::MetaDataV2 => { + Ok((InboundRequest::MetaData(MetadataRequest::new_v2()), socket)) + } _ => { match tokio::time::timeout( Duration::from_secs(REQUEST_TIMEOUT), @@ -448,7 +486,7 @@ pub enum InboundRequest { BlocksByRoot(BlocksByRootRequest), LightClientBootstrap(LightClientBootstrapRequest), Ping(Ping), - MetaData(PhantomData), + MetaData(MetadataRequest), } /// Implements the encoding per supported protocol for `RPCRequest`. @@ -460,24 +498,33 @@ impl InboundRequest { match self { InboundRequest::Status(_) => 1, InboundRequest::Goodbye(_) => 0, - InboundRequest::BlocksByRange(req) => req.count, - InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, + InboundRequest::BlocksByRange(req) => *req.count(), + InboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64, InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, InboundRequest::LightClientBootstrap(_) => 1, } } - /// Gives the corresponding `Protocol` to this request. - pub fn protocol(&self) -> Protocol { + /// Gives the corresponding `SupportedProtocol` to this request. + pub fn versioned_protocol(&self) -> SupportedProtocol { match self { - InboundRequest::Status(_) => Protocol::Status, - InboundRequest::Goodbye(_) => Protocol::Goodbye, - InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, - InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, - InboundRequest::Ping(_) => Protocol::Ping, - InboundRequest::MetaData(_) => Protocol::MetaData, - InboundRequest::LightClientBootstrap(_) => Protocol::LightClientBootstrap, + InboundRequest::Status(_) => SupportedProtocol::StatusV1, + InboundRequest::Goodbye(_) => SupportedProtocol::GoodbyeV1, + InboundRequest::BlocksByRange(req) => match req { + OldBlocksByRangeRequest::V1(_) => SupportedProtocol::BlocksByRangeV1, + OldBlocksByRangeRequest::V2(_) => SupportedProtocol::BlocksByRangeV2, + }, + InboundRequest::BlocksByRoot(req) => match req { + BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, + BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, + }, + InboundRequest::Ping(_) => SupportedProtocol::PingV1, + InboundRequest::MetaData(req) => match req { + MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, + MetadataRequest::V2(_) => SupportedProtocol::MetaDataV2, + }, + InboundRequest::LightClientBootstrap(_) => SupportedProtocol::LightClientBootstrapV1, } } diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 1fdc6cce3..e1634d711 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -192,7 +192,7 @@ pub trait RateLimiterItem { impl RateLimiterItem for super::InboundRequest { fn protocol(&self) -> Protocol { - self.protocol() + self.versioned_protocol().protocol() } fn expected_responses(&self) -> u64 { @@ -202,7 +202,7 @@ impl RateLimiterItem for super::InboundRequest { impl RateLimiterItem for super::OutboundRequest { fn protocol(&self) -> Protocol { - self.protocol() + self.versioned_protocol().protocol() } fn expected_responses(&self) -> u64 { diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 6748a1947..626917d6a 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -72,7 +72,7 @@ impl SelfRateLimiter { request_id: Id, req: OutboundRequest, ) -> Result, Error> { - let protocol = req.protocol(); + let protocol = req.versioned_protocol().protocol(); // First check that there are not already other requests waiting to be sent. if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) { queued_requests.push_back(QueuedRequest { req, request_id }); @@ -111,7 +111,7 @@ impl SelfRateLimiter { event: RPCSend::Request(request_id, req), }), Err(e) => { - let protocol = req.protocol(); + let protocol = req.versioned_protocol(); match e { RateLimitedErr::TooLarge => { // this should never happen with default parameters. Let's just send the request. @@ -119,7 +119,7 @@ impl SelfRateLimiter { crit!( log, "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."; - "protocol" => %req.protocol() + "protocol" => %req.versioned_protocol().protocol() ); Ok(BehaviourAction::NotifyHandler { peer_id, @@ -128,7 +128,7 @@ impl SelfRateLimiter { }) } RateLimitedErr::TooSoon(wait_time) => { - debug!(log, "Self rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); + debug!(log, "Self rate limiting"; "protocol" => %protocol.protocol(), "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); Err((QueuedRequest { req, request_id }, wait_time)) } } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index bd3df7976..5ab89fee5 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -7,7 +7,8 @@ use types::{EthSpec, SignedBeaconBlock}; use crate::rpc::{ methods::{ BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, - OldBlocksByRangeRequest, RPCCodedResponse, RPCResponse, ResponseTermination, StatusMessage, + OldBlocksByRangeRequest, OldBlocksByRangeRequestV1, OldBlocksByRangeRequestV2, + RPCCodedResponse, RPCResponse, ResponseTermination, StatusMessage, }, OutboundRequest, SubstreamId, }; @@ -43,14 +44,25 @@ impl std::convert::From for OutboundRequest { fn from(req: Request) -> OutboundRequest { match req { Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r), - Request::BlocksByRange(BlocksByRangeRequest { start_slot, count }) => { - OutboundRequest::BlocksByRange(OldBlocksByRangeRequest { - start_slot, - count, - step: 1, - }) + Request::BlocksByRange(r) => match r { + BlocksByRangeRequest::V1(req) => OutboundRequest::BlocksByRange( + OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }), + ), + BlocksByRangeRequest::V2(req) => OutboundRequest::BlocksByRange( + OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }), + ), + }, + Request::LightClientBootstrap(_) => { + unreachable!("Lighthouse never makes an outbound light client request") } - Request::LightClientBootstrap(b) => OutboundRequest::LightClientBootstrap(b), Request::Status(s) => OutboundRequest::Status(s), } } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 34d5a5631..129a4da25 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -9,6 +9,7 @@ use crate::peer_manager::{ ConnectionDirection, PeerManager, PeerManagerEvent, }; use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS}; +use crate::rpc::methods::MetadataRequest; use crate::rpc::*; use crate::service::behaviour::BehaviourEvent; pub use crate::service::behaviour::Gossipsub; @@ -37,7 +38,6 @@ use slog::{crit, debug, info, o, trace, warn}; use std::path::PathBuf; use std::pin::Pin; use std::{ - marker::PhantomData, sync::Arc, task::{Context, Poll}, }; @@ -944,16 +944,25 @@ impl Network { /// Sends a METADATA request to a peer. fn send_meta_data_request(&mut self, peer_id: PeerId) { - let event = OutboundRequest::MetaData(PhantomData); + // We always prefer sending V2 requests + let event = OutboundRequest::MetaData(MetadataRequest::new_v2()); self.eth2_rpc_mut() .send_request(peer_id, RequestId::Internal, event); } /// Sends a METADATA response to a peer. - fn send_meta_data_response(&mut self, id: PeerRequestId, peer_id: PeerId) { - let event = RPCCodedResponse::Success(RPCResponse::MetaData( - self.network_globals.local_metadata.read().clone(), - )); + fn send_meta_data_response( + &mut self, + req: MetadataRequest, + id: PeerRequestId, + peer_id: PeerId, + ) { + let metadata = self.network_globals.local_metadata.read().clone(); + let metadata = match req { + MetadataRequest::V1(_) => metadata.metadata_v1(), + MetadataRequest::V2(_) => metadata, + }; + let event = RPCCodedResponse::Success(RPCResponse::MetaData(metadata)); self.eth2_rpc_mut().send_response(peer_id, id, event); } @@ -1196,9 +1205,9 @@ impl Network { self.pong(peer_request_id, peer_id); None } - InboundRequest::MetaData(_) => { + InboundRequest::MetaData(req) => { // send the requested meta-data - self.send_meta_data_response((handler_id, id), peer_id); + self.send_meta_data_response(req, (handler_id, id), peer_id); None } InboundRequest::Goodbye(reason) => { @@ -1225,13 +1234,9 @@ impl Network { Some(event) } InboundRequest::BlocksByRange(req) => { - let methods::OldBlocksByRangeRequest { - start_slot, - mut count, - step, - } = req; // Still disconnect the peer if the request is naughty. - if step == 0 { + let mut count = *req.count(); + if *req.step() == 0 { self.peer_manager_mut().handle_rpc_error( &peer_id, Protocol::BlocksByRange, @@ -1243,14 +1248,18 @@ impl Network { return None; } // return just one block in case the step parameter is used. https://github.com/ethereum/consensus-specs/pull/2856 - if step > 1 { + if *req.step() > 1 { count = 1; } - let event = self.build_request( - peer_request_id, - peer_id, - Request::BlocksByRange(BlocksByRangeRequest { start_slot, count }), - ); + let request = match req { + methods::OldBlocksByRangeRequest::V1(req) => Request::BlocksByRange( + BlocksByRangeRequest::new_v1(req.start_slot, count), + ), + methods::OldBlocksByRangeRequest::V2(req) => Request::BlocksByRange( + BlocksByRangeRequest::new(req.start_slot, count), + ), + }; + let event = self.build_request(peer_request_id, peer_id, request); Some(event) } InboundRequest::BlocksByRoot(req) => { diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 625df65ee..ac0dc57d7 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -272,9 +272,11 @@ pub(crate) fn save_metadata_to_disk( log: &slog::Logger, ) { let _ = std::fs::create_dir_all(dir); - match File::create(dir.join(METADATA_FILENAME)) - .and_then(|mut f| f.write_all(&metadata.as_ssz_bytes())) - { + let metadata_bytes = match metadata { + MetaData::V1(md) => md.as_ssz_bytes(), + MetaData::V2(md) => md.as_ssz_bytes(), + }; + match File::create(dir.join(METADATA_FILENAME)).and_then(|mut f| f.write_all(&metadata_bytes)) { Ok(_) => { debug!(log, "Metadata written to disk"); } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index ebdbb6742..656df0c4a 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -155,10 +155,7 @@ fn test_blocks_by_range_chunked_rpc() { common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request - let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { - start_slot: 0, - count: messages_to_send, - }); + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); let spec = E::default_spec(); @@ -282,10 +279,7 @@ fn test_blocks_by_range_over_limit() { common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request - let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { - start_slot: 0, - count: messages_to_send, - }); + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); // BlocksByRange Response let full_block = merge_block_large(&common::fork_context(ForkName::Merge)); @@ -367,10 +361,7 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request - let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { - start_slot: 0, - count: messages_to_send, - }); + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, messages_to_send)); // BlocksByRange Response let spec = E::default_spec(); @@ -490,10 +481,7 @@ fn test_blocks_by_range_single_empty_rpc() { common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request - let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { - start_slot: 0, - count: 10, - }); + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest::new(0, 10)); // BlocksByRange Response let spec = E::default_spec(); @@ -594,16 +582,15 @@ fn test_blocks_by_root_chunked_rpc() { common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRoot Request - let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from(vec![ + let rpc_request = + Request::BlocksByRoot(BlocksByRootRequest::new(VariableList::from(vec![ Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), - ]), - }); + ]))); // BlocksByRoot Response let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&spec)); @@ -722,8 +709,8 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRoot Request - let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from(vec![ + let rpc_request = + Request::BlocksByRoot(BlocksByRootRequest::new(VariableList::from(vec![ Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), @@ -734,8 +721,7 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), - ]), - }); + ]))); // BlocksByRoot Response let full_block = BeaconBlock::Base(BeaconBlockBase::::full(&spec)); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 81b163bf7..83baa0417 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -131,10 +131,10 @@ impl Worker { request_id: PeerRequestId, request: BlocksByRootRequest, ) { - let requested_blocks = request.block_roots.len(); + let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain - .get_blocks_checking_early_attester_cache(request.block_roots.into(), &executor) + .get_blocks_checking_early_attester_cache(request.block_roots().to_vec(), &executor) { Ok(block_stream) => block_stream, Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), @@ -292,18 +292,18 @@ impl Worker { ) { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, + "count" => req.count(), + "start_slot" => req.start_slot(), ); // Should not send more than max request blocks - if req.count > MAX_REQUEST_BLOCKS { - req.count = MAX_REQUEST_BLOCKS; + if *req.count() > MAX_REQUEST_BLOCKS { + *req.count_mut() = MAX_REQUEST_BLOCKS; } let forwards_block_root_iter = match self .chain - .forwards_iter_block_roots(Slot::from(req.start_slot)) + .forwards_iter_block_roots(Slot::from(*req.start_slot())) { Ok(iter) => iter, Err(BeaconChainError::HistoricalBlockError( @@ -326,18 +326,20 @@ impl Worker { // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = None; let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { - iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) - // map skip slots to None - .map(|(root, _)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .collect::>>() + iter.take_while(|(_, slot)| { + slot.as_u64() < req.start_slot().saturating_add(*req.count()) + }) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() }); let block_roots = match maybe_block_roots { @@ -364,8 +366,8 @@ impl Worker { Ok(Some(block)) => { // Due to skip slots, blocks could be out of the range, we ensure they // are in the range before sending - if block.slot() >= req.start_slot - && block.slot() < req.start_slot + req.count + if block.slot() >= *req.start_slot() + && block.slot() < req.start_slot() + req.count() { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { @@ -440,15 +442,15 @@ impl Worker { .slot() .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - if blocks_sent < (req.count as usize) { + if blocks_sent < (*req.count() as usize) { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, + "start_slot" => req.start_slot(), "current_slot" => current_slot, - "requested" => req.count, + "requested" => req.count(), "returned" => blocks_sent ); } else { @@ -456,9 +458,9 @@ impl Worker { self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot, + "start_slot" => req.start_slot(), "current_slot" => current_slot, - "requested" => req.count, + "requested" => req.count(), "returned" => blocks_sent ); } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 256a2b429..62ca68e7b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -156,9 +156,7 @@ impl SingleBlockRequest { cannot_process: self.failed_processing >= self.failed_downloading, }) } else if let Some(&peer_id) = self.available_peers.iter().choose(&mut rand::thread_rng()) { - let request = BlocksByRootRequest { - block_roots: VariableList::from(vec![self.hash]), - }; + let request = BlocksByRootRequest::new(VariableList::from(vec![self.hash])); self.state = State::Downloading { peer_id }; self.used_peers.insert(peer_id); Ok((peer_id, request)) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index c81fed244..23d42002f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -112,7 +112,7 @@ impl SyncNetworkContext { self.log, "Sending BlocksByRange Request"; "method" => "BlocksByRange", - "count" => request.count, + "count" => request.count(), "peer" => %peer_id, ); let request = Request::BlocksByRange(request); @@ -138,7 +138,7 @@ impl SyncNetworkContext { self.log, "Sending backfill BlocksByRange Request"; "method" => "BlocksByRange", - "count" => request.count, + "count" => request.count(), "peer" => %peer_id, ); let request = Request::BlocksByRange(request); @@ -185,7 +185,7 @@ impl SyncNetworkContext { self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", - "count" => request.block_roots.len(), + "count" => request.block_roots().len(), "peer" => %peer_id ); let request = Request::BlocksByRoot(request); @@ -209,7 +209,7 @@ impl SyncNetworkContext { self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", - "count" => request.block_roots.len(), + "count" => request.block_roots().len(), "peer" => %peer_id ); let request = Request::BlocksByRoot(request); diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 3eee7223d..723ea9b59 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -202,10 +202,10 @@ impl BatchInfo { /// Returns a BlocksByRange request associated with the batch. pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { - BlocksByRangeRequest { - start_slot: self.start_slot.into(), - count: self.end_slot.sub(self.start_slot).into(), - } + BlocksByRangeRequest::new( + self.start_slot.into(), + self.end_slot.sub(self.start_slot).into(), + ) } /// After different operations over a batch, this could be in a state that allows it to