Use SSZ types in rpc (#1244)

* Update `milagro_bls` to new release (#1183)

* Update milagro_bls to new release

Signed-off-by: Kirk Baird <baird.k@outlook.com>

* Tidy up fake cryptos

Signed-off-by: Kirk Baird <baird.k@outlook.com>

* move SecretHash to bls and put plaintext back

Signed-off-by: Kirk Baird <baird.k@outlook.com>

* Update v0.12.0 to v0.12.1

* Use ssz types for Request and error types

* Fix errors

* Constrain BlocksByRangeRequest count to MAX_REQUEST_BLOCKS

* Fix issues after rebasing

* Address review comments

Co-authored-by: Kirk Baird <baird.k@outlook.com>
Co-authored-by: Michael Sproul <michael@sigmaprime.io>
Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
Pawan Dhananjay 2020-06-12 05:34:50 +05:30 committed by GitHub
parent 39bf05e3e5
commit bb8b88edcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 91 additions and 32 deletions

1
Cargo.lock generated
View File

@ -2990,6 +2990,7 @@ dependencies = [
"error-chain", "error-chain",
"eth2-libp2p", "eth2-libp2p",
"eth2_ssz", "eth2_ssz",
"eth2_ssz_types",
"exit-future", "exit-future",
"fnv", "fnv",
"futures 0.3.5", "futures 0.3.5",

View File

@ -6,6 +6,7 @@ use crate::rpc::{
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use libp2p::bytes::{BufMut, Bytes, BytesMut}; use libp2p::bytes::{BufMut, Bytes, BytesMut};
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_types::VariableList;
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use types::{EthSpec, SignedBeaconBlock}; use types::{EthSpec, SignedBeaconBlock};
@ -52,9 +53,9 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZInboundCodec<TSpec>
RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(),
}, },
RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(), RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(),
RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(), RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(),
RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(), RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(),
RPCCodedResponse::StreamTermination(_) => { RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination") unreachable!("Code error - attempting to encode a stream termination")
} }
@ -99,7 +100,7 @@ impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
}, },
Protocol::BlocksByRoot => match self.protocol.version { Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&packet)?, block_roots: VariableList::from_ssz_bytes(&packet)?,
}))), }))),
}, },
Protocol::Ping => match self.protocol.version { Protocol::Ping => match self.protocol.version {

View File

@ -8,6 +8,7 @@ use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder; use snap::read::FrameDecoder;
use snap::write::FrameEncoder; use snap::write::FrameEncoder;
use ssz::{Decode, Encode}; use ssz::{Decode, Encode};
use ssz_types::VariableList;
use std::io::Cursor; use std::io::Cursor;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::io::{Read, Write}; use std::io::{Read, Write};
@ -60,9 +61,9 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) => res.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(),
}, },
RPCCodedResponse::InvalidRequest(err) => err.into_bytes().as_ssz_bytes(), RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(),
RPCCodedResponse::ServerError(err) => err.into_bytes().as_ssz_bytes(), RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(),
RPCCodedResponse::Unknown(err) => err.into_bytes().as_ssz_bytes(), RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(),
RPCCodedResponse::StreamTermination(_) => { RPCCodedResponse::StreamTermination(_) => {
unreachable!("Code error - attempting to encode a stream termination") unreachable!("Code error - attempting to encode a stream termination")
} }
@ -138,7 +139,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
}, },
Protocol::BlocksByRoot => match self.protocol.version { Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: Vec::from_ssz_bytes(&decoded_buffer)?, block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
}))), }))),
}, },
Protocol::Ping => match self.protocol.version { Protocol::Ping => match self.protocol.version {

View File

@ -455,7 +455,7 @@ where
let err = HandlerErr::Inbound { let err = HandlerErr::Inbound {
id: inbound_id, id: inbound_id,
proto: *protocol, proto: *protocol,
error: RPCError::ErrorResponse(*code, reason.clone()), error: RPCError::ErrorResponse(*code, reason.to_string()),
}; };
self.pending_errors.push(err); self.pending_errors.push(err);
} }
@ -917,7 +917,7 @@ where
Err(HandlerErr::Outbound { Err(HandlerErr::Outbound {
id, id,
proto, proto,
error: RPCError::ErrorResponse(code, r.clone()), error: RPCError::ErrorResponse(code, r.to_string()),
}) })
} }
}; };

View File

@ -3,8 +3,52 @@
use crate::types::EnrBitfield; use crate::types::EnrBitfield;
use serde::Serialize; use serde::Serialize;
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
use ssz_types::{
typenum::{U1024, U256},
VariableList,
};
use std::ops::Deref;
use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// Maximum number of blocks in a single request.
type MaxRequestBlocks = U1024;
pub const MAX_REQUEST_BLOCKS: u64 = 1024;
/// Maximum length of error message.
type MaxErrorLen = U256;
/// Wrapper over SSZ List to represent error message in rpc responses.
#[derive(Debug, Clone)]
pub struct ErrorType(VariableList<u8, MaxErrorLen>);
impl From<String> for ErrorType {
fn from(s: String) -> Self {
Self(VariableList::from(s.as_bytes().to_vec()))
}
}
impl From<&str> for ErrorType {
fn from(s: &str) -> Self {
Self(VariableList::from(s.as_bytes().to_vec()))
}
}
impl Deref for ErrorType {
type Target = VariableList<u8, MaxErrorLen>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl ToString for ErrorType {
fn to_string(&self) -> String {
match std::str::from_utf8(self.0.deref()) {
Ok(s) => s.to_string(),
Err(_) => format!("{:?}", self.0.deref()), // Display raw bytes if not a UTF-8 string
}
}
}
/* Request/Response data structures for RPC methods */ /* Request/Response data structures for RPC methods */
/* Requests */ /* Requests */
@ -147,7 +191,7 @@ pub struct BlocksByRangeRequest {
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRootRequest { pub struct BlocksByRootRequest {
/// The list of beacon block bodies being requested. /// The list of beacon block bodies being requested.
pub block_roots: Vec<Hash256>, pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
} }
/* RPC Handling and Grouping */ /* RPC Handling and Grouping */
@ -190,13 +234,13 @@ pub enum RPCCodedResponse<T: EthSpec> {
Success(RPCResponse<T>), Success(RPCResponse<T>),
/// The response was invalid. /// The response was invalid.
InvalidRequest(String), InvalidRequest(ErrorType),
/// The response indicates a server error. /// The response indicates a server error.
ServerError(String), ServerError(ErrorType),
/// There was an unknown response. /// There was an unknown response.
Unknown(String), Unknown(ErrorType),
/// Received a stream termination indicating which response is being terminated. /// Received a stream termination indicating which response is being terminated.
StreamTermination(ResponseTermination), StreamTermination(ResponseTermination),
@ -233,18 +277,18 @@ impl<T: EthSpec> RPCCodedResponse<T> {
/// Builds an RPCCodedResponse from a response code and an ErrorMessage /// Builds an RPCCodedResponse from a response code and an ErrorMessage
pub fn from_error(response_code: u8, err: String) -> Self { pub fn from_error(response_code: u8, err: String) -> Self {
match response_code { match response_code {
1 => RPCCodedResponse::InvalidRequest(err), 1 => RPCCodedResponse::InvalidRequest(err.into()),
2 => RPCCodedResponse::ServerError(err), 2 => RPCCodedResponse::ServerError(err.into()),
_ => RPCCodedResponse::Unknown(err), _ => RPCCodedResponse::Unknown(err.into()),
} }
} }
/// Builds an RPCCodedResponse from a response code and an ErrorMessage /// Builds an RPCCodedResponse from a response code and an ErrorMessage
pub fn from_error_code(response_code: RPCResponseErrorCode, err: String) -> Self { pub fn from_error_code(response_code: RPCResponseErrorCode, err: String) -> Self {
match response_code { match response_code {
RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err), RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err.into()),
RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err), RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err.into()),
RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err), RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err.into()),
} }
} }

View File

@ -24,7 +24,7 @@ pub(crate) use protocol::{RPCProtocol, RPCRequest};
pub use handler::SubstreamId; pub use handler::SubstreamId;
pub use methods::{ pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCResponseErrorCode, RequestId, BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCResponseErrorCode, RequestId,
ResponseTermination, StatusMessage, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS,
}; };
pub use protocol::{Protocol, RPCError}; pub use protocol::{Protocol, RPCError};

View File

@ -2,6 +2,7 @@
use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::{BehaviourEvent, Libp2pEvent, Request, Response}; use eth2_libp2p::{BehaviourEvent, Libp2pEvent, Request, Response};
use slog::{debug, warn, Level}; use slog::{debug, warn, Level};
use ssz_types::VariableList;
use std::time::Duration; use std::time::Duration;
use tokio::time::delay_for; use tokio::time::delay_for;
use types::{ use types::{
@ -467,11 +468,11 @@ async fn test_blocks_by_root_chunked_rpc() {
// BlocksByRoot Request // BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { let rpc_request = Request::BlocksByRoot(BlocksByRootRequest {
block_roots: vec![ block_roots: VariableList::from(vec![
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
], ]),
}); });
// BlocksByRoot Response // BlocksByRoot Response
@ -579,7 +580,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
// BlocksByRoot Request // BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { let rpc_request = Request::BlocksByRoot(BlocksByRootRequest {
block_roots: vec![ block_roots: VariableList::from(vec![
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
@ -590,7 +591,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0),
], ]),
}); });
// BlocksByRoot Response // BlocksByRoot Response

View File

@ -24,6 +24,7 @@ slot_clock = { path = "../../common/slot_clock" }
slog = { version = "2.5.2", features = ["max_level_trace"] } slog = { version = "2.5.2", features = ["max_level_trace"] }
hex = "0.4.2" hex = "0.4.2"
eth2_ssz = "0.1.2" eth2_ssz = "0.1.2"
eth2_ssz_types = { path = "../../consensus/ssz_types" }
tree_hash = "0.1.0" tree_hash = "0.1.0"
futures = "0.3.5" futures = "0.3.5"
error-chain = "0.12.2" error-chain = "0.12.2"

View File

@ -322,7 +322,7 @@ impl<T: BeaconChainTypes> Processor<T> {
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
request_id: SubstreamId, request_id: SubstreamId,
req: BlocksByRangeRequest, mut req: BlocksByRangeRequest,
) { ) {
debug!( debug!(
self.log, self.log,
@ -333,6 +333,10 @@ impl<T: BeaconChainTypes> Processor<T> {
"step" => req.step, "step" => req.step,
); );
// Should not send more than max request blocks
if req.count > MAX_REQUEST_BLOCKS {
req.count = MAX_REQUEST_BLOCKS;
}
if req.step == 0 { if req.step == 0 {
warn!(self.log, warn!(self.log,
"Peer sent invalid range request"; "Peer sent invalid range request";

View File

@ -36,16 +36,17 @@
use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId};
use super::network_context::SyncNetworkContext; use super::network_context::SyncNetworkContext;
use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType};
use super::range_sync::{BatchId, ChainId, RangeSync}; use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH};
use super::RequestId; use super::RequestId;
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::rpc::BlocksByRootRequest; use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest};
use eth2_libp2p::types::NetworkGlobals; use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use slog::{crit, debug, error, info, trace, warn, Logger}; use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec; use smallvec::SmallVec;
use ssz_types::VariableList;
use std::boxed::Box; use std::boxed::Box;
use std::ops::Sub; use std::ops::Sub;
use std::sync::Arc; use std::sync::Arc;
@ -188,6 +189,10 @@ pub fn spawn<T: BeaconChainTypes>(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
log: slog::Logger, log: slog::Logger,
) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> { ) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
assert!(
MAX_REQUEST_BLOCKS >= T::EthSpec::slots_per_epoch() * EPOCHS_PER_BATCH,
"Max blocks that can be requested in a single batch greater than max allowed blocks in a single request"
);
// generate the message channel // generate the message channel
let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>(); let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>();
@ -489,7 +494,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
} }
let request = BlocksByRootRequest { let request = BlocksByRootRequest {
block_roots: vec![block_hash], block_roots: VariableList::from(vec![block_hash]),
}; };
if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) {
@ -707,7 +712,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}; };
let request = BlocksByRootRequest { let request = BlocksByRootRequest {
block_roots: vec![parent_hash], block_roots: VariableList::from(vec![parent_hash]),
}; };
// We continue to search for the chain of blocks from the same peer. Other peers are not // We continue to search for the chain of blocks from the same peer. Other peers are not

View File

@ -3,6 +3,7 @@ use eth2_libp2p::rpc::methods::*;
use eth2_libp2p::PeerId; use eth2_libp2p::PeerId;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use ssz::Encode; use ssz::Encode;
use std::cmp::min;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -75,7 +76,7 @@ impl<T: EthSpec> Batch<T> {
pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest {
BlocksByRangeRequest { BlocksByRangeRequest {
start_slot: self.start_slot.into(), start_slot: self.start_slot.into(),
count: std::cmp::min( count: min(
T::slots_per_epoch() * EPOCHS_PER_BATCH, T::slots_per_epoch() * EPOCHS_PER_BATCH,
self.end_slot.sub(self.start_slot).into(), self.end_slot.sub(self.start_slot).into(),
), ),

View File

@ -9,5 +9,5 @@ mod sync_type;
pub use batch::Batch; pub use batch::Batch;
pub use batch::BatchId; pub use batch::BatchId;
pub use chain::ChainId; pub use chain::{ChainId, EPOCHS_PER_BATCH};
pub use range::RangeSync; pub use range::RangeSync;