diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 82257cc32..d75e53e6c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,6 +1,7 @@ use super::methods::*; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use ssz::{impl_decode_via_from, impl_encode_via_from, ssz_encode, Decode, Encode}; +use ssz_derive::{Decode, Encode}; use std::hash::{Hash, Hasher}; use std::io; use std::iter; @@ -31,7 +32,7 @@ impl Default for RPCProtocol { } /// A monotonic counter for ordering `RPCRequest`s. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Copy, Default)] pub struct RequestId(u64); impl RequestId { @@ -115,65 +116,67 @@ where } } +/// A helper structed used to obtain SSZ serialization for RPC messages. +#[derive(Encode, Decode, Default)] +struct SszContainer { + /// Note: the `is_request` field is not included in the spec. + /// + /// We are unable to determine a request from a response unless we add some flag to the + /// packet. Here we have added a bool (encoded as 1 byte) which is set to `1` if the + /// message is a request. + is_request: bool, + id: u64, + other: u16, + bytes: Vec, +} + // NOTE! // // This code has not been tested, it is a placeholder until we can update to the new libp2p // spec. fn decode(packet: Vec) -> Result { - let mut builder = ssz::SszDecoderBuilder::new(&packet); + let msg = SszContainer::from_ssz_bytes(&packet)?; - builder.register_type::()?; - builder.register_type::()?; - builder.register_type::()?; - builder.register_type::>()?; - - let mut decoder = builder.build()?; - - let request: bool = decoder.decode_next()?; - let id: RequestId = decoder.decode_next()?; - let method_id: u16 = decoder.decode_next()?; - let bytes: Vec = decoder.decode_next()?; - - if request { - let body = match RPCMethod::from(method_id) { - RPCMethod::Hello => RPCRequest::Hello(HelloMessage::from_ssz_bytes(&bytes)?), - RPCMethod::Goodbye => RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(&bytes)?), + if msg.is_request { + let body = match RPCMethod::from(msg.other) { + RPCMethod::Hello => RPCRequest::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?), + RPCMethod::Goodbye => RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(&msg.bytes)?), RPCMethod::BeaconBlockRoots => { - RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest::from_ssz_bytes(&bytes)?) - } - RPCMethod::BeaconBlockHeaders => { - RPCRequest::BeaconBlockHeaders(BeaconBlockHeadersRequest::from_ssz_bytes(&bytes)?) + RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest::from_ssz_bytes(&msg.bytes)?) } + RPCMethod::BeaconBlockHeaders => RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&msg.bytes)?, + ), RPCMethod::BeaconBlockBodies => { - RPCRequest::BeaconBlockBodies(BeaconBlockBodiesRequest::from_ssz_bytes(&bytes)?) + RPCRequest::BeaconBlockBodies(BeaconBlockBodiesRequest::from_ssz_bytes(&msg.bytes)?) } RPCMethod::BeaconChainState => { - RPCRequest::BeaconChainState(BeaconChainStateRequest::from_ssz_bytes(&bytes)?) + RPCRequest::BeaconChainState(BeaconChainStateRequest::from_ssz_bytes(&msg.bytes)?) } RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Request { - id, - method_id, + id: RequestId::from(msg.id), + method_id: msg.other, body, }) } // we have received a response else { - let result = match RPCMethod::from(method_id) { - RPCMethod::Hello => RPCResponse::Hello(HelloMessage::from_ssz_bytes(&bytes)?), + let result = match RPCMethod::from(msg.other) { + RPCMethod::Hello => RPCResponse::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?), RPCMethod::BeaconBlockRoots => { - RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse::from_ssz_bytes(&bytes)?) - } - RPCMethod::BeaconBlockHeaders => { - RPCResponse::BeaconBlockHeaders(BeaconBlockHeadersResponse::from_ssz_bytes(&bytes)?) - } - RPCMethod::BeaconBlockBodies => { - RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse::from_ssz_bytes(&packet)?) + RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse::from_ssz_bytes(&msg.bytes)?) } + RPCMethod::BeaconBlockHeaders => RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse::from_ssz_bytes(&msg.bytes)?, + ), + RPCMethod::BeaconBlockBodies => RPCResponse::BeaconBlockBodies( + BeaconBlockBodiesResponse::from_ssz_bytes(&msg.bytes)?, + ), RPCMethod::BeaconChainState => { - RPCResponse::BeaconChainState(BeaconChainStateResponse::from_ssz_bytes(&packet)?) + RPCResponse::BeaconChainState(BeaconChainStateResponse::from_ssz_bytes(&msg.bytes)?) } // We should never receive a goodbye response; it is invalid. RPCMethod::Goodbye => return Err(DecodeError::UnknownRPCMethod), @@ -181,8 +184,8 @@ fn decode(packet: Vec) -> Result { }; Ok(RPCEvent::Response { - id, - method_id, + id: RequestId::from(msg.id), + method_id: msg.other, result, }) } @@ -208,80 +211,44 @@ impl Encode for RPCEvent { false } - // NOTE! - // - // This code has not been tested, it is a placeholder until we can update to the new libp2p - // spec. fn ssz_append(&self, buf: &mut Vec) { - let offset = ::ssz_fixed_len() - + ::ssz_fixed_len() - + as Encode>::ssz_fixed_len(); - - let mut encoder = ssz::SszEncoder::container(buf, offset); - - match self { + let container = match self { RPCEvent::Request { id, method_id, body, - } => { - encoder.append(&true); - encoder.append(id); - encoder.append(method_id); - - // Encode the `body` as a `Vec`. - match body { - RPCRequest::Hello(body) => { - encoder.append(&body.as_ssz_bytes()); - } - RPCRequest::Goodbye(body) => { - encoder.append(&body.as_ssz_bytes()); - } - RPCRequest::BeaconBlockRoots(body) => { - encoder.append(&body.as_ssz_bytes()); - } - RPCRequest::BeaconBlockHeaders(body) => { - encoder.append(&body.as_ssz_bytes()); - } - RPCRequest::BeaconBlockBodies(body) => { - encoder.append(&body.as_ssz_bytes()); - } - RPCRequest::BeaconChainState(body) => { - encoder.append(&body.as_ssz_bytes()); - } - } - } + } => SszContainer { + is_request: true, + id: (*id).into(), + other: (*method_id).into(), + bytes: match body { + RPCRequest::Hello(body) => body.as_ssz_bytes(), + RPCRequest::Goodbye(body) => body.as_ssz_bytes(), + RPCRequest::BeaconBlockRoots(body) => body.as_ssz_bytes(), + RPCRequest::BeaconBlockHeaders(body) => body.as_ssz_bytes(), + RPCRequest::BeaconBlockBodies(body) => body.as_ssz_bytes(), + RPCRequest::BeaconChainState(body) => body.as_ssz_bytes(), + }, + }, RPCEvent::Response { id, method_id, result, - } => { - encoder.append(&true); - encoder.append(id); - encoder.append(method_id); + } => SszContainer { + is_request: false, + id: (*id).into(), + other: (*method_id).into(), + bytes: match result { + RPCResponse::Hello(response) => response.as_ssz_bytes(), + RPCResponse::BeaconBlockRoots(response) => response.as_ssz_bytes(), + RPCResponse::BeaconBlockHeaders(response) => response.as_ssz_bytes(), + RPCResponse::BeaconBlockBodies(response) => response.as_ssz_bytes(), + RPCResponse::BeaconChainState(response) => response.as_ssz_bytes(), + }, + }, + }; - match result { - RPCResponse::Hello(response) => { - encoder.append(&response.as_ssz_bytes()); - } - RPCResponse::BeaconBlockRoots(response) => { - encoder.append(&response.as_ssz_bytes()); - } - RPCResponse::BeaconBlockHeaders(response) => { - encoder.append(&response.as_ssz_bytes()); - } - RPCResponse::BeaconBlockBodies(response) => { - encoder.append(&response.as_ssz_bytes()); - } - RPCResponse::BeaconChainState(response) => { - encoder.append(&response.as_ssz_bytes()); - } - } - } - } - - // Finalize the encoder, writing to `buf`. - encoder.finalize(); + container.ssz_append(buf) } }