diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs index 2863efd85..272a4ae75 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/delegate.rs @@ -80,6 +80,7 @@ pub enum DelegateError { Gossipsub(::Error), RPC( as ProtocolsHandler>::Error), Identify(::Error), + Disconnected, } impl std::error::Error for DelegateError {} @@ -93,6 +94,7 @@ impl std::fmt::Display for DelegateError { DelegateError::Gossipsub(err) => err.fmt(formater), DelegateError::RPC(err) => err.fmt(formater), DelegateError::Identify(err) => err.fmt(formater), + DelegateError::Disconnected => write!(formater, "Disconnected"), } } } @@ -135,11 +137,10 @@ impl ProtocolsHandler for DelegatingHandler { let rpc_proto = self.rpc_handler.listen_protocol(); let identify_proto = self.identify_handler.listen_protocol(); - let timeout = gossip_proto + let timeout = *gossip_proto .timeout() .max(rpc_proto.timeout()) - .max(identify_proto.timeout()) - .clone(); + .max(identify_proto.timeout()); let select = SelectUpgrade::new( gossip_proto.into_upgrade().1, @@ -317,7 +318,7 @@ impl ProtocolsHandler for DelegatingHandler { } Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol.map_upgrade(|u| EitherUpgrade::A(u)), + protocol: protocol.map_upgrade(EitherUpgrade::A), info: EitherOutput::First(info), }); } diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index f87ba6dbe..70b628b6f 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -95,14 +95,9 @@ impl ProtocolsHandler for BehaviourHandler { self.delegate.inject_dial_upgrade_error(info, err) } + // We don't use the keep alive to disconnect. This is handled in the poll fn connection_keep_alive(&self) -> KeepAlive { - if self.shutting_down { - let rpc_keep_alive = self.delegate.rpc().connection_keep_alive(); - let identify_keep_alive = self.delegate.identify().connection_keep_alive(); - rpc_keep_alive.max(identify_keep_alive) - } else { - KeepAlive::Yes - } + KeepAlive::Yes } fn poll( @@ -116,6 +111,15 @@ impl ProtocolsHandler for BehaviourHandler { Self::Error, >, > { + // Disconnect if the sub-handlers are ready. + if self.shutting_down { + let rpc_keep_alive = self.delegate.rpc().connection_keep_alive(); + let identify_keep_alive = self.delegate.identify().connection_keep_alive(); + if KeepAlive::No == rpc_keep_alive.max(identify_keep_alive) { + return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected)); + } + } + match self.delegate.poll(cx) { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Poll::Ready(ProtocolsHandlerEvent::Custom( diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index ca5ec1499..dc07e66c3 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,4 +1,4 @@ -use crate::peer_manager::{PeerManager, PeerManagerEvent}; +use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::Eth2Enr; @@ -21,6 +21,7 @@ use libp2p::{ }; use slog::{crit, debug, o}; use std::{ + collections::VecDeque, marker::PhantomData, sync::Arc, task::{Context, Poll}, @@ -46,10 +47,12 @@ pub struct Behaviour { identify: Identify, /// The peer manager that keeps track of peer's reputation and status. peer_manager: PeerManager, - /// The events generated by this behaviour to be consumed in the swarm poll. - events: Vec>, + /// The output events generated by this behaviour to be consumed in the swarm poll. + events: VecDeque>, + /// Events generated in the global behaviour to be sent to the behaviour handler. + handler_events: VecDeque, BehaviourEvent>>, /// Queue of peers to disconnect. - peers_to_dc: Vec, + peers_to_dc: VecDeque, /// The current meta data of the node, so respond to pings and get metadata meta_data: MetaData, /// A collections of variables accessible outside the network service. @@ -58,173 +61,12 @@ pub struct Behaviour { // NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick // lookups for every gossipsub message send. enr_fork_id: EnrForkId, + /// The waker for the current thread. + waker: Option, /// Logger for behaviour actions. log: slog::Logger, } -/// Calls the given function with the given args on all sub behaviours. -macro_rules! delegate_to_behaviours { - ($self: ident, $fn: ident, $($arg: ident), *) => { - $self.gossipsub.$fn($($arg),*); - $self.eth2_rpc.$fn($($arg),*); - $self.identify.$fn($($arg),*); - }; -} - -impl NetworkBehaviour for Behaviour { - type ProtocolsHandler = BehaviourHandler; - type OutEvent = BehaviourEvent; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify) - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.peer_manager.addresses_of_peer(peer_id) - } - - fn inject_connected(&mut self, peer_id: &PeerId) { - delegate_to_behaviours!(self, inject_connected, peer_id); - } - - fn inject_disconnected(&mut self, peer_id: &PeerId) { - delegate_to_behaviours!(self, inject_disconnected, peer_id); - } - - fn inject_connection_established( - &mut self, - peer_id: &PeerId, - conn_id: &ConnectionId, - endpoint: &ConnectedPoint, - ) { - delegate_to_behaviours!( - self, - inject_connection_established, - peer_id, - conn_id, - endpoint - ); - } - - fn inject_connection_closed( - &mut self, - peer_id: &PeerId, - conn_id: &ConnectionId, - endpoint: &ConnectedPoint, - ) { - delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); - } - - fn inject_addr_reach_failure( - &mut self, - peer_id: Option<&PeerId>, - addr: &Multiaddr, - error: &dyn std::error::Error, - ) { - delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error); - } - - fn inject_dial_failure(&mut self, peer_id: &PeerId) { - delegate_to_behaviours!(self, inject_dial_failure, peer_id); - } - - fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - delegate_to_behaviours!(self, inject_new_listen_addr, addr); - } - - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - delegate_to_behaviours!(self, inject_expired_listen_addr, addr); - } - - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - delegate_to_behaviours!(self, inject_new_external_addr, addr); - } - - fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { - delegate_to_behaviours!(self, inject_listener_error, id, err); - } - fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { - delegate_to_behaviours!(self, inject_listener_closed, id, reason); - } - - fn inject_event( - &mut self, - peer_id: PeerId, - conn_id: ConnectionId, - event: ::OutEvent, - ) { - match event { - // Events comming from the handler, redirected to each behaviour - BehaviourHandlerOut::Delegate(delegate) => match *delegate { - DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), - DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), - DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev), - }, - /* Custom events sent BY the handler */ - BehaviourHandlerOut::Custom => { - // TODO: implement - } - } - } - - fn poll( - &mut self, - cx: &mut Context, - poll_params: &mut impl PollParameters, - ) -> Poll::InEvent, Self::OutEvent>> { - // TODO: move where it's less distracting - macro_rules! poll_behaviour { - /* $behaviour: The sub-behaviour being polled. - * $on_event_fn: Function to call if we get an event from the sub-behaviour. - * $notify_handler_event_closure: Closure mapping the received event type to - * the one that the handler should get. - */ - ($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => { - loop { - // poll the sub-behaviour - match self.$behaviour.poll(cx, poll_params) { - Poll::Ready(action) => match action { - // call the designated function to handle the event from sub-behaviour - NBAction::GenerateEvent(event) => self.$on_event_fn(event), - NBAction::DialAddress { address } => { - return Poll::Ready(NBAction::DialAddress { address }) - } - NBAction::DialPeer { peer_id, condition } => { - return Poll::Ready(NBAction::DialPeer { peer_id, condition }) - } - NBAction::NotifyHandler { - peer_id, - handler, - event, - } => { - return Poll::Ready(NBAction::NotifyHandler { - peer_id, - handler, - // call the closure mapping the received event to the needed one - // in order to notify the handler - event: BehaviourHandlerIn::Delegate( - $notify_handler_event_closure(event), - ), - }); - } - NBAction::ReportObservedAddr { address } => { - return Poll::Ready(NBAction::ReportObservedAddr { address }) - } - }, - Poll::Pending => break, - } - } - }; - } - - poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub); - poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC); - poll_behaviour!(identify, on_identify_event, DelegateIn::Identify); - - self.custom_poll(cx) - } -} - /// Implements the combined behaviour for the libp2p service. impl Behaviour { pub fn new( @@ -264,15 +106,27 @@ impl Behaviour { ), identify, peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?, - events: Vec::new(), - peers_to_dc: Vec::new(), + events: VecDeque::new(), + handler_events: VecDeque::new(), + peers_to_dc: VecDeque::new(), meta_data, network_globals, enr_fork_id, + waker: None, log: behaviour_log, }) } + /// Attempts to connect to a libp2p peer. + /// + /// This MUST be used over Swarm::dial() as this keeps track of the peer in the peer manager. + /// + /// All external dials, dial a multiaddr. This is currently unused but kept here in case any + /// part of lighthouse needs to connect to a peer_id in the future. + pub fn _dial(&mut self, peer_id: &PeerId) { + self.peer_manager.dial_peer(peer_id); + } + /// Returns the local ENR of the node. pub fn local_enr(&self) -> Enr { self.network_globals.local_enr() @@ -409,13 +263,18 @@ impl Behaviour { /* Peer management functions */ - /// Notify discovery that the peer has been banned. - // TODO: Remove this and integrate all disconnection/banning logic inside the peer manager. - pub fn peer_banned(&mut self, _peer_id: PeerId) {} + /// Report a peer's action. + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + self.peer_manager.report_peer(peer_id, action) + } - /// Notify discovery that the peer has been unbanned. - // TODO: Remove this and integrate all disconnection/banning logic inside the peer manager. - pub fn peer_unbanned(&mut self, _peer_id: &PeerId) {} + /// Disconnects from a peer providing a reason. + /// + /// This will send a goodbye, disconnect and then ban the peer. + /// This is fatal for a peer, and should be used in unrecoverable circumstances. + pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { + self.peer_manager.goodbye_peer(peer_id, reason); + } /// Returns an iterator over all enr entries in the DHT. pub fn enr_entries(&mut self) -> Vec { @@ -531,40 +390,6 @@ impl Behaviour { &mut self.peer_manager } - /* Address in the new behaviour. Connections are now maintained at the swarm level. - /// Notifies the behaviour that a peer has connected. - pub fn notify_peer_connect(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => self.peer_manager.connect_outgoing(&peer_id), - ConnectedPoint::Listener { .. } => self.peer_manager.connect_ingoing(&peer_id), - }; - - // Find ENR info about a peer if possible. - if let Some(enr) = self.discovery.enr_of_peer(&peer_id) { - let bitfield = match enr.bitfield::() { - Ok(v) => v, - Err(e) => { - warn!(self.log, "Peer has invalid ENR bitfield"; - "peer_id" => format!("{}", peer_id), - "error" => format!("{:?}", e)); - return; - } - }; - - // use this as a baseline, until we get the actual meta-data - let meta_data = MetaData { - seq_number: 0, - attnets: bitfield, - }; - // TODO: Shift to the peer manager - self.network_globals - .peers - .write() - .add_metadata(&peer_id, meta_data); - } - } - */ - fn on_gossip_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message(propagation_source, id, gs_msg) => { @@ -576,7 +401,7 @@ impl Behaviour { } Ok(msg) => { // if this message isn't a duplicate, notify the network - self.events.push(BehaviourEvent::PubsubMessage { + self.add_event(BehaviourEvent::PubsubMessage { id, source: propagation_source, topics: gs_msg.topics, @@ -586,8 +411,7 @@ impl Behaviour { } } GossipsubEvent::Subscribed { peer_id, topic } => { - self.events - .push(BehaviourEvent::PeerSubscribed(peer_id, topic)); + self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic)); } GossipsubEvent::Unsubscribed { .. } => {} } @@ -596,7 +420,7 @@ impl Behaviour { /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. fn propagate_response(&mut self, id: RequestId, peer_id: PeerId, response: Response) { if !matches!(id, RequestId::Behaviour) { - self.events.push(BehaviourEvent::ResponseReceived { + self.add_event(BehaviourEvent::ResponseReceived { peer_id, id, response, @@ -606,7 +430,7 @@ impl Behaviour { /// Convenience function to propagate a request. fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) { - self.events.push(BehaviourEvent::RequestReceived { + self.add_event(BehaviourEvent::RequestReceived { peer_id, id, request, @@ -639,8 +463,7 @@ impl Behaviour { self.peer_manager.handle_rpc_error(&peer_id, proto, &error); // inform failures of requests comming outside the behaviour if !matches!(id, RequestId::Behaviour) { - self.events - .push(BehaviourEvent::RPCFailed { peer_id, id, error }); + self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error }); } } } @@ -664,11 +487,18 @@ impl Behaviour { // let the peer manager know this peer is in the process of disconnecting self.peer_manager._disconnecting_peer(&peer_id); // queue for disconnection without a goodbye message - debug!(self.log, "Received a Goodbye, queueing for disconnection"; - "peer_id" => peer_id.to_string()); - self.peers_to_dc.push(peer_id.clone()); - // TODO: do not propagate - self.propagate_request(peer_request_id, peer_id, Request::Goodbye(reason)); + debug!( + self.log, "Peer sent Goodbye"; + "peer_id" => peer_id.to_string(), + "reason" => reason.to_string(), + "client" => self.network_globals.client(&peer_id).to_string(), + ); + self.peers_to_dc.push_back(peer_id); + // NOTE: We currently do not inform the application that we are + // disconnecting here. + // The actual disconnection event will be relayed to the application. Ideally + // this time difference is short, but we may need to introduce a message to + // inform the application layer early. } /* Protocols propagated to the Network */ RPCRequest::Status(msg) => { @@ -724,10 +554,15 @@ impl Behaviour { &mut self, cx: &mut Context, ) -> Poll, BehaviourEvent>> { + // if there are any handler_events process them + if let Some(event) = self.handler_events.pop_front() { + return Poll::Ready(event); + } + // handle pending disconnections to perform - if !self.peers_to_dc.is_empty() { + if let Some(peer_id) = self.peers_to_dc.pop_front() { return Poll::Ready(NBAction::NotifyHandler { - peer_id: self.peers_to_dc.remove(0), + peer_id, handler: NotifyHandler::All, event: BehaviourHandlerIn::Shutdown(None), }); @@ -760,18 +595,18 @@ impl Behaviour { PeerManagerEvent::MetaData(peer_id) => { self.send_meta_data_request(peer_id); } - PeerManagerEvent::DisconnectPeer(peer_id) => { + PeerManagerEvent::DisconnectPeer(peer_id, reason) => { debug!(self.log, "PeerManager requested to disconnect a peer"; "peer_id" => peer_id.to_string()); // queue for disabling - self.peers_to_dc.push(peer_id.clone()); + self.peers_to_dc.push_back(peer_id.clone()); // send one goodbye return Poll::Ready(NBAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, event: BehaviourHandlerIn::Shutdown(Some(( RequestId::Behaviour, - RPCRequest::Goodbye(GoodbyeReason::Fault), + RPCRequest::Goodbye(reason), ))), }); } @@ -781,8 +616,8 @@ impl Behaviour { } } - if !self.events.is_empty() { - return Poll::Ready(NBAction::GenerateEvent(self.events.remove(0))); + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NBAction::GenerateEvent(event)); } Poll::Pending @@ -817,21 +652,244 @@ impl Behaviour { IdentifyEvent::Error { .. } => {} } } + + /// Adds an event to the queue waking the current thread to process it. + fn add_event(&mut self, event: BehaviourEvent) { + self.events.push_back(event); + if let Some(waker) = &self.waker { + waker.wake_by_ref(); + } + } +} + +/// Calls the given function with the given args on all sub behaviours. +macro_rules! delegate_to_behaviours { + ($self: ident, $fn: ident, $($arg: ident), *) => { + $self.gossipsub.$fn($($arg),*); + $self.eth2_rpc.$fn($($arg),*); + $self.identify.$fn($($arg),*); + }; +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = BehaviourHandler; + type OutEvent = BehaviourEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify) + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.peer_manager.addresses_of_peer(peer_id) + } + + // This gets called every time a connection is closed. + fn inject_connection_closed( + &mut self, + peer_id: &PeerId, + conn_id: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint); + } + + // This gets called once there are no more active connections. + fn inject_disconnected(&mut self, peer_id: &PeerId) { + // Inform the peer manager. + self.peer_manager.notify_disconnect(&peer_id); + // Inform the application. + self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); + delegate_to_behaviours!(self, inject_disconnected, peer_id); + } + + // This gets called every time a connection is established. + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + conn_id: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + // If the peer is banned, send a goodbye and disconnect. + if self.peer_manager.is_banned(peer_id) { + self.peers_to_dc.push_back(peer_id.clone()); + // send a goodbye on all possible handlers for this peer + self.handler_events.push_back(NBAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::All, + event: BehaviourHandlerIn::Shutdown(Some(( + RequestId::Behaviour, + RPCRequest::Goodbye(GoodbyeReason::Banned), + ))), + }); + return; + } + + // notify the peer manager of a successful connection + match endpoint { + ConnectedPoint::Listener { .. } => { + self.peer_manager.connect_ingoing(&peer_id); + self.add_event(BehaviourEvent::PeerConnected(peer_id.clone())); + debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connection" => "Incoming"); + } + ConnectedPoint::Dialer { .. } => { + self.peer_manager.connect_outgoing(&peer_id); + self.add_event(BehaviourEvent::PeerDialed(peer_id.clone())); + debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connection" => "Dialed"); + } + } + // report the event to the application + + delegate_to_behaviours!( + self, + inject_connection_established, + peer_id, + conn_id, + endpoint + ); + } + + // This gets called on the initial connection establishment. + fn inject_connected(&mut self, peer_id: &PeerId) { + // Drop any connection from a banned peer. The goodbye and disconnects are handled in + // `inject_connection_established()`, which gets called first. + if self.peer_manager.is_banned(peer_id) { + return; + } + delegate_to_behaviours!(self, inject_connected, peer_id); + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error, + ) { + delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error); + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + // Could not dial the peer, inform the peer manager. + self.peer_manager.notify_dial_failure(&peer_id); + delegate_to_behaviours!(self, inject_dial_failure, peer_id); + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + delegate_to_behaviours!(self, inject_new_listen_addr, addr); + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + delegate_to_behaviours!(self, inject_expired_listen_addr, addr); + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + delegate_to_behaviours!(self, inject_new_external_addr, addr); + } + + fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { + delegate_to_behaviours!(self, inject_listener_error, id, err); + } + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) { + delegate_to_behaviours!(self, inject_listener_closed, id, reason); + } + + fn inject_event( + &mut self, + peer_id: PeerId, + conn_id: ConnectionId, + event: ::OutEvent, + ) { + match event { + // Events comming from the handler, redirected to each behaviour + BehaviourHandlerOut::Delegate(delegate) => match *delegate { + DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev), + DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev), + DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev), + }, + /* Custom events sent BY the handler */ + BehaviourHandlerOut::Custom => { + // TODO: implement + } + } + } + + fn poll( + &mut self, + cx: &mut Context, + poll_params: &mut impl PollParameters, + ) -> Poll::InEvent, Self::OutEvent>> { + // update the waker if needed + if let Some(waker) = &self.waker { + if waker.will_wake(cx.waker()) { + self.waker = Some(cx.waker().clone()); + } + } else { + self.waker = Some(cx.waker().clone()); + } + + // TODO: move where it's less distracting + macro_rules! poll_behaviour { + /* $behaviour: The sub-behaviour being polled. + * $on_event_fn: Function to call if we get an event from the sub-behaviour. + * $notify_handler_event_closure: Closure mapping the received event type to + * the one that the handler should get. + */ + ($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => { + loop { + // poll the sub-behaviour + match self.$behaviour.poll(cx, poll_params) { + Poll::Ready(action) => match action { + // call the designated function to handle the event from sub-behaviour + NBAction::GenerateEvent(event) => self.$on_event_fn(event), + NBAction::DialAddress { address } => { + return Poll::Ready(NBAction::DialAddress { address }) + } + NBAction::DialPeer { peer_id, condition } => { + return Poll::Ready(NBAction::DialPeer { peer_id, condition }) + } + NBAction::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(NBAction::NotifyHandler { + peer_id, + handler, + // call the closure mapping the received event to the needed one + // in order to notify the handler + event: BehaviourHandlerIn::Delegate( + $notify_handler_event_closure(event), + ), + }); + } + NBAction::ReportObservedAddr { address } => { + return Poll::Ready(NBAction::ReportObservedAddr { address }) + } + }, + Poll::Pending => break, + } + } + }; + } + + poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub); + poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC); + poll_behaviour!(identify, on_identify_event, DelegateIn::Identify); + + self.custom_poll(cx) + } } /* Public API types */ /// The type of RPC requests the Behaviour informs it has received and allows for sending. /// -// NOTE: This is an application-level wrapper over the lower network leve requests that can be -// sent. The main difference is the absense of the Ping and Metadata protocols, which don't +// NOTE: This is an application-level wrapper over the lower network level requests that can be +// sent. The main difference is the absence of the Ping, Metadata and Goodbye protocols, which don't // leave the Behaviour. For all protocols managed by RPC see `RPCRequest`. #[derive(Debug, Clone, PartialEq)] pub enum Request { /// A Status message. Status(StatusMessage), - /// A Goobye message. - Goodbye(GoodbyeReason), /// A blocks by range request. BlocksByRange(BlocksByRangeRequest), /// A request blocks root request. @@ -843,7 +901,6 @@ impl std::convert::From for RPCRequest { match req { Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r), Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r), - Request::Goodbye(r) => RPCRequest::Goodbye(r), Request::Status(s) => RPCRequest::Status(s), } } @@ -887,6 +944,12 @@ pub type PeerRequestId = (ConnectionId, SubstreamId); /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] pub enum BehaviourEvent { + /// We have successfully dialed and connected to a peer. + PeerDialed(PeerId), + /// A peer has successfully dialed and connected to us. + PeerConnected(PeerId), + /// A peer has disconnected. + PeerDisconnected(PeerId), /// An RPC Request that was sent failed. RPCFailed { /// The id of the failed request. diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 04af9b412..e6db78508 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -90,7 +90,7 @@ impl QueryType { pub fn min_ttl(&self) -> Option { match self { Self::FindPeers => None, - Self::Subnet { min_ttl, .. } => min_ttl.clone(), + Self::Subnet { min_ttl, .. } => *min_ttl, } } } @@ -197,7 +197,7 @@ impl Discovery { debug!( log, "Could not add peer to the local routing table"; - "error" => format!("{}", e) + "error" => e.to_string() ) }); } @@ -267,7 +267,7 @@ impl Discovery { debug!( self.log, "Could not add peer to the local routing table"; - "error" => format!("{}", e) + "error" => e.to_string() ) } } @@ -350,7 +350,7 @@ impl Discovery { let _ = self .discv5 - .enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes()) + .enr_insert(ETH2_ENR_KEY, enr_fork_id.as_ssz_bytes()) .map_err(|e| { warn!( self.log, @@ -452,11 +452,7 @@ impl Discovery { // Returns a boolean indicating if we are currently processing the maximum number of // concurrent queries or not. fn at_capacity(&self) -> bool { - if self.active_queries.len() >= MAX_CONCURRENT_QUERIES { - true - } else { - false - } + self.active_queries.len() >= MAX_CONCURRENT_QUERIES } /// Runs a discovery request for a given subnet_id if one already exists. @@ -526,7 +522,7 @@ impl Discovery { QueryType::Subnet { subnet_id, .. } => { // build the subnet predicate as a combination of the eth2_fork_predicate and the // subnet predicate - let subnet_predicate = subnet_predicate::(subnet_id.clone(), &self.log); + let subnet_predicate = subnet_predicate::(*subnet_id, &self.log); Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && subnet_predicate(enr)) } }; @@ -645,6 +641,8 @@ impl Discovery { // to disk. let enr = self.discv5.local_enr(); enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr, &self.log); + // update network globals + *self.network_globals.local_enr.write() = enr; return Poll::Ready(DiscoveryEvent::SocketUpdated(socket)); } _ => {} // Ignore all other discv5 server events diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index f416e0f06..f5c9e75a2 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -23,5 +23,7 @@ pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; -pub use peer_manager::{client::Client, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; +pub use peer_manager::{ + client::Client, score::PeerAction, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo, +}; pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/client.rs b/beacon_node/eth2_libp2p/src/peer_manager/client.rs index 3e8015cd0..a31fee493 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/client.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/client.rs @@ -131,6 +131,20 @@ fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String let unknown = String::from("unknown"); (kind, unknown.clone(), unknown) } + Some("Prysm") => { + let kind = ClientKind::Prysm; + let mut version = String::from("unknown"); + let mut os_version = version.clone(); + if agent_split.next().is_some() { + if let Some(agent_version) = agent_split.next() { + version = agent_version.into(); + if let Some(agent_os_version) = agent_split.next() { + os_version = agent_os_version.into(); + } + } + } + (kind, version, os_version) + } Some("nim-libp2p") => { let kind = ClientKind::Nimbus; let mut version = String::from("unknown"); diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index c384bfb31..bc3a5a1fc 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -2,7 +2,7 @@ pub use self::peerdb::*; use crate::discovery::{Discovery, DiscoveryEvent}; -use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode}; +use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::{error, metrics}; use crate::{Enr, EnrExt, NetworkConfig, NetworkGlobals, PeerId}; use futures::prelude::*; @@ -10,7 +10,7 @@ use futures::Stream; use hashset_delay::HashSetDelay; use libp2p::core::multiaddr::Protocol as MProtocol; use libp2p::identify::IdentifyInfo; -use slog::{crit, debug, error}; +use slog::{crit, debug, error, warn}; use smallvec::SmallVec; use std::{ net::SocketAddr, @@ -27,12 +27,11 @@ pub mod client; mod peer_info; mod peer_sync_status; mod peerdb; +pub(crate) mod score; pub use peer_info::{PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; -/// The minimum reputation before a peer is disconnected. -// Most likely this needs tweaking. -const _MIN_REP_BEFORE_BAN: Rep = 10; +use score::{PeerAction, ScoreState}; /// The time in seconds between re-status's peers. const STATUS_INTERVAL: u64 = 300; /// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within @@ -63,44 +62,6 @@ pub struct PeerManager { log: slog::Logger, } -/// A collection of actions a peer can perform which will adjust its reputation. -/// Each variant has an associated reputation change. -// To easily assess the behaviour of reputation changes the number of variants should stay low, and -// somewhat generic. -pub enum PeerAction { - /// We should not communicate more with this peer. - /// This action will cause the peer to get banned. - Fatal, - /// An error occurred with this peer but it is not necessarily malicious. - /// We have high tolerance for this actions: several occurrences are needed for a peer to get - /// kicked. - /// NOTE: ~15 occurrences will get the peer banned - HighToleranceError, - /// An error occurred with this peer but it is not necessarily malicious. - /// We have high tolerance for this actions: several occurrences are needed for a peer to get - /// kicked. - /// NOTE: ~10 occurrences will get the peer banned - MidToleranceError, - /// This peer's action is not malicious but will not be tolerated. A few occurrences will cause - /// the peer to get kicked. - /// NOTE: ~5 occurrences will get the peer banned - LowToleranceError, - /// Received an expected message. - _ValidMessage, -} - -impl PeerAction { - fn rep_change(&self) -> RepChange { - match self { - PeerAction::Fatal => RepChange::worst(), - PeerAction::LowToleranceError => RepChange::bad(60), - PeerAction::MidToleranceError => RepChange::bad(25), - PeerAction::HighToleranceError => RepChange::bad(15), - PeerAction::_ValidMessage => RepChange::good(20), - } - } -} - /// The events that the `PeerManager` outputs (requests). pub enum PeerManagerEvent { /// Dial a PeerId. @@ -114,7 +75,7 @@ pub enum PeerManagerEvent { /// Request METADATA from a peer. MetaData(PeerId), /// The peer should be disconnected. - DisconnectPeer(PeerId), + DisconnectPeer(PeerId, GoodbyeReason), } impl PeerManager { @@ -147,6 +108,89 @@ impl PeerManager { /* Public accessible functions */ + /// Attempts to connect to a peer. + /// + /// Returns true if the peer was accepted into the database. + pub fn dial_peer(&mut self, peer_id: &PeerId) -> bool { + self.events.push(PeerManagerEvent::Dial(peer_id.clone())); + self.connect_peer(peer_id, ConnectingType::Dialing) + } + + /// The application layer wants to disconnect from a peer for a particular reason. + /// + /// All instant disconnections are fatal and we ban the associated peer. + /// + /// This will send a goodbye and disconnect the peer if it is connected or dialing. + pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { + // get the peer info + if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score.to_string()); + // Goodbye's are fatal + info.score.apply_peer_action(PeerAction::Fatal); + if info.connection_status.is_connected_or_dialing() { + self.events + .push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason)); + } + } + } + + /// Reports a peer for some action. + /// + /// If the peer doesn't exist, log a warning and insert defaults. + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + // TODO: Remove duplicate code - This is duplicated in the update_peer_scores() + // function. + + // Variables to update the PeerDb if required. + let mut ban_peer = None; + let mut unban_peer = None; + + if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + let previous_state = info.score.state(); + info.score.apply_peer_action(action); + if previous_state != info.score.state() { + match info.score.state() { + ScoreState::Ban => { + debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); + ban_peer = Some(peer_id.clone()); + if info.connection_status.is_connected_or_dialing() { + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + } + } + ScoreState::Disconnect => { + debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + // disconnect the peer if it's currently connected or dialing + unban_peer = Some(peer_id.clone()); + if info.connection_status.is_connected_or_dialing() { + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + } + // TODO: Update the peer manager to inform that the peer is disconnecting. + } + ScoreState::Healthy => { + debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + // unban the peer if it was previously banned. + unban_peer = Some(peer_id.clone()); + } + } + } + } + + // Update the PeerDB state. + if let Some(peer_id) = ban_peer.take() { + self.network_globals.peers.write().ban(&peer_id); + } else { + if let Some(peer_id) = unban_peer.take() { + self.network_globals.peers.write().unban(&peer_id); + } + } + } + /* Discovery Requests */ /// Provides a reference to the underlying discovery service. @@ -178,9 +222,12 @@ impl PeerManager { self.status_peers.insert(peer_id.clone()); } + /* Notifications from the Swarm */ + /// Updates the state of the peer as disconnected. + /// + /// This is also called when dialing a peer fails. pub fn notify_disconnect(&mut self, peer_id: &PeerId) { - //self.update_reputations(); self.network_globals.peers.write().disconnect(peer_id); // remove the ping and status timer for the peer @@ -193,6 +240,17 @@ impl PeerManager { ); } + /// A dial attempt has failed. + /// + /// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer + /// connects and the dial attempt later fails. To handle this, we only update the peer_db if + /// the peer is not already connected. + pub fn notify_dial_failure(&mut self, peer_id: &PeerId) { + if !self.network_globals.peers.read().is_connected(peer_id) { + self.notify_disconnect(peer_id); + } + } + /// Sets a peer as connected as long as their reputation allows it /// Informs if the peer was accepted pub fn connect_ingoing(&mut self, peer_id: &PeerId) -> bool { @@ -205,28 +263,17 @@ impl PeerManager { self.connect_peer(peer_id, ConnectingType::OutgoingConnected) } - /// Updates the database informing that a peer is being dialed. - pub fn dialing_peer(&mut self, peer_id: &PeerId) -> bool { - self.connect_peer(peer_id, ConnectingType::Dialing) - } - /// Updates the database informing that a peer is being disconnected. pub fn _disconnecting_peer(&mut self, _peer_id: &PeerId) -> bool { // TODO: implement true } - /// Reports a peer for some action. + /// Reports if a peer is banned or not. /// - /// If the peer doesn't exist, log a warning and insert defaults. - pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { - //TODO: Check these. There are double disconnects for example - // self.update_reputations(); - self.network_globals - .peers - .write() - .add_reputation(peer_id, action.rep_change()); - // self.update_reputations(); + /// This is used to determine if we should accept incoming connections. + pub fn is_banned(&self, peer_id: &PeerId) -> bool { + self.network_globals.peers.read().is_banned(peer_id) } /// Updates `PeerInfo` with `identify` information. @@ -239,9 +286,12 @@ impl PeerManager { } } + /// An error has occured in the RPC. + /// + /// This adjusts a peer's score based on the error. pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { let client = self.network_globals.client(peer_id); - debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string()); + warn!(self.log, "RPC Error"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string()); // Map this error to a `PeerAction` (if any) let peer_action = match err { @@ -423,6 +473,8 @@ impl PeerManager { /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup /// proves resource constraining, we should switch to multiaddr dialling here. fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option) { + let mut to_dial_peers = Vec::new(); + for enr in peers { let peer_id = enr.peer_id(); @@ -433,9 +485,8 @@ impl PeerManager { .peers .read() .is_connected_or_dialing(&peer_id) - && !self.network_globals.peers.read().peer_banned(&peer_id) + && !self.network_globals.peers.read().is_banned(&peer_id) { - debug!(self.log, "Dialing discovered peer"; "peer_id"=> peer_id.to_string()); // TODO: Update output // This should be updated with the peer dialing. In fact created once the peer is // dialed @@ -445,9 +496,13 @@ impl PeerManager { .write() .update_min_ttl(&peer_id, min_ttl); } - self.events.push(PeerManagerEvent::Dial(peer_id)); + to_dial_peers.push(peer_id); } } + for peer_id in to_dial_peers { + debug!(self.log, "Dialing discovered peer"; "peer_id"=> peer_id.to_string()); + self.dial_peer(&peer_id); + } } /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being @@ -465,9 +520,7 @@ impl PeerManager { let mut peerdb = self.network_globals.peers.write(); if peerdb.connection_status(peer_id).map(|c| c.is_banned()) == Some(true) { // don't connect if the peer is banned - // TODO: Handle this case. If peer is banned this shouldn't be reached. It will put - // our connection/disconnection out of sync with libp2p - // return false; + slog::crit!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => peer_id.to_string()); } match connection { @@ -491,36 +544,22 @@ impl PeerManager { true } - /// Notifies the peer manager that this peer is being dialed. - pub fn _dialing_peer(&mut self, peer_id: &PeerId) { - self.network_globals.peers.write().dialing_peer(peer_id); - } - - /// Updates the reputation of known peers according to their connection + /// Updates the scores of known peers according to their connection /// status and the time that has passed. - /// - /// **Disconnected peers** get a 1rep hit every hour they stay disconnected. - /// **Banned peers** get a 1rep gain for every hour to slowly allow them back again. - /// - /// A banned(disconnected) peer that gets its rep above(below) MIN_REP_BEFORE_BAN is - /// now considered a disconnected(banned) peer. - // TODO: Implement when reputation is added. - fn _update_reputations(&mut self) { - /* - // avoid locking the peerdb too often - // TODO: call this on a timer - - let now = Instant::now(); - - // Check for peers that get banned, unbanned and that should be disconnected - let mut ban_queue = Vec::new(); - let mut unban_queue = Vec::new(); - + /// NOTE: This is experimental and will likely be adjusted + fn update_peer_scores(&mut self) { /* Check how long have peers been in this state and update their reputations if needed */ let mut pdb = self.network_globals.peers.write(); - for (id, info) in pdb._peers_mut() { - // Update reputations + let mut to_ban_peers = Vec::new(); + let mut to_unban_peers = Vec::new(); + + for (peer_id, info) in pdb.peers_mut() { + let previous_state = info.score.state(); + // Update scores + info.score.update(); + + /* TODO: Implement logic about connection lifetimes match info.connection_status { Connected { .. } => { // Connected peers gain reputation by sending useful messages @@ -570,21 +609,49 @@ impl PeerManager { // This peer gets unbanned unban_queue.push(id.clone()); } + */ + + // handle score transitions + if previous_state != info.score.state() { + match info.score.state() { + ScoreState::Ban => { + debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string()); + to_ban_peers.push(peer_id.clone()); + if info.connection_status.is_connected_or_dialing() { + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + } + } + ScoreState::Disconnect => { + debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + // disconnect the peer if it's currently connected or dialing + to_unban_peers.push(peer_id.clone()); + if info.connection_status.is_connected_or_dialing() { + self.events.push(PeerManagerEvent::DisconnectPeer( + peer_id.clone(), + GoodbyeReason::BadScore, + )); + } + // TODO: Update peer manager to report that it's disconnecting. + } + ScoreState::Healthy => { + debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string()); + // unban the peer if it was previously banned. + to_unban_peers.push(peer_id.clone()); + } + } + } } - - for id in ban_queue { - pdb.ban(&id); - - self.events - .push(PeerManagerEvent::DisconnectPeer(id.clone())); + // process banning peers + for peer_id in to_ban_peers { + pdb.ban(&peer_id); } - - for id in unban_queue { - pdb.disconnect(&id); + // process unbanning peers + for peer_id in to_unban_peers { + pdb.unban(&peer_id); } - - self._last_updated = Instant::now(); - */ } /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. @@ -605,7 +672,8 @@ impl PeerManager { // TODO: If we have too many peers, remove peers that are not required for subnet // validation. - // TODO: Perform peer reputation maintenance here + // Updates peer's scores. + self.update_peer_scores(); } } @@ -636,7 +704,7 @@ impl Stream for PeerManager { self.events.push(PeerManagerEvent::Ping(peer_id)); } Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)) + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) } Poll::Ready(None) | Poll::Pending => break, } @@ -649,7 +717,7 @@ impl Stream for PeerManager { self.events.push(PeerManagerEvent::Status(peer_id)) } Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)) + error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string()) } Poll::Ready(None) | Poll::Pending => break, } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index 825162662..f7635a4d3 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -1,5 +1,5 @@ use super::client::Client; -use super::peerdb::{Rep, DEFAULT_REPUTATION}; +use super::score::Score; use super::PeerSyncStatus; use crate::rpc::MetaData; use crate::Multiaddr; @@ -18,7 +18,7 @@ pub struct PeerInfo { /// The connection status of the peer _status: PeerStatus, /// The peers reputation - pub reputation: Rep, + pub score: Score, /// Client managing this peer pub client: Client, /// Connection status of this peer @@ -41,7 +41,7 @@ impl Default for PeerInfo { fn default() -> PeerInfo { PeerInfo { _status: Default::default(), - reputation: DEFAULT_REPUTATION, + score: Score::default(), client: Client::default(), connection_status: Default::default(), listening_addresses: vec![], @@ -146,7 +146,7 @@ impl Default for PeerConnectionStatus { } impl PeerConnectionStatus { - /// Checks if the status is connected + /// Checks if the status is connected. pub fn is_connected(&self) -> bool { match self { PeerConnectionStatus::Connected { .. } => true, @@ -154,7 +154,7 @@ impl PeerConnectionStatus { } } - /// Checks if the status is connected + /// Checks if the status is connected. pub fn is_dialing(&self) -> bool { match self { PeerConnectionStatus::Dialing { .. } => true, @@ -162,7 +162,12 @@ impl PeerConnectionStatus { } } - /// Checks if the status is banned + /// The peer is either connected or in the process of being dialed. + pub fn is_connected_or_dialing(&self) -> bool { + self.is_connected() || self.is_dialing() + } + + /// Checks if the status is banned. pub fn is_banned(&self) -> bool { match self { PeerConnectionStatus::Banned { .. } => true, @@ -170,7 +175,7 @@ impl PeerConnectionStatus { } } - /// Checks if the status is disconnected + /// Checks if the status is disconnected. pub fn is_disconnected(&self) -> bool { match self { Disconnected { .. } => true, @@ -214,6 +219,13 @@ impl PeerConnectionStatus { }; } + /// The score system has unbanned the peer. Update the connection status + pub fn unban(&mut self) { + if let PeerConnectionStatus::Banned { since } = self { + *self = PeerConnectionStatus::Disconnected { since: *since } + } + } + pub fn connections(&self) -> (u8, u8) { match self { Connected { n_in, n_out } => (*n_in, *n_out), diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index 063d36db9..f87ddbc89 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -1,74 +1,47 @@ use super::peer_info::{PeerConnectionStatus, PeerInfo}; use super::peer_sync_status::PeerSyncStatus; +use super::score::Score; use crate::rpc::methods::MetaData; use crate::PeerId; use slog::{crit, debug, trace, warn}; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::HashMap; use std::time::Instant; use types::{EthSpec, SubnetId}; -/// A peer's reputation (perceived potential usefulness) -pub type Rep = u8; - -/// Reputation change (positive or negative) -pub struct RepChange { - is_good: bool, - diff: Rep, -} - -/// Max number of disconnected nodes to remember -const MAX_DC_PEERS: usize = 30; - -/// The default starting reputation for an unknown peer. -pub const DEFAULT_REPUTATION: Rep = 50; +/// Max number of disconnected nodes to remember. +const MAX_DC_PEERS: usize = 100; +/// The maximum number of banned nodes to remember. +const MAX_BANNED_PEERS: usize = 300; /// Storage of known peers, their reputation and information pub struct PeerDB { /// The collection of known connected peers, their status and reputation peers: HashMap>, - /// Tracking of number of disconnected nodes - n_dc: usize, + /// The number of disconnected nodes in the database. + disconnected_peers: usize, + /// The number of banned peers in the database. + banned_peers: usize, /// PeerDB's logger log: slog::Logger, } -impl RepChange { - pub fn good(diff: Rep) -> Self { - RepChange { - is_good: true, - diff, - } - } - pub fn bad(diff: Rep) -> Self { - RepChange { - is_good: false, - diff, - } - } - pub const fn worst() -> Self { - RepChange { - is_good: false, - diff: Rep::max_value(), - } - } -} - impl PeerDB { pub fn new(log: &slog::Logger) -> Self { Self { log: log.clone(), - n_dc: 0, + disconnected_peers: 0, + banned_peers: 0, peers: HashMap::new(), } } /* Getters */ - /// Gives the reputation of a peer, or DEFAULT_REPUTATION if it is unknown. - pub fn reputation(&self, peer_id: &PeerId) -> Rep { + /// Gives the score of a peer, or default score if it is unknown. + pub fn score(&self, peer_id: &PeerId) -> Score { self.peers .get(peer_id) - .map_or(DEFAULT_REPUTATION, |info| info.reputation) + .map_or(Score::default(), |info| info.score) } /// Returns an iterator over all peers in the db. @@ -77,7 +50,7 @@ impl PeerDB { } /// Returns an iterator over all peers in the db. - pub(super) fn _peers_mut(&mut self) -> impl Iterator)> { + pub(super) fn peers_mut(&mut self) -> impl Iterator)> { self.peers.iter_mut() } @@ -97,8 +70,25 @@ impl PeerDB { self.peers.get_mut(peer_id) } + /// Returns if the peer is already connected. + pub fn is_connected(&self, peer_id: &PeerId) -> bool { + if let Some(PeerConnectionStatus::Connected { .. }) = self.connection_status(peer_id) { + true + } else { + false + } + } + + /// If we are connected or currently dialing the peer returns true. + pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { + match self.connection_status(peer_id) { + Some(PeerConnectionStatus::Connected { .. }) + | Some(PeerConnectionStatus::Dialing { .. }) => true, + _ => false, + } + } /// Returns true if the peer is synced at least to our current head. - pub fn peer_synced(&self, peer_id: &PeerId) -> bool { + pub fn is_synced(&self, peer_id: &PeerId) -> bool { match self.peers.get(peer_id).map(|info| &info.sync_status) { Some(PeerSyncStatus::Synced { .. }) => true, Some(_) => false, @@ -107,7 +97,7 @@ impl PeerDB { } /// Returns true if the Peer is banned. - pub fn peer_banned(&self, peer_id: &PeerId) -> bool { + pub fn is_banned(&self, peer_id: &PeerId) -> bool { match self.peers.get(peer_id).map(|info| &info.connection_status) { Some(status) => status.is_banned(), None => false, @@ -179,7 +169,7 @@ impl PeerDB { } /// Returns a vector containing peers (their ids and info), sorted by - /// reputation from highest to lowest, and filtered using `is_status` + /// score from highest to lowest, and filtered using `is_status` pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> where F: Fn(&PeerConnectionStatus) -> bool, @@ -189,8 +179,8 @@ impl PeerDB { .iter() .filter(|(_, info)| is_status(&info.connection_status)) .collect::>(); - by_status.sort_by_key(|(_, info)| Rep::max_value() - info.reputation); - by_status + by_status.sort_by_key(|(_, info)| info.score); + by_status.into_iter().rev().collect() } /// Returns the peer with highest reputation that satisfies `is_status` @@ -201,7 +191,7 @@ impl PeerDB { self.peers .iter() .filter(|(_, info)| is_status(&info.connection_status)) - .max_by_key(|(_, info)| info.reputation) + .max_by_key(|(_, info)| info.score) .map(|(id, _)| id) } @@ -211,24 +201,6 @@ impl PeerDB { .map(|info| info.connection_status.clone()) } - /// Returns if the peer is already connected. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - if let Some(PeerConnectionStatus::Connected { .. }) = self.connection_status(peer_id) { - true - } else { - false - } - } - - /// If we are connected or currently dialing the peer returns true. - pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool { - match self.connection_status(peer_id) { - Some(PeerConnectionStatus::Connected { .. }) - | Some(PeerConnectionStatus::Dialing { .. }) => true, - _ => false, - } - } - /* Setters */ /// A peer is being dialed. @@ -236,7 +208,10 @@ impl PeerDB { let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { - self.n_dc = self.n_dc.saturating_sub(1); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + if info.connection_status.is_banned() { + self.banned_peers = self.banned_peers.saturating_sub(1); } info.connection_status = PeerConnectionStatus::Dialing { since: Instant::now(), @@ -284,7 +259,10 @@ impl PeerDB { let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { - self.n_dc = self.n_dc.saturating_sub(1); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + if info.connection_status.is_banned() { + self.banned_peers = self.banned_peers.saturating_sub(1); } info.connection_status.connect_ingoing(); } @@ -294,7 +272,10 @@ impl PeerDB { let info = self.peers.entry(peer_id.clone()).or_default(); if info.connection_status.is_disconnected() { - self.n_dc = self.n_dc.saturating_sub(1); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + if info.connection_status.is_banned() { + self.banned_peers = self.banned_peers.saturating_sub(1); } info.connection_status.connect_outgoing(); } @@ -309,40 +290,93 @@ impl PeerDB { }); if !info.connection_status.is_disconnected() && !info.connection_status.is_banned() { info.connection_status.disconnect(); - self.n_dc += 1; + self.disconnected_peers += 1; } self.shrink_to_fit(); } - /// Drops the peers with the lowest reputation so that the number of - /// disconnected peers is less than MAX_DC_PEERS - pub fn shrink_to_fit(&mut self) { - // for caution, but the difference should never be > 1 - while self.n_dc > MAX_DC_PEERS { - let to_drop = self - .peers - .iter() - .filter(|(_, info)| info.connection_status.is_disconnected()) - .min_by_key(|(_, info)| info.reputation) - .map(|(id, _)| id.clone()) - .unwrap(); // should be safe since n_dc > MAX_DC_PEERS > 0 - self.peers.remove(&to_drop); - self.n_dc = self.n_dc.saturating_sub(1); - } - } - - /// Sets a peer as banned + /// Marks a peer as banned. pub fn ban(&mut self, peer_id: &PeerId) { let log_ref = &self.log; let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { warn!(log_ref, "Banning unknown peer"; - "peer_id" => peer_id.to_string()); + "peer_id" => peer_id.to_string()); PeerInfo::default() }); + if info.connection_status.is_disconnected() { - self.n_dc = self.n_dc.saturating_sub(1); + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); + } + if !info.connection_status.is_banned() { + info.connection_status.ban(); + self.banned_peers += 1; + } + self.shrink_to_fit(); + } + + /// Unbans a peer. + pub fn unban(&mut self, peer_id: &PeerId) { + let log_ref = &self.log; + let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { + warn!(log_ref, "UnBanning unknown peer"; + "peer_id" => peer_id.to_string()); + PeerInfo::default() + }); + + if info.connection_status.is_banned() { + info.connection_status.unban(); + self.banned_peers = self.banned_peers.saturating_sub(1); + } + self.shrink_to_fit(); + } + + /// Removes banned and disconnected peers from the DB if we have reached any of our limits. + /// Drops the peers with the lowest reputation so that the number of + /// disconnected peers is less than MAX_DC_PEERS + pub fn shrink_to_fit(&mut self) { + // Remove excess baned peers + while self.banned_peers > MAX_BANNED_PEERS { + if let Some(to_drop) = self + .peers + .iter() + .filter(|(_, info)| info.connection_status.is_banned()) + .min_by(|(_, info_a), (_, info_b)| { + info_a + .score + .partial_cmp(&info_b.score) + .unwrap_or(std::cmp::Ordering::Equal) + }) + .map(|(id, _)| id.clone()) + { + debug!(self.log, "Removing old banned peer"; "peer_id" => to_drop.to_string()); + self.peers.remove(&to_drop); + } + // If there is no minimum, this is a coding error. For safety we decrease + // the count to avoid a potential infinite loop. + self.banned_peers = self.banned_peers.saturating_sub(1); + } + + // Remove excess disconnected peers + while self.disconnected_peers > MAX_DC_PEERS { + if let Some(to_drop) = self + .peers + .iter() + .filter(|(_, info)| info.connection_status.is_disconnected()) + .min_by(|(_, info_a), (_, info_b)| { + info_a + .score + .partial_cmp(&info_b.score) + .unwrap_or(std::cmp::Ordering::Equal) + }) + .map(|(id, _)| id.clone()) + { + debug!(self.log, "Removing old disconnected peer"; "peer_id" => to_drop.to_string()); + self.peers.remove(&to_drop); + } + // If there is no minimum, this is a coding error. For safety we decrease + // the count to avoid a potential infinite loop. + self.disconnected_peers = self.disconnected_peers.saturating_sub(1); } - info.connection_status.ban(); } /// Add the meta data of a peer. @@ -354,16 +388,6 @@ impl PeerDB { } } - /// Sets the reputation of peer. - #[allow(dead_code)] - pub(super) fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { - if let Some(peer_info) = self.peers.get_mut(peer_id) { - peer_info.reputation = rep; - } else { - crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => peer_id.to_string()); - } - } - /// Sets the syncing status of a peer. pub fn set_sync_status(&mut self, peer_id: &PeerId, sync_status: PeerSyncStatus) { if let Some(peer_info) = self.peers.get_mut(peer_id) { @@ -372,26 +396,6 @@ impl PeerDB { crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => peer_id.to_string()); } } - - /// Adds to a peer's reputation by `change`. If the reputation exceeds Rep's - /// upper (lower) bounds, it stays at the maximum (minimum) value. - pub(super) fn add_reputation(&mut self, peer_id: &PeerId, change: RepChange) { - let log_ref = &self.log; - let info = match self.peers.entry(peer_id.clone()) { - Entry::Vacant(_) => { - warn!(log_ref, "Peer is unknown, no reputation change made"; - "peer_id" => peer_id.to_string()); - return; - } - Entry::Occupied(e) => e.into_mut(), - }; - - info.reputation = if change.is_good { - info.reputation.saturating_add(change.diff) - } else { - info.reputation.saturating_sub(change.diff) - }; - } } #[cfg(test)] @@ -413,6 +417,12 @@ mod tests { } } + fn add_score(db: &mut PeerDB, peer_id: &PeerId, score: f64) { + if let Some(info) = db.peer_info_mut(peer_id) { + info.score.add(score); + } + } + fn get_db() -> PeerDB { let log = build_log(slog::Level::Debug, false); PeerDB::new(&log) @@ -437,9 +447,9 @@ mod tests { // this is the only peer assert_eq!(pdb.peers().count(), 1); // the peer has the default reputation - assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION); + assert_eq!(pdb.score(&random_peer).score(), Score::default().score()); // it should be connected, and therefore not counted as disconnected - assert_eq!(pdb.n_dc, 0); + assert_eq!(pdb.disconnected_peers, 0); assert!(peer_info.unwrap().connection_status.is_connected()); assert_eq!( peer_info.unwrap().connection_status.connections(), @@ -447,50 +457,6 @@ mod tests { ); } - #[test] - fn test_set_reputation() { - let mut pdb = get_db(); - let random_peer = PeerId::random(); - pdb.connect_ingoing(&random_peer); - - let mut rep = Rep::min_value(); - pdb.set_reputation(&random_peer, rep); - assert_eq!(pdb.reputation(&random_peer), rep); - - rep = Rep::max_value(); - pdb.set_reputation(&random_peer, rep); - assert_eq!(pdb.reputation(&random_peer), rep); - - rep = Rep::max_value() / 100; - pdb.set_reputation(&random_peer, rep); - assert_eq!(pdb.reputation(&random_peer), rep); - } - - #[test] - fn test_reputation_change() { - let mut pdb = get_db(); - - // 0 change does not change de reputation - let random_peer = PeerId::random(); - let change = RepChange::good(0); - pdb.connect_ingoing(&random_peer); - pdb.add_reputation(&random_peer, change); - assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION); - - // overflowing change is capped - let random_peer = PeerId::random(); - let change = RepChange::worst(); - pdb.connect_ingoing(&random_peer); - pdb.add_reputation(&random_peer, change); - assert_eq!(pdb.reputation(&random_peer), Rep::min_value()); - - let random_peer = PeerId::random(); - let change = RepChange::good(Rep::max_value()); - pdb.connect_ingoing(&random_peer); - pdb.add_reputation(&random_peer, change); - assert_eq!(pdb.reputation(&random_peer), Rep::max_value()); - } - #[test] fn test_disconnected_are_bounded() { let mut pdb = get_db(); @@ -499,13 +465,30 @@ mod tests { let p = PeerId::random(); pdb.connect_ingoing(&p); } - assert_eq!(pdb.n_dc, 0); + assert_eq!(pdb.disconnected_peers, 0); for p in pdb.connected_peer_ids().cloned().collect::>() { pdb.disconnect(&p); } - assert_eq!(pdb.n_dc, MAX_DC_PEERS); + assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS); + } + + #[test] + fn test_banned_are_bounded() { + let mut pdb = get_db(); + + for _ in 0..MAX_BANNED_PEERS + 1 { + let p = PeerId::random(); + pdb.connect_ingoing(&p); + } + assert_eq!(pdb.banned_peers, 0); + + for p in pdb.connected_peer_ids().cloned().collect::>() { + pdb.ban(&p); + } + + assert_eq!(pdb.banned_peers, MAX_BANNED_PEERS); } #[test] @@ -518,14 +501,16 @@ mod tests { pdb.connect_ingoing(&p0); pdb.connect_ingoing(&p1); pdb.connect_ingoing(&p2); - pdb.set_reputation(&p0, 70); - pdb.set_reputation(&p1, 100); - pdb.set_reputation(&p2, 50); + add_score(&mut pdb, &p0, 70.0); + add_score(&mut pdb, &p1, 100.0); + add_score(&mut pdb, &p2, 50.0); - let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected); - assert!(vec![&p1, &p0, &p2] - .into_iter() - .eq(best_peers.into_iter().map(|p| p.0))); + let best_peers: Vec<&PeerId> = pdb + .best_peers_by_status(PeerConnectionStatus::is_connected) + .iter() + .map(|p| p.0) + .collect(); + assert_eq!(vec![&p1, &p0, &p2], best_peers); } #[test] @@ -538,15 +523,15 @@ mod tests { pdb.connect_ingoing(&p0); pdb.connect_ingoing(&p1); pdb.connect_ingoing(&p2); - pdb.set_reputation(&p0, 70); - pdb.set_reputation(&p1, 100); - pdb.set_reputation(&p2, 50); + add_score(&mut pdb, &p0, 70.0); + add_score(&mut pdb, &p1, 100.0); + add_score(&mut pdb, &p2, 50.0); let the_best = pdb.best_by_status(PeerConnectionStatus::is_connected); assert!(the_best.is_some()); // Consistency check let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected); - assert_eq!(the_best, best_peers.into_iter().map(|p| p.0).next()); + assert_eq!(the_best, best_peers.iter().next().map(|p| p.0)); } #[test] @@ -556,26 +541,86 @@ mod tests { let random_peer = PeerId::random(); pdb.connect_ingoing(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.connect_ingoing(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.disconnect(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.connect_outgoing(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.disconnect(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.ban(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.disconnect(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.disconnect(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); pdb.disconnect(&random_peer); - assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + dbg!("1"); + } + + #[test] + fn test_disconnected_ban_consistency() { + let mut pdb = get_db(); + + let random_peer = PeerId::random(); + let random_peer1 = PeerId::random(); + let random_peer2 = PeerId::random(); + let random_peer3 = PeerId::random(); + + pdb.connect_ingoing(&random_peer); + pdb.connect_ingoing(&random_peer1); + pdb.connect_ingoing(&random_peer2); + pdb.connect_ingoing(&random_peer3); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + + pdb.connect_ingoing(&random_peer); + pdb.disconnect(&random_peer1); + pdb.ban(&random_peer2); + pdb.connect_ingoing(&random_peer3); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + pdb.ban(&random_peer1); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + + pdb.connect_outgoing(&random_peer2); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + pdb.ban(&random_peer3); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + + pdb.ban(&random_peer3); + pdb.connect_ingoing(&random_peer1); + pdb.disconnect(&random_peer2); + pdb.ban(&random_peer3); + pdb.connect_ingoing(&random_peer); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + pdb.disconnect(&random_peer); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + + pdb.disconnect(&random_peer); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); + assert_eq!(pdb.banned_peers, pdb.banned_peers().count()); + pdb.ban(&random_peer); + assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count()); } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs new file mode 100644 index 000000000..2aa53076e --- /dev/null +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -0,0 +1,251 @@ +//! This contains the scoring logic for peers. +//! +//! A peer's score is a rational number in the range [-100, 100]. +//! +//! As the logic develops this documentation will advance. +//! +//! The scoring algorithms are currently experimental. +use serde::Serialize; +use std::time::Instant; + +lazy_static! { + static ref HALFLIFE_DECAY: f64 = -2.0f64.ln() / SCORE_HALFLIFE; +} + +/// The default score for new peers. +pub(crate) const DEFAULT_SCORE: f64 = 0.0; +/// The minimum reputation before a peer is disconnected. +const MIN_SCORE_BEFORE_DISCONNECT: f64 = -20.0; +/// The minimum reputation before a peer is banned. +const MIN_SCORE_BEFORE_BAN: f64 = -50.0; +/// The maximum score a peer can obtain. +const MAX_SCORE: f64 = 100.0; +/// The minimum score a peer can obtain. +const MIN_SCORE: f64 = -100.0; +/// The halflife of a peer's score. I.e the number of seconds it takes for the score to decay to half its value. +const SCORE_HALFLIFE: f64 = 600.0; +/// The number of seconds we ban a peer for before their score begins to decay. +const BANNED_BEFORE_DECAY: u64 = 1800; + +/// A collection of actions a peer can perform which will adjust its score. +/// Each variant has an associated score change. +// To easily assess the behaviour of scores changes the number of variants should stay low, and +// somewhat generic. +#[derive(Debug, Clone, Copy)] +pub enum PeerAction { + /// We should not communicate more with this peer. + /// This action will cause the peer to get banned. + Fatal, + /// This peer's action is not malicious but will not be tolerated. A few occurrences will cause + /// the peer to get kicked. + /// NOTE: ~5 occurrences will get the peer banned + LowToleranceError, + /// An error occurred with this peer but it is not necessarily malicious. + /// We have high tolerance for this actions: several occurrences are needed for a peer to get + /// kicked. + /// NOTE: ~10 occurrences will get the peer banned + MidToleranceError, + /// An error occurred with this peer but it is not necessarily malicious. + /// We have high tolerance for this actions: several occurrences are needed for a peer to get + /// kicked. + /// NOTE: ~15 occurrences will get the peer banned + HighToleranceError, + /// Received an expected message. + _ValidMessage, +} + +/// The expected state of the peer given the peer's score. +#[derive(Debug, PartialEq)] +pub(crate) enum ScoreState { + /// We are content with the peers performance. We permit connections and messages. + Healthy, + /// The peer should be disconnected. We allow re-connections if the peer is persistent. + Disconnect, + /// The peer is banned. We disallow new connections until it's score has decayed into a + /// tolerable threshold. + Ban, +} + +/// A peer's score (perceived potential usefulness). +/// +/// This simplistic version consists of a global score per peer which decays to 0 over time. The +/// decay rate applies equally to positive and negative scores. +#[derive(Copy, PartialEq, Clone, Debug, Serialize)] +pub struct Score { + /// The global score. + // NOTE: In the future we may separate this into sub-scores involving the RPC, Gossipsub and + // lighthouse. + score: f64, + /// The time the score was last updated to perform time-based adjustments such as score-decay. + #[serde(skip)] + last_updated: Instant, +} + +impl Default for Score { + fn default() -> Self { + Score { + score: DEFAULT_SCORE, + last_updated: Instant::now(), + } + } +} + +impl Eq for Score {} + +impl PartialOrd for Score { + fn partial_cmp(&self, other: &Score) -> Option { + self.score + .partial_cmp(&other.score) + .or_else(|| self.last_updated.partial_cmp(&other.last_updated)) + } +} + +impl Ord for Score { + fn cmp(&self, other: &Score) -> std::cmp::Ordering { + self.partial_cmp(other) + .unwrap_or_else(|| std::cmp::Ordering::Equal) + } +} + +impl From for Score { + fn from(f: f64) -> Self { + Score { + score: f, + last_updated: Instant::now(), + } + } +} + +impl std::fmt::Display for Score { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:.2}", self.score) + } +} + +impl std::fmt::Display for PeerAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PeerAction::Fatal => write!(f, "Fatal"), + PeerAction::LowToleranceError => write!(f, "Low Tolerance Error"), + PeerAction::MidToleranceError => write!(f, "Mid Tolerance Error"), + PeerAction::HighToleranceError => write!(f, "High Tolerance Error"), + PeerAction::_ValidMessage => write!(f, "Valid Message"), + } + } +} + +impl std::fmt::Display for ScoreState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ScoreState::Healthy => write!(f, "Healthy"), + ScoreState::Ban => write!(f, "Ban"), + ScoreState::Disconnect => write!(f, "Disconnect"), + } + } +} + +impl Score { + /// Access to the underlying score. + pub fn score(&self) -> f64 { + self.score + } + + /// Modifies the score based on a peer's action. + pub fn apply_peer_action(&mut self, peer_action: PeerAction) { + match peer_action { + PeerAction::Fatal => self.score = MIN_SCORE, // The worst possible score + PeerAction::LowToleranceError => self.add(-10.0), + PeerAction::MidToleranceError => self.add(-5.0), + PeerAction::HighToleranceError => self.add(-1.0), + PeerAction::_ValidMessage => self.add(0.1), + } + } + + /// Returns the expected state of the peer given it's score. + pub(crate) fn state(&self) -> ScoreState { + match self.score { + x if x <= MIN_SCORE_BEFORE_BAN => ScoreState::Ban, + x if x <= MIN_SCORE_BEFORE_DISCONNECT => ScoreState::Disconnect, + _ => ScoreState::Healthy, + } + } + + /// Add an f64 to the score abiding by the limits. + pub fn add(&mut self, score: f64) { + let mut new_score = self.score + score; + if new_score > MAX_SCORE { + new_score = MAX_SCORE; + } + if new_score < MIN_SCORE { + new_score = MIN_SCORE; + } + + self.score = new_score; + } + + /// Applies time-based logic such as decay rates to the score. + /// This function should be called periodically. + pub fn update(&mut self) { + // Apply decay logic + // + // There is two distinct decay processes. One for banned peers and one for all others. If + // the score is below the banning threshold and the duration since it was last update is + // shorter than the banning threshold, we do nothing. + let now = Instant::now(); + if self.score <= MIN_SCORE_BEFORE_BAN + && now + .checked_duration_since(self.last_updated) + .map(|d| d.as_secs()) + <= Some(BANNED_BEFORE_DECAY) + { + // The peer is banned and still within the ban timeout. Do not update it's score. + return; + } + + // Decay the current score + // Using exponential decay based on a constant half life. + if let Some(secs_since_update) = now + .checked_duration_since(self.last_updated) + .map(|d| d.as_secs()) + { + // e^(-ln(2)/HL*t) + let decay_factor = (*HALFLIFE_DECAY * secs_since_update as f64).exp(); + self.score *= decay_factor; + self.last_updated = now; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reputation_change() { + let mut score = Score::default(); + + // 0 change does not change de reputation + // + let change = 0.0; + score.add(change); + assert_eq!(score.score(), DEFAULT_SCORE); + + // underflowing change is capped + let mut score = Score::default(); + let change = MIN_SCORE - 50.0; + score.add(change); + assert_eq!(score.score(), MIN_SCORE); + + // overflowing change is capped + let mut score = Score::default(); + let change = MAX_SCORE + 50.0; + score.add(change); + assert_eq!(score.score(), MAX_SCORE); + + // Score adjusts + let mut score = Score::default(); + let change = 1.32; + score.add(change); + assert_eq!(score.score(), DEFAULT_SCORE + change); + } +} diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs index 04ab425ac..162541239 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs @@ -145,7 +145,7 @@ impl Decoder for SSZInboundCodec { }, Protocol::MetaData => match self.protocol.version { Version::V1 => { - if packet.len() > 0 { + if !packet.is_empty() { Err(RPCError::InvalidData) } else { Ok(Some(RPCRequest::MetaData(PhantomData))) diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index b33cfdc32..276ff9d57 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -74,7 +74,7 @@ impl Encoder> for SSZSnappyInboundCodec< // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { return Err(RPCError::InternalError( - "attempting to encode data > max_packet_size".into(), + "attempting to encode data > max_packet_size", )); } // Inserts the length prefix of the uncompressed bytes into dst @@ -186,7 +186,7 @@ impl Decoder for SSZSnappyInboundCodec { }, Protocol::MetaData => match self.protocol.version { Version::V1 => { - if decoded_buffer.len() > 0 { + if !decoded_buffer.is_empty() { Err(RPCError::InvalidData) } else { Ok(Some(RPCRequest::MetaData(PhantomData))) diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 29bc3504c..02641a373 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -20,7 +20,7 @@ use std::{ collections::hash_map::Entry, pin::Pin, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; use tokio::time::{delay_queue, delay_until, Delay, DelayQueue, Instant as TInstant}; use types::EthSpec; @@ -40,19 +40,19 @@ const SHUTDOWN_TIMEOUT_SECS: u8 = 15; #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] pub struct SubstreamId(usize); -/// An error encoutered by the handler. +/// An error encountered by the handler. pub enum HandlerErr { - /// An error ocurred for this peer's request. This can occurr during protocol negotiation, - /// message passing, or if the handler identifies that we are sending an error reponse to the peer. + /// An error occurred for this peer's request. This can occur during protocol negotiation, + /// message passing, or if the handler identifies that we are sending an error response to the peer. Inbound { /// Id of the peer's request for which an error occurred. id: SubstreamId, /// Information of the negotiated protocol. proto: Protocol, - /// The error that ocurred. + /// The error that occurred. error: RPCError, }, - /// An error ocurred for this request. Such error can occurr during protocol negotiation, + /// An error occurred for this request. Such error can occur during protocol negotiation, /// message passing, or if we successfully received a response from the peer, but this response /// indicates an error. Outbound { @@ -60,7 +60,7 @@ pub enum HandlerErr { id: RequestId, /// Information of the protocol. proto: Protocol, - /// The error that ocurred. + /// The error that occurred. error: RPCError, }, } @@ -122,9 +122,6 @@ where /// State of the handler. state: HandlerState, - /// After the given duration has elapsed, an inactive connection will shutdown. - inactive_timeout: Duration, - /// Try to negotiate the outbound upgrade a few times if there is an IO error before reporting the request as failed. /// This keeps track of the number of attempts. outbound_io_error_retries: u8, @@ -139,6 +136,7 @@ enum HandlerState { /// The handler is shutting_down. /// /// While in this state the handler rejects new requests but tries to finish existing ones. + /// Once the timer expires, all messages are killed. ShuttingDown(Delay), /// The handler is deactivated. A goodbye has been sent and no more messages are sent or /// received. @@ -278,11 +276,7 @@ impl RPCHandler where TSpec: EthSpec, { - pub fn new( - listen_protocol: SubstreamProtocol>, - inactive_timeout: Duration, - log: &slog::Logger, - ) -> Self { + pub fn new(listen_protocol: SubstreamProtocol>, log: &slog::Logger) -> Self { RPCHandler { listen_protocol, pending_errors: Vec::new(), @@ -299,7 +293,6 @@ where state: HandlerState::Active, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, - inactive_timeout, outbound_io_error_retries: 0, log: log.clone(), } @@ -496,19 +489,21 @@ where // Check that we don't have outbound items pending for dialing, nor dialing, nor // established. Also check that there are no established inbound substreams. // Errors and events need to be reported back, so check those too. - let should_shutdown = self.dial_queue.is_empty() - && self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - && self.pending_errors.is_empty() - && self.events_out.is_empty() - && self.dial_negotiated == 0; + let should_shutdown = if let HandlerState::ShuttingDown(_) = self.state { + self.dial_queue.is_empty() + && self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + && self.pending_errors.is_empty() + && self.events_out.is_empty() + && self.dial_negotiated == 0 + } else { + false + }; match self.keep_alive { - KeepAlive::Yes if should_shutdown => { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); - } + KeepAlive::Yes if should_shutdown => self.keep_alive = KeepAlive::No, KeepAlive::Yes => {} // We continue being active - KeepAlive::Until(_) if should_shutdown => {} // Already deemed inactive + KeepAlive::Until(_) if should_shutdown => self.keep_alive = KeepAlive::No, // Already deemed inactive KeepAlive::Until(_) => { // No longer idle self.keep_alive = KeepAlive::Yes; @@ -833,7 +828,7 @@ where // await flush entry.get_mut().0 = InboundSubstreamState::ResponsePendingFlush { - substream: substream, + substream, closing, }; drive_stream_further = true; diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 8f9bf19e6..b0ca48a3c 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -116,6 +116,18 @@ pub enum GoodbyeReason { /// Error/fault in the RPC. Fault = 3, + /// Teku uses this code for not being able to verify a network. + UnableToVerifyNetwork = 128, + + /// The node has too many connected peers. + TooManyPeers = 129, + + /// Scored poorly. + BadScore = 250, + + /// The peer is banned + Banned = 251, + /// Unknown reason. Unknown = 0, } @@ -126,6 +138,10 @@ impl From for GoodbyeReason { 1 => GoodbyeReason::ClientShutdown, 2 => GoodbyeReason::IrrelevantNetwork, 3 => GoodbyeReason::Fault, + 128 => GoodbyeReason::UnableToVerifyNetwork, + 129 => GoodbyeReason::TooManyPeers, + 250 => GoodbyeReason::BadScore, + 251 => GoodbyeReason::Banned, _ => GoodbyeReason::Unknown, } } @@ -381,6 +397,10 @@ impl std::fmt::Display for GoodbyeReason { GoodbyeReason::ClientShutdown => write!(f, "Client Shutdown"), GoodbyeReason::IrrelevantNetwork => write!(f, "Irrelevant Network"), GoodbyeReason::Fault => write!(f, "Fault"), + GoodbyeReason::UnableToVerifyNetwork => write!(f, "Unable to verify network"), + GoodbyeReason::TooManyPeers => write!(f, "Too many peers"), + GoodbyeReason::BadScore => write!(f, "Bad Score"), + GoodbyeReason::Banned => write!(f, "Banned"), GoodbyeReason::Unknown => write!(f, "Unknown Reason"), } } diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index cd3c003f8..edd4555eb 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -14,7 +14,6 @@ use libp2p::{Multiaddr, PeerId}; use slog::{debug, o}; use std::marker::PhantomData; use std::task::{Context, Poll}; -use std::time::Duration; use types::EthSpec; pub(crate) use handler::HandlerErr; @@ -149,7 +148,6 @@ where SubstreamProtocol::new(RPCProtocol { phantom: PhantomData, }), - Duration::from_secs(30), &self.log, ) } diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 45fb8952e..965c6d849 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -1,10 +1,10 @@ use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response}; use crate::discovery::enr; use crate::multiaddr::Protocol; -use crate::rpc::{RPCResponseErrorCode, RequestId}; +use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}; use crate::types::{error, GossipKind}; use crate::EnrExt; -use crate::{NetworkConfig, NetworkGlobals}; +use crate::{NetworkConfig, NetworkGlobals, PeerAction}; use futures::prelude::*; use libp2p::core::{ identity::Keypair, @@ -12,11 +12,10 @@ use libp2p::core::{ muxing::StreamMuxerBox, transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, - ConnectedPoint, }; use libp2p::{ - core, noise, - swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, + core, noise, secio, + swarm::{SwarmBuilder, SwarmEvent}, PeerId, Swarm, Transport, }; use slog::{crit, debug, info, o, trace, warn}; @@ -26,13 +25,9 @@ use std::io::{Error, ErrorKind}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tokio::time::DelayQueue; use types::{EnrForkId, EthSpec}; pub const NETWORK_KEY_FILENAME: &str = "key"; -/// The time in milliseconds to wait before banning a peer. This allows for any Goodbye messages to be -/// flushed and protocols to be negotiated. -const BAN_PEER_WAIT_TIMEOUT: u64 = 200; /// The maximum simultaneous libp2p connections per peer. const MAX_CONNECTIONS_PER_PEER: usize = 1; @@ -45,40 +40,16 @@ pub enum Libp2pEvent { Behaviour(BehaviourEvent), /// A new listening address has been established. NewListenAddr(Multiaddr), - /// A peer has established at least one connection. - PeerConnected { - /// The peer that connected. - peer_id: PeerId, - /// Whether the peer was a dialer or listener. - endpoint: ConnectedPoint, - }, - /// A peer no longer has any connections, i.e is disconnected. - PeerDisconnected { - /// The peer the disconnected. - peer_id: PeerId, - /// Whether the peer was a dialer or a listener. - endpoint: ConnectedPoint, - }, } /// The configuration and state of the libp2p components for the beacon node. pub struct Service { /// The libp2p Swarm handler. - //TODO: Make this private pub swarm: Swarm>, /// This node's PeerId. pub local_peer_id: PeerId, - /// Used for managing the state of peers. - network_globals: Arc>, - - /// A current list of peers to ban after a given timeout. - peers_to_ban: DelayQueue, - - /// A list of timeouts after which peers become unbanned. - peer_ban_timeout: DelayQueue, - /// The libp2p logger handle. pub log: slog::Logger, } @@ -162,7 +133,9 @@ impl Service { }; // helper closure for dialing peers - let mut dial_addr = |multiaddr: &Multiaddr| { + let mut dial_addr = |mut multiaddr: Multiaddr| { + // strip the p2p protocol if it exists + strip_peer_id(&mut multiaddr); match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), Err(err) => debug!( @@ -174,7 +147,7 @@ impl Service { // attempt to connect to user-input libp2p nodes for multiaddr in &config.libp2p_nodes { - dial_addr(multiaddr); + dial_addr(multiaddr.clone()); } // attempt to connect to any specified boot-nodes @@ -194,7 +167,7 @@ impl Service { .read() .is_connected_or_dialing(&bootnode_enr.peer_id()) { - dial_addr(multiaddr); + dial_addr(multiaddr.clone()); } } } @@ -212,25 +185,12 @@ impl Service { let service = Service { local_peer_id, swarm, - network_globals: network_globals.clone(), - peers_to_ban: DelayQueue::new(), - peer_ban_timeout: DelayQueue::new(), log, }; Ok((network_globals, service)) } - /// Adds a peer to be banned for a period of time, specified by a timeout. - pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId, timeout: Duration) { - warn!(self.log, "Disconnecting and banning peer"; "peer_id" => peer_id.to_string(), "timeout" => format!("{:?}", timeout)); - self.peers_to_ban.insert( - peer_id.clone(), - Duration::from_millis(BAN_PEER_WAIT_TIMEOUT), - ); - self.peer_ban_timeout.insert(peer_id, timeout); - } - /// Sends a request to a peer, with a given Id. pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { self.swarm.send_request(peer_id, request_id, request); @@ -247,6 +207,16 @@ impl Service { self.swarm._send_error_reponse(peer_id, id, error, reason); } + /// Report a peer's action. + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + self.swarm.report_peer(peer_id, action); + } + + // Disconnect and ban a peer, providing a reason. + pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { + self.swarm.goodbye_peer(peer_id, reason); + } + /// Sends a response to a peer's request. pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response) { self.swarm.send_successful_response(peer_id, id, response); @@ -254,115 +224,61 @@ impl Service { pub async fn next_event(&mut self) -> Libp2pEvent { loop { - tokio::select! { - event = self.swarm.next_event() => { - match event { - SwarmEvent::Behaviour(behaviour) => { - return Libp2pEvent::Behaviour(behaviour) - } - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - } => { - debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connections" => num_established.get()); - // if this is the first connection inform the network layer a new connection - // has been established and update the db - if num_established.get() == 1 { - // update the peerdb - match endpoint { - ConnectedPoint::Listener { .. } => { - self.swarm.peer_manager().connect_ingoing(&peer_id); - } - ConnectedPoint::Dialer { .. } => self - .network_globals - .peers - .write() - .connect_outgoing(&peer_id), - } - return Libp2pEvent::PeerConnected { peer_id, endpoint }; - } - } - SwarmEvent::ConnectionClosed { - peer_id, - cause, - endpoint, - num_established, - } => { - debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string(), "connections" => num_established); - if num_established == 0 { - // update the peer_db - self.swarm.peer_manager().notify_disconnect(&peer_id); - // the peer has disconnected - return Libp2pEvent::PeerDisconnected { - peer_id, - endpoint, - }; - } - } - SwarmEvent::NewListenAddr(multiaddr) => { - return Libp2pEvent::NewListenAddr(multiaddr) - } - - SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - } => { - debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string()) - } - SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - } => { - debug!(self.log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string()) - } - SwarmEvent::BannedPeer { - peer_id, - endpoint: _, - } => { - debug!(self.log, "Attempted to dial a banned peer"; "peer_id" => peer_id.to_string()) - } - SwarmEvent::UnreachableAddr { - peer_id, - address, - error, - attempts_remaining, - } => { - debug!(self.log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining); - self.swarm.peer_manager().notify_disconnect(&peer_id); - } - SwarmEvent::UnknownPeerUnreachableAddr { address, error } => { - debug!(self.log, "Peer not known at dialed address"; "address" => address.to_string(), "error" => error.to_string()); - } - SwarmEvent::ExpiredListenAddr(multiaddr) => { - debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string()) - } - SwarmEvent::ListenerClosed { addresses, reason } => { - debug!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason)) - } - SwarmEvent::ListenerError { error } => { - debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())) - } - SwarmEvent::Dialing(peer_id) => { - self.swarm.peer_manager().dialing_peer(&peer_id); - } - } + match self.swarm.next_event().await { + SwarmEvent::Behaviour(behaviour) => return Libp2pEvent::Behaviour(behaviour), + SwarmEvent::ConnectionEstablished { .. } => { + // A connection could be established with a banned peer. This is + // handled inside the behaviour. } - Some(Ok(peer_to_ban)) = self.peers_to_ban.next() => { - let peer_id = peer_to_ban.into_inner(); - Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); - // TODO: Correctly notify protocols of the disconnect - // TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 - self.swarm.inject_disconnected(&peer_id); - // inform the behaviour that the peer has been banned - self.swarm.peer_banned(peer_id); + SwarmEvent::ConnectionClosed { + peer_id, + cause, + endpoint: _, + num_established, + } => { + debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string(), "connections" => num_established); } - Some(Ok(peer_to_unban)) = self.peer_ban_timeout.next() => { - debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_to_unban)); - let unban_peer = peer_to_unban.into_inner(); - self.swarm.peer_unbanned(&unban_peer); - Swarm::unban_peer_id(&mut self.swarm, unban_peer); + SwarmEvent::NewListenAddr(multiaddr) => { + return Libp2pEvent::NewListenAddr(multiaddr) + } + SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + } => { + debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string()) + } + SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + } => { + debug!(self.log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string()) + } + SwarmEvent::BannedPeer { .. } => { + // We do not ban peers at the swarm layer, so this should never occur. + } + SwarmEvent::UnreachableAddr { + peer_id, + address, + error, + attempts_remaining, + } => { + debug!(self.log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining); + } + SwarmEvent::UnknownPeerUnreachableAddr { address, error } => { + debug!(self.log, "Peer not known at dialed address"; "address" => address.to_string(), "error" => error.to_string()); + } + SwarmEvent::ExpiredListenAddr(multiaddr) => { + debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string()) + } + SwarmEvent::ListenerClosed { addresses, reason } => { + debug!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason)) + } + SwarmEvent::ListenerError { error } => { + debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())) + } + SwarmEvent::Dialing(peer_id) => { + debug!(self.log, "Dialing peer"; "peer_id" => peer_id.to_string()); } } } @@ -386,7 +302,7 @@ fn build_transport( let transport = transport .and_then(move |stream, endpoint| { let upgrade = core::upgrade::SelectUpgrade::new( - libp2p::secio::SecioConfig::new(local_private_key.clone()), + secio::SecioConfig::new(local_private_key.clone()), generate_noise_config(&local_private_key), ); core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then( @@ -455,7 +371,6 @@ fn keypair_from_bytes(mut bytes: Vec) -> error::Result { /// /// Currently only secp256k1 keys are allowed, as these are the only keys supported by discv5. fn load_private_key(config: &NetworkConfig, log: &slog::Logger) -> Keypair { - // TODO: Currently using secp256k1 keypairs - currently required for discv5 // check for key from disk let network_key_f = config.network_dir.join(NETWORK_KEY_FILENAME); if let Ok(mut network_key_file) = File::open(network_key_f.clone()) { @@ -507,3 +422,14 @@ fn generate_noise_config( .expect("signing can fail only once during starting a node"); noise::NoiseConfig::xx(static_dh_keys).into_authenticated() } + +/// For a multiaddr that ends with a peer id, this strips this suffix. Rust-libp2p +/// only supports dialing to an address without providing the peer id. +fn strip_peer_id(addr: &mut Multiaddr) { + let last = addr.pop(); + match last { + Some(Protocol::P2p(_)) => {} + Some(other) => addr.push(other), + _ => {} + } +} diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index 8cab21f69..73e0dd710 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -68,7 +68,7 @@ impl PubsubMessage { continue; } Ok(gossip_topic) => { - let ref decompressed_data = match gossip_topic.encoding() { + let decompressed_data = &(match gossip_topic.encoding() { GossipEncoding::SSZSnappy => { // Exit early if uncompressed data is > GOSSIP_MAX_SIZE match decompress_len(data) { @@ -86,7 +86,7 @@ impl PubsubMessage { Err(e) => return Err(format!("{}", e)), } } - }; + }); // the ssz decoders match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { diff --git a/beacon_node/eth2_libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs index 2a0bac735..774137a3a 100644 --- a/beacon_node/eth2_libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2_libp2p/tests/rpc_tests.rs @@ -47,7 +47,7 @@ async fn test_status_rpc() { let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); sender @@ -137,7 +137,7 @@ async fn test_blocks_by_range_chunked_rpc() { let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); sender @@ -248,7 +248,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); sender @@ -377,7 +377,7 @@ async fn test_blocks_by_range_single_empty_rpc() { let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); sender @@ -489,7 +489,7 @@ async fn test_blocks_by_root_chunked_rpc() { let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); sender @@ -608,7 +608,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a STATUS message debug!(log, "Sending RPC"); sender @@ -713,19 +713,19 @@ async fn test_goodbye_rpc() { // get sender/receiver let (mut sender, mut receiver) = common::build_node_pair(&log).await; - // Goodbye Request - let rpc_request = Request::Goodbye(GoodbyeReason::ClientShutdown); - // build the sender future let sender_future = async { loop { match sender.next_event().await { - Libp2pEvent::PeerConnected { peer_id, .. } => { - // Send a STATUS message + Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { + // Send a goodbye and disconnect debug!(log, "Sending RPC"); sender .swarm - .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); + .goodbye_peer(&peer_id, GoodbyeReason::IrrelevantNetwork); + } + Libp2pEvent::Behaviour(BehaviourEvent::PeerDisconnected(_)) => { + return; } _ => {} // Ignore other RPC messages } @@ -736,13 +736,8 @@ async fn test_goodbye_rpc() { let receiver_future = async { loop { match receiver.next_event().await { - Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { - peer_id: _, - id: _, - request, - }) => { + Libp2pEvent::Behaviour(BehaviourEvent::PeerDisconnected(_)) => { // Should receive sent RPC request - assert_eq!(rpc_request.clone(), request); // receives the goodbye. Nothing left to do return; } _ => {} // Ignore other events @@ -750,9 +745,10 @@ async fn test_goodbye_rpc() { } }; + let total_future = futures::future::join(sender_future, receiver_future); + tokio::select! { - _ = sender_future => {} - _ = receiver_future => {} + _ = total_future => {} _ = delay_for(Duration::from_secs(30)) => { panic!("Future timed out"); } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 05799dc66..067b25840 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -1,7 +1,8 @@ //! This module handles incoming network messages. //! -//! It routes the messages to appropriate services, such as the Sync -//! and processes those that are +//! It routes the messages to appropriate services. +//! It handles requests at the application layer in its associated processor and directs +//! syncing-related responses to the Sync manager. #![allow(clippy::unit_arg)] pub mod processor; @@ -166,15 +167,6 @@ impl Router { self.processor .on_status_request(peer_id, id, status_message) } - Request::Goodbye(goodbye_reason) => { - debug!( - self.log, "Peer sent Goodbye"; - "peer_id" => peer_id.to_string(), - "reason" => format!("{:?}", goodbye_reason), - "client" => self.network_globals.client(&peer_id).to_string(), - ); - self.processor.on_disconnect(peer_id); - } Request::BlocksByRange(request) => self .processor .on_blocks_by_range_request(peer_id, id, request), diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 35f3258e5..ce858b6f9 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -10,7 +10,7 @@ use beacon_chain::{ GossipVerifiedBlock, }; use eth2_libp2p::rpc::*; -use eth2_libp2p::{NetworkGlobals, PeerId, PeerRequestId, Request, Response}; +use eth2_libp2p::{NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response}; use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; @@ -194,7 +194,7 @@ impl Processor { ); self.network - .disconnect(peer_id, GoodbyeReason::IrrelevantNetwork); + .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.head_slot > self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE { @@ -210,7 +210,7 @@ impl Processor { "reason" => "different system clocks or genesis time" ); self.network - .disconnect(peer_id, GoodbyeReason::IrrelevantNetwork); + .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.finalized_epoch <= local.finalized_epoch && remote.finalized_root != Hash256::zero() && local.finalized_root != Hash256::zero() @@ -230,7 +230,7 @@ impl Processor { "reason" => "different finalized chain" ); self.network - .disconnect(peer_id, GoodbyeReason::IrrelevantNetwork); + .goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork); } else if remote.finalized_epoch < local.finalized_epoch { // The node has a lower finalized epoch, their chain is not useful to us. There are two // cases where a node can have a lower finalized epoch: @@ -344,7 +344,7 @@ impl Processor { warn!(self.log, "Peer sent invalid range request"; "error" => "Step sent was 0"); - self.network.disconnect(peer_id, GoodbyeReason::Fault); + self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); return; } @@ -1096,23 +1096,24 @@ impl HandlerNetworkContext { Self { network_send, log } } + /// Sends a message to the network task. fn inform_network(&mut self, msg: NetworkMessage) { self.network_send .send(msg) .unwrap_or_else(|_| warn!(self.log, "Could not send message to the network service")) } - pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { - warn!( - &self.log, - "Disconnecting peer (RPC)"; - "reason" => format!("{:?}", reason), - "peer_id" => format!("{:?}", peer_id), - ); - self.send_processor_request(peer_id.clone(), Request::Goodbye(reason)); - self.inform_network(NetworkMessage::Disconnect { peer_id }); + /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. + pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + self.inform_network(NetworkMessage::GoodbyePeer { peer_id, reason }); } + /// Reports a peer's action, adjusting the peer's score. + pub fn _report_peer(&mut self, peer_id: PeerId, action: PeerAction) { + self.inform_network(NetworkMessage::ReportPeer { peer_id, action }); + } + + /// Sends a request to the network task. pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { self.inform_network(NetworkMessage::SendRequest { peer_id, @@ -1121,6 +1122,7 @@ impl HandlerNetworkContext { }) } + /// Sends a response to the network task. pub fn send_response(&mut self, peer_id: PeerId, response: Response, id: PeerRequestId) { self.inform_network(NetworkMessage::SendResponse { peer_id, @@ -1128,6 +1130,8 @@ impl HandlerNetworkContext { response, }) } + + /// Sends an error response to the network task. pub fn _send_error_response( &mut self, peer_id: PeerId, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4a410cef9..0f1b1f77e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,8 +8,8 @@ use crate::{error, metrics}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{ - rpc::{RPCResponseErrorCode, RequestId}, - Libp2pEvent, PeerRequestId, PubsubMessage, Request, Response, + rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, + Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, }; use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; @@ -24,8 +24,49 @@ use types::EthSpec; mod tests; -/// The time in seconds that a peer will be banned and prevented from reconnecting. -const BAN_PEER_TIMEOUT: u64 = 30; +/// Types of messages that the network service can receive. +#[derive(Debug)] +pub enum NetworkMessage { + /// Subscribes a list of validators to specific slots for attestation duties. + Subscribe { + subscriptions: Vec, + }, + /// Send an RPC request to the libp2p service. + SendRequest { + peer_id: PeerId, + request: Request, + request_id: RequestId, + }, + /// Send a successful Response to the libp2p service. + SendResponse { + peer_id: PeerId, + response: Response, + id: PeerRequestId, + }, + /// Respond to a peer's request with an error. + SendError { + // TODO: note that this is never used, we just say goodbye without nicely closing the + // stream assigned to the request + peer_id: PeerId, + error: RPCResponseErrorCode, + reason: String, + id: PeerRequestId, + }, + /// Publish a list of messages to the gossipsub protocol. + Publish { messages: Vec> }, + /// Propagate a received gossipsub message. + Propagate { + propagation_source: PeerId, + message_id: MessageId, + }, + /// Reports a peer to the peer manager for performing an action. + ReportPeer { peer_id: PeerId, action: PeerAction }, + /// Disconnect an ban a peer, providing a reason. + GoodbyePeer { + peer_id: PeerId, + reason: GoodbyeReason, + }, +} /// Service that handles communication between internal services and the `eth2_libp2p` network service. pub struct NetworkService { @@ -200,12 +241,8 @@ fn spawn_service( expose_publish_metrics(&messages); service.libp2p.swarm.publish(messages); } - NetworkMessage::Disconnect { peer_id } => { - service.libp2p.disconnect_and_ban_peer( - peer_id, - std::time::Duration::from_secs(BAN_PEER_TIMEOUT), - ); - } + NetworkMessage::ReportPeer { peer_id, action } => service.libp2p.report_peer(&peer_id, action), + NetworkMessage::GoodbyePeer { peer_id, reason } => service.libp2p.goodbye_peer(&peer_id, reason), NetworkMessage::Subscribe { subscriptions } => { if let Err(e) = service .attestation_service @@ -240,16 +277,27 @@ fn spawn_service( // poll the swarm match libp2p_event { Libp2pEvent::Behaviour(event) => match event { + + BehaviourEvent::PeerDialed(peer_id) => { + let _ = service + .router_send + .send(RouterMessage::PeerDialed(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer dialed to router"); }); + }, + BehaviourEvent::PeerConnected(_peer_id) => { + // A peer has connected to us + // We currently do not perform any action here. + }, + BehaviourEvent::PeerDisconnected(peer_id) => { + let _ = service + .router_send + .send(RouterMessage::PeerDisconnected(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer disconnect to router"); + }); + }, BehaviourEvent::RequestReceived{peer_id, id, request} => { - if let Request::Goodbye(_) = request { - // if we received a Goodbye message, drop and ban the peer - //peers_to_ban.push(peer_id.clone()); - // TODO: remove this: https://github.com/sigp/lighthouse/issues/1240 - service.libp2p.disconnect_and_ban_peer( - peer_id.clone(), - std::time::Duration::from_secs(BAN_PEER_TIMEOUT), - ); - }; let _ = service .router_send .send(RouterMessage::RPCRequestReceived{peer_id, id, request}) @@ -328,25 +376,6 @@ fn spawn_service( Libp2pEvent::NewListenAddr(multiaddr) => { service.network_globals.listen_multiaddrs.write().push(multiaddr); } - Libp2pEvent::PeerConnected{ peer_id, endpoint,} => { - debug!(service.log, "Peer Connected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint)); - if let eth2_libp2p::ConnectedPoint::Dialer { .. } = endpoint { - let _ = service - .router_send - .send(RouterMessage::PeerDialed(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer dialed to router"); }); - } - } - Libp2pEvent::PeerDisconnected{ peer_id, endpoint,} => { - debug!(service.log, "Peer Disconnected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint)); - let _ = service - .router_send - .send(RouterMessage::PeerDisconnected(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer disconnect to router"); - }); - } } } } @@ -378,45 +407,6 @@ fn next_fork_delay( }) } -/// Types of messages that the network service can receive. -#[derive(Debug)] -pub enum NetworkMessage { - /// Subscribes a list of validators to specific slots for attestation duties. - Subscribe { - subscriptions: Vec, - }, - /// Send an RPC request to the libp2p service. - SendRequest { - peer_id: PeerId, - request: Request, - request_id: RequestId, - }, - /// Send a successful Response to the libp2p service. - SendResponse { - peer_id: PeerId, - response: Response, - id: PeerRequestId, - }, - /// Respond to a peer's request with an error. - SendError { - // TODO: note that this is never used, we just say goodbye without nicely clossing the - // stream assigned to the request - peer_id: PeerId, - error: RPCResponseErrorCode, - reason: String, - id: PeerRequestId, - }, - /// Publish a list of messages to the gossipsub protocol. - Publish { messages: Vec> }, - /// Propagate a received gossipsub message. - Propagate { - propagation_source: PeerId, - message_id: MessageId, - }, - /// Disconnect and bans a peer id. - Disconnect { peer_id: PeerId }, -} - /// Inspects the `messages` that were being sent to the network and updates Prometheus metrics. fn expose_publish_metrics(messages: &[PubsubMessage]) { for message in messages { diff --git a/beacon_node/network/src/store.rs b/beacon_node/network/src/store.rs deleted file mode 100644 index e69de29bb..000000000 diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2ad6d7b31..b53b2f612 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -40,9 +40,9 @@ use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH}; use super::RequestId; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; -use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest}; +use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason}; use eth2_libp2p::types::NetworkGlobals; -use eth2_libp2p::PeerId; +use eth2_libp2p::{PeerAction, PeerId}; use fnv::FnvHashMap; use slog::{crit, debug, error, info, trace, warn, Logger}; use smallvec::SmallVec; @@ -347,10 +347,13 @@ impl SyncManager { // stream termination for a single block lookup, remove the key if let Some(single_block_request) = self.single_block_lookups.remove(&request_id) { - // the peer didn't respond with a block that it referenced + // The peer didn't respond with a block that it referenced. + // This can be allowed as some clients may implement pruning. We mildly + // tolerate this behaviour. if !single_block_request.block_returned { warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", single_block_request.hash), "peer_id" => format!("{}", peer_id)); - self.network.downvote_peer(peer_id); + self.network + .report_peer(peer_id, PeerAction::MidToleranceError); } return; } @@ -389,9 +392,10 @@ impl SyncManager { ) { // verify the hash is correct and try and process the block if expected_block_hash != block.canonical_root() { - // the peer that sent this, sent us the wrong block + // The peer that sent this, sent us the wrong block. + // We do not tolerate this behaviour. The peer is instantly disconnected and banned. warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => format!("{}", peer_id)); - self.network.downvote_peer(peer_id); + self.network.goodbye_peer(peer_id, GoodbyeReason::Fault); return; } @@ -426,7 +430,10 @@ impl SyncManager { } outcome => { warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome)); - self.network.downvote_peer(peer_id); + // This could be a range of errors. But we couldn't process the block. + // For now we consider this a mid tolerance error. + self.network + .report_peer(peer_id, PeerAction::MidToleranceError); } } } @@ -624,8 +631,12 @@ impl SyncManager { "expected_parent" => format!("{}", expected_hash), ); + // We try again, but downvote the peer. self.request_parent(parent_request); - self.network.downvote_peer(peer); + // We do not tolerate these kinds of errors. We will accept a few but these are signs + // of a faulty peer. + self.network + .report_peer(peer, PeerAction::LowToleranceError); } else { // The last block in the queue is the only one that has not attempted to be processed yet. // @@ -662,12 +673,18 @@ impl SyncManager { // all else we consider the chain a failure and downvote the peer that sent // us the last block warn!( - self.log, "Invalid parent chain. Downvoting peer"; + self.log, "Invalid parent chain"; + "score_adjustment" => PeerAction::MidToleranceError.to_string(), "outcome" => format!("{:?}", outcome), - "last_peer" => format!("{:?}", parent_request.last_submitted_peer), + "last_peer" => parent_request.last_submitted_peer.to_string(), + ); + // This currently can be a host of errors. We permit this due to the partial + // ambiguity. + // TODO: Refine the error types and score the peer appropriately. + self.network.report_peer( + parent_request.last_submitted_peer, + PeerAction::MidToleranceError, ); - self.network - .downvote_peer(parent_request.last_submitted_peer); return; } } @@ -774,7 +791,13 @@ impl SyncManager { ); } SyncMessage::ParentLookupFailed(peer_id) => { - self.network.downvote_peer(peer_id); + // A peer sent an object (block or attestation) that referenced a parent. + // On request for this parent the peer indicated it did not have this + // block. + // This is not fatal. Peer's could prune old blocks so we moderately + // tolerate this behaviour. + self.network + .report_peer(peer_id, PeerAction::MidToleranceError); } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 039c5db6d..d487971d1 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -5,7 +5,7 @@ use crate::router::processor::status_message; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; -use eth2_libp2p::{Client, NetworkGlobals, PeerId, Request}; +use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request}; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -101,37 +101,20 @@ impl SyncNetworkContext { self.send_rpc_request(peer_id, Request::BlocksByRoot(request)) } - pub fn downvote_peer(&mut self, peer_id: PeerId) { - debug!( - self.log, - "Peer downvoted"; - "peer" => format!("{:?}", peer_id) - ); - // TODO: Implement reputation - // TODO: what if we first close the channel sending a response - // RPCResponseErrorCode::InvalidRequest (or something) - // and then disconnect the peer? either request dc or let the behaviour have that logic - // itself - self.disconnect(peer_id, GoodbyeReason::Fault); + pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { + self.network_send + .send(NetworkMessage::GoodbyePeer { peer_id, reason }) + .unwrap_or_else(|_| { + warn!(self.log, "Could not report peer, channel failed"); + }); } - fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { - warn!( - &self.log, - "Disconnecting peer (RPC)"; - "reason" => format!("{:?}", reason), - "peer_id" => format!("{:?}", peer_id), - ); - - // ignore the error if the channel send fails - let _ = self.send_rpc_request(peer_id.clone(), Request::Goodbye(reason)); + pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) { + debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action"=> action.to_string()); self.network_send - .send(NetworkMessage::Disconnect { peer_id }) + .send(NetworkMessage::ReportPeer { peer_id, action }) .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send a Disconnect to the network service" - ) + warn!(self.log, "Could not report peer, channel failed"); }); } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 05abf6ea3..22183adeb 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -3,7 +3,7 @@ use crate::sync::block_processor::{spawn_block_processor, BatchProcessResult, Pr use crate::sync::network_context::SyncNetworkContext; use crate::sync::{RequestId, SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::PeerId; +use eth2_libp2p::{PeerAction, PeerId}; use rand::prelude::*; use slog::{crit, debug, warn}; use std::collections::HashSet; @@ -14,7 +14,7 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for /// already requested slots. There is a timeout for each batch request. If this value is too high, -/// we will downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the +/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which case the /// responder will fill the response up to the max request size, assuming they have the bandwidth /// to do so. pub const EPOCHS_PER_BATCH: u64 = 2; @@ -27,7 +27,7 @@ const BATCH_BUFFER_SIZE: u8 = 5; /// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed /// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will -/// be downvoted. +/// be reported negatively. const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3; #[derive(PartialEq)] @@ -192,7 +192,9 @@ impl SyncingChain { warn!(self.log, "BlocksByRange response returned out of range blocks"; "response_initial_slot" => first_slot, "requested_initial_slot" => batch.start_slot); - network.downvote_peer(batch.current_peer); + // This is a pretty bad error. We don't consider this fatal, but we don't tolerate + // this much either. + network.report_peer(batch.current_peer, PeerAction::LowToleranceError); self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches return; } @@ -363,14 +365,18 @@ impl SyncingChain { // check that we have not exceeded the re-process retry counter if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { - // if a batch has exceeded the invalid batch lookup attempts limit, it means + // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and - // downvote all peers. - warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; + // report all peers. + // There are some edge cases with forks that could land us in this situation. + // This should be unlikely, so we tolerate these errors, but not often. + let action = PeerAction::LowToleranceError; + warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; + "score_adjustment" => action.to_string(), "chain_id" => self.id, "id"=> *batch.id); for peer_id in self.peer_pool.drain() { - network.downvote_peer(peer_id); + network.report_peer(peer_id, action); } ProcessingResult::RemoveChain } else { @@ -389,14 +395,16 @@ impl SyncingChain { // check that we have not exceeded the re-process retry counter if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS { - // if a batch has exceeded the invalid batch lookup attempts limit, it means + // If a batch has exceeded the invalid batch lookup attempts limit, it means // that it is likely all peers in this chain are are sending invalid batches // repeatedly and are either malicious or faulty. We drop the chain and // downvote all peers. - warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers"; + let action = PeerAction::LowToleranceError; + warn!(self.log, "Batch failed to download. Dropping chain scoring peers"; + "score_adjustment" => action.to_string(), "chain_id" => self.id, "id"=> *batch.id); for peer_id in self.peer_pool.drain() { - network.downvote_peer(peer_id); + network.report_peer(peer_id, action); } ProcessingResult::RemoveChain } else { @@ -437,18 +445,30 @@ impl SyncingChain { // The re-downloaded version was different if processed_batch.current_peer != processed_batch.original_peer { // A new peer sent the correct batch, the previous peer did not - // downvote the original peer - // - // If the same peer corrected it's mistake, we allow it.... for - // now. + // We negatively score the original peer. + let action = PeerAction::LowToleranceError; debug!( - self.log, "Re-processed batch validated. Downvoting original peer"; + self.log, "Re-processed batch validated. Scoring original peer"; "chain_id" => self.id, "batch_id" => *processed_batch.id, + "score_adjustment" => action.to_string(), "original_peer" => format!("{}",processed_batch.original_peer), "new_peer" => format!("{}", processed_batch.current_peer) ); - network.downvote_peer(processed_batch.original_peer); + network.report_peer(processed_batch.original_peer, action); + } else { + // The same peer corrected it's previous mistake. There was an error, so we + // negative score the original peer. + let action = PeerAction::MidToleranceError; + debug!( + self.log, "Re-processed batch validated by the same peer."; + "chain_id" => self.id, + "batch_id" => *processed_batch.id, + "score_adjustment" => action.to_string(), + "original_peer" => format!("{}",processed_batch.original_peer), + "new_peer" => format!("{}", processed_batch.current_peer) + ); + network.report_peer(processed_batch.original_peer, action); } } } diff --git a/common/hashset_delay/src/hashset_delay.rs b/common/hashset_delay/src/hashset_delay.rs index 77d4f610f..82287f266 100644 --- a/common/hashset_delay/src/hashset_delay.rs +++ b/common/hashset_delay/src/hashset_delay.rs @@ -68,7 +68,7 @@ where // update the timeout self.update_timeout(&key, entry_duration); } else { - let delay_key = self.expirations.insert(key.clone(), entry_duration.clone()); + let delay_key = self.expirations.insert(key.clone(), entry_duration); let entry = MapEntry { key: delay_key, value: Instant::now() + entry_duration, @@ -114,7 +114,7 @@ where self.expirations.remove(&entry.key); return true; } - return false; + false } /// Retains only the elements specified by the predicate.