From cda61c1577c0e937ae1a53684993bccb5dbbbbca Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 5 Jul 2019 18:59:53 +1000 Subject: [PATCH] Adds basic inbound/outbound upgrades for eth2 rpc --- beacon_node/eth2-libp2p/src/rpc/methods.rs | 132 +++-- beacon_node/eth2-libp2p/src/rpc/mod.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 530 +++++++++++++------- 3 files changed, 403 insertions(+), 261 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 76f62f23a..dfc3121c1 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -2,39 +2,7 @@ use ssz::{impl_decode_via_from, impl_encode_via_from}; use ssz_derive::{Decode, Encode}; -use types::{BeaconBlockBody, BeaconBlockHeader, Epoch, Hash256, Slot}; - -#[derive(Debug)] -/// Available Serenity Libp2p RPC methods -pub enum RPCMethod { - /// Initialise handshake between connecting peers. - Hello, - /// Terminate a connection providing a reason. - Goodbye, - /// Requests a number of beacon block roots. - BeaconBlockRoots, - /// Requests a number of beacon block headers. - BeaconBlockHeaders, - /// Requests a number of beacon block bodies. - BeaconBlockBodies, - /// Requests values for a merkle proof for the current blocks state root. - BeaconChainState, // Note: experimental, not complete. - /// Unknown method received. - Unknown, -} - -pub enum RawRPCRequest - - -#[derive(Debug, Clone)] -pub enum RPCRequest { - Hello(HelloMessage), - Goodbye(GoodbyeReason), - BeaconBlockRoots(BeaconBlockRootsRequest), - BeaconBlockHeaders(BeaconBlockHeadersRequest), - BeaconBlockBodies(BeaconBlockBodiesRequest), - BeaconChainState(BeaconChainStateRequest), -} +use types::{Epoch, Hash256, Slot}; #[derive(Debug, Clone)] pub enum RPCResponse { @@ -45,19 +13,35 @@ pub enum RPCResponse { BeaconChainState(BeaconChainStateResponse), } +pub enum ResponseCode { + Success = 0, + EncodingError = 1, + InvalidRequest = 2, + ServerError = 3, +} + /* Request/Response data structures for RPC methods */ +/* Requests */ + /// The HELLO request/response handshake message. #[derive(Encode, Decode, Clone, Debug)] pub struct HelloMessage { /// The network ID of the peer. pub network_id: u8, + + /// The chain id for the HELLO request. + pub chain_id: u64, + /// The peers last finalized root. pub latest_finalized_root: Hash256, + /// The peers last finalized epoch. pub latest_finalized_epoch: Epoch, + /// The peers last block root. pub best_root: Hash256, + /// The peers last slot. pub best_slot: Slot, } @@ -68,43 +52,40 @@ pub struct HelloMessage { /// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then /// re-serializing may not return the same bytes. #[derive(Debug, Clone)] -pub enum GoodbyeReason { - ClientShutdown, - IrreleventNetwork, - Fault, - Unknown, +pub enum Goodbye { + /// This node has shutdown. + ClientShutdown = 1, + + /// Incompatible networks. + IrreleventNetwork = 2, + + /// Error/fault in the RPC. + Fault = 3, + + /// Unknown reason. + Unknown = 0, } -impl From for GoodbyeReason { - fn from(id: u64) -> GoodbyeReason { +impl From for Goodbye { + fn from(id: u64) -> Goodbye { match id { - 1 => GoodbyeReason::ClientShutdown, - 2 => GoodbyeReason::IrreleventNetwork, - 3 => GoodbyeReason::Fault, - _ => GoodbyeReason::Unknown, + 1 => Goodbye::ClientShutdown, + 2 => Goodbye::IrreleventNetwork, + 3 => Goodbye::Fault, + _ => Goodbye::Unknown, } } } -impl Into for GoodbyeReason { - fn into(self) -> u64 { - match self { - GoodbyeReason::Unknown => 0, - GoodbyeReason::ClientShutdown => 1, - GoodbyeReason::IrreleventNetwork => 2, - GoodbyeReason::Fault => 3, - } - } -} - -impl_encode_via_from!(GoodbyeReason, u64); -impl_decode_via_from!(GoodbyeReason, u64); +impl_encode_via_from!(Goodbye, u64); +impl_decode_via_from!(Goodbye, u64); /// Request a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockRootsRequest { /// The starting slot of the requested blocks. pub start_slot: Slot, + /// The number of blocks from the start slot. pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers } @@ -116,6 +97,17 @@ pub struct BeaconBlockRootsResponse { pub roots: Vec, } +/// Contains a block root and associated slot. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct BlockRootSlot { + /// The block root. + pub block_root: Hash256, + + /// The block slot. + pub slot: Slot, +} + +/// The response of a beacl block roots request. impl BeaconBlockRootsResponse { /// Returns `true` if each `self.roots.slot[i]` is higher than the preceding `i`. pub fn slots_are_ascending(&self) -> bool { @@ -129,33 +121,27 @@ impl BeaconBlockRootsResponse { } } -/// Contains a block root and associated slot. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BlockRootSlot { - /// The block root. - pub block_root: Hash256, - /// The block slot. - pub slot: Slot, -} - /// Request a number of beacon block headers from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconBlockHeadersRequest { /// The starting header hash of the requested headers. pub start_root: Hash256, + /// The starting slot of the requested headers. pub start_slot: Slot, + /// The maximum number of headers than can be returned. pub max_headers: u64, + /// The maximum number of slots to skip between blocks. pub skip_slots: u64, } /// Response containing requested block headers. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct BeaconBlockHeadersResponse { - /// The list of requested beacon block headers. - pub headers: Vec, + /// The list of ssz-encoded requested beacon block headers. + pub headers: Vec, } /// Request a number of beacon block bodies from a peer. @@ -166,10 +152,10 @@ pub struct BeaconBlockBodiesRequest { } /// Response containing the list of requested beacon block bodies. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct BeaconBlockBodiesResponse { - /// The list of beacon block bodies being requested. - pub block_bodies: Vec, + /// The list of ssz-encoded beacon block bodies being requested. + pub block_bodies: Vec, } /// Request values for tree hashes which yield a blocks `state_root`. @@ -184,5 +170,5 @@ pub struct BeaconChainStateRequest { #[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct BeaconChainStateResponse { /// The values corresponding the to the requested tree hashes. - pub values: bool, //TBD - stubbed with encodeable bool + pub values: bool, //TBD - stubbed with encodable bool } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 59015a86e..0ccb6c1e3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -19,7 +19,7 @@ use slog::o; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; -/// This struct implements the libp2p `NetworkBehaviour` trait and therefore manages network-level +/// Rpc implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct Rpc { /// Queue of events to processed. diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 136b9cc26..e0b4ca1a4 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,14 +1,15 @@ use super::methods::*; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use ssz::{impl_decode_via_from, impl_encode_via_from, ssz_encode, Decode, Encode}; -use ssz_derive::{Decode, Encode}; -use std::hash::{Hash, Hasher}; +use ssz::{Decode, Encode}; +use std::hash::Hasher; use std::io; use std::iter; use tokio::io::{AsyncRead, AsyncWrite}; /// The maximum bytes that can be sent across the RPC. -const MAX_READ_SIZE: usize = 4_194_304; // 4M +const MAX_RPC_SIZE: usize = 4_194_304; // 4M +/// The protocol prefix the RPC protocol id. +const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/"; /// Implementation of the `ConnectionUpgrade` for the rpc protocol. #[derive(Debug, Clone)] @@ -16,65 +17,195 @@ pub struct RPCProtocol; impl UpgradeInfo for RPCProtocol { type Info = &'static [u8]; - type InfoIter = iter::Iter; + type InfoIter = Vec; fn protocol_info(&self) -> Self::InfoIter { - vec![b"/eth/serenity/rpc/hello/1/ssz", - b"/eth/serenity/rpc/goodbye/1/ssz", - b"/eth/serenity/rpc/beacon_block_roots/1/ssz", - b"/eth/serenity/rpc/beacon_block_headers/1/ssz", - b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", - b"/eth/serenity/rpc/beacon_chain_state/1/ssz"].into_iter() + vec![ + b"/eth/serenity/rpc/hello/1/ssz", + b"/eth/serenity/rpc/goodbye/1/ssz", + b"/eth/serenity/rpc/beacon_block_roots/1/ssz", + b"/eth/serenity/rpc/beacon_block_headers/1/ssz", + b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", + b"/eth/serenity/rpc/beacon_chain_state/1/ssz", + ] } } /// The outbound RPC type as well as the return type used in the behaviour. #[derive(Debug, Clone)] pub enum RPCEvent { - Request ( RPCRequest ), - Response ( RPCResponse ), + Request(RPCRequest), + Response(RPCResponse), } -// outbound protocol supports the same as the inbound. -impl UpgradeInfo for RPCEvent { - type Info = &'static [u8]; - type InfoIter = iter::Iter; - - fn protocol_info(&self) -> Self::InfoIter { - vec![b"/eth/serenity/rpc/hello/1/ssz", - b"/eth/serenity/rpc/goodbye/1/ssz", - b"/eth/serenity/rpc/beacon_block_roots/1/ssz", - b"/eth/serenity/rpc/beacon_block_headers/1/ssz", - b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", - b"/eth/serenity/rpc/beacon_chain_state/1/ssz"].into_iter() - } -} - -/* Inbound upgrade */ +/* Inbound upgrade */ // The inbound protocol reads the request, decodes it and returns the stream to the protocol // handler to respond to once ready. -type FnDecodeRPCEvent = fn(upgrade::Negotiated, Vec, ()) -> Result<(upgrade::Negotiated,RPCEvent), DecodeError>; +type FnDecodeRPCEvent = fn( + upgrade::Negotiated, + Vec, + (), +) -> Result<(upgrade::Negotiated, RPCEvent), RPCError>; impl InboundUpgrade for RPCProtocol where TSocket: AsyncRead + AsyncWrite, { - type Socket = upgrade::Negotiated, - type Output = (Self::Socket, RPCEvent), - type Error = DecodeError; - type Future = upgrade::ReadRespond; + type Output = (upgrade::Negotiated, RPCEvent); + type Error = RPCError; + type Future = upgrade::ReadRespond, (), FnDecodeRPCEvent>; - fn upgrade_inbound(self, socket: upgrade::Negotiated, protocol: Self::Info) -> Self::Future { - upgrade::read_respond(socket, MAX_READ_SIZE, (), |socket, packet, ()| Ok((socket, decode_request(packet, protocol)?))) + fn upgrade_inbound( + self, + socket: upgrade::Negotiated, + protocol: Self::Info, + ) -> Self::Future { + upgrade::read_respond(socket, MAX_RPC_SIZE, (), |socket, packet, ()| { + Ok((socket, decode_request(packet, protocol)?)) + }) } } +/* Outbound request */ -/* Outbound upgrade */ +// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and +// `OutboundUpgrade` -impl OutboundUpgrade for RPCEvent +/// The raw protocol id sent over the wire. +type RawProtocolId = Vec; + +/// Tracks the types in a protocol id. +pub struct ProtocolId { + /// The rpc message type/name. + pub message_name: String, + + /// The version of the RPC. + pub version: usize, + + /// The encoding of the RPC. + pub encoding: String, +} + +/// An RPC protocol ID. +impl ProtocolId { + pub fn new(message_name: String, version: usize, encoding: String) -> Self { + ProtocolId { + message_name, + version, + encoding, + } + } + + /// Converts a raw RPC protocol id string into an `RPCProtocolId` + pub fn from_bytes(bytes: Vec) -> Result { + let protocol_string = String::from_utf8(bytes.as_vec()) + .map_err(|_| RPCError::InvalidProtocol("Invalid protocol Id"))?; + let protocol_string = protocol_string.as_str().split('/'); + + Ok(ProtocolId { + message_name: protocol_string[3], + version: protocol_string[4], + encoding: protocol_string[5], + }) + } +} + +impl Into for ProtocolId { + fn into(&self) -> [u8] { + &format!( + "{}/{}/{}/{}", + PROTOCOL_PREFIX, self.message_name, self.version, self.encoding + ) + .as_bytes() + } +} + +#[derive(Debug, Clone)] +pub enum RPCRequest { + Hello(HelloMessage), + Goodbye(Goodbye), + BeaconBlockRoots(BeaconBlockRootsRequest), + BeaconBlockHeaders(BeaconBlockHeadersRequest), + BeaconBlockBodies(BeaconBlockBodiesRequest), + BeaconChainState(BeaconChainStateRequest), +} + +impl UpgradeInfo for RPCRequest { + type Info = RawProtocolId; + type InfoIter = Vec; + + // add further protocols as we support more encodings/versions + fn protocol_info(&self) -> Self::InfoIter { + self.supported_protocols() + } +} + +// GOODBYE RPC has it's own upgrade as it doesn't expect a response +impl UpgradeInfo for Goodbye { + type Info = RawProtocolId; + type InfoIter = iter::Once; + + // add further protocols as we support more encodings/versions + fn protocol_info(&self) -> Self::InfoIter { + iter::once(ProtocolId::new("goodbye", 1, "ssz").into()) + } +} + +/// Implements the encoding per supported protocol for RPCRequest. +impl RPCRequest { + pub fn supported_protocols(&self) -> Vec { + match self { + // add more protocols when versions/encodings are supported + RPCRequest::Hello(_) => vec![ProtocolId::new("hello", 1, "ssz").into()], + RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", 1, "ssz").into()], + RPCRequest::BeaconBlockRoots(_) => { + vec![ProtocolId::new("beacon_block_roots", 1, "ssz").into()] + } + RPCRequest::BeaconBlockHeaders(_) => { + vec![ProtocolId::new("beacon_block_headers", 1, "ssz").into()] + } + RPCRequest::BeaconBlockBodies(_) => { + vec![ProtocolId::new("beacon_block_bodies", 1, "ssz").into()] + } + RPCRequest::BeaconBlockState(_) => { + vec![ProtocolId::new("beacon_block_state", 1, "ssz").into()] + } + } + } + + /// Encodes the Request object based on the negotiated protocol. + pub fn encode(&self, protocol: RawProtocolId) -> Result, io::Error> { + // Assume select has given a supported protocol. + let protocol = ProtocolId::from_bytes(protocol)?; + // Match on the encoding and in the future, the version + match protocol.encoding { + "ssz" => Ok(self.ssz_encode()), + _ => { + return Err(RPCError::Custom(format!( + "Unknown Encoding: {}", + protocol.encoding + ))) + } + } + } + + fn ssz_encode(&self) { + match self { + RPCRequest::Hello(req) => req.as_ssz_bytes(), + RPCRequest::Goodbye(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(), + RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(), + } + } +} + +/* Outbound upgrades */ + +impl OutboundUpgrade for RPCRequest where TSocket: AsyncWrite, { @@ -82,177 +213,202 @@ where type Error = io::Error; type Future = upgrade::WriteOne>; - #[inline] - fn upgrade_outbound(self, socket: upgrade::Negotiated, protocol: Self::Info) -> Self::Future { - let bytes = ssz_encode(&self); - upgrade::request_response(socket, + fn upgrade_outbound( + self, + socket: upgrade::Negotiated, + protocol: Self::Info, + ) -> Self::Future { + let bytes = self.encode(protocol); + upgrade::request_response(socket, bytes, MAX_RPC_SIZE, protocol, |packet, protocol| { + Ok(decode_response(packet, protocol)?) + }) + } +} + +impl OutboundUpgrade for Goodbye +where + TSocket: AsyncWrite, +{ + type Output = (); + type Error = io::Error; + type Future = upgrade::WriteOne>; + + fn upgrade_outbound( + self, + socket: upgrade::Negotiated, + protocol: Self::Info, + ) -> Self::Future { + let bytes = self.as_ssz_bytes(); upgrade::write_one(socket, bytes) } } -// This function can be extended to provide further logic for supporting various protocol versions/encoding -fn decode_request(packet: Vec, protocol: &'static[u8]) { - match protocol { - b"/eth/serenity/rpc/hello/1/ssz" => { } - b"/eth/serenity/rpc/goodbye/1/ssz", - b"/eth/serenity/rpc/beacon_block_roots/1/ssz", - b"/eth/serenity/rpc/beacon_block_headers/1/ssz", - b"/eth/serenity/rpc/beacon_block_bodies/1/ssz", - b"/eth/serenity/rpc/beacon_chain_state/1/ssz", - _=> { // Other protocols are not supported. - return Err(DecodeError::UnknownProtocol); - } - } -} +/* Decoding for Requests/Responses */ +// This function can be extended to provide further logic for supporting various protocol versions/encoding +fn decode_request(packet: Vec, protocol: ProtocolId) -> Result { + let protocol_id = ProtocolId::from_bytes(protocol); - - - - - - - - - - - - -/// A helper struct used to obtain SSZ serialization for RPC messages. -#[derive(Encode, Decode, Default)] -struct SszContainer { - /// Note: the `is_request` field is not included in the spec. - /// - /// We are unable to determine a request from a response unless we add some flag to the - /// packet. Here we have added a bool (encoded as 1 byte) which is set to `1` if the - /// message is a request. - is_request: bool, - id: u64, - other: u16, - bytes: Vec, -} - -fn decode(packet: Vec) -> Result { - let msg = SszContainer::from_ssz_bytes(&packet)?; - - if msg.is_request { - let body = match RPCMethod::from(msg.other) { - RPCMethod::Hello => RPCRequest::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?), - RPCMethod::Goodbye => RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes(&msg.bytes)?), - RPCMethod::BeaconBlockRoots => { - RPCRequest::BeaconBlockRoots(BeaconBlockRootsRequest::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::BeaconBlockHeaders => RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&msg.bytes)?, - ), - RPCMethod::BeaconBlockBodies => { - RPCRequest::BeaconBlockBodies(BeaconBlockBodiesRequest::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::BeaconChainState => { - RPCRequest::BeaconChainState(BeaconChainStateRequest::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), - }; - - Ok(RPCEvent::Request { - id: RequestId::from(msg.id), - method_id: msg.other, - body, - }) - } - // we have received a response - else { - let result = match RPCMethod::from(msg.other) { - RPCMethod::Hello => RPCResponse::Hello(HelloMessage::from_ssz_bytes(&msg.bytes)?), - RPCMethod::BeaconBlockRoots => { - RPCResponse::BeaconBlockRoots(BeaconBlockRootsResponse::from_ssz_bytes(&msg.bytes)?) - } - RPCMethod::BeaconBlockHeaders => RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse::from_ssz_bytes(&msg.bytes)?, - ), - RPCMethod::BeaconBlockBodies => RPCResponse::BeaconBlockBodies( - BeaconBlockBodiesResponse::from_ssz_bytes(&msg.bytes)?, - ), - RPCMethod::BeaconChainState => { - RPCResponse::BeaconChainState(BeaconChainStateResponse::from_ssz_bytes(&msg.bytes)?) - } - // We should never receive a goodbye response; it is invalid. - RPCMethod::Goodbye => return Err(DecodeError::UnknownRPCMethod), - RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), - }; - - Ok(RPCEvent::Response { - id: RequestId::from(msg.id), - method_id: msg.other, - result, - }) - } -} - - -impl Encode for RPCEvent { - fn is_ssz_fixed_len() -> bool { - false - } - - fn ssz_append(&self, buf: &mut Vec) { - let container = match self { - RPCEvent::Request { - id, - method_id, - body, - } => SszContainer { - is_request: true, - id: (*id).into(), - other: *method_id, - bytes: match body { - RPCRequest::Hello(body) => body.as_ssz_bytes(), - RPCRequest::Goodbye(body) => body.as_ssz_bytes(), - RPCRequest::BeaconBlockRoots(body) => body.as_ssz_bytes(), - RPCRequest::BeaconBlockHeaders(body) => body.as_ssz_bytes(), - RPCRequest::BeaconBlockBodies(body) => body.as_ssz_bytes(), - RPCRequest::BeaconChainState(body) => body.as_ssz_bytes(), - }, + match protocol_id.message_name { + "hello" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCRequest::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), }, - RPCEvent::Response { - id, - method_id, - result, - } => SszContainer { - is_request: false, - id: (*id).into(), - other: *method_id, - bytes: match result { - RPCResponse::Hello(response) => response.as_ssz_bytes(), - RPCResponse::BeaconBlockRoots(response) => response.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(response) => response.as_ssz_bytes(), - RPCResponse::BeaconBlockBodies(response) => response.as_ssz_bytes(), - RPCResponse::BeaconChainState(response) => response.as_ssz_bytes(), - }, + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCRequest::Goodbye(Goodbye::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown GOODBYE encoding")), }, - }; - - container.ssz_append(buf) + _ => Err(RPCError::InvalidProtocol("Unknown GOODBYE version")), + }, + "beacon_block_roots" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCRequest::BeaconBlockRooots( + BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version", + )), + }, + "beacon_block_headers" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCRequest::BeaconBlockHeaders( + BeaconBlockHeadersRequest::from_ssz_bytes(&packet), + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version", + )), + }, + "beacon_block_bodies" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCRequest::BeaconBlockBodies( + BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version", + )), + }, + "beacon_chain_state" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCRequest::BeaconChainState( + BeaconChainStateRequest::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version", + )), + }, } } +/// Decodes a response that was received on the same stream as a request. The response type should +/// therefore match the request protocol type. +fn decode_response(packet: Vec, protocol: RawProtocolId) -> Result { + let protocol_id = ProtocolId::from_bytes(protocol)?; + + match protocol_id.message_name { + "hello" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCResponse::Hello(HelloMessage::from_ssz_bytes(&packet)?)), + _ => Err(RPCError::InvalidProtocol("Unknown HELLO encoding")), + }, + _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + }, + "goodbye" => Err(RPCError::Custom("GOODBYE should not have a response")), + "beacon_block_roots" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCResponse::BeaconBlockRoots( + BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_ROOTS version", + )), + }, + "beacon_block_headers" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCResponse::BeaconBlockHeaders( + BeaconBlockHeadersResponse { headers: packet }, + )), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_HEADERS version", + )), + }, + "beacon_block_bodies" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(RPCResponse::BeaconBlockBodies(BeaconBlockBodiesResponse { + block_bodies: packet, + })), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_BLOCK_BODIES version", + )), + }, + "beacon_chain_state" => match protocol_id.version { + "1" => match protocol_id.encoding { + "ssz" => Ok(BeaconChainStateRequest::from_ssz_bytes(&packet)?), + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE encoding", + )), + }, + _ => Err(RPCError::InvalidProtocol( + "Unknown BEACON_CHAIN_STATE version", + )), + }, + } +} + +/// Error in RPC Encoding/Decoding. #[derive(Debug)] -pub enum DecodeError { +pub enum RPCError { + /// Error when reading the packet from the socket. ReadError(upgrade::ReadOneError), + /// Error when decoding the raw buffer from ssz. SSZDecodeError(ssz::DecodeError), - UnknownRPCMethod, + /// Invalid Protocol ID + InvalidProtocol(&'static str), + /// Custom message. + Custom(String), } -impl From for DecodeError { +impl From for RPCError { #[inline] fn from(err: upgrade::ReadOneError) -> Self { - DecodeError::ReadError(err) + RPCError::ReadError(err) } } -impl From for DecodeError { +impl From for RPCError { #[inline] fn from(err: ssz::DecodeError) -> Self { - DecodeError::SSZDecodeError(err) + RPCError::SSZDecodeError(err) } }