From ec73dfe90b0568fcbc22a775b2e2bf509fde6370 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 8 Aug 2019 17:46:39 +1000 Subject: [PATCH] Starting of req/resp overhaul --- beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 114 ++++---------- beacon_node/eth2-libp2p/src/rpc/methods.rs | 153 +++++-------------- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 55 +++---- 4 files changed, 96 insertions(+), 227 deletions(-) diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 794b09712..55081aed5 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -26,3 +26,4 @@ smallvec = "0.6.10" fnv = "1.0.6" unsigned-varint = "0.2.2" bytes = "0.4.12" +tokio-io-timeout = "0.3.1" diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 8e2bdaa64..f7262118d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -41,10 +41,8 @@ impl Encoder for SSZInboundCodec { RPCErrorResponse::Success(resp) => { match resp { RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockRoots(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlockHeaders(res) => res.headers, // already raw bytes - RPCResponse::BeaconBlockBodies(res) => res.block_bodies, // already raw bytes - RPCResponse::BeaconChainState(res) => res.as_ssz_bytes(), + RPCResponse::BeaconBlocks(res) => res, // already raw bytes + RPCResponse::RecentBeaconBlocks(res) => res, // already raw bytes } } RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), @@ -72,52 +70,30 @@ impl Decoder for SSZInboundCodec { match self.inner.decode(src).map_err(RPCError::from) { Ok(Some(packet)) => match self.protocol.message_name.as_str() { "hello" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes( + "1" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes( &packet, )?))), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version")), + _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( + "1" => Ok(Some(RPCRequest::Goodbye(GoodbyeReason::from_ssz_bytes( &packet, )?))), - _ => Err(RPCError::InvalidProtocol( - "Unknown GOODBYE version.as_str()", - )), + _ => unreachable!("Cannot negotiate an unknown version"), }, - "beacon_block_roots" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::BeaconBlockRoots( - BeaconBlockRootsRequest::from_ssz_bytes(&packet)?, + "beacon_blocks" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::BeaconBlocks( + BeaconBlocksRequest::from_ssz_bytes(&packet)?, ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), + _ => unreachable!("Cannot negotiate an unknown version"), }, - "beacon_block_headers" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::BeaconBlockHeaders( - BeaconBlockHeadersRequest::from_ssz_bytes(&packet)?, + "recent_beacon_blocks" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::RecentBeaconBlocks( + RecentBeaconBlocksRequest::from_ssz_bytes(&packet)?, ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), + _ => unreachable!("Cannot negotiate an unknown version"), }, - "beacon_block_bodies" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::BeaconBlockBodies( - BeaconBlockBodiesRequest::from_ssz_bytes(&packet)?, - ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCRequest::BeaconChainState( - BeaconChainStateRequest::from_ssz_bytes(&packet)?, - ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - _ => Err(RPCError::InvalidProtocol("Unknown message name.")), + _ => unreachable!("Cannot negotiate an unknown protocol"), }, Ok(None) => Ok(None), Err(e) => Err(e), @@ -156,10 +132,8 @@ impl Encoder for SSZOutboundCodec { let bytes = match item { RPCRequest::Hello(req) => req.as_ssz_bytes(), RPCRequest::Goodbye(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlockRoots(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlockHeaders(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlockBodies(req) => req.as_ssz_bytes(), - RPCRequest::BeaconChainState(req) => req.as_ssz_bytes(), + RPCRequest::BeaconBlocks(req) => req.as_ssz_bytes(), + RPCRequest::RecentBeaconBlocks(req) => req.as_ssz_bytes(), }; // length-prefix self.inner @@ -168,7 +142,11 @@ impl Encoder for SSZOutboundCodec { } } -// Decoder for outbound +// Decoder for outbound streams +// +// The majority of the decoding has now been pushed upstream due to the changing specification. +// We prefer to decode blocks and attestations with extra knowledge about the chain to perform +// faster verification checks before decoding entire blocks/attestations. impl Decoder for SSZOutboundCodec { type Item = RPCResponse; type Error = RPCError; @@ -177,51 +155,21 @@ impl Decoder for SSZOutboundCodec { match self.inner.decode(src).map_err(RPCError::from) { Ok(Some(packet)) => match self.protocol.message_name.as_str() { "hello" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( + "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( &packet, )?))), - _ => Err(RPCError::InvalidProtocol("Unknown HELLO version.")), + _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), - "beacon_block_roots" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCResponse::BeaconBlockRoots( - BeaconBlockRootsResponse::from_ssz_bytes(&packet)?, - ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_ROOTS version.", - )), + "beacon_blocks" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), + _ => unreachable!("Cannot negotiate an unknown version"), }, - "beacon_block_headers" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCResponse::BeaconBlockHeaders( - BeaconBlockHeadersResponse { - headers: packet.to_vec(), - }, - ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_HEADERS version.", - )), + "recent_beacon_blocks" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), + _ => unreachable!("Cannot negotiate an unknown version"), }, - "beacon_block_bodies" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCResponse::BeaconBlockBodies( - BeaconBlockBodiesResponse { - block_bodies: packet.to_vec(), - // this gets filled in the protocol handler - block_roots: None, - }, - ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_BLOCK_BODIES version.", - )), - }, - "beacon_chain_state" => match self.protocol.version.as_str() { - "1.0.0" => Ok(Some(RPCResponse::BeaconChainState( - BeaconChainStateResponse::from_ssz_bytes(&packet)?, - ))), - _ => Err(RPCError::InvalidProtocol( - "Unknown BEACON_CHAIN_STATE version.", - )), - }, - _ => Err(RPCError::InvalidProtocol("Unknown method")), + _ => unreachable!("Cannot negotiate an unknown protocol"), }, Ok(None) => Ok(None), Err(e) => Err(e), diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 2e5a9a7ff..8fef1a75a 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -2,7 +2,7 @@ use ssz::{impl_decode_via_from, impl_encode_via_from}; use ssz_derive::{Decode, Encode}; -use types::{BeaconBlockBody, Epoch, EthSpec, Hash256, Slot}; +use types::{Epoch, Hash256, Slot}; /* Request/Response data structures for RPC methods */ @@ -13,23 +13,20 @@ pub type RequestId = usize; /// The HELLO request/response handshake message. #[derive(Encode, Decode, Clone, Debug)] pub struct HelloMessage { - /// The network ID of the peer. - pub network_id: u8, + /// The fork version of the chain we are broadcasting. + pub fork_version: [u8; 4], - /// The chain id for the HELLO request. - pub chain_id: u64, + /// Latest finalized root. + pub finalized_root: Hash256, - /// The peers last finalized root. - pub latest_finalized_root: Hash256, + /// Latest finalized epoch. + pub finalized_epoch: Epoch, - /// The peers last finalized epoch. - pub latest_finalized_epoch: Epoch, + /// The latest block root. + pub head_root: Hash256, - /// The peers last block root. - pub best_root: Hash256, - - /// The peers last slot. - pub best_slot: Slot, + /// The slot associated with the latest block root. + pub head_slot: Slot, } /// The reason given for a `Goodbye` message. @@ -74,108 +71,42 @@ impl_decode_via_from!(GoodbyeReason, u64); /// Request a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconBlockRootsRequest { - /// The starting slot of the requested blocks. - pub start_slot: Slot, +pub struct BeaconBlocksRequest { + /// The hash tree root of a block on the requested chain. + pub head_block_root: Hash256, + + /// The starting slot to request blocks. + pub start_slot: u64, /// The number of blocks from the start slot. - pub count: u64, // this must be less than 32768. //TODO: Enforce this in the lower layers + pub count: u64, + + /// The step increment to receive blocks. + /// + /// A value of 1 returns every block. + /// A value of 2 returns every second block. + /// A value of 3 returns every third block and so on. + pub step: u64, } +// TODO: Currently handle encoding/decoding of blocks in the message handler. Leave this struct +// here in case encoding/decoding of ssz requires an object. +/* /// Response containing a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconBlockRootsResponse { +pub struct BeaconBlocksResponse { /// List of requested blocks and associated slots. - pub roots: Vec, -} - -/// Contains a block root and associated slot. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BlockRootSlot { - /// The block root. - pub block_root: Hash256, - - /// The block slot. - pub slot: Slot, -} - -/// The response of a beacon block roots request. -impl BeaconBlockRootsResponse { - /// Returns `true` if each `self.roots.slot[i]` is higher than the preceding `i`. - pub fn slots_are_ascending(&self) -> bool { - for window in self.roots.windows(2) { - if window[0].slot >= window[1].slot { - return false; - } - } - - true - } -} - -/// Request a number of beacon block headers from a peer. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconBlockHeadersRequest { - /// The starting header hash of the requested headers. - pub start_root: Hash256, - - /// The starting slot of the requested headers. - pub start_slot: Slot, - - /// The maximum number of headers than can be returned. - pub max_headers: u64, - - /// The maximum number of slots to skip between blocks. - pub skip_slots: u64, -} - -/// Response containing requested block headers. -#[derive(Clone, Debug, PartialEq)] -pub struct BeaconBlockHeadersResponse { - /// The list of ssz-encoded requested beacon block headers. - pub headers: Vec, + pub beacon_blocks: Vec, } +*/ /// Request a number of beacon block bodies from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconBlockBodiesRequest { +pub struct RecentBeaconBlocksRequest { /// The list of beacon block bodies being requested. pub block_roots: Vec, } -/// Response containing the list of requested beacon block bodies. -#[derive(Clone, Debug, PartialEq)] -pub struct BeaconBlockBodiesResponse { - /// The list of hashes that were sent in the request and match these roots response. None when - /// sending outbound. - pub block_roots: Option>, - /// The list of ssz-encoded beacon block bodies being requested. - pub block_bodies: Vec, -} - -/// The decoded version of `BeaconBlockBodiesResponse` which is expected in `SimpleSync`. -pub struct DecodedBeaconBlockBodiesResponse { - /// The list of hashes sent in the request to get this response. - pub block_roots: Vec, - /// The valid decoded block bodies. - pub block_bodies: Vec>, -} - -/// Request values for tree hashes which yield a blocks `state_root`. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconChainStateRequest { - /// The tree hashes that a value is requested for. - pub hashes: Vec, -} - -/// Request values for tree hashes which yield a blocks `state_root`. -// Note: TBD -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconChainStateResponse { - /// The values corresponding the to the requested tree hashes. - pub values: bool, //TBD - stubbed with encodable bool -} - /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages @@ -183,14 +114,10 @@ pub struct BeaconChainStateResponse { pub enum RPCResponse { /// A HELLO message. Hello(HelloMessage), - /// A response to a get BEACON_BLOCK_ROOTS request. - BeaconBlockRoots(BeaconBlockRootsResponse), - /// A response to a get BEACON_BLOCK_HEADERS request. - BeaconBlockHeaders(BeaconBlockHeadersResponse), - /// A response to a get BEACON_BLOCK_BODIES request. - BeaconBlockBodies(BeaconBlockBodiesResponse), - /// A response to a get BEACON_CHAIN_STATE request. - BeaconChainState(BeaconChainStateResponse), + /// A response to a get BEACON_BLOCKS request. + BeaconBlocks(Vec), + /// A response to a get RECENT_BEACON_BLOCKS request. + RecentBeaconBlocks(Vec), } #[derive(Debug)] @@ -206,8 +133,8 @@ impl RPCErrorResponse { pub fn as_u8(&self) -> u8 { match self { RPCErrorResponse::Success(_) => 0, - RPCErrorResponse::InvalidRequest(_) => 2, - RPCErrorResponse::ServerError(_) => 3, + RPCErrorResponse::InvalidRequest(_) => 1, + RPCErrorResponse::ServerError(_) => 2, RPCErrorResponse::Unknown(_) => 255, } } @@ -223,8 +150,8 @@ impl RPCErrorResponse { /// Builds an RPCErrorResponse from a response code and an ErrorMessage pub fn from_error(response_code: u8, err: ErrorMessage) -> Self { match response_code { - 2 => RPCErrorResponse::InvalidRequest(err), - 3 => RPCErrorResponse::ServerError(err), + 1 => RPCErrorResponse::InvalidRequest(err), + 2 => RPCErrorResponse::ServerError(err), _ => RPCErrorResponse::Unknown(err), } } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index b606fc743..be1efdf5d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -16,13 +16,17 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::prelude::*; use tokio::timer::timeout; use tokio::util::FutureExt; +use tokio_io_timeout::TimeoutStream; /// The maximum bytes that can be sent across the RPC. const MAX_RPC_SIZE: usize = 4_194_304; // 4M /// The protocol prefix the RPC protocol id. -const PROTOCOL_PREFIX: &str = "/eth2/beacon_node/rpc"; -/// The number of seconds to wait for a request once a protocol has been established before the stream is terminated. -const REQUEST_TIMEOUT: u64 = 3; +const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; +/// Time allowed for the first byte of a request to arrive before we time out (Time To First Byte). +const TTFB_TIMEOUT: u64 = 5; +/// The number of seconds to wait for the first bytes of a request once a protocol has been +/// established before the stream is terminated. +const REQUEST_TIMEOUT: u64 = 15; #[derive(Debug, Clone)] pub struct RPCProtocol; @@ -33,11 +37,10 @@ impl UpgradeInfo for RPCProtocol { fn protocol_info(&self) -> Self::InfoIter { vec![ - ProtocolId::new("hello", "1.0.0", "ssz"), - ProtocolId::new("goodbye", "1.0.0", "ssz"), - ProtocolId::new("beacon_block_roots", "1.0.0", "ssz"), - ProtocolId::new("beacon_block_headers", "1.0.0", "ssz"), - ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz"), + ProtocolId::new("hello", "1", "ssz"), + ProtocolId::new("goodbye", "1", "ssz"), + ProtocolId::new("beacon_blocks", "1", "ssz"), + ProtocolId::new("recent_beacon_blocks", "1", "ssz"), ] } } @@ -87,7 +90,7 @@ impl ProtocolName for ProtocolId { // handler to respond to once ready. pub type InboundOutput = (RPCRequest, InboundFramed); -pub type InboundFramed = Framed, InboundCodec>; +pub type InboundFramed = Framed>, InboundCodec>; type FnAndThen = fn( (Option, InboundFramed), ) -> FutureResult, RPCError>; @@ -118,7 +121,9 @@ where "ssz" | _ => { let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); let codec = InboundCodec::SSZ(ssz_codec); - Framed::new(socket, codec) + let mut timed_socket = TimeoutStream::new(socket); + timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); + Framed::new(timed_socket, codec) .into_future() .timeout(Duration::from_secs(REQUEST_TIMEOUT)) .map_err(RPCError::from as FnMapErr) @@ -144,10 +149,8 @@ where pub enum RPCRequest { Hello(HelloMessage), Goodbye(GoodbyeReason), - BeaconBlockRoots(BeaconBlockRootsRequest), - BeaconBlockHeaders(BeaconBlockHeadersRequest), - BeaconBlockBodies(BeaconBlockBodiesRequest), - BeaconChainState(BeaconChainStateRequest), + BeaconBlocks(BeaconBlocksRequest), + RecentBeaconBlocks(RecentBeaconBlocksRequest), } impl UpgradeInfo for RPCRequest { @@ -165,22 +168,11 @@ impl RPCRequest { pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Hello(_) => vec![ - ProtocolId::new("hello", "1.0.0", "ssz"), - ProtocolId::new("goodbye", "1.0.0", "ssz"), - ], - RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1.0.0", "ssz")], - RPCRequest::BeaconBlockRoots(_) => { - vec![ProtocolId::new("beacon_block_roots", "1.0.0", "ssz")] - } - RPCRequest::BeaconBlockHeaders(_) => { - vec![ProtocolId::new("beacon_block_headers", "1.0.0", "ssz")] - } - RPCRequest::BeaconBlockBodies(_) => { - vec![ProtocolId::new("beacon_block_bodies", "1.0.0", "ssz")] - } - RPCRequest::BeaconChainState(_) => { - vec![ProtocolId::new("beacon_block_state", "1.0.0", "ssz")] + RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1", "ssz")], + RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1", "ssz")], + RPCRequest::BeaconBlocks(_) => vec![ProtocolId::new("beacon_blocks", "1", "ssz")], + RPCRequest::RecentBeaconBlocks(_) => { + vec![ProtocolId::new("recent_beacon_blocks", "1", "ssz")] } } } @@ -215,7 +207,8 @@ where ) -> Self::Future { match protocol.encoding.as_str() { "ssz" | _ => { - let ssz_codec = BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, 4096)); + let ssz_codec = + BaseOutboundCodec::new(SSZOutboundCodec::new(protocol, MAX_RPC_SIZE)); let codec = OutboundCodec::SSZ(ssz_codec); Framed::new(socket, codec).send(self) }