diff --git a/Cargo.lock b/Cargo.lock index 37a60ce0a..860352139 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2990,6 +2990,7 @@ dependencies = [ "error-chain", "eth2-libp2p", "eth2_ssz", + "eth2_ssz_types", "exit-future", "fnv", "futures 0.3.5", diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index df9b0f401..e5fbf419b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -6,6 +6,7 @@ use crate::rpc::{ use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::{BufMut, Bytes, BytesMut}; use ssz::{Decode, Encode}; +use ssz_types::VariableList; use std::marker::PhantomData; use tokio_util::codec::{Decoder, Encoder}; use types::{EthSpec, SignedBeaconBlock}; @@ -52,9 +53,9 @@ impl Encoder> for SSZInboundCodec RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(), - RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(), - RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(), + RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), + RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } @@ -99,7 +100,7 @@ impl Decoder for SSZInboundCodec { }, Protocol::BlocksByRoot => match self.protocol.version { Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: Vec::from_ssz_bytes(&packet)?, + block_roots: VariableList::from_ssz_bytes(&packet)?, }))), }, Protocol::Ping => match self.protocol.version { diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs index 3a190ed5d..5056cbc6b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -8,6 +8,7 @@ use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; use ssz::{Decode, Encode}; +use ssz_types::VariableList; use std::io::Cursor; use std::io::ErrorKind; use std::io::{Read, Write}; @@ -60,9 +61,9 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(), - RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(), - RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(), + RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), + RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } @@ -138,7 +139,7 @@ impl Decoder for SSZSnappyInboundCodec { }, Protocol::BlocksByRoot => match self.protocol.version { Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: Vec::from_ssz_bytes(&decoded_buffer)?, + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, }))), }, Protocol::Ping => match self.protocol.version { diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index e60ee28c1..dc111f15c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -455,7 +455,7 @@ where let err = HandlerErr::Inbound { id: inbound_id, proto: *protocol, - error: RPCError::ErrorResponse(*code, reason.clone()), + error: RPCError::ErrorResponse(*code, reason.to_string()), }; self.pending_errors.push(err); } @@ -917,7 +917,7 @@ where Err(HandlerErr::Outbound { id, proto, - error: RPCError::ErrorResponse(code, r.clone()), + error: RPCError::ErrorResponse(code, r.to_string()), }) } }; diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 167198fa8..7a30d6fc4 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -3,8 +3,52 @@ use crate::types::EnrBitfield; use serde::Serialize; use ssz_derive::{Decode, Encode}; +use ssz_types::{ + typenum::{U1024, U256}, + VariableList, +}; +use std::ops::Deref; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +/// Maximum number of blocks in a single request. +type MaxRequestBlocks = U1024; +pub const MAX_REQUEST_BLOCKS: u64 = 1024; + +/// Maximum length of error message. +type MaxErrorLen = U256; + +/// Wrapper over SSZ List to represent error message in rpc responses. +#[derive(Debug, Clone)] +pub struct ErrorType(VariableList); + +impl From for ErrorType { + fn from(s: String) -> Self { + Self(VariableList::from(s.as_bytes().to_vec())) + } +} + +impl From<&str> for ErrorType { + fn from(s: &str) -> Self { + Self(VariableList::from(s.as_bytes().to_vec())) + } +} + +impl Deref for ErrorType { + type Target = VariableList; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ToString for ErrorType { + fn to_string(&self) -> String { + match std::str::from_utf8(self.0.deref()) { + Ok(s) => s.to_string(), + Err(_) => format!("{:?}", self.0.deref()), // Display raw bytes if not a UTF-8 string + } + } +} + /* Request/Response data structures for RPC methods */ /* Requests */ @@ -147,7 +191,7 @@ pub struct BlocksByRangeRequest { #[derive(Clone, Debug, PartialEq)] pub struct BlocksByRootRequest { /// The list of beacon block bodies being requested. - pub block_roots: Vec, + pub block_roots: VariableList, } /* RPC Handling and Grouping */ @@ -190,13 +234,13 @@ pub enum RPCCodedResponse { Success(RPCResponse), /// The response was invalid. - InvalidRequest(String), + InvalidRequest(ErrorType), /// The response indicates a server error. - ServerError(String), + ServerError(ErrorType), /// There was an unknown response. - Unknown(String), + Unknown(ErrorType), /// Received a stream termination indicating which response is being terminated. StreamTermination(ResponseTermination), @@ -233,18 +277,18 @@ impl RPCCodedResponse { /// Builds an RPCCodedResponse from a response code and an ErrorMessage pub fn from_error(response_code: u8, err: String) -> Self { match response_code { - 1 => RPCCodedResponse::InvalidRequest(err), - 2 => RPCCodedResponse::ServerError(err), - _ => RPCCodedResponse::Unknown(err), + 1 => RPCCodedResponse::InvalidRequest(err.into()), + 2 => RPCCodedResponse::ServerError(err.into()), + _ => RPCCodedResponse::Unknown(err.into()), } } /// Builds an RPCCodedResponse from a response code and an ErrorMessage pub fn from_error_code(response_code: RPCResponseErrorCode, err: String) -> Self { match response_code { - RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err), - RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err), - RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err), + RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err.into()), + RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err.into()), + RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err.into()), } } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 1af2389d7..56ef47f5a 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -24,7 +24,7 @@ pub(crate) use protocol::{RPCProtocol, RPCRequest}; pub use handler::SubstreamId; pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCResponseErrorCode, RequestId, - ResponseTermination, StatusMessage, + ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; pub use protocol::{Protocol, RPCError}; diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 7218a64c0..2c96073c5 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -2,6 +2,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::{BehaviourEvent, Libp2pEvent, Request, Response}; use slog::{debug, warn, Level}; +use ssz_types::VariableList; use std::time::Duration; use tokio::time::delay_for; use types::{ @@ -467,11 +468,11 @@ async fn test_blocks_by_root_chunked_rpc() { // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { - block_roots: vec![ + block_roots: VariableList::from(vec![ Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), - ], + ]), }); // BlocksByRoot Response @@ -579,7 +580,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { - block_roots: vec![ + block_roots: VariableList::from(vec![ Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), @@ -590,7 +591,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), - ], + ]), }); // BlocksByRoot Response diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index e9181e88b..bf6358ebc 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -24,6 +24,7 @@ slot_clock = { path = "../../common/slot_clock" } slog = { version = "2.5.2", features = ["max_level_trace"] } hex = "0.4.2" eth2_ssz = "0.1.2" +eth2_ssz_types = { path = "../../consensus/ssz_types" } tree_hash = "0.1.0" futures = "0.3.5" error-chain = "0.12.2" diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 1fd6927c5..475ff824d 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -322,7 +322,7 @@ impl Processor { &mut self, peer_id: PeerId, request_id: SubstreamId, - req: BlocksByRangeRequest, + mut req: BlocksByRangeRequest, ) { debug!( self.log, @@ -333,6 +333,10 @@ impl Processor { "step" => req.step, ); + // Should not send more than max request blocks + if req.count > MAX_REQUEST_BLOCKS { + req.count = MAX_REQUEST_BLOCKS; + } if req.step == 0 { warn!(self.log, "Peer sent invalid range request"; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index e5c035b7f..111eecdeb 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,16 +36,17 @@ use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; -use super::range_sync::{BatchId, ChainId, RangeSync}; +use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH}; use super::RequestId; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::rpc::BlocksByRootRequest; +use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest}; use eth2_libp2p::types::NetworkGlobals; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; +use ssz_types::VariableList; use std::boxed::Box; use std::ops::Sub; use std::sync::Arc; @@ -188,6 +189,10 @@ pub fn spawn( network_send: mpsc::UnboundedSender>, log: slog::Logger, ) -> mpsc::UnboundedSender> { + assert!( + MAX_REQUEST_BLOCKS >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH, + "Max blocks that can be requested in a single batch greater than max allowed blocks in a single request" + ); // generate the message channel let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); @@ -489,7 +494,7 @@ impl SyncManager { } let request = BlocksByRootRequest { - block_roots: vec![block_hash], + block_roots: VariableList::from(vec![block_hash]), }; if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { @@ -707,7 +712,7 @@ impl SyncManager { }; let request = BlocksByRootRequest { - block_roots: vec![parent_hash], + block_roots: VariableList::from(vec![parent_hash]), }; // We continue to search for the chain of blocks from the same peer. Other peers are not diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 684c75126..5b3e2b581 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -3,6 +3,7 @@ use eth2_libp2p::rpc::methods::*; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use ssz::Encode; +use std::cmp::min; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; @@ -75,7 +76,7 @@ impl Batch { pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { BlocksByRangeRequest { start_slot: self.start_slot.into(), - count: std::cmp::min( + count: min( T::slots_per_epoch() * EPOCHS_PER_BATCH, self.end_slot.sub(self.start_slot).into(), ), diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 77eb17f15..b87829559 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -9,5 +9,5 @@ mod sync_type; pub use batch::Batch; pub use batch::BatchId; -pub use chain::ChainId; +pub use chain::{ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync;