diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 35692aa23..282931a90 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -16,6 +16,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Install ganache-cli run: sudo npm install -g ganache-cli - name: Run tests in release @@ -25,6 +27,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Install ganache-cli run: sudo npm install -g ganache-cli - name: Run tests in debug @@ -34,6 +38,8 @@ jobs: needs: cargo-fmt steps: - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable - name: Run eth2.0-spec-tests with and without fake_crypto run: make test-ef dockerfile-ubuntu: diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 5e1a92f24..307f9fa46 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -9,8 +9,8 @@ clap = "2.33.0" hex = "0.3" # rust-libp2p is presently being sourced from a Sigma Prime fork of the # `libp2p/rust-libp2p` repository. -libp2p = { git = "https://github.com/sigp/rust-libp2p", rev = "2a9ded92db30dab7d3530c597a0a3b3458a7dfb7" } -enr = { git = "https://github.com/sigp/rust-libp2p/", rev = "2a9ded92db30dab7d3530c597a0a3b3458a7dfb7", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "b13ec466ce1661d88ea95be7e1fcd7bfdfa22ca8" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "b13ec466ce1661d88ea95be7e1fcd7bfdfa22ca8", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0.102" serde_derive = "1.0.102" @@ -21,13 +21,15 @@ version = { path = "../version" } tokio = "0.1.22" futures = "0.1.29" error-chain = "0.12.1" -tokio-timer = "0.2.11" dirs = "2.0.2" -tokio-io = "0.1.12" smallvec = "0.6.11" fnv = "1.0.6" unsigned-varint = "0.2.3" -bytes = "0.4.12" -tokio-io-timeout = "0.3.1" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } +tokio-io-timeout = "0.3.1" + +[dev-dependencies] +slog-stdlog = "4.0.0" +slog-term = "2.4.2" +slog-async = "2.3.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 6a97ac9e4..0b5118199 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -82,6 +82,10 @@ impl Behaviour { pub fn discovery(&self) -> &Discovery { &self.discovery } + + pub fn gs(&self) -> &Gossipsub { + &self.gossipsub + } } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour @@ -103,7 +107,10 @@ impl NetworkBehaviourEventProcess {} + GossipsubEvent::Subscribed { peer_id, topic } => { + self.events + .push(BehaviourEvent::PeerSubscribed(peer_id, topic)); + } GossipsubEvent::Unsubscribed { .. } => {} } } @@ -196,6 +203,11 @@ impl Behaviour { self.gossipsub.subscribe(topic) } + /// Unsubscribe from a gossipsub topic. + pub fn unsubscribe(&mut self, topic: Topic) -> bool { + self.gossipsub.unsubscribe(topic) + } + /// Publishes a message on the pubsub (gossipsub) behaviour. pub fn publish(&mut self, topics: &[Topic], message: PubsubMessage) { let message_data = message.to_data(); @@ -219,10 +231,16 @@ impl Behaviour { } /* Discovery / Peer management functions */ + /// Return the list of currently connected peers. pub fn connected_peers(&self) -> usize { self.discovery.connected_peers() } + /// Notify discovery that the peer has been banned. + pub fn peer_banned(&mut self, peer_id: PeerId) { + self.discovery.peer_banned(peer_id); + } + /// Informs the discovery behaviour if a new IP/Port is set at the application layer pub fn update_local_enr_socket(&mut self, socket: std::net::SocketAddr, is_tcp: bool) { self.discovery.update_local_enr(socket, is_tcp); @@ -248,6 +266,8 @@ pub enum BehaviourEvent { /// The message itself. message: PubsubMessage, }, + /// Subscribed to peer for given topic + PeerSubscribed(PeerId, TopicHash), } /// Messages that are passed to and from the pubsub (Gossipsub) behaviour. These are encoded and diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index cacad9c20..39391e5fd 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -61,6 +61,11 @@ pub struct Config { /// List of extra topics to initially subscribe to as strings. pub topics: Vec, + + /// Introduces randomization in network propagation of messages. This should only be set for + /// testing purposes and will likely be removed in future versions. + // TODO: Remove this functionality for mainnet + pub propagation_percentage: Option, } impl Default for Config { @@ -88,6 +93,7 @@ impl Default for Config { libp2p_nodes: vec![], client_version: version::version(), topics: Vec::new(), + propagation_percentage: None, } } } diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 8ad58300a..ccbc99623 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -18,7 +18,7 @@ use std::path::Path; use std::str::FromStr; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_timer::Delay; +use tokio::timer::Delay; /// Maximum seconds before searching for extra peers. const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 60; @@ -33,6 +33,9 @@ pub struct Discovery { /// The peers currently connected to libp2p streams. connected_peers: HashSet, + /// The currently banned peers. + banned_peers: HashSet, + /// The target number of connected peers on the libp2p interface. max_peers: usize, @@ -77,7 +80,10 @@ impl Discovery { info!(log, "ENR Initialised"; "ENR" => local_enr.to_base64(), "Seq" => local_enr.seq()); debug!(log, "Discv5 Node ID Initialised"; "node_id" => format!("{}",local_enr.node_id())); - let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address) + // the last parameter enables IP limiting. 2 Nodes on the same /24 subnet per bucket and 10 + // nodes on the same /24 subnet per table. + // TODO: IP filtering is currently disabled for the DHT. Enable for production + let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address, false) .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; // Add bootnodes to routing table @@ -93,6 +99,7 @@ impl Discovery { Ok(Self { connected_peers: HashSet::new(), + banned_peers: HashSet::new(), max_peers: config.max_peers, peer_discovery_delay: Delay::new(Instant::now()), past_discovery_delay: INITIAL_SEARCH_DELAY, @@ -147,6 +154,14 @@ impl Discovery { &self.connected_peers } + /// The peer has been banned. Add this peer to the banned list to prevent any future + /// re-connections. + // TODO: Remove the peer from the DHT if present + // TODO: Implement a timeout, after which we unban the peer + pub fn peer_banned(&mut self, peer_id: PeerId) { + self.banned_peers.insert(peer_id); + } + /// Search for new peers using the underlying discovery mechanism. fn find_peers(&mut self) { // pick a random NodeId @@ -271,6 +286,7 @@ where // if we need more peers, attempt a connection if self.connected_peers.len() < self.max_peers && self.connected_peers.get(&peer_id).is_none() + && !self.banned_peers.contains(&peer_id) { debug!(self.log, "Peer discovered"; "peer_id"=> format!("{:?}", peer_id)); return Async::Ready(NetworkBehaviourAction::DialPeer { diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 4b4935818..a32050bc9 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -22,6 +22,7 @@ pub use libp2p::enr::Enr; pub use libp2p::gossipsub::{Topic, TopicHash}; pub use libp2p::multiaddr; pub use libp2p::Multiaddr; +pub use libp2p::{core::ConnectedPoint, swarm::NetworkBehaviour}; pub use libp2p::{ gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, PeerId, Swarm, diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 973567473..cc247911f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -1,8 +1,8 @@ //! This handles the various supported encoding mechanism for the Eth 2.0 RPC. use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; -use bytes::BufMut; -use bytes::BytesMut; +use libp2p::bytes::BufMut; +use libp2p::bytes::BytesMut; use tokio::codec::{Decoder, Encoder}; pub trait OutboundCodec: Encoder + Decoder { @@ -14,6 +14,9 @@ pub trait OutboundCodec: Encoder + Decoder { ) -> Result, ::Error>; } +/* Global Inbound Codec */ +// This deals with Decoding RPC Requests from other peers and encoding our responses + pub struct BaseInboundCodec where TCodec: Encoder + Decoder, @@ -31,15 +34,16 @@ where } } +/* Global Outbound Codec */ +// This deals with Decoding RPC Responses from other peers and encoding our requests pub struct BaseOutboundCodec where TOutboundCodec: OutboundCodec, { - /// Inner codec for handling various encodings + /// Inner codec for handling various encodings. inner: TOutboundCodec, - /// Optimisation for decoding. True if the response code has been read and we are awaiting a - /// response. - response_code: Option, + /// Keeps track of the current response code for a chunk. + current_response_code: Option, } impl BaseOutboundCodec @@ -49,11 +53,16 @@ where pub fn new(codec: TOutboundCodec) -> Self { BaseOutboundCodec { inner: codec, - response_code: None, + current_response_code: None, } } } +/* Implementation of the Encoding/Decoding for the global codecs */ + +/* Base Inbound Codec */ + +// This Encodes RPC Responses sent to external peers impl Encoder for BaseInboundCodec where TCodec: Decoder + Encoder, @@ -64,11 +73,15 @@ where fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.clear(); dst.reserve(1); - dst.put_u8(item.as_u8()); + dst.put_u8( + item.as_u8() + .expect("Should never encode a stream termination"), + ); self.inner.encode(item, dst) } } +// This Decodes RPC Requests from external peers impl Decoder for BaseInboundCodec where TCodec: Encoder + Decoder, @@ -81,6 +94,9 @@ where } } +/* Base Outbound Codec */ + +// This Encodes RPC Requests sent to external peers impl Encoder for BaseOutboundCodec where TCodec: OutboundCodec + Encoder, @@ -93,6 +109,7 @@ where } } +// This decodes RPC Responses received from external peers impl Decoder for BaseOutboundCodec where TCodec: OutboundCodec + Decoder, @@ -102,34 +119,36 @@ where fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { // if we have only received the response code, wait for more bytes - if src.len() == 1 { + if src.len() <= 1 { return Ok(None); } // using the response code determine which kind of payload needs to be decoded. - let response_code = { - if let Some(resp_code) = self.response_code { - resp_code - } else { - let resp_byte = src.split_to(1); - let mut resp_code_byte = [0; 1]; - resp_code_byte.copy_from_slice(&resp_byte); + let response_code = self.current_response_code.unwrap_or_else(|| { + let resp_code = src.split_to(1)[0]; + self.current_response_code = Some(resp_code); + resp_code + }); - let resp_code = u8::from_be_bytes(resp_code_byte); - self.response_code = Some(resp_code); - resp_code + let inner_result = { + if RPCErrorResponse::is_response(response_code) { + // decode an actual response and mutates the buffer if enough bytes have been read + // returning the result. + self.inner + .decode(src) + .map(|r| r.map(RPCErrorResponse::Success)) + } else { + // decode an error + self.inner + .decode_error(src) + .map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))) } }; - - if RPCErrorResponse::is_response(response_code) { - // decode an actual response - self.inner - .decode(src) - .map(|r| r.map(RPCErrorResponse::Success)) - } else { - // decode an error - self.inner - .decode_error(src) - .map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))) + // if the inner decoder was capable of decoding a chunk, we need to reset the current + // response code for the next chunk + if let Ok(Some(_)) = inner_result { + self.current_response_code = None; } + // return the result + inner_result } } diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index d0d1b650b..b9993d4b3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -5,7 +5,7 @@ use self::base::{BaseInboundCodec, BaseOutboundCodec}; use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; use crate::rpc::protocol::RPCError; use crate::rpc::{RPCErrorResponse, RPCRequest}; -use bytes::BytesMut; +use libp2p::bytes::BytesMut; use tokio::codec::{Decoder, Encoder}; // Known types of codecs diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index d0e4d01cf..c03a325eb 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -4,7 +4,7 @@ use crate::rpc::{ protocol::{ProtocolId, RPCError}, }; use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; -use bytes::{BufMut, Bytes, BytesMut}; +use libp2p::bytes::{BufMut, Bytes, BytesMut}; use ssz::{Decode, Encode}; use tokio::codec::{Decoder, Encoder}; use unsigned_varint::codec::UviBytes; @@ -31,7 +31,7 @@ impl SSZInboundCodec { } } -// Encoder for inbound +// Encoder for inbound streams: Encodes RPC Responses sent to peers. impl Encoder for SSZInboundCodec { type Item = RPCErrorResponse; type Error = RPCError; @@ -40,16 +40,19 @@ impl Encoder for SSZInboundCodec { let bytes = match item { RPCErrorResponse::Success(resp) => { match resp { - RPCResponse::Hello(res) => res.as_ssz_bytes(), - RPCResponse::BeaconBlocks(res) => res, // already raw bytes - RPCResponse::RecentBeaconBlocks(res) => res, // already raw bytes + RPCResponse::Status(res) => res.as_ssz_bytes(), + RPCResponse::BlocksByRange(res) => res, // already raw bytes + RPCResponse::BlocksByRoot(res) => res, // already raw bytes + RPCResponse::Goodbye => unreachable!("Never encode or decode this message"), } } RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), + RPCErrorResponse::StreamTermination(_) => { + unreachable!("Code error - attempting to encode a stream termination") + } }; - if !bytes.is_empty() { // length-prefix and return return self @@ -65,7 +68,7 @@ impl Encoder for SSZInboundCodec { } } -// Decoder for inbound +// Decoder for inbound streams: Decodes RPC requests from peers impl Decoder for SSZInboundCodec { type Item = RPCRequest; type Error = RPCError; @@ -73,8 +76,8 @@ impl Decoder for SSZInboundCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self.inner.decode(src).map_err(RPCError::from) { Ok(Some(packet)) => match self.protocol.message_name.as_str() { - "hello" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCRequest::Hello(HelloMessage::from_ssz_bytes( + "status" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( &packet, )?))), _ => unreachable!("Cannot negotiate an unknown version"), @@ -85,16 +88,16 @@ impl Decoder for SSZInboundCodec { )?))), _ => unreachable!("Cannot negotiate an unknown version"), }, - "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCRequest::BeaconBlocks( - BeaconBlocksRequest::from_ssz_bytes(&packet)?, + "blocks_by_range" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::BlocksByRange( + BlocksByRangeRequest::from_ssz_bytes(&packet)?, ))), _ => unreachable!("Cannot negotiate an unknown version"), }, - "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCRequest::RecentBeaconBlocks( - RecentBeaconBlocksRequest::from_ssz_bytes(&packet)?, - ))), + "blocks_by_root" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: Vec::from_ssz_bytes(&packet)?, + }))), _ => unreachable!("Cannot negotiate an unknown version"), }, _ => unreachable!("Cannot negotiate an unknown protocol"), @@ -105,7 +108,7 @@ impl Decoder for SSZInboundCodec { } } -/* Outbound Codec */ +/* Outbound Codec: Codec for initiating RPC requests */ pub struct SSZOutboundCodec { inner: UviBytes, @@ -127,26 +130,26 @@ impl SSZOutboundCodec { } } -// Encoder for outbound +// Encoder for outbound streams: Encodes RPC Requests to peers impl Encoder for SSZOutboundCodec { type Item = RPCRequest; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { - RPCRequest::Hello(req) => req.as_ssz_bytes(), + RPCRequest::Status(req) => req.as_ssz_bytes(), RPCRequest::Goodbye(req) => req.as_ssz_bytes(), - RPCRequest::BeaconBlocks(req) => req.as_ssz_bytes(), - RPCRequest::RecentBeaconBlocks(req) => req.as_ssz_bytes(), + RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(), + RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), }; // length-prefix self.inner - .encode(bytes::Bytes::from(bytes), dst) + .encode(libp2p::bytes::Bytes::from(bytes), dst) .map_err(RPCError::from) } } -// Decoder for outbound streams +// Decoder for outbound streams: Decodes RPC responses from peers. // // The majority of the decoding has now been pushed upstream due to the changing specification. // We prefer to decode blocks and attestations with extra knowledge about the chain to perform @@ -158,44 +161,53 @@ impl Decoder for SSZOutboundCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() == 1 && src[0] == 0_u8 { // the object is empty. We return the empty object if this is the case + // clear the buffer and return an empty object + src.clear(); match self.protocol.message_name.as_str() { - "hello" => match self.protocol.version.as_str() { + "status" => match self.protocol.version.as_str() { "1" => Err(RPCError::Custom( - "Hello stream terminated unexpectedly".into(), + "Status stream terminated unexpectedly".into(), )), // cannot have an empty HELLO message. The stream has terminated unexpectedly _ => unreachable!("Cannot negotiate an unknown version"), }, "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), - "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::BeaconBlocks(Vec::new()))), + "blocks_by_range" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BlocksByRange(Vec::new()))), _ => unreachable!("Cannot negotiate an unknown version"), }, - "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(Vec::new()))), + "blocks_by_root" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BlocksByRoot(Vec::new()))), _ => unreachable!("Cannot negotiate an unknown version"), }, _ => unreachable!("Cannot negotiate an unknown protocol"), } } else { match self.inner.decode(src).map_err(RPCError::from) { - Ok(Some(packet)) => match self.protocol.message_name.as_str() { - "hello" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::Hello(HelloMessage::from_ssz_bytes( - &packet, - )?))), - _ => unreachable!("Cannot negotiate an unknown version"), - }, - "goodbye" => Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")), - "beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::BeaconBlocks(packet.to_vec()))), - _ => unreachable!("Cannot negotiate an unknown version"), - }, - "recent_beacon_blocks" => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCResponse::RecentBeaconBlocks(packet.to_vec()))), - _ => unreachable!("Cannot negotiate an unknown version"), - }, - _ => unreachable!("Cannot negotiate an unknown protocol"), - }, + Ok(Some(mut packet)) => { + // take the bytes from the buffer + let raw_bytes = packet.take(); + + match self.protocol.message_name.as_str() { + "status" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::Status(StatusMessage::from_ssz_bytes( + &raw_bytes, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + "goodbye" => { + Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) + } + "blocks_by_range" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BlocksByRange(raw_bytes.to_vec()))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + "blocks_by_root" => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::BlocksByRoot(raw_bytes.to_vec()))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + _ => unreachable!("Cannot negotiate an unknown protocol"), + } + } Ok(None) => Ok(None), // waiting for more bytes Err(e) => Err(e), } diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 07322875f..83d2b2aae 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,4 +1,4 @@ -use super::methods::RequestId; +use super::methods::{RPCErrorResponse, RPCResponse, RequestId}; use super::protocol::{RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; @@ -9,13 +9,25 @@ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; +use slog::{crit, debug, error, trace, warn}; use smallvec::SmallVec; +use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::timer::{delay_queue, DelayQueue}; + +//TODO: Implement close() on the substream types to improve the poll code. +//TODO: Implement check_timeout() on the substream types /// The time (in seconds) before a substream that is awaiting a response from the user times out. pub const RESPONSE_TIMEOUT: u64 = 10; +/// Inbound requests are given a sequential `RequestId` to keep track of. +type InboundRequestId = RequestId; +/// Outbound requests are associated with an id that is given by the application that sent the +/// request. +type OutboundRequestId = RequestId; + /// Implementation of `ProtocolsHandler` for the RPC protocol. pub struct RPCHandler where @@ -36,11 +48,23 @@ where /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, - /// Map of current substreams awaiting a response to an RPC request. - waiting_substreams: FnvHashMap>, + /// Current inbound substreams awaiting processing. + inbound_substreams: + FnvHashMap, delay_queue::Key)>, - /// List of outbound substreams that need to be driven to completion. - substreams: Vec>, + /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. + inbound_substreams_delay: DelayQueue, + + /// Map of outbound substreams that need to be driven to completion. The `RequestId` is + /// maintained by the application sending the request. + outbound_substreams: + FnvHashMap, delay_queue::Key)>, + + /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. + outbound_substreams_delay: DelayQueue, + + /// Map of outbound items that are queued as the stream processes them. + queued_outbound_items: FnvHashMap>, /// Sequential Id for waiting substreams. current_substream_id: RequestId, @@ -54,38 +78,49 @@ where /// After the given duration has elapsed, an inactive connection will shutdown. inactive_timeout: Duration, + /// Logger for handling RPC streams + log: slog::Logger, + /// Marker to pin the generic stream. _phantom: PhantomData, } -/// An outbound substream is waiting a response from the user. -struct WaitingResponse { - /// The framed negotiated substream. - substream: InboundFramed, - /// The time when the substream is closed. - timeout: Instant, -} - /// State of an outbound substream. Either waiting for a response, or in the process of sending. -pub enum SubstreamState +pub enum InboundSubstreamState where TSubstream: AsyncRead + AsyncWrite, { /// A response has been sent, pending writing and flush. ResponsePendingSend { + /// The substream used to send the response substream: futures::sink::Send>, + /// Whether a stream termination is requested. If true the stream will be closed after + /// this send. Otherwise it will transition to an idle state until a stream termination is + /// requested or a timeout is reached. + closing: bool, }, + /// The response stream is idle and awaiting input from the application to send more chunked + /// responses. + ResponseIdle(InboundFramed), + /// The substream is attempting to shutdown. + Closing(InboundFramed), + /// Temporary state during processing + Poisoned, +} + +pub enum OutboundSubstreamState { /// A request has been sent, and we are awaiting a response. This future is driven in the /// handler because GOODBYE requests can be handled and responses dropped instantly. RequestPendingResponse { /// The framed negotiated substream. substream: OutboundFramed, - /// Keeps track of the request id and the request to permit forming advanced responses which require - /// data from the request. - rpc_event: RPCEvent, - /// The time when the substream is closed. - timeout: Instant, + /// Keeps track of the actual request sent. + request: RPCRequest, }, + /// Closing an outbound substream> + Closing(OutboundFramed), + /// Temporary state during processing + Poisoned, } impl RPCHandler @@ -95,6 +130,7 @@ where pub fn new( listen_protocol: SubstreamProtocol, inactive_timeout: Duration, + log: &slog::Logger, ) -> Self { RPCHandler { listen_protocol, @@ -102,12 +138,16 @@ where events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, - waiting_substreams: FnvHashMap::default(), - substreams: Vec::new(), + queued_outbound_items: FnvHashMap::default(), + inbound_substreams: FnvHashMap::default(), + outbound_substreams: FnvHashMap::default(), + inbound_substreams_delay: DelayQueue::new(), + outbound_substreams_delay: DelayQueue::new(), current_substream_id: 1, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, inactive_timeout, + log: log.clone(), _phantom: PhantomData, } } @@ -142,15 +182,6 @@ where } } -impl Default for RPCHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - fn default() -> Self { - RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) - } -} - impl ProtocolsHandler for RPCHandler where TSubstream: AsyncRead + AsyncWrite, @@ -177,16 +208,18 @@ where // drop the stream and return a 0 id for goodbye "requests" if let r @ RPCRequest::Goodbye(_) = req { self.events_out.push(RPCEvent::Request(0, r)); + warn!(self.log, "Goodbye Received"); return; } // New inbound request. Store the stream and tag the output. - let awaiting_stream = WaitingResponse { - substream, - timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), - }; - self.waiting_substreams - .insert(self.current_substream_id, awaiting_stream); + let delay_key = self.inbound_substreams_delay.insert( + self.current_substream_id, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + let awaiting_stream = InboundSubstreamState::ResponseIdle(substream); + self.inbound_substreams + .insert(self.current_substream_id, (awaiting_stream, delay_key)); self.events_out .push(RPCEvent::Request(self.current_substream_id, req)); @@ -203,7 +236,7 @@ where if self.dial_negotiated == 0 && self.dial_queue.is_empty() - && self.waiting_substreams.is_empty() + && self.outbound_substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); } else { @@ -211,15 +244,28 @@ where } // add the stream to substreams if we expect a response, otherwise drop the stream. - if let RPCEvent::Request(id, req) = rpc_event { - if req.expect_response() { - let awaiting_stream = SubstreamState::RequestPendingResponse { + match rpc_event { + RPCEvent::Request(id, RPCRequest::Goodbye(_)) => { + // notify the application layer, that a goodbye has been sent, so the application can + // drop and remove the peer + self.events_out.push(RPCEvent::Response( + id, + RPCErrorResponse::Success(RPCResponse::Goodbye), + )); + } + RPCEvent::Request(id, request) if request.expect_response() => { + // new outbound request. Store the stream and tag the output. + let delay_key = self + .outbound_substreams_delay + .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); + let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { substream: out, - rpc_event: RPCEvent::Request(id, req), - timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), + request, }; - - self.substreams.push(awaiting_stream); + self.outbound_substreams + .insert(id, (awaiting_stream, delay_key)); + } + _ => { // a response is not expected, drop the stream for all other requests } } } @@ -230,15 +276,73 @@ where fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { RPCEvent::Request(_, _) => self.send_request(rpc_event), - RPCEvent::Response(rpc_id, res) => { + RPCEvent::Response(rpc_id, response) => { // check if the stream matching the response still exists - if let Some(waiting_stream) = self.waiting_substreams.remove(&rpc_id) { - // only send one response per stream. This must be in the waiting state. - self.substreams.push(SubstreamState::ResponsePendingSend { - substream: waiting_stream.substream.send(res), - }); - } + trace!(self.log, "Checking for outbound stream"); + + // variables indicating if the response is an error response or a multi-part + // response + let res_is_error = response.is_error(); + let res_is_multiple = response.multiple_responses(); + + match self.inbound_substreams.get_mut(&rpc_id) { + Some((substream_state, _)) => { + match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { + InboundSubstreamState::ResponseIdle(substream) => { + trace!(self.log, "Stream is idle, sending message"; "message" => format!("{}", response)); + // close the stream if there is no response + if let RPCErrorResponse::StreamTermination(_) = response { + trace!(self.log, "Stream termination sent. Ending the stream"); + *substream_state = InboundSubstreamState::Closing(substream); + } else { + // send the response + // if it's a single rpc request or an error, close the stream after + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream: substream.send(response), + closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses + }; + } + } + InboundSubstreamState::ResponsePendingSend { substream, closing } + if res_is_multiple => + { + // the stream is in use, add the request to a pending queue + trace!(self.log, "Adding message to queue"; "message" => format!("{}", response)); + (*self + .queued_outbound_items + .entry(rpc_id) + .or_insert_with(Vec::new)) + .push(response); + + // return the state + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + closing, + }; + } + InboundSubstreamState::Closing(substream) => { + *substream_state = InboundSubstreamState::Closing(substream); + debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response)); + } + InboundSubstreamState::ResponsePendingSend { substream, .. } => { + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + closing: true, + }; + error!(self.log, "Attempted sending multiple responses to a single response request"); + } + InboundSubstreamState::Poisoned => { + crit!(self.log, "Poisoned inbound substream"); + unreachable!("Coding error: Poisoned substream"); + } + } + } + None => { + debug!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response)); + } + }; } + // We do not send errors as responses RPCEvent::Error(_, _) => {} } } @@ -271,8 +375,7 @@ where // Returning an error here will result in dropping any peer that doesn't support any of // the RPC protocols. For our immediate purposes we permit this and simply log that an // upgrade was not supported. - // TODO: Add a logger to the handler for trace output. - dbg!(&err); + warn!(self.log,"RPC Protocol was not supported"; "Error" => format!("{}", err)); } // return any events that need to be reported @@ -284,64 +387,212 @@ where self.events_out.shrink_to_fit(); } - // remove any streams that have expired - self.waiting_substreams - .retain(|_k, waiting_stream| Instant::now() <= waiting_stream.timeout); + // purge expired inbound substreams + while let Async::Ready(Some(stream_id)) = self + .inbound_substreams_delay + .poll() + .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? + { + trace!(self.log, "Closing expired inbound stream"); + self.inbound_substreams.remove(stream_id.get_ref()); + } - // drive streams that need to be processed - for n in (0..self.substreams.len()).rev() { - let stream = self.substreams.swap_remove(n); - match stream { - SubstreamState::ResponsePendingSend { mut substream } => { - match substream.poll() { - Ok(Async::Ready(_substream)) => {} // sent and flushed - Ok(Async::NotReady) => { - self.substreams - .push(SubstreamState::ResponsePendingSend { substream }); - } - Err(e) => { - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(0, e), - ))) + // purge expired outbound substreams + while let Async::Ready(Some(stream_id)) = self + .outbound_substreams_delay + .poll() + .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? + { + trace!(self.log, "Closing expired outbound stream"); + self.outbound_substreams.remove(stream_id.get_ref()); + } + + // drive inbound streams that need to be processed + for request_id in self.inbound_substreams.keys().copied().collect::>() { + // Drain all queued items until all messages have been processed for this stream + // TODO Improve this code logic + let mut new_items_to_send = true; + while new_items_to_send == true { + new_items_to_send = false; + match self.inbound_substreams.entry(request_id) { + Entry::Occupied(mut entry) => { + match std::mem::replace( + &mut entry.get_mut().0, + InboundSubstreamState::Poisoned, + ) { + InboundSubstreamState::ResponsePendingSend { + mut substream, + closing, + } => { + match substream.poll() { + Ok(Async::Ready(raw_substream)) => { + // completed the send + trace!(self.log, "RPC message sent"); + + // close the stream if required + if closing { + entry.get_mut().0 = + InboundSubstreamState::Closing(raw_substream) + } else { + // check for queued chunks and update the stream + trace!(self.log, "Checking for queued items"); + entry.get_mut().0 = apply_queued_responses( + raw_substream, + &mut self + .queued_outbound_items + .get_mut(&request_id), + &mut new_items_to_send, + ); + } + } + Ok(Async::NotReady) => { + entry.get_mut().0 = + InboundSubstreamState::ResponsePendingSend { + substream, + closing, + }; + } + Err(e) => { + let delay_key = &entry.get().1; + self.inbound_substreams_delay.remove(delay_key); + entry.remove_entry(); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(0, e), + ))); + } + }; + } + InboundSubstreamState::ResponseIdle(substream) => { + trace!(self.log, "Idle stream searching queue"); + entry.get_mut().0 = apply_queued_responses( + substream, + &mut self.queued_outbound_items.get_mut(&request_id), + &mut new_items_to_send, + ); + } + InboundSubstreamState::Closing(mut substream) => { + match substream.close() { + Ok(Async::Ready(())) | Err(_) => { + trace!(self.log, "Inbound stream dropped"); + let delay_key = &entry.get().1; + self.queued_outbound_items.remove(&request_id); + self.inbound_substreams_delay.remove(delay_key); + entry.remove(); + } // drop the stream + Ok(Async::NotReady) => { + entry.get_mut().0 = + InboundSubstreamState::Closing(substream); + } + } + } + InboundSubstreamState::Poisoned => { + crit!(self.log, "Poisoned outbound substream"); + unreachable!("Coding Error: Inbound Substream is poisoned"); + } + }; + } + Entry::Vacant(_) => unreachable!(), + } + } + } + + // drive outbound streams that need to be processed + for request_id in self.outbound_substreams.keys().copied().collect::>() { + match self.outbound_substreams.entry(request_id) { + Entry::Occupied(mut entry) => { + match std::mem::replace( + &mut entry.get_mut().0, + OutboundSubstreamState::Poisoned, + ) { + OutboundSubstreamState::RequestPendingResponse { + mut substream, + request, + } => match substream.poll() { + Ok(Async::Ready(Some(response))) => { + trace!(self.log, "Message received"; "message" => format!("{}", response)); + if request.multiple_responses() { + entry.get_mut().0 = + OutboundSubstreamState::RequestPendingResponse { + substream, + request: request, + }; + let delay_key = &entry.get().1; + self.outbound_substreams_delay + .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); + } else { + trace!(self.log, "Closing single stream request"); + // only expect a single response, close the stream + entry.get_mut().0 = OutboundSubstreamState::Closing(substream); + } + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Response(request_id, response), + ))); + } + Ok(Async::Ready(None)) => { + // stream closed + // if we expected multiple streams send a stream termination, + // else report the stream terminating only. + trace!(self.log, "RPC Response - stream closed by remote"); + // drop the stream + let delay_key = &entry.get().1; + self.outbound_substreams_delay.remove(delay_key); + entry.remove_entry(); + // notify the application error + if request.multiple_responses() { + // return an end of stream result + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Response( + request_id, + RPCErrorResponse::StreamTermination( + request.stream_termination(), + ), + ), + ))); + } // else we return an error, stream should not have closed early. + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error( + request_id, + RPCError::Custom( + "Stream closed early. Empty response".into(), + ), + ), + ))); + } + Ok(Async::NotReady) => { + entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse { + substream, + request, + } + } + Err(e) => { + // drop the stream + let delay_key = &entry.get().1; + self.outbound_substreams_delay.remove(delay_key); + entry.remove_entry(); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(request_id, e), + ))); + } + }, + OutboundSubstreamState::Closing(mut substream) => match substream.close() { + Ok(Async::Ready(())) | Err(_) => { + trace!(self.log, "Outbound stream dropped"); + // drop the stream + let delay_key = &entry.get().1; + self.outbound_substreams_delay.remove(delay_key); + entry.remove_entry(); + } + Ok(Async::NotReady) => { + entry.get_mut().0 = OutboundSubstreamState::Closing(substream); + } + }, + OutboundSubstreamState::Poisoned => { + crit!(self.log, "Poisoned outbound substream"); + unreachable!("Coding Error: Outbound substream is poisoned") } } } - SubstreamState::RequestPendingResponse { - mut substream, - rpc_event, - timeout, - } => match substream.poll() { - Ok(Async::Ready(response)) => { - if let Some(response) = response { - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Response(rpc_event.id(), response), - ))); - } else { - // stream closed early or nothing was sent - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - rpc_event.id(), - RPCError::Custom("Stream closed early. Empty response".into()), - ), - ))); - } - } - Ok(Async::NotReady) => { - if Instant::now() < timeout { - self.substreams - .push(SubstreamState::RequestPendingResponse { - substream, - rpc_event, - timeout, - }); - } - } - Err(e) => { - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(rpc_event.id(), e), - ))) - } - }, + Entry::Vacant(_) => unreachable!(), } } @@ -365,3 +616,31 @@ where Ok(Async::NotReady) } } + +// Check for new items to send to the peer and update the underlying stream +fn apply_queued_responses( + raw_substream: InboundFramed, + queued_outbound_items: &mut Option<&mut Vec>, + new_items_to_send: &mut bool, +) -> InboundSubstreamState { + match queued_outbound_items { + Some(ref mut queue) if !queue.is_empty() => { + *new_items_to_send = true; + // we have queued items + match queue.remove(0) { + RPCErrorResponse::StreamTermination(_) => { + // close the stream if this is a stream termination + InboundSubstreamState::Closing(raw_substream) + } + chunk => InboundSubstreamState::ResponsePendingSend { + substream: raw_substream.send(chunk), + closing: false, + }, + } + } + _ => { + // no items queued set to idle + InboundSubstreamState::ResponseIdle(raw_substream) + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index ee8ad4860..75f9ddb28 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -1,4 +1,4 @@ -//!Available RPC methods types and ids. +//! Available RPC methods types and ids. use ssz_derive::{Decode, Encode}; use types::{Epoch, Hash256, Slot}; @@ -9,9 +9,9 @@ use types::{Epoch, Hash256, Slot}; pub type RequestId = usize; -/// The HELLO request/response handshake message. -#[derive(Encode, Decode, Clone, Debug)] -pub struct HelloMessage { +/// The STATUS request/response handshake message. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct StatusMessage { /// The fork version of the chain we are broadcasting. pub fork_version: [u8; 4], @@ -33,7 +33,7 @@ pub struct HelloMessage { /// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`, /// however `GoodbyeReason::Unknown.into()` will go into `0_u64`. Therefore de-serializing then /// re-serializing may not return the same bytes. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum GoodbyeReason { /// This node has shutdown. ClientShutdown = 1, @@ -100,7 +100,7 @@ impl ssz::Decode for GoodbyeReason { /// Request a number of beacon block roots from a peer. #[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct BeaconBlocksRequest { +pub struct BlocksByRangeRequest { /// The hash tree root of a block on the requested chain. pub head_block_root: Hash256, @@ -119,8 +119,8 @@ pub struct BeaconBlocksRequest { } /// Request a number of beacon block bodies from a peer. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] -pub struct RecentBeaconBlocksRequest { +#[derive(Clone, Debug, PartialEq)] +pub struct BlocksByRootRequest { /// The list of beacon block bodies being requested. pub block_roots: Vec, } @@ -128,32 +128,59 @@ pub struct RecentBeaconBlocksRequest { /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum RPCResponse { /// A HELLO message. - Hello(HelloMessage), - /// A response to a get BEACON_BLOCKS request. - BeaconBlocks(Vec), - /// A response to a get RECENT_BEACON_BLOCKS request. - RecentBeaconBlocks(Vec), + Status(StatusMessage), + + /// A response to a get BLOCKS_BY_RANGE request. A None response signifies the end of the + /// batch. + BlocksByRange(Vec), + + /// A response to a get BLOCKS_BY_ROOT request. + BlocksByRoot(Vec), + + /// A Goodbye message has been sent + Goodbye, +} + +/// Indicates which response is being terminated by a stream termination response. +#[derive(Debug)] +pub enum ResponseTermination { + /// Blocks by range stream termination. + BlocksByRange, + + /// Blocks by root stream termination. + BlocksByRoot, } #[derive(Debug)] pub enum RPCErrorResponse { + /// The response is a successful. Success(RPCResponse), + + /// The response was invalid. InvalidRequest(ErrorMessage), + + /// The response indicates a server error. ServerError(ErrorMessage), + + /// There was an unknown response. Unknown(ErrorMessage), + + /// Received a stream termination indicating which response is being terminated. + StreamTermination(ResponseTermination), } impl RPCErrorResponse { - /// Used to encode the response. - pub fn as_u8(&self) -> u8 { + /// Used to encode the response in the codec. + pub fn as_u8(&self) -> Option { match self { - RPCErrorResponse::Success(_) => 0, - RPCErrorResponse::InvalidRequest(_) => 1, - RPCErrorResponse::ServerError(_) => 2, - RPCErrorResponse::Unknown(_) => 255, + RPCErrorResponse::Success(_) => Some(0), + RPCErrorResponse::InvalidRequest(_) => Some(1), + RPCErrorResponse::ServerError(_) => Some(2), + RPCErrorResponse::Unknown(_) => Some(255), + RPCErrorResponse::StreamTermination(_) => None, } } @@ -173,6 +200,32 @@ impl RPCErrorResponse { _ => RPCErrorResponse::Unknown(err), } } + + /// Specifies which response allows for multiple chunks for the stream handler. + pub fn multiple_responses(&self) -> bool { + match self { + RPCErrorResponse::Success(resp) => match resp { + RPCResponse::Status(_) => false, + RPCResponse::BlocksByRange(_) => true, + RPCResponse::BlocksByRoot(_) => true, + RPCResponse::Goodbye => false, + }, + RPCErrorResponse::InvalidRequest(_) => true, + RPCErrorResponse::ServerError(_) => true, + RPCErrorResponse::Unknown(_) => true, + // Stream terminations are part of responses that have chunks + RPCErrorResponse::StreamTermination(_) => true, + } + } + + /// Returns true if this response is an error. Used to terminate the stream after an error is + /// sent. + pub fn is_error(&self) -> bool { + match self { + RPCErrorResponse::Success(_) => false, + _ => true, + } + } } #[derive(Encode, Decode, Debug)] @@ -187,20 +240,19 @@ impl ErrorMessage { } } -impl std::fmt::Display for HelloMessage { +impl std::fmt::Display for StatusMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Hello Message: Fork Version: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_version, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) + write!(f, "Status Message: Fork Version: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_version, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) } } impl std::fmt::Display for RPCResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RPCResponse::Hello(hello) => write!(f, "{}", hello), - RPCResponse::BeaconBlocks(data) => write!(f, ", len: {}", data.len()), - RPCResponse::RecentBeaconBlocks(data) => { - write!(f, ", len: {}", data.len()) - } + RPCResponse::Status(status) => write!(f, "{}", status), + RPCResponse::BlocksByRange(_) => write!(f, ""), + RPCResponse::BlocksByRoot(_) => write!(f, ""), + RPCResponse::Goodbye => write!(f, "Goodbye Sent"), } } } @@ -212,6 +264,7 @@ impl std::fmt::Display for RPCErrorResponse { RPCErrorResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), RPCErrorResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), RPCErrorResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), + RPCErrorResponse::StreamTermination(_) => write!(f, "Stream Termination"), } } } @@ -227,7 +280,7 @@ impl std::fmt::Display for GoodbyeReason { } } -impl std::fmt::Display for BeaconBlocksRequest { +impl std::fmt::Display for BlocksByRangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 2bbb6a05c..f467bc7ab 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -9,29 +9,32 @@ use handler::RPCHandler; use libp2p::core::ConnectedPoint; use libp2p::swarm::{ protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + SubstreamProtocol, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; +pub use methods::{ + ErrorMessage, RPCErrorResponse, RPCResponse, RequestId, ResponseTermination, StatusMessage, +}; pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::o; use std::marker::PhantomData; +use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; pub(crate) mod codec; mod handler; pub mod methods; mod protocol; -// mod request_response; /// The return type used in the behaviour and the resultant event from the protocols handler. #[derive(Debug)] pub enum RPCEvent { - /// A request that was received from the RPC protocol. The first parameter is a sequential + /// An inbound/outbound request for RPC protocol. The first parameter is a sequential /// id which tracks an awaiting substream for the response. Request(RequestId, RPCRequest), - - /// A response that has been received from the RPC protocol. The first parameter returns - /// that which was sent with the corresponding request. + /// A response that is being sent or has been received from the RPC protocol. The first parameter returns + /// that which was sent with the corresponding request, the second is a single chunk of a + /// response. Response(RequestId, RPCErrorResponse), /// An Error occurred. Error(RequestId, RPCError), @@ -50,9 +53,9 @@ impl RPCEvent { impl std::fmt::Display for RPCEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RPCEvent::Request(id, req) => write!(f, "RPC Request(Id: {}, {})", id, req), - RPCEvent::Response(id, res) => write!(f, "RPC Response(Id: {}, {})", id, res), - RPCEvent::Error(id, err) => write!(f, "RPC Request(Id: {}, Error: {:?})", id, err), + RPCEvent::Request(id, req) => write!(f, "RPC Request(id: {}, {})", id, req), + RPCEvent::Response(id, res) => write!(f, "RPC Response(id: {}, {})", id, res), + RPCEvent::Error(id, err) => write!(f, "RPC Request(id: {}, error: {:?})", id, err), } } } @@ -65,7 +68,7 @@ pub struct RPC { /// Pins the generic substream. marker: PhantomData<(TSubstream)>, /// Slog logger for RPC behaviour. - _log: slog::Logger, + log: slog::Logger, } impl RPC { @@ -74,7 +77,7 @@ impl RPC { RPC { events: Vec::new(), marker: PhantomData, - _log: log, + log: log, } } @@ -97,7 +100,11 @@ where type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { - Default::default() + RPCHandler::new( + SubstreamProtocol::new(RPCProtocol), + Duration::from_secs(30), + &self.log, + ) } // handled by discovery diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 401fa8b9e..5ba6ce9a7 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,8 +1,11 @@ use super::methods::*; -use crate::rpc::codec::{ - base::{BaseInboundCodec, BaseOutboundCodec}, - ssz::{SSZInboundCodec, SSZOutboundCodec}, - InboundCodec, OutboundCodec, +use crate::rpc::{ + codec::{ + base::{BaseInboundCodec, BaseOutboundCodec}, + ssz::{SSZInboundCodec, SSZOutboundCodec}, + InboundCodec, OutboundCodec, + }, + methods::ResponseTermination, }; use futures::{ future::{self, FutureResult}, @@ -37,10 +40,10 @@ impl UpgradeInfo for RPCProtocol { fn protocol_info(&self) -> Self::InfoIter { vec![ - ProtocolId::new("hello", "1", "ssz"), + ProtocolId::new("status", "1", "ssz"), ProtocolId::new("goodbye", "1", "ssz"), - ProtocolId::new("beacon_blocks", "1", "ssz"), - ProtocolId::new("recent_beacon_blocks", "1", "ssz"), + ProtocolId::new("blocks_by_range", "1", "ssz"), + ProtocolId::new("blocks_by_root", "1", "ssz"), ] } } @@ -145,12 +148,12 @@ where // Combines all the RPC requests into a single enum to implement `UpgradeInfo` and // `OutboundUpgrade` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum RPCRequest { - Hello(HelloMessage), + Status(StatusMessage), Goodbye(GoodbyeReason), - BeaconBlocks(BeaconBlocksRequest), - RecentBeaconBlocks(RecentBeaconBlocksRequest), + BlocksByRange(BlocksByRangeRequest), + BlocksByRoot(BlocksByRootRequest), } impl UpgradeInfo for RPCRequest { @@ -168,21 +171,47 @@ impl RPCRequest { pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Hello(_) => vec![ProtocolId::new("hello", "1", "ssz")], + RPCRequest::Status(_) => vec![ProtocolId::new("status", "1", "ssz")], RPCRequest::Goodbye(_) => vec![ProtocolId::new("goodbye", "1", "ssz")], - RPCRequest::BeaconBlocks(_) => vec![ProtocolId::new("beacon_blocks", "1", "ssz")], - RPCRequest::RecentBeaconBlocks(_) => { - vec![ProtocolId::new("recent_beacon_blocks", "1", "ssz")] - } + RPCRequest::BlocksByRange(_) => vec![ProtocolId::new("blocks_by_range", "1", "ssz")], + RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new("blocks_by_root", "1", "ssz")], } } + /* These functions are used in the handler for stream management */ + /// This specifies whether a stream should remain open and await a response, given a request. /// A GOODBYE request has no response. pub fn expect_response(&self) -> bool { match self { + RPCRequest::Status(_) => true, RPCRequest::Goodbye(_) => false, - _ => true, + RPCRequest::BlocksByRange(_) => true, + RPCRequest::BlocksByRoot(_) => true, + } + } + + /// Returns which methods expect multiple responses from the stream. If this is false and + /// the stream terminates, an error is given. + pub fn multiple_responses(&self) -> bool { + match self { + RPCRequest::Status(_) => false, + RPCRequest::Goodbye(_) => false, + RPCRequest::BlocksByRange(_) => true, + RPCRequest::BlocksByRoot(_) => true, + } + } + + /// Returns the `ResponseTermination` type associated with the request if a stream gets + /// terminated. + pub fn stream_termination(&self) -> ResponseTermination { + match self { + // this only gets called after `multiple_responses()` returns true. Therefore, only + // variants that have `multiple_responses()` can have values. + RPCRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, + RPCRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + RPCRequest::Status(_) => unreachable!(), + RPCRequest::Goodbye(_) => unreachable!(), } } } @@ -229,6 +258,8 @@ pub enum RPCError { IoError(io::Error), /// Waiting for a request/response timed out, or timer error'd. StreamTimeout, + /// The peer returned a valid RPCErrorResponse but the response was an error. + RPCErrorResponse, /// Custom message. Custom(String), } @@ -256,6 +287,12 @@ impl From> for RPCError { } } +impl From<()> for RPCError { + fn from(_err: ()) -> Self { + RPCError::Custom("".into()) + } +} + impl From for RPCError { fn from(err: io::Error) -> Self { RPCError::IoError(err) @@ -270,6 +307,7 @@ impl std::fmt::Display for RPCError { RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), RPCError::InvalidProtocol(ref err) => write!(f, "Invalid Protocol: {}", err), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), + RPCError::RPCErrorResponse => write!(f, "RPC Response Error"), RPCError::StreamTimeout => write!(f, "Stream Timeout"), RPCError::Custom(ref err) => write!(f, "{}", err), } @@ -284,6 +322,7 @@ impl std::error::Error for RPCError { RPCError::InvalidProtocol(_) => None, RPCError::IoError(ref err) => Some(err), RPCError::StreamTimeout => None, + RPCError::RPCErrorResponse => None, RPCError::Custom(_) => None, } } @@ -292,10 +331,10 @@ impl std::error::Error for RPCError { impl std::fmt::Display for RPCRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RPCRequest::Hello(hello) => write!(f, "Hello Message: {}", hello), + RPCRequest::Status(status) => write!(f, "Status Message: {}", status), RPCRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), - RPCRequest::BeaconBlocks(req) => write!(f, "Beacon Blocks: {}", req), - RPCRequest::RecentBeaconBlocks(req) => write!(f, "Recent Beacon Blocks: {:?}", req), + RPCRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), + RPCRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index b7ede9dd8..1ea4701e5 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -28,10 +28,13 @@ pub struct Service { /// The libp2p Swarm handler. //TODO: Make this private pub swarm: Swarm, + /// This node's PeerId. pub local_peer_id: PeerId, + /// Indicates if the listening address have been verified and compared to the expected ENR. pub verified_listen_address: bool, + /// The libp2p logger handle. pub log: slog::Logger, } @@ -190,6 +193,11 @@ impl Stream for Service { BehaviourEvent::PeerDisconnected(peer_id) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDisconnected(peer_id)))); } + BehaviourEvent::PeerSubscribed(peer_id, topic) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerSubscribed( + peer_id, topic, + )))); + } }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => { @@ -202,6 +210,7 @@ impl Stream for Service { } } } + break; } _ => break, @@ -276,6 +285,8 @@ pub enum Libp2pEvent { topics: Vec, message: PubsubMessage, }, + /// Subscribed to peer for a topic hash. + PeerSubscribed(PeerId, TopicHash), } fn keypair_from_hex(hex_bytes: &str) -> error::Result { diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2-libp2p/tests/common/mod.rs new file mode 100644 index 000000000..a4fa7db59 --- /dev/null +++ b/beacon_node/eth2-libp2p/tests/common/mod.rs @@ -0,0 +1,116 @@ +#![cfg(test)] +use enr::Enr; +use eth2_libp2p::Multiaddr; +use eth2_libp2p::NetworkConfig; +use eth2_libp2p::Service as LibP2PService; +use slog::{debug, error, o, Drain}; +use std::time::Duration; + +pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } +} + +pub fn build_config( + port: u16, + mut boot_nodes: Vec, + secret_key: Option, +) -> NetworkConfig { + let mut config = NetworkConfig::default(); + config.libp2p_port = port; // tcp port + config.discovery_port = port; // udp port + config.boot_nodes.append(&mut boot_nodes); + config.secret_key_hex = secret_key; + config.network_dir.push(port.to_string()); + // Reduce gossipsub heartbeat parameters + config.gs_config.heartbeat_initial_delay = Duration::from_millis(500); + config.gs_config.heartbeat_interval = Duration::from_millis(500); + config +} + +pub fn build_libp2p_instance( + port: u16, + boot_nodes: Vec, + secret_key: Option, + log: slog::Logger, +) -> LibP2PService { + let config = build_config(port, boot_nodes, secret_key); + // launch libp2p service + let libp2p_service = LibP2PService::new(config.clone(), log.clone()).unwrap(); + libp2p_service +} + +#[allow(dead_code)] +pub fn get_enr(node: &LibP2PService) -> Enr { + node.swarm.discovery().local_enr().clone() +} + +// Returns `n` libp2p peers in fully connected topology. +#[allow(dead_code)] +pub fn build_full_mesh(log: slog::Logger, n: usize, start_port: Option) -> Vec { + let base_port = start_port.unwrap_or(9000); + let mut nodes: Vec = (base_port..base_port + n as u16) + .map(|p| build_libp2p_instance(p, vec![], None, log.clone())) + .collect(); + let multiaddrs: Vec = nodes + .iter() + .map(|x| get_enr(&x).multiaddr()[1].clone()) + .collect(); + + for i in 0..n { + for j in i..n { + if i != j { + match libp2p::Swarm::dial_addr(&mut nodes[i].swarm, multiaddrs[j].clone()) { + Ok(()) => debug!(log, "Connected"), + Err(_) => error!(log, "Failed to connect"), + }; + } + } + } + nodes +} + +// Constructs a pair of nodes with seperate loggers. The sender dials the receiver. +// This returns a (sender, receiver) pair. +#[allow(dead_code)] +pub fn build_node_pair(log: &slog::Logger, start_port: u16) -> (LibP2PService, LibP2PService) { + let sender_log = log.new(o!("who" => "sender")); + let receiver_log = log.new(o!("who" => "receiver")); + + let mut sender = build_libp2p_instance(start_port, vec![], None, sender_log); + let receiver = build_libp2p_instance(start_port + 1, vec![], None, receiver_log); + + let receiver_multiaddr = receiver.swarm.discovery().local_enr().clone().multiaddr()[1].clone(); + match libp2p::Swarm::dial_addr(&mut sender.swarm, receiver_multiaddr) { + Ok(()) => debug!(log, "Sender dialed receiver"), + Err(_) => error!(log, "Dialing failed"), + }; + (sender, receiver) +} + +// Returns `n` peers in a linear topology +#[allow(dead_code)] +pub fn build_linear(log: slog::Logger, n: usize, start_port: Option) -> Vec { + let base_port = start_port.unwrap_or(9000); + let mut nodes: Vec = (base_port..base_port + n as u16) + .map(|p| build_libp2p_instance(p, vec![], None, log.clone())) + .collect(); + let multiaddrs: Vec = nodes + .iter() + .map(|x| get_enr(&x).multiaddr()[1].clone()) + .collect(); + for i in 0..n - 1 { + match libp2p::Swarm::dial_addr(&mut nodes[i].swarm, multiaddrs[i + 1].clone()) { + Ok(()) => debug!(log, "Connected"), + Err(_) => error!(log, "Failed to connect"), + }; + } + nodes +} diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs new file mode 100644 index 000000000..affb7899b --- /dev/null +++ b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs @@ -0,0 +1,138 @@ +#![cfg(test)] +use eth2_libp2p::*; +use futures::prelude::*; +use slog::{debug, Level}; + +mod common; + +/* Gossipsub tests */ +// Note: The aim of these tests is not to test the robustness of the gossip network +// but to check if the gossipsub implementation is behaving according to the specifications. + +// Test if gossipsub message are forwarded by nodes with a simple linear topology. +// +// Topology used in test +// +// node1 <-> node2 <-> node3 ..... <-> node(n-1) <-> node(n) + +#[test] +fn test_gossipsub_forward() { + // set up the logging. The level and enabled or not + let log = common::build_log(Level::Info, false); + + let num_nodes = 20; + let mut nodes = common::build_linear(log.clone(), num_nodes, Some(19000)); + let mut received_count = 0; + let pubsub_message = PubsubMessage::Block(vec![0; 4]); + let publishing_topic: String = "/eth2/beacon_block/ssz".into(); + let mut subscribed_count = 0; + tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + for node in nodes.iter_mut() { + loop { + match node.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PubsubMessage { + topics, + message, + source, + id, + })) => { + assert_eq!(topics.len(), 1); + // Assert topic is the published topic + assert_eq!( + topics.first().unwrap(), + &TopicHash::from_raw(publishing_topic.clone()) + ); + // Assert message received is the correct one + assert_eq!(message, pubsub_message.clone()); + received_count += 1; + // Since `propagate_message` is false, need to propagate manually + node.swarm.propagate_message(&source, id); + // Test should succeed if all nodes except the publisher receive the message + if received_count == num_nodes - 1 { + debug!(log.clone(), "Received message at {} nodes", num_nodes - 1); + return Ok(Async::Ready(())); + } + } + Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) => { + // Received topics is one of subscribed eth2 topics + assert!(topic.clone().into_string().starts_with("/eth2/")); + // Publish on beacon block topic + if topic == TopicHash::from_raw("/eth2/beacon_block/ssz") { + subscribed_count += 1; + // Every node except the corner nodes are connected to 2 nodes. + if subscribed_count == (num_nodes * 2) - 2 { + node.swarm.publish( + &vec![Topic::new(topic.into_string())], + pubsub_message.clone(), + ); + } + } + } + _ => break, + } + } + } + Ok(Async::NotReady) + })) +} + +// Test publishing of a message with a full mesh for the topic +// Not very useful but this is the bare minimum functionality. +#[test] +fn test_gossipsub_full_mesh_publish() { + // set up the logging. The level and enabled or not + let log = common::build_log(Level::Info, false); + + let num_nodes = 20; + let mut nodes = common::build_full_mesh(log, num_nodes, None); + let mut publishing_node = nodes.pop().unwrap(); + let pubsub_message = PubsubMessage::Block(vec![0; 4]); + let publishing_topic: String = "/eth2/beacon_block/ssz".into(); + let mut subscribed_count = 0; + let mut received_count = 0; + tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + for node in nodes.iter_mut() { + loop { + match node.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PubsubMessage { + topics, message, .. + })) => { + assert_eq!(topics.len(), 1); + // Assert topic is the published topic + assert_eq!( + topics.first().unwrap(), + &TopicHash::from_raw(publishing_topic.clone()) + ); + // Assert message received is the correct one + assert_eq!(message, pubsub_message.clone()); + received_count += 1; + if received_count == num_nodes - 1 { + return Ok(Async::Ready(())); + } + } + _ => break, + } + } + } + loop { + match publishing_node.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) => { + // Received topics is one of subscribed eth2 topics + assert!(topic.clone().into_string().starts_with("/eth2/")); + // Publish on beacon block topic + if topic == TopicHash::from_raw("/eth2/beacon_block/ssz") { + subscribed_count += 1; + if subscribed_count == num_nodes - 1 { + publishing_node.swarm.publish( + &vec![Topic::new(topic.into_string())], + pubsub_message.clone(), + ); + } + } + } + _ => break, + } + } + Ok(Async::NotReady) + })) +} diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs new file mode 100644 index 000000000..91390b4e7 --- /dev/null +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -0,0 +1,575 @@ +#![cfg(test)] +use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::*; +use eth2_libp2p::{Libp2pEvent, RPCEvent}; +use slog::{warn, Level}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::prelude::*; +use types::{Epoch, Hash256, Slot}; + +mod common; + +#[test] +// Tests the STATUS RPC message +fn test_status_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log, 10500); + + // Dummy STATUS RPC message + let rpc_request = RPCRequest::Status(StatusMessage { + fork_version: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // Dummy STATUS RPC message + let rpc_response = RPCResponse::Status(StatusMessage { + fork_version: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + let sender_request = rpc_request.clone(); + let sender_log = log.clone(); + let sender_response = rpc_response.clone(); + + // build the sender future + let sender_future = future::poll_fn(move || -> Poll { + loop { + match sender.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + // Send a STATUS message + warn!(sender_log, "Sending RPC"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); + } + Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response @ RPCErrorResponse::Success(_)) => { + warn!(sender_log, "Sender Received"); + assert_eq!(id, 1); + + let response = { + match response { + RPCErrorResponse::Success(r) => r, + _ => unreachable!(), + } + }; + assert_eq!(response, sender_response.clone()); + + warn!(sender_log, "Sender Completed"); + return Ok(Async::Ready(true)); + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + }; + } + }); + + // build the receiver future + let receiver_future = future::poll_fn(move || -> Poll { + loop { + match receiver.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + // Should receive sent RPC request + RPCEvent::Request(id, request) => { + assert_eq!(id, 1); + assert_eq!(rpc_request.clone(), request); + + // send the response + warn!(log, "Receiver Received"); + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response(id, RPCErrorResponse::Success(rpc_response.clone())), + ); + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(Mutex::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + sender_future + .select(receiver_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| *error_result.lock().unwrap() = false) + .map(move |result| { + *thread_result.lock().unwrap() = result.0; + () + }), + ); + assert!(*test_result.lock().unwrap()); +} + +#[test] +// Tests a streamed BlocksByRange RPC Message +fn test_blocks_by_range_chunked_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let messages_to_send = 10; + + let log = common::build_log(log_level, enable_logging); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log, 10505); + + // BlocksByRange Request + let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { + head_block_root: Hash256::from_low_u64_be(0), + start_slot: 0, + count: messages_to_send, + step: 0, + }); + + // BlocksByRange Response + let rpc_response = RPCResponse::BlocksByRange(vec![13, 13, 13]); + + let sender_request = rpc_request.clone(); + let sender_log = log.clone(); + let sender_response = rpc_response.clone(); + + // keep count of the number of messages received + let messages_received = Arc::new(Mutex::new(0)); + // build the sender future + let sender_future = future::poll_fn(move || -> Poll { + loop { + match sender.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + // Send a BlocksByRange request + warn!(sender_log, "Sender sending RPC request"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); + } + Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response) => { + warn!(sender_log, "Sender received a response"); + assert_eq!(id, 1); + match response { + RPCErrorResponse::Success(res) => { + assert_eq!(res, sender_response.clone()); + *messages_received.lock().unwrap() += 1; + warn!(sender_log, "Chunk received"); + } + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ) => { + // should be exactly 10 messages before terminating + assert_eq!(*messages_received.lock().unwrap(), messages_to_send); + // end the test + return Ok(Async::Ready(true)); + } + _ => panic!("Invalid RPC received"), + } + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => {} + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + }; + } + }); + + // build the receiver future + let receiver_future = future::poll_fn(move || -> Poll { + loop { + match receiver.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + // Should receive the sent RPC request + RPCEvent::Request(id, request) => { + assert_eq!(id, 1); + assert_eq!(rpc_request.clone(), request); + + // send the response + warn!(log, "Receiver got request"); + + for _ in 1..=messages_to_send { + receiver.swarm.send_rpc( + peer_id.clone(), + RPCEvent::Response( + id, + RPCErrorResponse::Success(rpc_response.clone()), + ), + ); + } + // send the stream termination + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response( + id, + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ), + ), + ); + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(Mutex::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + sender_future + .select(receiver_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| *error_result.lock().unwrap() = false) + .map(move |result| { + *thread_result.lock().unwrap() = result.0; + () + }), + ); + assert!(*test_result.lock().unwrap()); +} + +#[test] +// Tests an empty response to a BlocksByRange RPC Message +fn test_blocks_by_range_single_empty_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log, 10510); + + // BlocksByRange Request + let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { + head_block_root: Hash256::from_low_u64_be(0), + start_slot: 0, + count: 10, + step: 0, + }); + + // BlocksByRange Response + let rpc_response = RPCResponse::BlocksByRange(vec![]); + + let sender_request = rpc_request.clone(); + let sender_log = log.clone(); + let sender_response = rpc_response.clone(); + + // keep count of the number of messages received + let messages_received = Arc::new(Mutex::new(0)); + // build the sender future + let sender_future = future::poll_fn(move || -> Poll { + loop { + match sender.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + // Send a BlocksByRange request + warn!(sender_log, "Sender sending RPC request"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); + } + Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response) => { + warn!(sender_log, "Sender received a response"); + assert_eq!(id, 1); + match response { + RPCErrorResponse::Success(res) => { + assert_eq!(res, sender_response.clone()); + *messages_received.lock().unwrap() += 1; + warn!(sender_log, "Chunk received"); + } + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ) => { + // should be exactly 1 messages before terminating + assert_eq!(*messages_received.lock().unwrap(), 1); + // end the test + return Ok(Async::Ready(true)); + } + _ => panic!("Invalid RPC received"), + } + } + m => panic!("Received invalid RPC message: {}", m), + }, + Async::Ready(Some(_)) => {} + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + }; + } + }); + + // build the receiver future + let receiver_future = future::poll_fn(move || -> Poll { + loop { + match receiver.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + // Should receive the sent RPC request + RPCEvent::Request(id, request) => { + assert_eq!(id, 1); + assert_eq!(rpc_request.clone(), request); + + // send the response + warn!(log, "Receiver got request"); + + receiver.swarm.send_rpc( + peer_id.clone(), + RPCEvent::Response(id, RPCErrorResponse::Success(rpc_response.clone())), + ); + // send the stream termination + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response( + id, + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ), + ), + ); + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(Mutex::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + sender_future + .select(receiver_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| *error_result.lock().unwrap() = false) + .map(move |result| { + *thread_result.lock().unwrap() = result.0; + () + }), + ); + assert!(*test_result.lock().unwrap()); +} + +#[test] +// Tests a streamed, chunked BlocksByRoot RPC Message +fn test_blocks_by_root_chunked_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let messages_to_send = 3; + + let log = common::build_log(log_level, enable_logging); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log, 10515); + + // BlocksByRoot Request + let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: vec![Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0)], + }); + + // BlocksByRoot Response + let rpc_response = RPCResponse::BlocksByRoot(vec![13, 13, 13]); + + let sender_request = rpc_request.clone(); + let sender_log = log.clone(); + let sender_response = rpc_response.clone(); + + // keep count of the number of messages received + let messages_received = Arc::new(Mutex::new(0)); + // build the sender future + let sender_future = future::poll_fn(move || -> Poll { + loop { + match sender.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + // Send a BlocksByRoot request + warn!(sender_log, "Sender sending RPC request"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); + } + Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + // Should receive the RPC response + RPCEvent::Response(id, response) => { + warn!(sender_log, "Sender received a response"); + assert_eq!(id, 1); + match response { + RPCErrorResponse::Success(res) => { + assert_eq!(res, sender_response.clone()); + *messages_received.lock().unwrap() += 1; + warn!(sender_log, "Chunk received"); + } + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRoot, + ) => { + // should be exactly 10 messages before terminating + assert_eq!(*messages_received.lock().unwrap(), messages_to_send); + // end the test + return Ok(Async::Ready(true)); + } + m => panic!("Invalid RPC received: {}", m), + } + } + m => panic!("Received invalid RPC message: {}", m), + }, + Async::Ready(Some(_)) => {} + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + }; + } + }); + + // build the receiver future + let receiver_future = future::poll_fn(move || -> Poll { + loop { + match receiver.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + // Should receive the sent RPC request + RPCEvent::Request(id, request) => { + assert_eq!(id, 1); + assert_eq!(rpc_request.clone(), request); + + // send the response + warn!(log, "Receiver got request"); + + for _ in 1..=messages_to_send { + receiver.swarm.send_rpc( + peer_id.clone(), + RPCEvent::Response( + id, + RPCErrorResponse::Success(rpc_response.clone()), + ), + ); + } + // send the stream termination + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response( + id, + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRoot, + ), + ), + ); + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(Mutex::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + sender_future + .select(receiver_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| *error_result.lock().unwrap() = false) + .map(move |result| { + *thread_result.lock().unwrap() = result.0; + () + }), + ); + assert!(*test_result.lock().unwrap()); +} + +#[test] +// Tests a Goodbye RPC message +fn test_goodbye_rpc() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Trace; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair(&log, 10520); + + // Goodbye Request + let rpc_request = RPCRequest::Goodbye(GoodbyeReason::ClientShutdown); + + let sender_request = rpc_request.clone(); + let sender_log = log.clone(); + + // build the sender future + let sender_future = future::poll_fn(move || -> Poll { + loop { + match sender.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + // Send a Goodbye request + warn!(sender_log, "Sender sending RPC request"); + sender + .swarm + .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); + } + Async::Ready(Some(_)) => {} + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + }; + } + }); + + // build the receiver future + let receiver_future = future::poll_fn(move || -> Poll { + loop { + match receiver.poll().unwrap() { + Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + // Should receive the sent RPC request + RPCEvent::Request(id, request) => { + assert_eq!(id, 0); + assert_eq!(rpc_request.clone(), request); + // receives the goodbye. Nothing left to do + return Ok(Async::Ready(true)); + } + _ => panic!("Received invalid RPC message"), + }, + Async::Ready(Some(_)) => (), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + }); + + // execute the futures and check the result + let test_result = Arc::new(Mutex::new(false)); + let error_result = test_result.clone(); + let thread_result = test_result.clone(); + tokio::run( + sender_future + .select(receiver_future) + .timeout(Duration::from_millis(1000)) + .map_err(move |_| *error_result.lock().unwrap() = false) + .map(move |result| { + *thread_result.lock().unwrap() = result.0; + () + }), + ); + assert!(*test_result.lock().unwrap()); +} diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index b58f2fd7c..1117a2c89 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -21,3 +21,6 @@ error-chain = "0.12.1" tokio = "0.1.22" parking_lot = "0.9.0" smallvec = "0.6.11" +# TODO: Remove rand crate for mainnet +rand = "0.7.2" +fnv = "1.0.6" diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 54f7f4b9b..9a928b488 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -4,7 +4,7 @@ use crate::sync::MessageProcessor; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ behaviour::PubsubMessage, - rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId}, + rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, PeerId, RPCEvent, }; use futures::future::Future; @@ -115,9 +115,9 @@ impl MessageHandler { /// A new RPC request has been received from the network. fn handle_rpc_request(&mut self, peer_id: PeerId, request_id: RequestId, request: RPCRequest) { match request { - RPCRequest::Hello(hello_message) => { + RPCRequest::Status(status_message) => { self.message_processor - .on_hello_request(peer_id, request_id, hello_message) + .on_status_request(peer_id, request_id, status_message) } RPCRequest::Goodbye(goodbye_reason) => { debug!( @@ -127,12 +127,12 @@ impl MessageHandler { ); self.message_processor.on_disconnect(peer_id); } - RPCRequest::BeaconBlocks(request) => self + RPCRequest::BlocksByRange(request) => self .message_processor - .on_beacon_blocks_request(peer_id, request_id, request), - RPCRequest::RecentBeaconBlocks(request) => self + .on_blocks_by_range_request(peer_id, request_id, request), + RPCRequest::BlocksByRoot(request) => self .message_processor - .on_recent_beacon_blocks_request(peer_id, request_id, request), + .on_blocks_by_root_request(peer_id, request_id, request), } } @@ -147,27 +147,30 @@ impl MessageHandler { // an error could have occurred. match error_response { RPCErrorResponse::InvalidRequest(error) => { - warn!(self.log, "Peer indicated invalid request";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()) + warn!(self.log, "Peer indicated invalid request";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); } RPCErrorResponse::ServerError(error) => { - warn!(self.log, "Peer internal server error";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()) + warn!(self.log, "Peer internal server error";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); } RPCErrorResponse::Unknown(error) => { - warn!(self.log, "Unknown peer error";"peer" => format!("{:?}", peer_id), "error" => error.as_string()) + warn!(self.log, "Unknown peer error";"peer" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); } RPCErrorResponse::Success(response) => { match response { - RPCResponse::Hello(hello_message) => { + RPCResponse::Status(status_message) => { self.message_processor - .on_hello_response(peer_id, hello_message); + .on_status_response(peer_id, status_message); } - RPCResponse::BeaconBlocks(response) => { - match self.decode_beacon_blocks(&response) { - Ok(beacon_blocks) => { - self.message_processor.on_beacon_blocks_response( + RPCResponse::BlocksByRange(response) => { + match self.decode_beacon_block(response) { + Ok(beacon_block) => { + self.message_processor.on_blocks_by_range_response( peer_id, request_id, - beacon_blocks, + Some(beacon_block), ); } Err(e) => { @@ -176,13 +179,13 @@ impl MessageHandler { } } } - RPCResponse::RecentBeaconBlocks(response) => { - match self.decode_beacon_blocks(&response) { - Ok(beacon_blocks) => { - self.message_processor.on_recent_beacon_blocks_response( + RPCResponse::BlocksByRoot(response) => { + match self.decode_beacon_block(response) { + Ok(beacon_block) => { + self.message_processor.on_blocks_by_root_response( peer_id, request_id, - beacon_blocks, + Some(beacon_block), ); } Err(e) => { @@ -191,6 +194,22 @@ impl MessageHandler { } } } + RPCResponse::Goodbye => { + // A goodbye was successfully sent, ignore it + } + } + } + RPCErrorResponse::StreamTermination(response_type) => { + // have received a stream termination, notify the processing functions + match response_type { + ResponseTermination::BlocksByRange => { + self.message_processor + .on_blocks_by_range_response(peer_id, request_id, None); + } + ResponseTermination::BlocksByRoot => { + self.message_processor + .on_blocks_by_root_response(peer_id, request_id, None); + } } } } @@ -198,8 +217,8 @@ impl MessageHandler { /// Handle various RPC errors fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { - //TODO: Handle error correctly warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "request_id" => format!("{}", request_id), "Error" => format!("{:?}", error)); + self.message_processor.on_rpc_error(peer_id, request_id); } /// Handle RPC messages @@ -338,16 +357,13 @@ impl MessageHandler { /* Req/Resp Domain Decoding */ - /// Verifies and decodes an ssz-encoded list of `BeaconBlock`s. This list may contain empty - /// entries encoded with an SSZ NULL. - fn decode_beacon_blocks( + /// Verifies and decodes an ssz-encoded `BeaconBlock`. If `None` is passed, this represents a + /// stream termination. + fn decode_beacon_block( &self, - beacon_blocks: &[u8], - ) -> Result>, DecodeError> { - if beacon_blocks.is_empty() { - return Ok(Vec::new()); - } + beacon_block: Vec, + ) -> Result, DecodeError> { //TODO: Implement faster block verification before decoding entirely - Vec::from_ssz_bytes(&beacon_blocks) + BeaconBlock::from_ssz_bytes(&beacon_block) } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 3743a67ca..4be0909d4 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -4,13 +4,15 @@ use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; use core::marker::PhantomData; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::Topic; -use eth2_libp2p::{Enr, Libp2pEvent, Multiaddr, PeerId, Swarm}; +use eth2_libp2p::{ + rpc::{RPCErrorResponse, RPCRequest, RPCResponse}, + ConnectedPoint, Enr, Libp2pEvent, Multiaddr, NetworkBehaviour, PeerId, Swarm, Topic, +}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; -use parking_lot::Mutex; -use slog::{debug, info, trace}; +use parking_lot::{Mutex, MutexGuard}; +use slog::{debug, info, trace, warn}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; @@ -53,6 +55,7 @@ impl Service { message_handler_send, executor, network_log, + config.propagation_percentage, )?; let network_service = Service { libp2p_service, @@ -122,6 +125,7 @@ fn spawn_service( message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, log: slog::Logger, + propagation_percentage: Option, ) -> error::Result> { let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); @@ -132,6 +136,7 @@ fn spawn_service( network_recv, message_handler_send, log.clone(), + propagation_percentage, ) // allow for manual termination .select(exit_rx.then(|_| Ok(()))) @@ -150,33 +155,70 @@ fn network_service( mut network_recv: mpsc::UnboundedReceiver, mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, + propagation_percentage: Option, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { - // if the network channel is not ready, try the swarm + // keep a list of peers to disconnect, once all channels are processed, remove the peers. + let mut peers_to_ban = Vec::new(); + + // processes the network channel before processing the libp2p swarm loop { // poll the network channel match network_recv.poll() { Ok(Async::Ready(Some(message))) => match message { NetworkMessage::RPC(peer_id, rpc_event) => { - trace!(log, "{}", rpc_event); + trace!(log, "Sending RPC"; "RPC" => format!("{}", rpc_event)); libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); } NetworkMessage::Propagate { propagation_source, message_id, } => { - trace!(log, "Propagating gossipsub message"; - "propagation_peer" => format!("{:?}", propagation_source), - "message_id" => format!("{}", message_id), - ); - libp2p_service - .lock() - .swarm - .propagate_message(&propagation_source, message_id); + // TODO: Remove this for mainnet + // randomly prevents propagation + let mut should_send = true; + if let Some(percentage) = propagation_percentage { + // not exact percentage but close enough + let rand = rand::random::() % 100; + if rand > percentage { + // don't propagate + should_send = false; + } + } + if !should_send { + info!(log, "Random filter did not propagate message"); + } else { + trace!(log, "Propagating gossipsub message"; + "propagation_peer" => format!("{:?}", propagation_source), + "message_id" => format!("{}", message_id), + ); + libp2p_service + .lock() + .swarm + .propagate_message(&propagation_source, message_id); + } } NetworkMessage::Publish { topics, message } => { - debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.lock().swarm.publish(&topics, message); + // TODO: Remove this for mainnet + // randomly prevents propagation + let mut should_send = true; + if let Some(percentage) = propagation_percentage { + // not exact percentage but close enough + let rand = rand::random::() % 100; + if rand > percentage { + // don't propagate + should_send = false; + } + } + if !should_send { + info!(log, "Random filter did not publish message"); + } else { + debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); + libp2p_service.lock().swarm.publish(&topics, message); + } + } + NetworkMessage::Disconnect { peer_id } => { + peers_to_ban.push(peer_id); } }, Ok(Async::NotReady) => break, @@ -194,7 +236,19 @@ fn network_service( match libp2p_service.lock().poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { - trace!(log, "{}", rpc_event); + trace!(log, "Received RPC"; "RPC" => format!("{}", rpc_event)); + + // if we received or sent a Goodbye message, drop and ban the peer + match rpc_event { + RPCEvent::Request(_, RPCRequest::Goodbye(_)) + | RPCEvent::Response( + _, + RPCErrorResponse::Success(RPCResponse::Goodbye), + ) => { + peers_to_ban.push(peer_id.clone()); + } + _ => {} + }; message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "Failed to send RPC to handler")?; @@ -221,6 +275,7 @@ fn network_service( .try_send(HandlerMessage::PubsubMessage(id, source, message)) .map_err(|_| "Failed to send pubsub message to handler")?; } + Libp2pEvent::PeerSubscribed(_, _) => {} }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), Ok(Async::NotReady) => break, @@ -228,10 +283,32 @@ fn network_service( } } + while !peers_to_ban.is_empty() { + let service = libp2p_service.lock(); + disconnect_peer(service, peers_to_ban.pop().expect("element exists"), &log); + } + Ok(Async::NotReady) }) } +fn disconnect_peer(mut service: MutexGuard, peer_id: PeerId, log: &slog::Logger) { + warn!(log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id)); + Swarm::ban_peer_id(&mut service.swarm, peer_id.clone()); + // TODO: Correctly notify protocols of the disconnect + // TOOD: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 + let dummy_connected_point = ConnectedPoint::Dialer { + address: "/ip4/0.0.0.0" + .parse::() + .expect("valid multiaddr"), + }; + service + .swarm + .inject_disconnected(&peer_id, dummy_connected_point); + // inform the behaviour that the peer has been banned + service.swarm.peer_banned(peer_id); +} + /// Types of messages that the network service can receive. #[derive(Debug)] pub enum NetworkMessage { @@ -242,9 +319,11 @@ pub enum NetworkMessage { topics: Vec, message: PubsubMessage, }, - /// Propagate a received gossipsub message + /// Propagate a received gossipsub message. Propagate { propagation_source: PeerId, message_id: String, }, + /// Disconnect and bans a peer id. + Disconnect { peer_id: PeerId }, } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9e92ade76..1ac53e534 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1,6 +1,6 @@ //! The `SyncManager` facilities the block syncing logic of lighthouse. The current networking -//! specification provides two methods from which to obtain blocks from peers. The `BeaconBlocks` -//! request and the `RecentBeaconBlocks` request. The former is used to obtain a large number of +//! specification provides two methods from which to obtain blocks from peers. The `BlocksByRange` +//! request and the `BlocksByRoot` request. The former is used to obtain a large number of //! blocks and the latter allows for searching for blocks given a block-hash. //! //! These two RPC methods are designed for two type of syncing. @@ -9,7 +9,6 @@ //! //! Both of these syncing strategies are built into the `SyncManager`. //! -//! //! Currently the long-range (batch) syncing method functions by opportunistically downloading //! batches blocks from all peers who know about a chain that we do not. When a new peer connects //! which has a later head that is greater than `SLOT_IMPORT_TOLERANCE` from our current head slot, @@ -19,7 +18,7 @@ //! //! Batch Syncing //! -//! This syncing process start by requesting `MAX_BLOCKS_PER_REQUEST` blocks from a peer with an +//! This syncing process start by requesting `BLOCKS_PER_REQUEST` blocks from a peer with an //! unknown chain (with a greater slot height) starting from our current head slot. If the earliest //! block returned is known to us, then the group of blocks returned form part of a known chain, //! and we process this batch of blocks, before requesting more batches forward and processing @@ -52,14 +51,23 @@ //! queued for lookup. A round-robin approach is used to request the parent from the known list of //! fully sync'd peers. If `PARENT_FAIL_TOLERANCE` attempts at requesting the block fails, we //! drop the propagated block and downvote the peer that sent it to us. +//! +//! Block Lookup +//! +//! To keep the logic maintained to the syncing thread (and manage the request_ids), when a block needs to be searched for (i.e +//! if an attestation references an unknown block) this manager can search for the block and +//! subsequently search for parents if needed. -use super::simple_sync::{hello_message, NetworkContext, PeerSyncInfo, FUTURE_SLOT_TOLERANCE}; +use super::message_processor::{ + status_message, NetworkContext, PeerSyncInfo, FUTURE_SLOT_TOLERANCE, +}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::{RPCRequest, RequestId}; use eth2_libp2p::PeerId; +use fnv::FnvHashMap; use futures::prelude::*; -use slog::{debug, info, trace, warn, Logger}; +use slog::{crit, debug, info, trace, warn, Logger}; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; use std::ops::{Add, Sub}; @@ -68,14 +76,17 @@ use tokio::sync::{mpsc, oneshot}; use types::{BeaconBlock, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many blocks per batch -/// is requested. Currently the value is small for testing. This will be incremented for -/// production. -const MAX_BLOCKS_PER_REQUEST: u64 = 50; +/// is requested. There is a timeout for each batch request. If this value is too high, we will +/// downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the +/// responder will fill the response up to the max request size, assuming they have the bandwidth +/// to do so. +//TODO: Make this dynamic based on peer's bandwidth +const BLOCKS_PER_REQUEST: u64 = 50; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// fully sync'd peer. -const SLOT_IMPORT_TOLERANCE: usize = 10; +const SLOT_IMPORT_TOLERANCE: usize = 20; /// How many attempts we try to find a parent of a block before we give up trying . const PARENT_FAIL_TOLERANCE: usize = 3; /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -91,24 +102,33 @@ const EMPTY_BATCH_TOLERANCE: usize = 100; pub enum SyncMessage { /// A useful peer has been discovered. AddPeer(PeerId, PeerSyncInfo), - /// A `BeaconBlocks` response has been received. - BeaconBlocksResponse { + + /// A `BlocksByRange` response has been received. + BlocksByRangeResponse { peer_id: PeerId, request_id: RequestId, - beacon_blocks: Vec>, + beacon_block: Option>, }, - /// A `RecentBeaconBlocks` response has been received. - RecentBeaconBlocksResponse { + + /// A `BlocksByRoot` response has been received. + BlocksByRootResponse { peer_id: PeerId, request_id: RequestId, - beacon_blocks: Vec>, + beacon_block: Option>, }, + /// A block with an unknown parent has been received. UnknownBlock(PeerId, BeaconBlock), + + /// A peer has sent an object that references a block that is unknown. This triggers the + /// manager to attempt to find the block matching the unknown hash. + UnknownBlockHash(PeerId, Hash256), + /// A peer has disconnected. Disconnect(PeerId), + /// An RPC Error has occurred on a request. - _RPCError(RequestId), + RPCError(PeerId, RequestId), } #[derive(PartialEq)] @@ -116,46 +136,42 @@ pub enum SyncMessage { enum BlockRequestsState { /// The object is queued to be downloaded from a peer but has not yet been requested. Queued, + /// The batch or parent has been requested with the `RequestId` and we are awaiting a response. Pending(RequestId), + /// The downloaded blocks are ready to be processed by the beacon chain. For a batch process /// this means we have found a common chain. ReadyToProcess, + + /// The batch is complete, simply drop without downvoting the peer. + Complete, + /// A failure has occurred and we will drop and downvote the peer that caused the request. Failed, } -/// The state of batch requests. -enum SyncDirection { - /// The batch has just been initialised and we need to check to see if a backward sync is - /// required on first batch response. - Initial, - /// We are syncing forwards, the next batch should contain higher slot numbers than is - /// predecessor. - Forwards, - /// We are syncing backwards and looking for a common ancestor chain before we can start - /// processing the downloaded blocks. - Backwards, -} - /// `BlockRequests` keep track of the long-range (batch) sync process per peer. struct BlockRequests { /// The peer's head slot and the target of this batch download. target_head_slot: Slot, - /// The peer's head root, used to specify which chain of blocks we are downloading from the - /// blocks. + + /// The peer's head root, used to specify which chain of blocks we are downloading from. target_head_root: Hash256, + /// The blocks that we have currently downloaded from the peer that are yet to be processed. downloaded_blocks: Vec>, + /// The number of blocks successfully processed in this request. blocks_processed: usize, + /// The number of empty batches we have consecutively received. If a peer returns more than /// EMPTY_BATCHES_TOLERANCE, they are dropped. consecutive_empty_batches: usize, + /// The current state of this batch request. state: BlockRequestsState, - /// Specifies the current direction of this batch request. - sync_direction: SyncDirection, + /// The current `start_slot` of the batched block request. current_start_slot: Slot, } @@ -164,12 +180,15 @@ struct BlockRequests { struct ParentRequests { /// The blocks that have currently been downloaded. downloaded_blocks: Vec>, + /// The number of failed attempts to retrieve a parent block. If too many attempts occur, this /// lookup is failed and rejected. failed_attempts: usize, + /// The peer who last submitted a block. If the chain ends or fails, this is the peer that is /// downvoted. last_submitted_peer: PeerId, + /// The current state of the parent lookup. state: BlockRequestsState, } @@ -177,13 +196,16 @@ struct ParentRequests { impl BlockRequests { /// Gets the next start slot for a batch and transitions the state to a Queued state. fn update_start_slot(&mut self) { - match self.sync_direction { - SyncDirection::Initial | SyncDirection::Forwards => { - self.current_start_slot += Slot::from(MAX_BLOCKS_PER_REQUEST); - } - SyncDirection::Backwards => { - self.current_start_slot -= Slot::from(MAX_BLOCKS_PER_REQUEST); - } + // the last request may not have returned all the required blocks (hit the rpc size + // limit). If so, start from the last returned slot + if !self.downloaded_blocks.is_empty() + && self.downloaded_blocks[self.downloaded_blocks.len() - 1].slot + > self.current_start_slot + { + self.current_start_slot = self.downloaded_blocks[self.downloaded_blocks.len() - 1].slot + + Slot::from(BLOCKS_PER_REQUEST); + } else { + self.current_start_slot += Slot::from(BLOCKS_PER_REQUEST); } self.state = BlockRequestsState::Queued; } @@ -195,9 +217,11 @@ enum ManagerState { /// The manager is performing a long-range (batch) sync. In this mode, parent lookups are /// disabled. Syncing, + /// The manager is up to date with all known peers and is connected to at least one /// fully-syncing peer. In this state, parent lookups are enabled. Regular, + /// No useful peers are connected. Long-range sync's cannot proceed and we have no useful /// peers to download parents for. More peers need to be connected before we can proceed. Stalled, @@ -210,23 +234,34 @@ enum ManagerState { pub struct SyncManager { /// A weak reference to the underlying beacon chain. chain: Weak>, + /// The current state of the import manager. state: ManagerState, + /// A receiving channel sent by the message processor thread. input_channel: mpsc::UnboundedReceiver>, + /// A network context to contact the network service. network: NetworkContext, + /// A collection of `BlockRequest` per peer that is currently being downloaded. Used in the /// long-range (batch) sync process. import_queue: HashMap>, + /// A collection of parent block lookups. parent_queue: SmallVec<[ParentRequests; 3]>, + + /// A collection of block hashes being searched for + single_block_lookups: FnvHashMap, + /// The collection of known, connected, fully-sync'd peers. full_peers: HashSet, - /// The current request Id. This is used to keep track of responses to various outbound + + /// The current request id. This is used to keep track of responses to various outbound /// requests. This is an internal accounting mechanism, request id's are never sent to any /// peers. current_req_id: usize, + /// The logger for the import manager. log: Logger, } @@ -256,6 +291,7 @@ pub fn spawn( network, import_queue: HashMap::new(), parent_queue: SmallVec::new(), + single_block_lookups: FnvHashMap::default(), full_peers: HashSet::new(), current_req_id: 0, log: log.clone(), @@ -281,14 +317,12 @@ impl SyncManager { /// /// This function handles the logic associated with the connection of a new peer. If the peer /// is sufficiently ahead of our current head, a long-range (batch) sync is started and - /// batches of blocks are queued to download from the peer. Batched blocks begin at our - /// current head. If the resulting downloaded blocks are part of our current chain, we - /// continue with a forward sync. If not, we download blocks (in batches) backwards until we - /// reach a common ancestor. Batches are then processed and downloaded sequentially forwards. + /// batches of blocks are queued to download from the peer. Batched blocks begin at our latest + /// finalized head. /// /// If the peer is within the `SLOT_IMPORT_TOLERANCE`, then it's head is sufficiently close to /// ours that we consider it fully sync'd with respect to our current chain. - pub fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { + fn add_peer(&mut self, peer_id: PeerId, remote: PeerSyncInfo) { // ensure the beacon chain still exists let chain = match self.chain.upgrade() { Some(chain) => chain, @@ -313,7 +347,6 @@ impl SyncManager { // remove the peer from the queue if it exists self.import_queue.remove(&peer_id); self.add_full_peer(peer_id); - // return; } @@ -340,35 +373,27 @@ impl SyncManager { } else { // not already downloading blocks from this peer let block_requests = BlockRequests { - target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked in the SyncManager before add_peer is called + target_head_slot: remote.head_slot, // this should be larger than the current head. It is checked before add_peer is called target_head_root: remote.head_root, consecutive_empty_batches: 0, downloaded_blocks: Vec::new(), blocks_processed: 0, state: BlockRequestsState::Queued, - sync_direction: SyncDirection::Initial, - current_start_slot: chain.best_slot(), + current_start_slot: local + .finalized_epoch + .start_slot(T::EthSpec::slots_per_epoch()), }; self.import_queue.insert(peer_id, block_requests); } } - /// A `BeaconBlocks` request has received a response. This function process the response. - pub fn beacon_blocks_response( + /// A `BlocksByRange` request has received a response. This function process the response. + fn blocks_by_range_response( &mut self, peer_id: PeerId, request_id: RequestId, - mut blocks: Vec>, + block: Option>, ) { - // ensure the underlying chain still exists - let chain = match self.chain.upgrade() { - Some(chain) => chain, - None => { - trace!(self.log, "Chain dropped. Sync terminating"); - return; - } - }; - // find the request associated with this response let block_requests = match self .import_queue @@ -378,33 +403,42 @@ impl SyncManager { Some(req) => req, _ => { // No pending request, invalid request_id or coding error - warn!(self.log, "BeaconBlocks response unknown"; "request_id" => request_id); + warn!(self.log, "BlocksByRange response unknown"; "request_id" => request_id); return; } }; - // If we are syncing up to a target head block, at least the target head block should be - // returned. If we are syncing back to our last finalized block the request should return - // at least the last block we received (last known block). In diagram form: - // - // unknown blocks requested blocks downloaded blocks - // |-------------------|------------------------|------------------------| - // ^finalized slot ^ requested start slot ^ last known block ^ remote head + // add the downloaded block + if let Some(downloaded_block) = block { + // add the block to the request + block_requests.downloaded_blocks.push(downloaded_block); + return; + } + // the batch has finished processing, or terminated early + // TODO: The following requirement may need to be relaxed as a node could fork and prune + // their old head, given to us during a STATUS. + // If we are syncing up to a target head block, at least the target head block should be + // returned. + let blocks = &block_requests.downloaded_blocks; if blocks.is_empty() { - debug!(self.log, "BeaconBlocks response was empty"; "request_id" => request_id); + debug!(self.log, "BlocksByRange response was empty"; "request_id" => request_id); block_requests.consecutive_empty_batches += 1; if block_requests.consecutive_empty_batches >= EMPTY_BATCH_TOLERANCE { warn!(self.log, "Peer returned too many empty block batches"; "peer" => format!("{:?}", peer_id)); block_requests.state = BlockRequestsState::Failed; - } else if block_requests.current_start_slot + MAX_BLOCKS_PER_REQUEST + } else if block_requests.current_start_slot + BLOCKS_PER_REQUEST >= block_requests.target_head_slot { warn!(self.log, "Peer did not return blocks it claimed to possess"; "peer" => format!("{:?}", peer_id)); - block_requests.state = BlockRequestsState::Failed; + // This could be due to a re-org causing the peer to prune their head. In this + // instance, we try to process what is currently downloaded, if there are blocks + // downloaded. + block_requests.state = BlockRequestsState::Complete; } else { + // this batch was empty, request the next batch block_requests.update_start_slot(); } return; @@ -416,12 +450,9 @@ impl SyncManager { // Note that the order of blocks is verified in block processing let last_sent_slot = blocks[blocks.len() - 1].slot; if block_requests.current_start_slot > blocks[0].slot - || block_requests - .current_start_slot - .add(MAX_BLOCKS_PER_REQUEST) - < last_sent_slot + || block_requests.current_start_slot.add(BLOCKS_PER_REQUEST) < last_sent_slot { - warn!(self.log, "BeaconBlocks response returned out of range blocks"; + warn!(self.log, "BlocksByRange response returned out of range blocks"; "request_id" => request_id, "response_initial_slot" => blocks[0].slot, "requested_initial_slot" => block_requests.current_start_slot); @@ -431,87 +462,35 @@ impl SyncManager { return; } - // Determine if more blocks need to be downloaded. There are a few cases: - // - We are in initial sync mode - We have requested blocks and need to determine if this - // is part of a known chain to determine the whether to start syncing backwards or continue - // syncing forwards. - // - We are syncing backwards and need to verify if we have found a common ancestor in - // order to start processing the downloaded blocks. - // - We are syncing forwards. We mark this as complete and check if any further blocks are - // required to download when processing the batch. - - match block_requests.sync_direction { - SyncDirection::Initial => { - block_requests.downloaded_blocks.append(&mut blocks); - - // this batch is the first batch downloaded. Check if we can process or if we need - // to backwards search. - - //TODO: Decide which is faster. Reading block from db and comparing or calculating - //the hash tree root and comparing. - let earliest_slot = block_requests.downloaded_blocks[0].slot; - if Some(block_requests.downloaded_blocks[0].canonical_root()) - == chain.root_at_slot(earliest_slot) - { - // we have a common head, start processing and begin a forwards sync - block_requests.sync_direction = SyncDirection::Forwards; - block_requests.state = BlockRequestsState::ReadyToProcess; - return; - } - // no common head, begin a backwards search - block_requests.sync_direction = SyncDirection::Backwards; - block_requests.current_start_slot = - std::cmp::min(chain.best_slot(), block_requests.downloaded_blocks[0].slot); - block_requests.update_start_slot(); - } - SyncDirection::Forwards => { - // continue processing all blocks forwards, verify the end in the processing - block_requests.downloaded_blocks.append(&mut blocks); - block_requests.state = BlockRequestsState::ReadyToProcess; - } - SyncDirection::Backwards => { - block_requests.downloaded_blocks.splice(..0, blocks); - - // verify the request hasn't failed by having no common ancestor chain - // get our local finalized_slot - let local_finalized_slot = { - let state = &chain.head().beacon_state; - state - .finalized_checkpoint - .epoch - .start_slot(T::EthSpec::slots_per_epoch()) - }; - - if local_finalized_slot >= block_requests.current_start_slot { - warn!(self.log, "Peer returned an unknown chain."; "request_id" => request_id); - block_requests.state = BlockRequestsState::Failed; - return; - } - - // check if we have reached a common chain ancestor - let earliest_slot = block_requests.downloaded_blocks[0].slot; - if Some(block_requests.downloaded_blocks[0].canonical_root()) - == chain.root_at_slot(earliest_slot) - { - // we have a common head, start processing and begin a forwards sync - block_requests.sync_direction = SyncDirection::Forwards; - block_requests.state = BlockRequestsState::ReadyToProcess; - return; - } - - // no common chain, haven't passed last_finalized_head, so continue backwards - // search - block_requests.update_start_slot(); - } - } + // Process this batch + block_requests.state = BlockRequestsState::ReadyToProcess; } - pub fn recent_blocks_response( + /// The response to a `BlocksByRoot` request. + /// The current implementation takes one block at a time. As blocks are streamed, any + /// subsequent blocks will simply be ignored. + /// There are two reasons we could have received a BlocksByRoot response + /// - We requested a single hash and have received a response for the single_block_lookup + /// - We are looking up parent blocks in parent lookup search + fn blocks_by_root_response( &mut self, peer_id: PeerId, request_id: RequestId, - mut blocks: Vec>, + block: Option>, ) { + // check if this is a single block lookup - i.e we were searching for a specific hash + if block.is_some() { + if let Some(block_hash) = self.single_block_lookups.remove(&request_id) { + self.single_block_lookup_response( + peer_id, + block.expect("block exists"), + block_hash, + ); + return; + } + } + + // this should be a response to a parent request search // find the request let parent_request = match self .parent_queue @@ -520,40 +499,78 @@ impl SyncManager { { Some(req) => req, None => { - // No pending request, invalid request_id or coding error - warn!(self.log, "RecentBeaconBlocks response unknown"; "request_id" => request_id); + if block.is_some() { + // No pending request, invalid request_id or coding error + warn!(self.log, "BlocksByRoot response unknown"; "request_id" => request_id); + } + // it could be a stream termination None, in which case we just ignore it return; } }; + match block { + Some(block) => { + // add the block to response + parent_request.downloaded_blocks.push(block); - // if an empty response is given, the peer didn't have the requested block, try again - if blocks.is_empty() { - parent_request.failed_attempts += 1; - parent_request.state = BlockRequestsState::Queued; - parent_request.last_submitted_peer = peer_id; - return; + // queue for processing + parent_request.state = BlockRequestsState::ReadyToProcess; + } + None => { + // if an empty response is given, the peer didn't have the requested block, try again + parent_request.failed_attempts += 1; + parent_request.state = BlockRequestsState::Queued; + parent_request.last_submitted_peer = peer_id; + return; + } } - - // currently only support a single block lookup. Reject any response that has more than 1 - // block - if blocks.len() != 1 { - //TODO: Potentially downvote the peer - debug!(self.log, "Peer sent more than 1 parent. Ignoring"; - "peer_id" => format!("{:?}", peer_id), - "no_parents" => blocks.len() - ); - return; - } - - // add the block to response - parent_request - .downloaded_blocks - .push(blocks.pop().expect("must exist")); - - // queue for processing - parent_request.state = BlockRequestsState::ReadyToProcess; } + /// Processes the response obtained from a single block lookup search. If the block is + /// processed or errors, the search ends. If the blocks parent is unknown, a block parent + /// lookup search is started. + fn single_block_lookup_response( + &mut self, + peer_id: PeerId, + block: BeaconBlock, + expected_block_hash: Hash256, + ) { + // verify the hash is correct and try and process the block + if expected_block_hash != block.canonical_root() { + // the peer that sent this, sent us the wrong block + downvote_peer(&mut self.network, &self.log, peer_id); + return; + } + + // we have the correct block, try and process it + if let Some(chain) = self.chain.upgrade() { + match chain.process_block(block.clone()) { + Ok(outcome) => { + match outcome { + BlockProcessingOutcome::Processed { block_root } => { + info!(self.log, "Processed block"; "block" => format!("{}", block_root)); + } + BlockProcessingOutcome::ParentUnknown { parent: _ } => { + // We don't know of the blocks parent, begin a parent lookup search + self.add_unknown_block(peer_id, block); + } + BlockProcessingOutcome::BlockIsAlreadyKnown => { + trace!(self.log, "Single block lookup already known"); + } + _ => { + warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); + downvote_peer(&mut self.network, &self.log, peer_id); + } + } + } + Err(e) => { + warn!(self.log, "Unexpected block processing error"; "error" => format!("{:?}", e)); + } + } + } + } + + /// A block has been sent to us that has an unknown parent. This begins a parent lookup search + /// to find the parent or chain of parents that match our current chain. fn add_unknown_block(&mut self, peer_id: PeerId, block: BeaconBlock) { // if we are not in regular sync mode, ignore this block if self.state != ManagerState::Regular { @@ -583,8 +600,53 @@ impl SyncManager { self.parent_queue.push(req); } - fn inject_error(&mut self, _id: RequestId) { - //TODO: Remove block state from pending + /// A request to search for a block hash has been received. This function begins a BlocksByRoot + /// request to find the requested block. + fn search_for_block(&mut self, peer_id: PeerId, block_hash: Hash256) { + let request_id = self.current_req_id; + self.single_block_lookups.insert(request_id, block_hash); + self.current_req_id += 1; + let request = BlocksByRootRequest { + block_roots: vec![block_hash], + }; + blocks_by_root_request(&mut self.network, &self.log, peer_id, request_id, request); + } + + fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId) { + trace!(self.log, "Sync manager received a failed RPC"); + // remove any single block lookups + self.single_block_lookups.remove(&request_id); + + // find the request associated with this response + if let Some(block_requests) = self + .import_queue + .get_mut(&peer_id) + .filter(|r| r.state == BlockRequestsState::Pending(request_id)) + { + // TODO: Potentially implement a tolerance. For now, we try to process what have been + // downloaded + if !block_requests.downloaded_blocks.is_empty() { + block_requests.current_start_slot = block_requests + .downloaded_blocks + .last() + .expect("is not empty") + .slot; + block_requests.state = BlockRequestsState::ReadyToProcess; + } else { + block_requests.state = BlockRequestsState::Failed; + } + }; + + // increment the failure of a parent lookup if the request matches a parent search + if let Some(parent_req) = self + .parent_queue + .iter_mut() + .find(|request| request.state == BlockRequestsState::Pending(request_id)) + { + parent_req.failed_attempts += 1; + parent_req.state = BlockRequestsState::Queued; + parent_req.last_submitted_peer = peer_id; + } } fn peer_disconnect(&mut self, peer_id: &PeerId) { @@ -626,23 +688,32 @@ impl SyncManager { fn process_potential_block_requests(&mut self) { // check if an outbound request is required + // Managing a fixed number of outbound requests is maintained at the RPC protocol libp2p // layer and not needed here. Therefore we create many outbound requests and let the RPC - // handle the number of simultaneous requests. Request all queued objects. + // handle the number of simultaneous requests. + // Request all queued objects. // remove any failed batches let debug_log = &self.log; let full_peer_ref = &mut self.full_peers; self.import_queue.retain(|peer_id, block_request| { - if let BlockRequestsState::Failed = block_request.state { - debug!(debug_log, "Block import from peer failed"; - "peer_id" => format!("{:?}", peer_id), - "downloaded_blocks" => block_request.blocks_processed - ); - full_peer_ref.remove(peer_id); - false - } else { - true + match block_request.state { + BlockRequestsState::Failed => { + debug!(debug_log, "Block import from peer failed"; + "peer_id" => format!("{:?}", peer_id), + "downloaded_blocks" => block_request.blocks_processed + ); + full_peer_ref.remove(peer_id); + false + } + BlockRequestsState::Complete => { + debug!(debug_log, "Block import from peer completed"; + "peer_id" => format!("{:?}", peer_id), + ); + false + } + _ => true, // keep all other states } }); @@ -653,13 +724,13 @@ impl SyncManager { block_requests.state = BlockRequestsState::Pending(request_id); self.current_req_id += 1; - let request = BeaconBlocksRequest { + let request = BlocksByRangeRequest { head_block_root: block_requests.target_head_root, start_slot: block_requests.current_start_slot.as_u64(), - count: MAX_BLOCKS_PER_REQUEST, + count: BLOCKS_PER_REQUEST, step: 0, }; - request_blocks( + blocks_by_range_request( &mut self.network, &self.log, peer_id.clone(), @@ -684,9 +755,12 @@ impl SyncManager { if block_requests.state == BlockRequestsState::ReadyToProcess { let downloaded_blocks = std::mem::replace(&mut block_requests.downloaded_blocks, Vec::new()); - let last_element = downloaded_blocks.len() - 1; + let end_slot = downloaded_blocks + .last() + .expect("Batches to be processed should not be empty") + .slot; + let total_blocks = downloaded_blocks.len(); let start_slot = downloaded_blocks[0].slot; - let end_slot = downloaded_blocks[last_element].slot; match process_blocks(chain_ref.clone(), downloaded_blocks, log_ref) { Ok(()) => { @@ -694,15 +768,15 @@ impl SyncManager { "peer" => format!("{:?}", peer_id), "start_slot" => start_slot, "end_slot" => end_slot, - "no_blocks" => last_element + 1, + "no_blocks" => total_blocks, ); - block_requests.blocks_processed += last_element + 1; + block_requests.blocks_processed += total_blocks; // check if the batch is complete, by verifying if we have reached the // target head if end_slot >= block_requests.target_head_slot { - // Completed, re-hello the peer to ensure we are up to the latest head - hello_peer(network_ref, log_ref, chain_ref.clone(), peer_id.clone()); + // Completed, re-status the peer to ensure we are up to the latest head + status_peer(network_ref, log_ref, chain_ref.clone(), peer_id.clone()); // remove the request false } else { @@ -718,7 +792,7 @@ impl SyncManager { "peer" => format!("{:?}", peer_id), "start_slot" => start_slot, "end_slot" => end_slot, - "no_blocks" => last_element + 1, + "no_blocks" => total_blocks, "error" => format!("{:?}", e), ); downvote_peer(network_ref, log_ref, peer_id.clone()); @@ -771,14 +845,14 @@ impl SyncManager { self.current_req_id += 1; let last_element_index = parent_request.downloaded_blocks.len() - 1; let parent_hash = parent_request.downloaded_blocks[last_element_index].parent_root; - let request = RecentBeaconBlocksRequest { + let request = BlocksByRootRequest { block_roots: vec![parent_hash], }; // select a random fully synced peer to attempt to download the parent block let peer_id = self.full_peers.iter().next().expect("List is not empty"); - recent_blocks_request( + blocks_by_root_request( &mut self.network, &self.log, peer_id.clone(), @@ -800,17 +874,29 @@ impl SyncManager { .filter(|req| req.state == BlockRequestsState::ReadyToProcess) { // verify the last added block is the parent of the last requested block - let last_index = completed_request.downloaded_blocks.len() - 1; - let expected_hash = completed_request.downloaded_blocks[last_index].parent_root; - // Note: the length must be greater than 1 so this cannot panic. - let block_hash = completed_request.downloaded_blocks[last_index - 1].canonical_root(); + + if completed_request.downloaded_blocks.len() < 2 { + crit!( + self.log, + "There must be at least two blocks in a parent request lookup at all times" + ); + panic!("There must be at least two blocks in parent request lookup at all time"); + // fail loudly + } + let previous_index = completed_request.downloaded_blocks.len() - 2; + let expected_hash = completed_request.downloaded_blocks[previous_index].parent_root; + // Note: the length must be greater than 2 so this cannot panic. + let block_hash = completed_request + .downloaded_blocks + .last() + .expect("Complete batch cannot be empty") + .canonical_root(); if block_hash != expected_hash { // remove the head block let _ = completed_request.downloaded_blocks.pop(); completed_request.state = BlockRequestsState::Queued; - //TODO: Potentially downvote the peer let peer = completed_request.last_submitted_peer.clone(); - debug!(self.log, "Peer sent invalid parent. Ignoring"; + debug!(self.log, "Peer sent invalid parent."; "peer_id" => format!("{:?}",peer), "received_block" => format!("{}", block_hash), "expected_parent" => format!("{}", expected_hash), @@ -836,7 +922,8 @@ impl SyncManager { re_run_poll = true; break; } - Ok(BlockProcessingOutcome::Processed { block_root: _ }) => {} + Ok(BlockProcessingOutcome::Processed { block_root: _ }) + | Ok(BlockProcessingOutcome::BlockIsAlreadyKnown { .. }) => {} Ok(outcome) => { // it's a future slot or an invalid block, remove it and try again completed_request.failed_attempts += 1; @@ -891,7 +978,7 @@ impl SyncManager { /* Network Context Helper Functions */ -fn hello_peer( +fn status_peer( network: &mut NetworkContext, log: &slog::Logger, chain: Weak>, @@ -899,26 +986,26 @@ fn hello_peer( ) { trace!( log, - "RPC Request"; - "method" => "HELLO", + "Sending Status Request"; + "method" => "STATUS", "peer" => format!("{:?}", peer_id) ); if let Some(chain) = chain.upgrade() { - network.send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&chain))); + network.send_rpc_request(None, peer_id, RPCRequest::Status(status_message(&chain))); } } -fn request_blocks( +fn blocks_by_range_request( network: &mut NetworkContext, log: &slog::Logger, peer_id: PeerId, request_id: RequestId, - request: BeaconBlocksRequest, + request: BlocksByRangeRequest, ) { trace!( log, - "RPC Request"; - "method" => "BeaconBlocks", + "Sending BlocksByRange Request"; + "method" => "BlocksByRange", "id" => request_id, "count" => request.count, "peer" => format!("{:?}", peer_id) @@ -926,28 +1013,28 @@ fn request_blocks( network.send_rpc_request( Some(request_id), peer_id.clone(), - RPCRequest::BeaconBlocks(request), + RPCRequest::BlocksByRange(request), ); } -fn recent_blocks_request( +fn blocks_by_root_request( network: &mut NetworkContext, log: &slog::Logger, peer_id: PeerId, request_id: RequestId, - request: RecentBeaconBlocksRequest, + request: BlocksByRootRequest, ) { trace!( log, - "RPC Request"; - "method" => "RecentBeaconBlocks", + "Sending BlocksByRoot Request"; + "method" => "BlocksByRoot", "count" => request.block_roots.len(), "peer" => format!("{:?}", peer_id) ); network.send_rpc_request( Some(request_id), peer_id.clone(), - RPCRequest::RecentBeaconBlocks(request), + RPCRequest::BlocksByRoot(request), ); } @@ -1080,28 +1167,31 @@ impl Future for SyncManager { SyncMessage::AddPeer(peer_id, info) => { self.add_peer(peer_id, info); } - SyncMessage::BeaconBlocksResponse { + SyncMessage::BlocksByRangeResponse { peer_id, request_id, - beacon_blocks, + beacon_block, } => { - self.beacon_blocks_response(peer_id, request_id, beacon_blocks); + self.blocks_by_range_response(peer_id, request_id, beacon_block); } - SyncMessage::RecentBeaconBlocksResponse { + SyncMessage::BlocksByRootResponse { peer_id, request_id, - beacon_blocks, + beacon_block, } => { - self.recent_blocks_response(peer_id, request_id, beacon_blocks); + self.blocks_by_root_response(peer_id, request_id, beacon_block); } SyncMessage::UnknownBlock(peer_id, block) => { self.add_unknown_block(peer_id, block); } + SyncMessage::UnknownBlockHash(peer_id, block_hash) => { + self.search_for_block(peer_id, block_hash); + } SyncMessage::Disconnect(peer_id) => { self.peer_disconnect(&peer_id); } - SyncMessage::_RPCError(request_id) => { - self.inject_error(request_id); + SyncMessage::RPCError(peer_id, request_id) => { + self.inject_error(peer_id, request_id); } }, Ok(Async::NotReady) => break, diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/message_processor.rs similarity index 64% rename from beacon_node/network/src/sync/simple_sync.rs rename to beacon_node/network/src/sync/message_processor.rs index 049c2a673..dd9be8990 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/message_processor.rs @@ -14,7 +14,6 @@ use tokio::sync::{mpsc, oneshot}; use tree_hash::SignedRoot; use types::{Attestation, BeaconBlock, Epoch, EthSpec, Hash256, Slot}; -//TODO: Put a maximum limit on the number of block that can be requested. //TODO: Rate limit requests /// If a block is more than `FUTURE_SLOT_TOLERANCE` slots ahead of our slot clock, we drop it. @@ -34,21 +33,21 @@ pub struct PeerSyncInfo { pub head_slot: Slot, } -impl From for PeerSyncInfo { - fn from(hello: HelloMessage) -> PeerSyncInfo { +impl From for PeerSyncInfo { + fn from(status: StatusMessage) -> PeerSyncInfo { PeerSyncInfo { - fork_version: hello.fork_version, - finalized_root: hello.finalized_root, - finalized_epoch: hello.finalized_epoch, - head_root: hello.head_root, - head_slot: hello.head_slot, + fork_version: status.fork_version, + finalized_root: status.finalized_root, + finalized_epoch: status.finalized_epoch, + head_root: status.head_root, + head_slot: status.head_slot, } } } impl From<&Arc>> for PeerSyncInfo { fn from(chain: &Arc>) -> PeerSyncInfo { - Self::from(hello_message(chain)) + Self::from(status_message(chain)) } } @@ -111,49 +110,58 @@ impl MessageProcessor { self.send_to_sync(SyncMessage::Disconnect(peer_id)); } - /// Handle the connection of a new peer. - /// - /// Sends a `Hello` message to the peer. - pub fn on_connect(&mut self, peer_id: PeerId) { - self.network - .send_rpc_request(None, peer_id, RPCRequest::Hello(hello_message(&self.chain))); + /// An error occurred during an RPC request. The state is maintained by the sync manager, so + /// this function notifies the sync manager of the error. + pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { + self.send_to_sync(SyncMessage::RPCError(peer_id, request_id)); } - /// Handle a `Hello` request. + /// Handle the connection of a new peer. /// - /// Processes the `HelloMessage` from the remote peer and sends back our `Hello`. - pub fn on_hello_request( + /// Sends a `Status` message to the peer. + pub fn on_connect(&mut self, peer_id: PeerId) { + self.network.send_rpc_request( + None, + peer_id, + RPCRequest::Status(status_message(&self.chain)), + ); + } + + /// Handle a `Status` request. + /// + /// Processes the `Status` from the remote peer and sends back our `Status`. + pub fn on_status_request( &mut self, peer_id: PeerId, request_id: RequestId, - hello: HelloMessage, + status: StatusMessage, ) { - // ignore hello responses if we are shutting down - trace!(self.log, "HelloRequest"; "peer" => format!("{:?}", peer_id)); + // ignore status responses if we are shutting down + trace!(self.log, "StatusRequest"; "peer" => format!("{:?}", peer_id)); - // Say hello back. + // Say status back. self.network.send_rpc_response( peer_id.clone(), request_id, - RPCResponse::Hello(hello_message(&self.chain)), + RPCResponse::Status(status_message(&self.chain)), ); - self.process_hello(peer_id, hello); + self.process_status(peer_id, status); } - /// Process a `Hello` response from a peer. - pub fn on_hello_response(&mut self, peer_id: PeerId, hello: HelloMessage) { - trace!(self.log, "HelloResponse"; "peer" => format!("{:?}", peer_id)); + /// Process a `Status` response from a peer. + pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { + trace!(self.log, "StatusResponse"; "peer" => format!("{:?}", peer_id)); - // Process the hello message, without sending back another hello. - self.process_hello(peer_id, hello); + // Process the status message, without sending back another status. + self.process_status(peer_id, status); } - /// Process a `Hello` message, requesting new blocks if appropriate. + /// Process a `Status` message, requesting new blocks if appropriate. /// /// Disconnects the peer if required. - fn process_hello(&mut self, peer_id: PeerId, hello: HelloMessage) { - let remote = PeerSyncInfo::from(hello); + fn process_status(&mut self, peer_id: PeerId, status: StatusMessage) { + let remote = PeerSyncInfo::from(status); let local = PeerSyncInfo::from(&self.chain); let start_slot = |epoch: Epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()); @@ -161,11 +169,27 @@ impl MessageProcessor { if local.fork_version != remote.fork_version { // The node is on a different network/fork, disconnect them. debug!( - self.log, "HandshakeFailure"; + self.log, "Handshake Failure"; "peer" => format!("{:?}", peer_id), "reason" => "network_id" ); + self.network + .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); + } else if remote.head_slot + > self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE + { + // Note: If the slot_clock cannot be read, this will not error. Other system + // components will deal with an invalid slot clock error. + + // The remotes head is on a slot that is significantly ahead of ours. This could be + // because they are using a different genesis time, or that theirs or our system + // clock is incorrect. + debug!( + self.log, "Handshake Failure"; + "peer" => format!("{:?}", peer_id), + "reason" => "different system clocks or genesis time" + ); self.network .disconnect(peer_id.clone(), GoodbyeReason::IrrelevantNetwork); } else if remote.finalized_epoch <= local.finalized_epoch @@ -179,7 +203,7 @@ impl MessageProcessor { // // Therefore, the node is on a different chain and we should not communicate with them. debug!( - self.log, "HandshakeFailure"; + self.log, "Handshake Failure"; "peer" => format!("{:?}", peer_id), "reason" => "different finalized chain" ); @@ -236,57 +260,57 @@ impl MessageProcessor { } } - /// Handle a `RecentBeaconBlocks` request from the peer. - pub fn on_recent_beacon_blocks_request( + /// Handle a `BlocksByRoot` request from the peer. + pub fn on_blocks_by_root_request( &mut self, peer_id: PeerId, request_id: RequestId, - request: RecentBeaconBlocksRequest, + request: BlocksByRootRequest, ) { - let blocks: Vec> = request - .block_roots - .iter() - .filter_map(|root| { - if let Ok(Some(block)) = self.chain.store.get::>(root) { - Some(block) - } else { - debug!( - self.log, - "Peer requested unknown block"; - "peer" => format!("{:?}", peer_id), - "request_root" => format!("{:}", root), - ); - - None - } - }) - .collect(); - + let mut send_block_count = 0; + for root in request.block_roots.iter() { + if let Ok(Some(block)) = self.chain.store.get::>(root) { + self.network.send_rpc_response( + peer_id.clone(), + request_id, + RPCResponse::BlocksByRoot(block.as_ssz_bytes()), + ); + send_block_count += 1; + } else { + debug!( + self.log, + "Peer requested unknown block"; + "peer" => format!("{:?}", peer_id), + "request_root" => format!("{:}", root), + ); + } + } debug!( self.log, - "RecentBeaconBlocksRequest"; + "Received BlocksByRoot Request"; "peer" => format!("{:?}", peer_id), "requested" => request.block_roots.len(), - "returned" => blocks.len(), + "returned" => send_block_count, ); - self.network.send_rpc_response( + // send stream termination + self.network.send_rpc_error_response( peer_id, request_id, - RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()), - ) + RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRoot), + ); } - /// Handle a `BeaconBlocks` request from the peer. - pub fn on_beacon_blocks_request( + /// Handle a `BlocksByRange` request from the peer. + pub fn on_blocks_by_range_request( &mut self, peer_id: PeerId, request_id: RequestId, - req: BeaconBlocksRequest, + req: BlocksByRangeRequest, ) { debug!( self.log, - "BeaconBlocksRequest"; + "Received BlocksByRange Request"; "peer" => format!("{:?}", peer_id), "count" => req.count, "start_slot" => req.start_slot, @@ -297,6 +321,10 @@ impl MessageProcessor { // In the current implementation we read from the db then filter out out-of-range blocks. // Improving the db schema to prevent this would be ideal. + //TODO: This really needs to be read forward for infinite streams + // We should be reading the first block from the db, sending, then reading the next... we + // need a forwards iterator!! + let mut blocks: Vec> = self .chain .rev_iter_block_roots() @@ -313,7 +341,6 @@ impl MessageProcessor { "Block in the chain is not in the store"; "request_root" => format!("{:}", root), ); - None } }) @@ -323,63 +350,82 @@ impl MessageProcessor { blocks.reverse(); blocks.dedup_by_key(|brs| brs.slot); - debug!( - self.log, - "BeaconBlocksRequest response"; - "peer" => format!("{:?}", peer_id), - "msg" => "Failed to return all requested hashes", - "start_slot" => req.start_slot, - "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), - "requested" => req.count, - "returned" => blocks.len(), - ); + if blocks.len() < (req.count as usize) { + debug!( + self.log, + "Sending BlocksByRange Response"; + "peer" => format!("{:?}", peer_id), + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "requested" => req.count, + "returned" => blocks.len(), + ); + } else { + trace!( + self.log, + "Sending BlocksByRange Response"; + "peer" => format!("{:?}", peer_id), + "start_slot" => req.start_slot, + "current_slot" => self.chain.slot().unwrap_or_else(|_| Slot::from(0_u64)).as_u64(), + "requested" => req.count, + "returned" => blocks.len(), + ); + } - self.network.send_rpc_response( + for block in blocks { + self.network.send_rpc_response( + peer_id.clone(), + request_id, + RPCResponse::BlocksByRange(block.as_ssz_bytes()), + ); + } + // send the stream terminator + self.network.send_rpc_error_response( peer_id, request_id, - RPCResponse::BeaconBlocks(blocks.as_ssz_bytes()), - ) + RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange), + ); } - /// Handle a `BeaconBlocks` response from the peer. - pub fn on_beacon_blocks_response( + /// Handle a `BlocksByRange` response from the peer. + /// A `beacon_block` behaves as a stream which is terminated on a `None` response. + pub fn on_blocks_by_range_response( &mut self, peer_id: PeerId, request_id: RequestId, - beacon_blocks: Vec>, + beacon_block: Option>, ) { - debug!( + trace!( self.log, - "BeaconBlocksResponse"; + "Received BlocksByRange Response"; "peer" => format!("{:?}", peer_id), - "count" => beacon_blocks.len(), ); - self.send_to_sync(SyncMessage::BeaconBlocksResponse { + self.send_to_sync(SyncMessage::BlocksByRangeResponse { peer_id, request_id, - beacon_blocks, + beacon_block, }); } - /// Handle a `RecentBeaconBlocks` response from the peer. - pub fn on_recent_beacon_blocks_response( + /// Handle a `BlocksByRoot` response from the peer. + pub fn on_blocks_by_root_response( &mut self, peer_id: PeerId, request_id: RequestId, - beacon_blocks: Vec>, + beacon_block: Option>, ) { - debug!( + trace!( self.log, - "RecentBeaconBlocksResponse"; + "Received BlocksByRoot Response"; "peer" => format!("{:?}", peer_id), - "count" => beacon_blocks.len(), ); - self.send_to_sync(SyncMessage::RecentBeaconBlocksResponse { + self.send_to_sync(SyncMessage::BlocksByRootResponse { peer_id, request_id, - beacon_blocks, + beacon_block, }); } @@ -447,24 +493,36 @@ impl MessageProcessor { /// Process a gossip message declaring a new attestation. /// /// Not currently implemented. - pub fn on_attestation_gossip(&mut self, _peer_id: PeerId, msg: Attestation) { + pub fn on_attestation_gossip(&mut self, peer_id: PeerId, msg: Attestation) { match self.chain.process_attestation(msg.clone()) { - Ok(outcome) => { - info!( - self.log, - "Processed attestation"; - "source" => "gossip", - "outcome" => format!("{:?}", outcome) - ); - - if outcome != AttestationProcessingOutcome::Processed { - trace!( + Ok(outcome) => match outcome { + AttestationProcessingOutcome::Processed => { + info!( self.log, - "Invalid gossip attestation ssz"; - "ssz" => format!("0x{}", hex::encode(msg.as_ssz_bytes())), + "Processed attestation"; + "source" => "gossip", + "outcome" => format!("{:?}", outcome) ); } - } + AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { + // TODO: Maintain this attestation and re-process once sync completes + debug!( + self.log, + "Attestation for unknown block"; + "peer_id" => format!("{:?}", peer_id), + "block" => format!("{}", beacon_block_root) + ); + // we don't know the block, get the sync manager to handle the block lookup + self.send_to_sync(SyncMessage::UnknownBlockHash(peer_id, beacon_block_root)); + } + AttestationProcessingOutcome::AttestsToFutureState { .. } + | AttestationProcessingOutcome::FinalizedSlot { .. } => {} // ignore the attestation + AttestationProcessingOutcome::Invalid { .. } + | AttestationProcessingOutcome::EmptyAggregationBitfield { .. } => { + // the peer has sent a bad attestation. Remove them. + self.network.disconnect(peer_id, GoodbyeReason::Fault); + } + }, Err(e) => { trace!( self.log, @@ -477,11 +535,11 @@ impl MessageProcessor { } } -/// Build a `HelloMessage` representing the state of the given `beacon_chain`. -pub(crate) fn hello_message(beacon_chain: &BeaconChain) -> HelloMessage { +/// Build a `StatusMessage` representing the state of the given `beacon_chain`. +pub(crate) fn status_message(beacon_chain: &BeaconChain) -> StatusMessage { let state = &beacon_chain.head().beacon_state; - HelloMessage { + StatusMessage { fork_version: state.fork.current_version, finalized_root: state.finalized_checkpoint.root, finalized_epoch: state.finalized_checkpoint.epoch, @@ -510,8 +568,15 @@ impl NetworkContext { "reason" => format!("{:?}", reason), "peer_id" => format!("{:?}", peer_id), ); - self.send_rpc_request(None, peer_id, RPCRequest::Goodbye(reason)) - // TODO: disconnect peers. + self.send_rpc_request(None, peer_id.clone(), RPCRequest::Goodbye(reason)); + self.network_send + .try_send(NetworkMessage::Disconnect { peer_id }) + .unwrap_or_else(|_| { + warn!( + self.log, + "Could not send a Disconnect to the network service" + ) + }); } pub fn send_rpc_request( @@ -525,7 +590,7 @@ impl NetworkContext { self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request)); } - //TODO: Handle Error responses + /// Convenience function to wrap successful RPC Responses. pub fn send_rpc_response( &mut self, peer_id: PeerId, @@ -538,6 +603,16 @@ impl NetworkContext { ); } + /// Send an RPCErrorResponse. This handles errors and stream terminations. + pub fn send_rpc_error_response( + &mut self, + peer_id: PeerId, + request_id: RequestId, + rpc_error_response: RPCErrorResponse, + ) { + self.send_rpc_event(peer_id, RPCEvent::Response(request_id, rpc_error_response)); + } + fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.network_send .try_send(NetworkMessage::RPC(peer_id, rpc_event)) diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 58ec386aa..b8f575075 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -1,10 +1,10 @@ +//! Syncing for lighthouse. +//! +//! Stores the various syncing methods for the beacon chain. mod manager; -/// Syncing for lighthouse. -/// -/// Stores the various syncing methods for the beacon chain. -mod simple_sync; +mod message_processor; -pub use simple_sync::MessageProcessor; +pub use message_processor::MessageProcessor; /// Currently implemented sync methods. pub enum SyncMethod { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index bb07ea1ff..e0ea98543 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -221,6 +221,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("If present, append a random string to the datadir path. Useful for fast development \ iteration.") ) + .arg( + Arg::with_name("random-propagation") + .long("random-propagation") + .value_name("INTEGER") + .takes_value(true) + .help("Specifies (as a percentage) the likelihood of propagating blocks and attestations. This should only be used for testing networking elements. The value must like in the range 1-100.") + ) .arg( Arg::with_name("force") .long("force") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 05193cf16..20668fb9c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -107,6 +107,16 @@ fn process_testnet_subcommand( builder.clean_datadir()?; } + if let Some(propagation_percentage_string) = cli_args.value_of("random-propagation") { + let percentage = propagation_percentage_string + .parse::() + .map_err(|_| format!("Unable to parse the propagation percentage"))?; + if percentage > 100 { + return Err(format!("Propagation percentage greater than 100")); + } + builder.client_config.network.propagation_percentage = Some(percentage); + } + let is_bootstrap = cli_args.subcommand_name() == Some("bootstrap"); if let Some(path_string) = cli_args.value_of("eth2-config") {