From b4a1a2e483fdcc20348d7310c7595af9a829ffe4 Mon Sep 17 00:00:00 2001 From: divma Date: Sun, 3 May 2020 08:17:12 -0500 Subject: [PATCH] Better handling of RPC errors and RPC conn with the PeerManager (#1047) --- beacon_node/eth2-libp2p/src/behaviour.rs | 16 +- beacon_node/eth2-libp2p/src/discovery/mod.rs | 2 +- .../eth2-libp2p/src/peer_manager/client.rs | 4 +- .../eth2-libp2p/src/peer_manager/mod.rs | 291 +++++++++++++----- .../eth2-libp2p/src/peer_manager/peerdb.rs | 96 ++++-- beacon_node/eth2-libp2p/src/rpc/codec/base.rs | 14 +- beacon_node/eth2-libp2p/src/rpc/codec/mod.rs | 6 +- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 46 +-- .../eth2-libp2p/src/rpc/codec/ssz_snappy.rs | 39 +-- beacon_node/eth2-libp2p/src/rpc/handler.rs | 277 +++++++++-------- beacon_node/eth2-libp2p/src/rpc/methods.rs | 77 +++-- beacon_node/eth2-libp2p/src/rpc/mod.rs | 18 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 148 ++++----- beacon_node/eth2-libp2p/tests/rpc_tests.rs | 30 +- beacon_node/network/src/router/mod.rs | 43 ++- beacon_node/network/src/router/processor.rs | 12 +- 16 files changed, 656 insertions(+), 463 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index bc8be5014..7691669b2 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -306,7 +306,7 @@ impl Behaviour Behaviour let bitfield = match enr.bitfield::() { Ok(v) => v, Err(e) => { - warn!(self.log, "Peer has invalid ENR bitfield"; + warn!(self.log, "Peer has invalid ENR bitfield"; "peer_id" => format!("{}", peer_id), "error" => format!("{:?}", e)); return; @@ -435,22 +435,26 @@ impl // send the requested meta-data self.send_meta_data_response(id, peer_id); } - RPCEvent::Response(_, RPCErrorResponse::Success(RPCResponse::Pong(ping))) => { + RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::Pong(ping))) => { self.peer_manager.pong_response(&peer_id, ping.data); } RPCEvent::Response( _, - RPCErrorResponse::Success(RPCResponse::MetaData(meta_data)), + RPCCodedResponse::Success(RPCResponse::MetaData(meta_data)), ) => { self.peer_manager.meta_data_response(&peer_id, meta_data); } RPCEvent::Request(_, RPCRequest::Status(_)) - | RPCEvent::Response(_, RPCErrorResponse::Success(RPCResponse::Status(_))) => { + | RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::Status(_))) => { // inform the peer manager that we have received a status from a peer self.peer_manager.peer_statusd(&peer_id); // propagate the STATUS message upwards self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)); } + RPCEvent::Error(_, protocol, ref err) => { + self.peer_manager.handle_rpc_error(&peer_id, protocol, err); + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)); + } _ => { // propagate all other RPC messages upwards self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 2cd40b8a1..454465647 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -258,7 +258,7 @@ impl Discovery { .network_globals .peers .read() - .peers_on_subnet(&subnet_id) + .peers_on_subnet(subnet_id) .count() as u64; if peers_on_subnet < TARGET_SUBNET_PEERS { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/client.rs b/beacon_node/eth2-libp2p/src/peer_manager/client.rs index 3ba68faaa..36a67325b 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/client.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/client.rs @@ -98,7 +98,7 @@ impl std::fmt::Display for Client { // helper function to identify clients from their agent_version. Returns the client // kind and it's associated version and the OS kind. fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String) { - let mut agent_split = agent_version.split("/"); + let mut agent_split = agent_version.split('/'); match agent_split.next() { Some("Lighthouse") => { let kind = ClientKind::Lighthouse; @@ -116,7 +116,7 @@ fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String let kind = ClientKind::Teku; let mut version = String::from("unknown"); let mut os_version = version.clone(); - if let Some(_) = agent_split.next() { + if agent_split.next().is_some() { if let Some(agent_version) = agent_split.next() { version = agent_version.into(); if let Some(agent_os_version) = agent_split.next() { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index e09fbbb07..95963696d 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -2,7 +2,7 @@ pub use self::peerdb::*; use crate::metrics; -use crate::rpc::MetaData; +use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::{NetworkGlobals, PeerId}; use futures::prelude::*; use futures::Stream; @@ -10,6 +10,7 @@ use hashmap_delay::HashSetDelay; use libp2p::identify::IdentifyInfo; use slog::{crit, debug, error, warn}; use smallvec::SmallVec; +use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, Instant}; use types::EthSpec; @@ -19,11 +20,11 @@ mod peer_info; mod peer_sync_status; mod peerdb; -pub use peer_info::PeerInfo; +pub use peer_info::{PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; /// The minimum reputation before a peer is disconnected. -// Most likely this needs tweaking -const _MINIMUM_REPUTATION_BEFORE_BAN: Rep = 20; +// Most likely this needs tweaking. +const MIN_REP_BEFORE_BAN: Rep = 10; /// The time in seconds between re-status's peers. const STATUS_INTERVAL: u64 = 300; /// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within @@ -32,7 +33,7 @@ const PING_INTERVAL: u64 = 30; /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { - /// Storage of network globals to access the PeerDB. + /// Storage of network globals to access the `PeerDB`. network_globals: Arc>, /// A queue of events that the `PeerManager` is waiting to produce. events: SmallVec<[PeerManagerEvent; 5]>, @@ -46,22 +47,45 @@ pub struct PeerManager { log: slog::Logger, } -/// A collection of actions a peer can perform which will adjust its reputation +/// A collection of actions a peer can perform which will adjust its reputation. /// Each variant has an associated reputation change. +// To easily assess the behaviour of reputation changes the number of variants should stay low, and +// somewhat generic. pub enum PeerAction { - /// The peer timed out on an RPC request/response. - _TimedOut = -10, - /// The peer sent and invalid request/response or encoding. - _InvalidMessage = -20, - /// The peer sent something objectively malicious. - _Malicious = -50, + /// We should not communicate more with this peer. + /// This action will cause the peer to get banned. + Fatal, + /// An error occurred with this peer but it is not necessarily malicious. + /// We have high tolerance for this actions: several occurrences are needed for a peer to get + /// kicked. + /// NOTE: ~15 occurrences will get the peer banned + HighToleranceError, + /// An error occurred with this peer but it is not necessarily malicious. + /// We have high tolerance for this actions: several occurrences are needed for a peer to get + /// kicked. + /// NOTE: ~10 occurrences will get the peer banned + MidToleranceError, + /// This peer's action is not malicious but will not be tolerated. A few occurrences will cause + /// the peer to get kicked. + /// NOTE: ~5 occurrences will get the peer banned + LowToleranceError, /// Received an expected message. - _ValidMessage = 20, - /// Peer disconnected. - Disconnected = -30, + _ValidMessage, } -/// The events that the PeerManager outputs (requests). +impl PeerAction { + fn rep_change(&self) -> RepChange { + match self { + PeerAction::Fatal => RepChange::worst(), + PeerAction::LowToleranceError => RepChange::bad(60), + PeerAction::MidToleranceError => RepChange::bad(25), + PeerAction::HighToleranceError => RepChange::bad(15), + PeerAction::_ValidMessage => RepChange::good(20), + } + } +} + +/// The events that the `PeerManager` outputs (requests). pub enum PeerManagerEvent { /// Sends a STATUS to a peer. Status(PeerId), @@ -96,24 +120,27 @@ impl PeerManager { if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { // received a ping // reset the to-ping timer for this peer - debug!(self.log, "Received a ping request"; "peer_id" => format!("{}", peer_id), "seq_no" => seq); + debug!(self.log, "Received a ping request"; "peer_id" => peer_id.to_string(), "seq_no" => seq); self.ping_peers.insert(peer_id.clone()); // if the sequence number is unknown send update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data { if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; "peer_id" => format!("{}", peer_id), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { - crit!(self.log, "Received a PING from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received a PING from an unknown peer"; + "peer_id" => peer_id.to_string()); } } @@ -126,18 +153,20 @@ impl PeerManager { // if the sequence number is unknown send update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data { if meta_data.seq_number < seq { - debug!(self.log, "Requesting new metadata from peer"; "peer_id" => format!("{}", peer_id), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); + debug!(self.log, "Requesting new metadata from peer"; + "peer_id" => peer_id.to_string(), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { // if we don't know the meta-data, request it - debug!(self.log, "Requesting first metadata from peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Requesting first metadata from peer"; + "peer_id" => peer_id.to_string()); self.events .push(PeerManagerEvent::MetaData(peer_id.clone())); } } else { - crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => peer_id.to_string()); } } @@ -147,18 +176,24 @@ impl PeerManager { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data { if known_meta_data.seq_number < meta_data.seq_number { - debug!(self.log, "Updating peer's metadata"; "peer_id" => format!("{}", peer_id), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + debug!(self.log, "Updating peer's metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); peer_info.meta_data = Some(meta_data); } else { - warn!(self.log, "Received old metadata"; "peer_id" => format!("{}", peer_id), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + // TODO: isn't this malicious/random behaviour? What happens if the seq_number + // is the same but the contents differ? + warn!(self.log, "Received old metadata"; + "peer_id" => peer_id.to_string(), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); } } else { // we have no meta-data for this peer, update - debug!(self.log, "Obtained peer's metadata"; "peer_id" => format!("{}", peer_id), "new_seq_no" => meta_data.seq_number); + debug!(self.log, "Obtained peer's metadata"; + "peer_id" => peer_id.to_string(), "new_seq_no" => meta_data.seq_number); peer_info.meta_data = Some(meta_data); } } else { - crit!(self.log, "Received METADATA from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received METADATA from an unknown peer"; + "peer_id" => peer_id.to_string()); } } @@ -167,38 +202,12 @@ impl PeerManager { self.status_peers.insert(peer_id.clone()); } - /// Checks the reputation of a peer and if it is too low, bans it and - /// sends the corresponding event. Informs if it got banned - fn _gets_banned(&mut self, peer_id: &PeerId) -> bool { - // if the peer was already banned don't inform again - let mut peerdb = self.network_globals.peers.write(); - - if let Some(connection_status) = peerdb.connection_status(peer_id) { - if peerdb.reputation(peer_id) < _MINIMUM_REPUTATION_BEFORE_BAN - && !connection_status.is_banned() - { - peerdb.ban(peer_id); - self.events - .push(PeerManagerEvent::_BanPeer(peer_id.clone())); - return true; - } - } - false - } - - /// Requests that a peer get disconnected. - pub fn _disconnect_peer(&mut self, peer_id: &PeerId) { - self.events - .push(PeerManagerEvent::_DisconnectPeer(peer_id.clone())); - } - /// Updates the state of the peer as disconnected. pub fn notify_disconnect(&mut self, peer_id: &PeerId) { self.update_reputations(); { let mut peerdb = self.network_globals.peers.write(); peerdb.disconnect(peer_id); - peerdb.add_reputation(peer_id, PeerAction::Disconnected as Rep); } // remove the ping and status timer for the peer @@ -223,35 +232,15 @@ impl PeerManager { self.connect_peer(peer_id, true) } - /// Provides a given peer's reputation if it exists. - pub fn _get_peer_rep(&self, peer_id: &PeerId) -> Rep { - self.network_globals.peers.read().reputation(peer_id) - } - - /// Updates the reputation of known peers according to their connection - /// status and the time that has passed. - pub fn update_reputations(&mut self) { - let now = Instant::now(); - let elapsed = (now - self.last_updated).as_secs(); - // 0 seconds means now - last_updated < 0, but (most likely) not = 0. - // In this case, do nothing (updating last_updated would propagate - // rounding errors) - if elapsed > 0 { - self.last_updated = now; - // TODO decide how reputations change with time. If they get too low - // set the peers as banned - } - } - /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. - pub fn _report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { self.update_reputations(); self.network_globals .peers .write() - .add_reputation(peer_id, action as Rep); + .add_reputation(peer_id, action.rep_change()); self.update_reputations(); } @@ -261,10 +250,68 @@ impl PeerManager { peer_info.client = client::Client::from_identify_info(info); peer_info.listening_addresses = info.listen_addrs.clone(); } else { - crit!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => format!("{}", peer_id)); + crit!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string()); } } + pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { + debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string()); + + // Map this error to a `PeerAction` (if any) + let peer_action = match err { + RPCError::IncompleteStream => { + // They closed early, this could mean poor connection + PeerAction::MidToleranceError + } + RPCError::InternalError(_reason) => { + // Our fault. Do nothing + return; + } + RPCError::InvalidData => { + // Peer is not complying with the protocol. This is considered a malicious action + PeerAction::Fatal + } + RPCError::IoError(_e) => { + // this could their fault or ours, so we tolerate this + PeerAction::HighToleranceError + } + RPCError::ErrorResponse(code) => match code { + RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, + RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError, + RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError, + }, + RPCError::SSZDecodeError(_) => PeerAction::Fatal, + RPCError::UnsupportedProtocol => { + // Not supporting a protocol shouldn't be considered a malicious action, but + // it is an action that in some cases will make the peer unfit to continue + // communicating. + // TODO: To avoid punishing a peer repeatedly for not supporting a protocol, this + // information could be stored and used to prevent sending requests for the given + // protocol to this peer. Similarly, to avoid blacklisting a peer for a protocol + // forever, if stored this information should expire. + match protocol { + Protocol::Ping => PeerAction::Fatal, + Protocol::BlocksByRange => return, + Protocol::BlocksByRoot => return, + Protocol::Goodbye => return, + Protocol::MetaData => PeerAction::LowToleranceError, + Protocol::Status => PeerAction::LowToleranceError, + } + } + RPCError::StreamTimeout => match protocol { + Protocol::Ping => PeerAction::LowToleranceError, + Protocol::BlocksByRange => PeerAction::MidToleranceError, + Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::Goodbye => return, + Protocol::MetaData => return, + Protocol::Status => return, + }, + RPCError::NegotiationTimeout => PeerAction::HighToleranceError, + }; + + self.report_peer(peer_id, peer_action); + } + /* Internal functions */ /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being @@ -275,7 +322,7 @@ impl PeerManager { /// This informs if the peer was accepted in to the db or not. // TODO: Drop peers if over max_peer limit fn connect_peer(&mut self, peer_id: &PeerId, outgoing: bool) -> bool { - // TODO: Call this on a timer + // TODO: remove after timed updates self.update_reputations(); { @@ -288,7 +335,7 @@ impl PeerManager { if outgoing { peerdb.connect_outgoing(peer_id); } else { - peerdb.connect_outgoing(peer_id); + peerdb.connect_ingoing(peer_id); } } @@ -310,6 +357,86 @@ impl PeerManager { pub fn _dialing_peer(&mut self, peer_id: &PeerId) { self.network_globals.peers.write().dialing_peer(peer_id); } + + /// Updates the reputation of known peers according to their connection + /// status and the time that has passed. + /// + /// **Disconnected peers** get a 1rep hit every hour they stay disconnected. + /// **Banned peers** get a 1rep gain for every hour to slowly allow them back again. + /// + /// A banned(disconnected) peer that gets its rep above(below) MIN_REP_BEFORE_BAN is + /// now considered a disconnected(banned) peer. + fn update_reputations(&mut self) { + // avoid locking the peerdb too often + // TODO: call this on a timer + if self.last_updated.elapsed().as_secs() < 30 { + return; + } + + let now = Instant::now(); + + // Check for peers that get banned, unbanned and that should be disconnected + let mut ban_queue = Vec::new(); + let mut unban_queue = Vec::new(); + + /* Check how long have peers been in this state and update their reputations if needed */ + let mut pdb = self.network_globals.peers.write(); + + for (id, info) in pdb.peers_mut() { + // Update reputations + match info.connection_status { + Connected { .. } => { + // Connected peers gain reputation by sending useful messages + } + Disconnected { since } | Banned { since } => { + // For disconnected peers, lower their reputation by 1 for every hour they + // stay disconnected. This helps us slowly forget disconnected peers. + // In the same way, slowly allow banned peers back again. + let dc_hours = (now - since).as_secs() / 3600; + let last_dc_hours = (self.last_updated - since).as_secs() / 3600; + if dc_hours > last_dc_hours { + // this should be 1 most of the time + let rep_dif = (dc_hours - last_dc_hours) + .try_into() + .unwrap_or(Rep::max_value()); + + info.reputation = if info.connection_status.is_banned() { + info.reputation.saturating_add(rep_dif) + } else { + info.reputation.saturating_sub(rep_dif) + }; + } + } + Dialing { since } => { + // A peer shouldn't be dialing for more than 2 minutes + if since.elapsed().as_secs() > 120 { + warn!(self.log,"Peer has been dialing for too long"; "peer_id" => id.to_string()); + // TODO: decide how to handle this + } + } + } + // Check if the peer gets banned or unbanned and if it should be disconnected + if info.reputation < MIN_REP_BEFORE_BAN && !info.connection_status.is_banned() { + // This peer gets banned. Check if we should request disconnection + ban_queue.push(id.clone()); + } else if info.reputation >= MIN_REP_BEFORE_BAN && info.connection_status.is_banned() { + // This peer gets unbanned + unban_queue.push(id.clone()); + } + } + + for id in ban_queue { + pdb.ban(&id); + + self.events.push(PeerManagerEvent::_BanPeer(id.clone())); + } + + for id in unban_queue { + pdb.disconnect(&id); + } + + self.last_updated = Instant::now(); + } } impl Stream for PeerManager { @@ -322,9 +449,9 @@ impl Stream for PeerManager { // These exist to handle a bug in delayqueue let mut peers_to_add = Vec::new(); while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { - error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)); + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()); })? { - debug!(self.log, "Pinging peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Pinging peer"; "peer_id" => peer_id.to_string()); // add the ping timer back peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Ping(peer_id)); @@ -338,9 +465,9 @@ impl Stream for PeerManager { } while let Async::Ready(Some(peer_id)) = self.status_peers.poll().map_err(|e| { - error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e)); + error!(self.log, "Failed to check for peers to status"; "error" => e.to_string()); })? { - debug!(self.log, "Sending Status to peer"; "peer_id" => format!("{}", peer_id)); + debug!(self.log, "Sending Status to peer"; "peer_id" => peer_id.to_string()); // add the status timer back peers_to_add.push(peer_id.clone()); self.events.push(PeerManagerEvent::Status(peer_id)); diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs index 066fa3736..d5fa4bcf7 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -7,11 +7,18 @@ use std::collections::HashMap; use std::time::Instant; use types::{EthSpec, SubnetId}; -/// A peer's reputation. -pub type Rep = i32; +/// A peer's reputation (perceived potential usefulness) +pub type Rep = u8; + +/// Reputation change (positive or negative) +pub struct RepChange { + is_good: bool, + diff: Rep, +} /// Max number of disconnected nodes to remember const MAX_DC_PEERS: usize = 30; + /// The default starting reputation for an unknown peer. pub const DEFAULT_REPUTATION: Rep = 50; @@ -25,6 +32,27 @@ pub struct PeerDB { log: slog::Logger, } +impl RepChange { + pub fn good(diff: Rep) -> Self { + RepChange { + is_good: true, + diff, + } + } + pub fn bad(diff: Rep) -> Self { + RepChange { + is_good: false, + diff, + } + } + pub const fn worst() -> Self { + RepChange { + is_good: false, + diff: Rep::max_value(), + } + } +} + impl PeerDB { pub fn new(log: &slog::Logger) -> Self { Self { @@ -48,6 +76,11 @@ impl PeerDB { self.peers.iter() } + /// Returns an iterator over all peers in the db. + pub(super) fn peers_mut(&mut self) -> impl Iterator)> { + self.peers.iter_mut() + } + /// Gives the ids of all known peers. pub fn peer_ids(&self) -> impl Iterator { self.peers.keys() @@ -59,6 +92,7 @@ impl PeerDB { } /// Returns a mutable reference to a peer's info if known. + /// TODO: make pub(super) to ensure that peer management is unified pub fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { self.peers.get_mut(peer_id) } @@ -111,12 +145,11 @@ impl PeerDB { } /// Gives an iterator of all peers on a given subnet. - pub fn peers_on_subnet(&self, subnet_id: &SubnetId) -> impl Iterator { - let subnet_id_filter = subnet_id.clone(); + pub fn peers_on_subnet(&self, subnet_id: SubnetId) -> impl Iterator { self.peers .iter() .filter(move |(_, info)| { - info.connection_status.is_connected() && info.on_subnet(subnet_id_filter) + info.connection_status.is_connected() && info.on_subnet(subnet_id) }) .map(|(peer_id, _)| peer_id) } @@ -192,10 +225,7 @@ impl PeerDB { /// A peer is being dialed. pub fn dialing_peer(&mut self, peer_id: &PeerId) { - let info = self - .peers - .entry(peer_id.clone()) - .or_insert_with(|| Default::default()); + let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { self.n_dc -= 1; @@ -207,10 +237,7 @@ impl PeerDB { /// Sets a peer as connected with an ingoing connection. pub fn connect_ingoing(&mut self, peer_id: &PeerId) { - let info = self - .peers - .entry(peer_id.clone()) - .or_insert_with(|| Default::default()); + let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { self.n_dc -= 1; @@ -220,10 +247,7 @@ impl PeerDB { /// Sets a peer as connected with an outgoing connection. pub fn connect_outgoing(&mut self, peer_id: &PeerId) { - let info = self - .peers - .entry(peer_id.clone()) - .or_insert_with(|| Default::default()); + let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { self.n_dc -= 1; @@ -231,16 +255,16 @@ impl PeerDB { info.connection_status.connect_outgoing(); } - /// Sets the peer as disconnected. + /// Sets the peer as disconnected. A banned peer remains banned pub fn disconnect(&mut self, peer_id: &PeerId) { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Disconnecting unknown peer"; - "peer_id" => format!("{:?}",peer_id)); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); - if !info.connection_status.is_disconnected() { + if !info.connection_status.is_disconnected() && !info.connection_status.is_banned() { info.connection_status.disconnect(); self.n_dc += 1; } @@ -269,7 +293,7 @@ impl PeerDB { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Banning unknown peer"; - "peer_id" => format!("{:?}",peer_id)); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); if info.connection_status.is_disconnected() { @@ -283,16 +307,17 @@ impl PeerDB { if let Some(peer_info) = self.peers.get_mut(peer_id) { peer_info.meta_data = Some(meta_data); } else { - warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => format!("{}", peer_id)); + warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => peer_id.to_string()); } } /// Sets the reputation of peer. - pub fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { + #[allow(dead_code)] + pub(super) fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { if let Some(peer_info) = self.peers.get_mut(peer_id) { peer_info.reputation = rep; } else { - crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => format!("{}",peer_id)); + crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => peer_id.to_string()); } } @@ -301,20 +326,25 @@ impl PeerDB { if let Some(peer_info) = self.peers.get_mut(peer_id) { peer_info.sync_status = sync_status; } else { - crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => format!("{}",peer_id)); + crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => peer_id.to_string()); } } /// Adds to a peer's reputation by `change`. If the reputation exceeds Rep's /// upper (lower) bounds, it stays at the maximum (minimum) value. - pub fn add_reputation(&mut self, peer_id: &PeerId, change: Rep) { + pub(super) fn add_reputation(&mut self, peer_id: &PeerId, change: RepChange) { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Adding to the reputation of an unknown peer"; - "peer_id" => format!("{:?}",peer_id)); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); - info.reputation = info.reputation.saturating_add(change); + + info.reputation = if change.is_good { + info.reputation.saturating_add(change.diff) + } else { + info.reputation.saturating_sub(change.diff) + }; } } @@ -396,14 +426,20 @@ mod tests { // 0 change does not change de reputation let random_peer = PeerId::random(); - let change: Rep = 0; + let change = RepChange::good(0); pdb.connect_ingoing(&random_peer); pdb.add_reputation(&random_peer, change); assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION); // overflowing change is capped let random_peer = PeerId::random(); - let change = Rep::max_value(); + let change = RepChange::worst(); + pdb.connect_ingoing(&random_peer); + pdb.add_reputation(&random_peer, change); + assert_eq!(pdb.reputation(&random_peer), Rep::min_value()); + + let random_peer = PeerId::random(); + let change = RepChange::good(Rep::max_value()); pdb.connect_ingoing(&random_peer); pdb.add_reputation(&random_peer, change); assert_eq!(pdb.reputation(&random_peer), Rep::max_value()); diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 48b537c7b..43f0f494c 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -1,6 +1,6 @@ //! This handles the various supported encoding mechanism for the Eth 2.0 RPC. -use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BufMut; use libp2p::bytes::BytesMut; use std::marker::PhantomData; @@ -78,9 +78,9 @@ where impl Encoder for BaseInboundCodec where TSpec: EthSpec, - TCodec: Decoder + Encoder>, + TCodec: Decoder + Encoder>, { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = ::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -130,7 +130,7 @@ where TSpec: EthSpec, TCodec: OutboundCodec + Decoder>, { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = ::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -146,17 +146,17 @@ where }); let inner_result = { - if RPCErrorResponse::::is_response(response_code) { + if RPCCodedResponse::::is_response(response_code) { // decode an actual response and mutates the buffer if enough bytes have been read // returning the result. self.inner .decode(src) - .map(|r| r.map(RPCErrorResponse::Success)) + .map(|r| r.map(RPCCodedResponse::Success)) } else { // decode an error self.inner .decode_error(src) - .map(|r| r.map(|resp| RPCErrorResponse::from_error(response_code, resp))) + .map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp))) } }; // if the inner decoder was capable of decoding a chunk, we need to reset the current diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index f1b7f74da..1fd97a78b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -6,7 +6,7 @@ use self::base::{BaseInboundCodec, BaseOutboundCodec}; use self::ssz::{SSZInboundCodec, SSZOutboundCodec}; use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use crate::rpc::protocol::RPCError; -use crate::rpc::{RPCErrorResponse, RPCRequest}; +use crate::rpc::{RPCCodedResponse, RPCRequest}; use libp2p::bytes::BytesMut; use tokio::codec::{Decoder, Encoder}; use types::EthSpec; @@ -23,7 +23,7 @@ pub enum OutboundCodec { } impl Encoder for InboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -59,7 +59,7 @@ impl Encoder for OutboundCodec { } impl Decoder for OutboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index bd8cbfd9f..37ea4eac5 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -3,7 +3,7 @@ use crate::rpc::{ codec::base::OutboundCodec, protocol::{Encoding, Protocol, ProtocolId, RPCError, Version}, }; -use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::{BufMut, Bytes, BytesMut}; use ssz::{Decode, Encode}; use std::marker::PhantomData; @@ -37,22 +37,22 @@ impl SSZInboundCodec { // Encoder for inbound streams: Encodes RPC Responses sent to peers. impl Encoder for SSZInboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { - RPCErrorResponse::Success(resp) => match resp { + RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), - RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), - RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), - RPCErrorResponse::StreamTermination(_) => { + RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), + RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), + RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } }; @@ -107,9 +107,7 @@ impl Decoder for SSZInboundCodec { Protocol::MetaData => match self.protocol.version { Version::V1 => { if packet.len() > 0 { - Err(RPCError::Custom( - "Get metadata request should be empty".into(), - )) + Err(RPCError::InvalidData) } else { Ok(Some(RPCRequest::MetaData(PhantomData))) } @@ -183,32 +181,20 @@ impl Decoder for SSZOutboundCodec { src.clear(); match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Status stream terminated unexpectedly".into(), - )), // cannot have an empty HELLO message. The stream has terminated unexpectedly + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty HELLO message. The stream has terminated unexpectedly }, - Protocol::Goodbye => { - Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) - } + Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Status stream terminated unexpectedly, empty block".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Status stream terminated unexpectedly, empty block".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, Protocol::Ping => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "PING stream terminated unexpectedly".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, Protocol::MetaData => match self.protocol.version { - Version::V1 => Err(RPCError::Custom( - "Metadata stream terminated unexpectedly".into(), - )), // cannot have an empty block message. + Version::V1 => Err(RPCError::IncompleteStream), // cannot have an empty block message. }, } } else { @@ -223,9 +209,7 @@ impl Decoder for SSZOutboundCodec { StatusMessage::from_ssz_bytes(&raw_bytes)?, ))), }, - Protocol::Goodbye => { - Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) - } + Protocol::Goodbye => Err(RPCError::InvalidData), Protocol::BlocksByRange => match self.protocol.version { Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( SignedBeaconBlock::from_ssz_bytes(&raw_bytes)?, diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs index e2f0db1ff..c99d1f6fd 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -3,7 +3,7 @@ use crate::rpc::{ codec::base::OutboundCodec, protocol::{Encoding, Protocol, ProtocolId, RPCError, Version}, }; -use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; +use crate::rpc::{ErrorMessage, RPCCodedResponse, RPCRequest, RPCResponse}; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -45,28 +45,28 @@ impl SSZSnappyInboundCodec { // Encoder for inbound streams: Encodes RPC Responses sent to peers. impl Encoder for SSZSnappyInboundCodec { - type Item = RPCErrorResponse; + type Item = RPCCodedResponse; type Error = RPCError; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { - RPCErrorResponse::Success(resp) => match resp { + RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), - RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), - RPCErrorResponse::Unknown(err) => err.as_ssz_bytes(), - RPCErrorResponse::StreamTermination(_) => { + RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), + RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), + RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), + RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { - return Err(RPCError::Custom( + return Err(RPCError::InternalError( "attempting to encode data > max_packet_size".into(), )); } @@ -106,9 +106,7 @@ impl Decoder for SSZSnappyInboundCodec { // Should not attempt to decode rpc chunks with length > max_packet_size if length > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to decode data > max_packet_size".into(), - )); + return Err(RPCError::InvalidData); } let mut reader = FrameDecoder::new(Cursor::new(&src)); let mut decoded_buffer = vec![0; length]; @@ -148,9 +146,7 @@ impl Decoder for SSZSnappyInboundCodec { Protocol::MetaData => match self.protocol.version { Version::V1 => { if decoded_buffer.len() > 0 { - Err(RPCError::Custom( - "Get metadata request should be empty".into(), - )) + Err(RPCError::InvalidData) } else { Ok(Some(RPCRequest::MetaData(PhantomData))) } @@ -212,8 +208,8 @@ impl Encoder for SSZSnappyOutboundCodec { }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to encode data > max_packet_size".into(), + return Err(RPCError::InternalError( + "attempting to encode data > max_packet_size", )); } @@ -257,9 +253,7 @@ impl Decoder for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with length > max_packet_size if length > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to decode data > max_packet_size".into(), - )); + return Err(RPCError::InvalidData); } let mut reader = FrameDecoder::new(Cursor::new(&src)); let mut decoded_buffer = vec![0; length]; @@ -276,7 +270,8 @@ impl Decoder for SSZSnappyOutboundCodec { ))), }, Protocol::Goodbye => { - Err(RPCError::InvalidProtocol("GOODBYE doesn't have a response")) + // Goodbye does not have a response + Err(RPCError::InvalidData) } Protocol::BlocksByRange => match self.protocol.version { Version::V1 => Ok(Some(RPCResponse::BlocksByRange(Box::new( @@ -330,9 +325,7 @@ impl OutboundCodec for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with length > max_packet_size if length > self.max_packet_size { - return Err(RPCError::Custom( - "attempting to decode data > max_packet_size".into(), - )); + return Err(RPCError::InvalidData); } let mut reader = FrameDecoder::new(Cursor::new(&src)); let mut decoded_buffer = vec![0; length]; diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index d8ff541c9..d8fa347ab 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,14 +1,16 @@ #![allow(clippy::type_complexity)] #![allow(clippy::cognitive_complexity)] -use super::methods::{ErrorMessage, RPCErrorResponse, RequestId, ResponseTermination}; -use super::protocol::{RPCError, RPCProtocol, RPCRequest}; +use super::methods::{ErrorMessage, RPCCodedResponse, RequestId, ResponseTermination}; +use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; +use libp2p::core::upgrade::{ + InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError, +}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; @@ -46,13 +48,13 @@ where listen_protocol: SubstreamProtocol>, /// If something bad happened and we should shut down the handler with an error. - pending_error: Vec<(RequestId, ProtocolsHandlerUpgrErr)>, + pending_error: Vec<(RequestId, Protocol, RPCError)>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[RPCEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[RPCEvent; 4]>, + dial_queue: SmallVec<[(RequestId, RPCRequest); 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, @@ -63,6 +65,7 @@ where ( InboundSubstreamState, Option, + Protocol, ), >, @@ -73,14 +76,18 @@ where /// maintained by the application sending the request. outbound_substreams: FnvHashMap< OutboundRequestId, - (OutboundSubstreamState, delay_queue::Key), + ( + OutboundSubstreamState, + delay_queue::Key, + Protocol, + ), >, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, /// Map of outbound items that are queued as the stream processes them. - queued_outbound_items: FnvHashMap>>, + queued_outbound_items: FnvHashMap>>, /// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID. current_inbound_substream_id: RequestId, @@ -152,17 +159,17 @@ where /// Moves the substream state to closing and informs the connected peer. The /// `queued_outbound_items` must be given as a parameter to add stream termination messages to /// the outbound queue. - pub fn close(&mut self, outbound_queue: &mut Vec>) { + pub fn close(&mut self, outbound_queue: &mut Vec>) { // When terminating a stream, report the stream termination to the requesting user via // an RPC error - let error = RPCErrorResponse::ServerError(ErrorMessage { - error_message: "Request timed out".as_bytes().to_vec(), + let error = RPCCodedResponse::ServerError(ErrorMessage { + error_message: b"Request timed out".to_vec(), }); // The stream termination type is irrelevant, this will terminate the // stream let stream_termination = - RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange); + RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange); match std::mem::replace(self, InboundSubstreamState::Poisoned) { InboundSubstreamState::ResponsePendingSend { substream, closing } => { @@ -244,10 +251,10 @@ where } /// Opens an outbound substream with a request. - pub fn send_request(&mut self, rpc_event: RPCEvent) { + pub fn send_request(&mut self, id: RequestId, req: RPCRequest) { self.keep_alive = KeepAlive::Yes; - self.dial_queue.push(rpc_event); + self.dial_queue.push((id, req)); } } @@ -262,7 +269,7 @@ where type Substream = TSubstream; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; - type OutboundOpenInfo = RPCEvent; // Keep track of the id and the request + type OutboundOpenInfo = (RequestId, RPCRequest); // Keep track of the id and the request fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() @@ -292,7 +299,7 @@ where let awaiting_stream = InboundSubstreamState::ResponseIdle(substream); self.inbound_substreams.insert( self.current_inbound_substream_id, - (awaiting_stream, Some(delay_key)), + (awaiting_stream, Some(delay_key), req.protocol()), ); self.events_out @@ -303,7 +310,7 @@ where fn inject_fully_negotiated_outbound( &mut self, out: as OutboundUpgrade>::Output, - rpc_event: Self::OutboundOpenInfo, + request_info: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -317,70 +324,80 @@ where } // add the stream to substreams if we expect a response, otherwise drop the stream. - match rpc_event { - RPCEvent::Request(mut id, request) if request.expect_response() => { - // outbound requests can be sent from various aspects of lighthouse which don't - // track request ids. In the future these will be flagged as None, currently they - // are flagged as 0. These can overlap. In this case, we pick the highest request - // Id available - if id == 0 && self.outbound_substreams.get(&id).is_some() { - // have duplicate outbound request with no id. Pick one that will not collide - let mut new_id = std::usize::MAX; - while self.outbound_substreams.get(&new_id).is_some() { - // panic all outbound substreams are full - new_id -= 1; - } - trace!(self.log, "New outbound stream id created"; "id" => new_id); - id = RequestId::from(new_id); - } - - // new outbound request. Store the stream and tag the output. - let delay_key = self - .outbound_substreams_delay - .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); - let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { - substream: out, - request, - }; - if let Some(_) = self - .outbound_substreams - .insert(id, (awaiting_stream, delay_key)) - { - crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); + let (mut id, request) = request_info; + if request.expect_response() { + // outbound requests can be sent from various aspects of lighthouse which don't + // track request ids. In the future these will be flagged as None, currently they + // are flagged as 0. These can overlap. In this case, we pick the highest request + // Id available + if id == 0 && self.outbound_substreams.get(&id).is_some() { + // have duplicate outbound request with no id. Pick one that will not collide + let mut new_id = std::usize::MAX; + while self.outbound_substreams.get(&new_id).is_some() { + // panic all outbound substreams are full + new_id -= 1; } + trace!(self.log, "New outbound stream id created"; "id" => new_id); + id = RequestId::from(new_id); } - _ => { // a response is not expected, drop the stream for all other requests + + // new outbound request. Store the stream and tag the output. + let delay_key = self + .outbound_substreams_delay + .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); + let protocol = request.protocol(); + let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { + substream: out, + request, + }; + if let Some(_) = self + .outbound_substreams + .insert(id, (awaiting_stream, delay_key, protocol)) + { + crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); } } } - // Note: If the substream has closed due to inactivity, or the substream is in the + // NOTE: If the substream has closed due to inactivity, or the substream is in the // wrong state a response will fail silently. fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { - RPCEvent::Request(_, _) => self.send_request(rpc_event), + RPCEvent::Request(id, req) => self.send_request(id, req), RPCEvent::Response(rpc_id, response) => { - // check if the stream matching the response still exists - // variables indicating if the response is an error response or a multi-part + // Variables indicating if the response is an error response or a multi-part // response let res_is_error = response.is_error(); let res_is_multiple = response.multiple_responses(); + // check if the stream matching the response still exists match self.inbound_substreams.get_mut(&rpc_id) { - Some((substream_state, _)) => { + Some((substream_state, _, protocol)) => { match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { InboundSubstreamState::ResponseIdle(substream) => { // close the stream if there is no response - if let RPCErrorResponse::StreamTermination(_) = response { - //trace!(self.log, "Stream termination sent. Ending the stream"); - *substream_state = InboundSubstreamState::Closing(substream); - } else { - // send the response - // if it's a single rpc request or an error, close the stream after - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream: substream.send(response), - closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses - }; + match response { + RPCCodedResponse::StreamTermination(_) => { + //trace!(self.log, "Stream termination sent. Ending the stream"); + *substream_state = + InboundSubstreamState::Closing(substream); + } + _ => { + if let Some(error_code) = response.error_code() { + self.pending_error.push(( + rpc_id, + *protocol, + RPCError::ErrorResponse(error_code), + )); + } + // send the response + // if it's a single rpc request or an error, close the stream after + *substream_state = + InboundSubstreamState::ResponsePendingSend { + substream: substream.send(response), + closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses + }; + } } } InboundSubstreamState::ResponsePendingSend { substream, closing } @@ -416,39 +433,55 @@ where } } None => { - warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response)); + warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}", response)); } }; } // We do not send errors as responses - RPCEvent::Error(_, _) => {} + RPCEvent::Error(..) => {} } } fn inject_dial_upgrade_error( &mut self, - request: Self::OutboundOpenInfo, + request_info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< >::Error, >, ) { + let (id, req) = request_info; if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error { self.outbound_io_error_retries += 1; if self.outbound_io_error_retries < IO_ERROR_RETRIES { - self.send_request(request); + self.send_request(id, req); return; } } + self.outbound_io_error_retries = 0; - // add the error - let request_id = { - if let RPCEvent::Request(id, _) = request { - id - } else { - 0 + // map the error + let rpc_error = match error { + ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"), + ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e, + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + RPCError::UnsupportedProtocol } + ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(e), + )) => match e { + ProtocolError::IoError(io_err) => RPCError::IoError(io_err), + ProtocolError::InvalidProtocol => { + RPCError::InternalError("Protocol was deemed invalid") + } + ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => { + // Peer is sending invalid data during the negotiation phase, not + // participating in the protocol + RPCError::InvalidData + } + }, }; - self.pending_error.push((request_id, error)); + self.pending_error.push((id, req.protocol(), rpc_error)); } fn connection_keep_alive(&self) -> KeepAlive { @@ -461,46 +494,11 @@ where ProtocolsHandlerEvent, Self::Error, > { - if let Some((request_id, err)) = self.pending_error.pop() { - // Returning an error here will result in dropping the peer. - match err { - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply( - RPCError::InvalidProtocol(protocol_string), - )) => { - // Peer does not support the protocol. - // TODO: We currently will not drop the peer, for maximal compatibility with - // other clients testing their software. In the future, we will need to decide - // which protocols are a bare minimum to support before kicking the peer. - error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)), - ))); - } - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => { - // negotiation timeout, mark the request as failed - debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len()); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - request_id, - RPCError::Custom("Protocol negotiation timeout".into()), - ), - ))); - } - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => { - // IO/Decode/Custom Error, report to the application - debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, err), - ))); - } - ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { - // Error during negotiation - debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))), - ))); - } - } + if !self.pending_error.is_empty() { + let (id, protocol, err) = self.pending_error.remove(0); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(id, protocol, err), + ))); } // return any events that need to be reported @@ -522,7 +520,7 @@ where let rpc_id = stream_id.get_ref(); // handle a stream timeout for various states - if let Some((substream_state, delay_key)) = self.inbound_substreams.get_mut(rpc_id) { + if let Some((substream_state, delay_key, _)) = self.inbound_substreams.get_mut(rpc_id) { // the delay has been removed *delay_key = None; @@ -541,14 +539,16 @@ where ProtocolsHandlerUpgrErr::Timer })? { - self.outbound_substreams.remove(stream_id.get_ref()); - // notify the user - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - *stream_id.get_ref(), - RPCError::Custom("Stream timed out".into()), - ), - ))); + if let Some((_id, _stream, protocol)) = + self.outbound_substreams.remove(stream_id.get_ref()) + { + // notify the user + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( + RPCEvent::Error(*stream_id.get_ref(), protocol, RPCError::StreamTimeout), + ))); + } else { + crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref()); + } } // drive inbound streams that need to be processed @@ -598,9 +598,10 @@ where if let Some(delay_key) = &entry.get().1 { self.inbound_substreams_delay.remove(delay_key); } + let protocol = entry.get().2; entry.remove_entry(); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(0, e), + RPCEvent::Error(0, protocol, e), ))); } }; @@ -696,7 +697,7 @@ where return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Response( request_id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( request.stream_termination(), ), ), @@ -705,9 +706,8 @@ where return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error( request_id, - RPCError::Custom( - "Stream closed early. Empty response".into(), - ), + request.protocol(), + RPCError::IncompleteStream, ), ))); } @@ -721,9 +721,10 @@ where // drop the stream let delay_key = &entry.get().1; self.outbound_substreams_delay.remove(delay_key); + let protocol = entry.get().2; entry.remove_entry(); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, e), + RPCEvent::Error(request_id, protocol, e), ))); } }, @@ -759,16 +760,14 @@ where // establish outbound substreams if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; - let rpc_event = self.dial_queue.remove(0); + let (id, req) = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); - if let RPCEvent::Request(id, req) = rpc_event { - return Ok(Async::Ready( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: RPCEvent::Request(id, req), - }, - )); - } + return Ok(Async::Ready( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: (id, req), + }, + )); } Ok(Async::NotReady) } @@ -777,7 +776,7 @@ where // Check for new items to send to the peer and update the underlying stream fn apply_queued_responses( raw_substream: InboundFramed, - queued_outbound_items: &mut Option<&mut Vec>>, + queued_outbound_items: &mut Option<&mut Vec>>, new_items_to_send: &mut bool, ) -> InboundSubstreamState { match queued_outbound_items { @@ -785,7 +784,7 @@ fn apply_queued_responses( *new_items_to_send = true; // we have queued items match queue.remove(0) { - RPCErrorResponse::StreamTermination(_) => { + RPCCodedResponse::StreamTermination(_) => { // close the stream if this is a stream termination InboundSubstreamState::Closing(raw_substream) } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index c9e86d3ec..e4b5b6714 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -173,8 +173,10 @@ pub enum ResponseTermination { BlocksByRoot, } +/// The structured response containing a result/code indicating success or failure +/// and the contents of the response #[derive(Debug)] -pub enum RPCErrorResponse { +pub enum RPCCodedResponse { /// The response is a successful. Success(RPCResponse), @@ -191,15 +193,23 @@ pub enum RPCErrorResponse { StreamTermination(ResponseTermination), } -impl RPCErrorResponse { +/// The code assigned to an erroneous `RPCResponse`. +#[derive(Debug)] +pub enum RPCResponseErrorCode { + InvalidRequest, + ServerError, + Unknown, +} + +impl RPCCodedResponse { /// Used to encode the response in the codec. pub fn as_u8(&self) -> Option { match self { - RPCErrorResponse::Success(_) => Some(0), - RPCErrorResponse::InvalidRequest(_) => Some(1), - RPCErrorResponse::ServerError(_) => Some(2), - RPCErrorResponse::Unknown(_) => Some(255), - RPCErrorResponse::StreamTermination(_) => None, + RPCCodedResponse::Success(_) => Some(0), + RPCCodedResponse::InvalidRequest(_) => Some(1), + RPCCodedResponse::ServerError(_) => Some(2), + RPCCodedResponse::Unknown(_) => Some(255), + RPCCodedResponse::StreamTermination(_) => None, } } @@ -211,30 +221,30 @@ impl RPCErrorResponse { } } - /// Builds an RPCErrorResponse from a response code and an ErrorMessage + /// Builds an RPCCodedResponse from a response code and an ErrorMessage pub fn from_error(response_code: u8, err: ErrorMessage) -> Self { match response_code { - 1 => RPCErrorResponse::InvalidRequest(err), - 2 => RPCErrorResponse::ServerError(err), - _ => RPCErrorResponse::Unknown(err), + 1 => RPCCodedResponse::InvalidRequest(err), + 2 => RPCCodedResponse::ServerError(err), + _ => RPCCodedResponse::Unknown(err), } } /// Specifies which response allows for multiple chunks for the stream handler. pub fn multiple_responses(&self) -> bool { match self { - RPCErrorResponse::Success(resp) => match resp { + RPCCodedResponse::Success(resp) => match resp { RPCResponse::Status(_) => false, RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, }, - RPCErrorResponse::InvalidRequest(_) => true, - RPCErrorResponse::ServerError(_) => true, - RPCErrorResponse::Unknown(_) => true, + RPCCodedResponse::InvalidRequest(_) => true, + RPCCodedResponse::ServerError(_) => true, + RPCCodedResponse::Unknown(_) => true, // Stream terminations are part of responses that have chunks - RPCErrorResponse::StreamTermination(_) => true, + RPCCodedResponse::StreamTermination(_) => true, } } @@ -242,10 +252,20 @@ impl RPCErrorResponse { /// sent. pub fn is_error(&self) -> bool { match self { - RPCErrorResponse::Success(_) => false, + RPCCodedResponse::Success(_) => false, _ => true, } } + + pub fn error_code(&self) -> Option { + match self { + RPCCodedResponse::Success(_) => None, + RPCCodedResponse::StreamTermination(_) => None, + RPCCodedResponse::InvalidRequest(_) => Some(RPCResponseErrorCode::InvalidRequest), + RPCCodedResponse::ServerError(_) => Some(RPCResponseErrorCode::ServerError), + RPCCodedResponse::Unknown(_) => Some(RPCResponseErrorCode::Unknown), + } + } } #[derive(Encode, Decode, Debug)] @@ -260,6 +280,17 @@ impl ErrorMessage { } } +impl std::fmt::Display for RPCResponseErrorCode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let repr = match self { + RPCResponseErrorCode::InvalidRequest => "The request was invalid", + RPCResponseErrorCode::ServerError => "Server error occurred", + RPCResponseErrorCode::Unknown => "Unknown error occurred", + }; + f.write_str(repr) + } +} + impl std::fmt::Display for StatusMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Status Message: Fork Digest: {:?}, Finalized Root: {}, Finalized Epoch: {}, Head Root: {}, Head Slot: {}", self.fork_digest, self.finalized_root, self.finalized_epoch, self.head_root, self.head_slot) @@ -282,14 +313,14 @@ impl std::fmt::Display for RPCResponse { } } -impl std::fmt::Display for RPCErrorResponse { +impl std::fmt::Display for RPCCodedResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RPCErrorResponse::Success(res) => write!(f, "{}", res), - RPCErrorResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), - RPCErrorResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), - RPCErrorResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), - RPCErrorResponse::StreamTermination(_) => write!(f, "Stream Termination"), + RPCCodedResponse::Success(res) => write!(f, "{}", res), + RPCCodedResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), + RPCCodedResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), + RPCCodedResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), + RPCCodedResponse::StreamTermination(_) => write!(f, "Stream Termination"), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index b3a250818..df87d8f89 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -13,10 +13,10 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ - ErrorMessage, MetaData, RPCErrorResponse, RPCResponse, RequestId, ResponseTermination, - StatusMessage, + ErrorMessage, MetaData, RPCCodedResponse, RPCResponse, RPCResponseErrorCode, RequestId, + ResponseTermination, StatusMessage, }; -pub use protocol::{RPCError, RPCProtocol, RPCRequest}; +pub use protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use slog::{debug, o}; use std::marker::PhantomData; use std::time::Duration; @@ -37,9 +37,9 @@ pub enum RPCEvent { /// A response that is being sent or has been received from the RPC protocol. The first parameter returns /// that which was sent with the corresponding request, the second is a single chunk of a /// response. - Response(RequestId, RPCErrorResponse), + Response(RequestId, RPCCodedResponse), /// An Error occurred. - Error(RequestId, RPCError), + Error(RequestId, Protocol, RPCError), } impl RPCEvent { @@ -47,7 +47,7 @@ impl RPCEvent { match *self { RPCEvent::Request(id, _) => id, RPCEvent::Response(id, _) => id, - RPCEvent::Error(id, _) => id, + RPCEvent::Error(id, _, _) => id, } } } @@ -57,7 +57,11 @@ impl std::fmt::Display for RPCEvent { match self { RPCEvent::Request(id, req) => write!(f, "RPC Request(id: {}, {})", id, req), RPCEvent::Response(id, res) => write!(f, "RPC Response(id: {}, {})", id, res), - RPCEvent::Error(id, err) => write!(f, "RPC Request(id: {}, error: {:?})", id, err), + RPCEvent::Error(id, prot, err) => write!( + f, + "RPC Error(id: {}, protocol: {:?} error: {:?})", + id, prot, err + ), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 76567cf46..d0b313bcf 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -34,7 +34,7 @@ const TTFB_TIMEOUT: u64 = 5; const REQUEST_TIMEOUT: u64 = 15; /// Protocol names to be used. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum Protocol { /// The Status protocol name. Status, @@ -128,7 +128,7 @@ impl UpgradeInfo for RPCProtocol { /// Tracks the types in a protocol id. #[derive(Clone, Debug)] pub struct ProtocolId { - /// The rpc message type/name. + /// The RPC message type/name. pub message_name: Protocol, /// The version of the RPC. @@ -151,7 +151,7 @@ impl ProtocolId { ProtocolId { message_name, - version: version, + version, encoding, protocol_id, } @@ -172,11 +172,20 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = Framed>, InboundCodec>; + +// Auxiliary types + +// The type of the socket timeout in the `InboundUpgrade` type `Future` +type TTimeout = + timeout::Timeout>>; +// The type of the socket timeout error in the `InboundUpgrade` type `Future` +type TTimeoutErr = timeout::Error<(RPCError, InboundFramed)>; +// `TimeoutErr` to `RPCError` mapping function +type FnMapErr = fn(TTimeoutErr) -> RPCError; + type FnAndThen = fn( (Option>, InboundFramed), ) -> FutureResult, RPCError>; -type FnMapErr = - fn(timeout::Error<(RPCError, InboundFramed)>) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -189,10 +198,7 @@ where type Future = future::Either< FutureResult, RPCError>, future::AndThen< - future::MapErr< - timeout::Timeout>>, - FnMapErr, - >, + future::MapErr, FnMapErr>, FutureResult, RPCError>, FnAndThen, >, @@ -203,7 +209,7 @@ where socket: upgrade::Negotiated, protocol: ProtocolId, ) -> Self::Future { - let protocol_name = protocol.message_name.clone(); + let protocol_name = protocol.message_name; let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = @@ -220,27 +226,31 @@ where let socket = Framed::new(timed_socket, codec); - // MetaData requests should be empty, return the stream match protocol_name { - Protocol::MetaData => futures::future::Either::A(futures::future::ok(( - RPCRequest::MetaData(PhantomData), - socket, - ))), - - _ => futures::future::Either::B( + // `MetaData` requests should be empty, return the stream + Protocol::MetaData => { + future::Either::A(future::ok((RPCRequest::MetaData(PhantomData), socket))) + } + _ => future::Either::B({ socket .into_future() .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .map_err(RPCError::from as FnMapErr) + .map_err({ + |err| { + if err.is_elapsed() { + RPCError::StreamTimeout + } else { + RPCError::InternalError("Stream timer failed") + } + } + } as FnMapErr) .and_then({ |(req, stream)| match req { - Some(request) => futures::future::ok((request, stream)), - None => futures::future::err(RPCError::Custom( - "Stream terminated early".into(), - )), + Some(request) => future::ok((request, stream)), + None => future::err(RPCError::IncompleteStream), } - } as FnAndThen), - ), + } as FnAndThen) + }), } } } @@ -270,7 +280,7 @@ impl UpgradeInfo for RPCRequest { } } -/// Implements the encoding per supported protocol for RPCRequest. +/// Implements the encoding per supported protocol for `RPCRequest`. impl RPCRequest { pub fn supported_protocols(&self) -> Vec { match self { @@ -330,6 +340,17 @@ impl RPCRequest { } } + pub fn protocol(&self) -> Protocol { + match self { + RPCRequest::Status(_) => Protocol::Status, + RPCRequest::Goodbye(_) => Protocol::Goodbye, + RPCRequest::BlocksByRange(_) => Protocol::BlocksByRange, + RPCRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, + RPCRequest::Ping(_) => Protocol::Ping, + RPCRequest::MetaData(_) => Protocol::MetaData, + } + } + /// Returns the `ResponseTermination` type associated with the request if a stream gets /// terminated. pub fn stream_termination(&self) -> ResponseTermination { @@ -361,6 +382,7 @@ where type Output = OutboundFramed; type Error = RPCError; type Future = sink::Send>; + fn upgrade_outbound( self, socket: upgrade::Negotiated, @@ -385,29 +407,25 @@ where /// Error in RPC Encoding/Decoding. #[derive(Debug)] pub enum RPCError { - /// Error when reading the packet from the socket. - ReadError(upgrade::ReadOneError), /// Error when decoding the raw buffer from ssz. + // NOTE: in the future a ssz::DecodeError should map to an InvalidData error SSZDecodeError(ssz::DecodeError), - /// Snappy error - SnappyError(snap::Error), - /// Invalid Protocol ID. - InvalidProtocol(&'static str), /// IO Error. IoError(io::Error), - /// Waiting for a request/response timed out, or timer error'd. + /// The peer returned a valid response but the response indicated an error. + ErrorResponse(RPCResponseErrorCode), + /// Timed out waiting for a response. StreamTimeout, - /// The peer returned a valid RPCErrorResponse but the response was an error. - RPCErrorResponse, - /// Custom message. - Custom(String), -} - -impl From for RPCError { - #[inline] - fn from(err: upgrade::ReadOneError) -> Self { - RPCError::ReadError(err) - } + /// Peer does not support the protocol. + UnsupportedProtocol, + /// Stream ended unexpectedly. + IncompleteStream, + /// Peer sent invalid data. + InvalidData, + /// An error occurred due to internal reasons. Ex: timer failure. + InternalError(&'static str), + /// Negotiation with this peer timed out + NegotiationTimeout, } impl From for RPCError { @@ -416,21 +434,6 @@ impl From for RPCError { RPCError::SSZDecodeError(err) } } -impl From> for RPCError { - fn from(err: tokio::timer::timeout::Error) -> Self { - if err.is_elapsed() { - RPCError::StreamTimeout - } else { - RPCError::Custom("Stream timer failed".into()) - } - } -} - -impl From<()> for RPCError { - fn from(_err: ()) -> Self { - RPCError::Custom("".into()) - } -} impl From for RPCError { fn from(err: io::Error) -> Self { @@ -438,24 +441,19 @@ impl From for RPCError { } } -impl From for RPCError { - fn from(err: snap::Error) -> Self { - RPCError::SnappyError(err) - } -} - // Error trait is required for `ProtocolsHandler` impl std::fmt::Display for RPCError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { - RPCError::ReadError(ref err) => write!(f, "Error while reading from socket: {}", err), RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), - RPCError::InvalidProtocol(ref err) => write!(f, "Invalid Protocol: {}", err), + RPCError::InvalidData => write!(f, "Peer sent unexpected data"), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), - RPCError::RPCErrorResponse => write!(f, "RPC Response Error"), + RPCError::ErrorResponse(ref code) => write!(f, "RPC response was an error: {}", code), RPCError::StreamTimeout => write!(f, "Stream Timeout"), - RPCError::SnappyError(ref err) => write!(f, "Snappy error: {}", err), - RPCError::Custom(ref err) => write!(f, "{}", err), + RPCError::UnsupportedProtocol => write!(f, "Peer does not support the protocol"), + RPCError::IncompleteStream => write!(f, "Stream ended unexpectedly"), + RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err), + RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"), } } } @@ -463,14 +461,16 @@ impl std::fmt::Display for RPCError { impl std::error::Error for RPCError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match *self { - RPCError::ReadError(ref err) => Some(err), + // NOTE: this does have a source RPCError::SSZDecodeError(_) => None, - RPCError::SnappyError(ref err) => Some(err), - RPCError::InvalidProtocol(_) => None, RPCError::IoError(ref err) => Some(err), RPCError::StreamTimeout => None, - RPCError::RPCErrorResponse => None, - RPCError::Custom(_) => None, + RPCError::UnsupportedProtocol => None, + RPCError::IncompleteStream => None, + RPCError::InvalidData => None, + RPCError::InternalError(_) => None, + RPCError::ErrorResponse(_) => None, + RPCError::NegotiationTimeout => None, } } } diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 74a0a7a1b..6f2a00bbb 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -62,12 +62,12 @@ fn test_status_rpc() { } Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { // Should receive the RPC response - RPCEvent::Response(id, response @ RPCErrorResponse::Success(_)) => { + RPCEvent::Response(id, response @ RPCCodedResponse::Success(_)) => { if id == 1 { warn!(sender_log, "Sender Received"); let response = { match response { - RPCErrorResponse::Success(r) => r, + RPCCodedResponse::Success(r) => r, _ => unreachable!(), } }; @@ -99,7 +99,7 @@ fn test_status_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); } @@ -181,12 +181,12 @@ fn test_blocks_by_range_chunked_rpc() { if id == 1 { warn!(sender_log, "Sender received a response"); match response { - RPCErrorResponse::Success(res) => { + RPCCodedResponse::Success(res) => { assert_eq!(res, sender_response.clone()); *messages_received.lock().unwrap() += 1; warn!(sender_log, "Chunk received"); } - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ) => { // should be exactly 10 messages before terminating @@ -225,7 +225,7 @@ fn test_blocks_by_range_chunked_rpc() { peer_id.clone(), RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); } @@ -234,7 +234,7 @@ fn test_blocks_by_range_chunked_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ), ), @@ -316,12 +316,12 @@ fn test_blocks_by_range_single_empty_rpc() { if id == 1 { warn!(sender_log, "Sender received a response"); match response { - RPCErrorResponse::Success(res) => { + RPCCodedResponse::Success(res) => { assert_eq!(res, sender_response.clone()); *messages_received.lock().unwrap() += 1; warn!(sender_log, "Chunk received"); } - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ) => { // should be exactly 1 messages before terminating @@ -356,7 +356,7 @@ fn test_blocks_by_range_single_empty_rpc() { peer_id.clone(), RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); // send the stream termination @@ -364,7 +364,7 @@ fn test_blocks_by_range_single_empty_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ), ), @@ -449,12 +449,12 @@ fn test_blocks_by_root_chunked_rpc() { warn!(sender_log, "Sender received a response"); assert_eq!(id, 1); match response { - RPCErrorResponse::Success(res) => { + RPCCodedResponse::Success(res) => { assert_eq!(res, sender_response.clone()); *messages_received.lock().unwrap() += 1; warn!(sender_log, "Chunk received"); } - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRoot, ) => { // should be exactly 10 messages before terminating @@ -489,7 +489,7 @@ fn test_blocks_by_root_chunked_rpc() { peer_id.clone(), RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCCodedResponse::Success(rpc_response.clone()), ), ); } @@ -498,7 +498,7 @@ fn test_blocks_by_root_chunked_rpc() { peer_id, RPCEvent::Response( id, - RPCErrorResponse::StreamTermination( + RPCCodedResponse::StreamTermination( ResponseTermination::BlocksByRange, ), ), diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 466c70363..8615d5586 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -10,7 +10,10 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{AttestationType, BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::{ - rpc::{RPCError, RPCErrorResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, + rpc::{ + RPCCodedResponse, RPCError, RPCRequest, RPCResponse, RPCResponseErrorCode, RequestId, + ResponseTermination, + }, MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, }; use futures::future::Future; @@ -123,7 +126,7 @@ impl Router { match rpc_message { RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), - RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), + RPCEvent::Error(id, _protocol, error) => self.handle_rpc_error(peer_id, id, error), } } @@ -164,23 +167,35 @@ impl Router { &mut self, peer_id: PeerId, request_id: RequestId, - error_response: RPCErrorResponse, + error_response: RPCCodedResponse, ) { // an error could have occurred. match error_response { - RPCErrorResponse::InvalidRequest(error) => { - warn!(self.log, "Peer indicated invalid request";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); - self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); + RPCCodedResponse::InvalidRequest(error) => { + warn!(self.log, "Peer indicated invalid request"; "peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error( + peer_id, + request_id, + RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest), + ); } - RPCErrorResponse::ServerError(error) => { - warn!(self.log, "Peer internal server error";"peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); - self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); + RPCCodedResponse::ServerError(error) => { + warn!(self.log, "Peer internal server error"; "peer_id" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error( + peer_id, + request_id, + RPCError::ErrorResponse(RPCResponseErrorCode::ServerError), + ); } - RPCErrorResponse::Unknown(error) => { - warn!(self.log, "Unknown peer error";"peer" => format!("{:?}", peer_id), "error" => error.as_string()); - self.handle_rpc_error(peer_id, request_id, RPCError::RPCErrorResponse); + RPCCodedResponse::Unknown(error) => { + warn!(self.log, "Unknown peer error"; "peer" => format!("{:?}", peer_id), "error" => error.as_string()); + self.handle_rpc_error( + peer_id, + request_id, + RPCError::ErrorResponse(RPCResponseErrorCode::Unknown), + ); } - RPCErrorResponse::Success(response) => match response { + RPCCodedResponse::Success(response) => match response { RPCResponse::Status(status_message) => { self.processor.on_status_response(peer_id, status_message); } @@ -205,7 +220,7 @@ impl Router { unreachable!("Meta data must be handled in the behaviour"); } }, - RPCErrorResponse::StreamTermination(response_type) => { + RPCCodedResponse::StreamTermination(response_type) => { // have received a stream termination, notify the processing functions match response_type { ResponseTermination::BlocksByRange => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index d309ee9bb..fc1f6b6fb 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -5,7 +5,7 @@ use beacon_chain::{ BlockProcessingOutcome, GossipVerifiedBlock, }; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}; +use eth2_libp2p::rpc::{RPCCodedResponse, RPCEvent, RPCRequest, RPCResponse, RequestId}; use eth2_libp2p::{NetworkGlobals, PeerId}; use slog::{debug, error, o, trace, warn}; use ssz::Encode; @@ -314,7 +314,7 @@ impl Processor { self.network.send_rpc_error_response( peer_id, request_id, - RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRoot), + RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRoot), ); } @@ -413,7 +413,7 @@ impl Processor { self.network.send_rpc_error_response( peer_id, request_id, - RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange), + RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), ); } @@ -691,16 +691,16 @@ impl HandlerNetworkContext { ) { self.send_rpc_event( peer_id, - RPCEvent::Response(request_id, RPCErrorResponse::Success(rpc_response)), + RPCEvent::Response(request_id, RPCCodedResponse::Success(rpc_response)), ); } - /// Send an RPCErrorResponse. This handles errors and stream terminations. + /// Send an RPCCodedResponse. This handles errors and stream terminations. pub fn send_rpc_error_response( &mut self, peer_id: PeerId, request_id: RequestId, - rpc_error_response: RPCErrorResponse, + rpc_error_response: RPCCodedResponse, ) { self.send_rpc_event(peer_id, RPCEvent::Response(request_id, rpc_error_response)); }