Activate peer scoring (#1284)

* Initial score structure

* Peer manager update

* Updates to dialing

* Correct tests

* Correct typos and remove unused function

* Integrate scoring into the network crate

* Clean warnings

* Formatting

* Shift core functionality into the behaviour

* Temp commit

* Shift disconnections into the behaviour

* Temp commit

* Update libp2p and gossipsub

* Remove gossipsub lru cache

* Correct merge conflicts

* Modify handler and correct tests

* Update enr network globals on socket update

* Apply clippy lints

* Add new prysm fingerprint

* More clippy fixes
This commit is contained in:
Age Manning 2020-07-07 10:13:16 +10:00 committed by GitHub
parent 5977c00edb
commit 5bc8fea2e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1339 additions and 934 deletions

View File

@ -80,6 +80,7 @@ pub enum DelegateError<TSpec: EthSpec> {
Gossipsub(<GossipHandler as ProtocolsHandler>::Error),
RPC(<RPCHandler<TSpec> as ProtocolsHandler>::Error),
Identify(<IdentifyHandler as ProtocolsHandler>::Error),
Disconnected,
}
impl<TSpec: EthSpec> std::error::Error for DelegateError<TSpec> {}
@ -93,6 +94,7 @@ impl<TSpec: EthSpec> std::fmt::Display for DelegateError<TSpec> {
DelegateError::Gossipsub(err) => err.fmt(formater),
DelegateError::RPC(err) => err.fmt(formater),
DelegateError::Identify(err) => err.fmt(formater),
DelegateError::Disconnected => write!(formater, "Disconnected"),
}
}
}
@ -135,11 +137,10 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
let rpc_proto = self.rpc_handler.listen_protocol();
let identify_proto = self.identify_handler.listen_protocol();
let timeout = gossip_proto
let timeout = *gossip_proto
.timeout()
.max(rpc_proto.timeout())
.max(identify_proto.timeout())
.clone();
.max(identify_proto.timeout());
let select = SelectUpgrade::new(
gossip_proto.into_upgrade().1,
@ -317,7 +318,7 @@ impl<TSpec: EthSpec> ProtocolsHandler for DelegatingHandler<TSpec> {
}
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(|u| EitherUpgrade::A(u)),
protocol: protocol.map_upgrade(EitherUpgrade::A),
info: EitherOutput::First(info),
});
}

View File

@ -95,14 +95,9 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
self.delegate.inject_dial_upgrade_error(info, err)
}
// We don't use the keep alive to disconnect. This is handled in the poll
fn connection_keep_alive(&self) -> KeepAlive {
if self.shutting_down {
let rpc_keep_alive = self.delegate.rpc().connection_keep_alive();
let identify_keep_alive = self.delegate.identify().connection_keep_alive();
rpc_keep_alive.max(identify_keep_alive)
} else {
KeepAlive::Yes
}
KeepAlive::Yes
}
fn poll(
@ -116,6 +111,15 @@ impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
Self::Error,
>,
> {
// Disconnect if the sub-handlers are ready.
if self.shutting_down {
let rpc_keep_alive = self.delegate.rpc().connection_keep_alive();
let identify_keep_alive = self.delegate.identify().connection_keep_alive();
if KeepAlive::No == rpc_keep_alive.max(identify_keep_alive) {
return Poll::Ready(ProtocolsHandlerEvent::Close(DelegateError::Disconnected));
}
}
match self.delegate.poll(cx) {
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(

View File

@ -1,4 +1,4 @@
use crate::peer_manager::{PeerManager, PeerManagerEvent};
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
use crate::rpc::*;
use crate::types::{GossipEncoding, GossipKind, GossipTopic};
use crate::Eth2Enr;
@ -21,6 +21,7 @@ use libp2p::{
};
use slog::{crit, debug, o};
use std::{
collections::VecDeque,
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
@ -46,10 +47,12 @@ pub struct Behaviour<TSpec: EthSpec> {
identify: Identify,
/// The peer manager that keeps track of peer's reputation and status.
peer_manager: PeerManager<TSpec>,
/// The events generated by this behaviour to be consumed in the swarm poll.
events: Vec<BehaviourEvent<TSpec>>,
/// The output events generated by this behaviour to be consumed in the swarm poll.
events: VecDeque<BehaviourEvent<TSpec>>,
/// Events generated in the global behaviour to be sent to the behaviour handler.
handler_events: VecDeque<NBAction<BehaviourHandlerIn<TSpec>, BehaviourEvent<TSpec>>>,
/// Queue of peers to disconnect.
peers_to_dc: Vec<PeerId>,
peers_to_dc: VecDeque<PeerId>,
/// The current meta data of the node, so respond to pings and get metadata
meta_data: MetaData<TSpec>,
/// A collections of variables accessible outside the network service.
@ -58,173 +61,12 @@ pub struct Behaviour<TSpec: EthSpec> {
// NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick
// lookups for every gossipsub message send.
enr_fork_id: EnrForkId,
/// The waker for the current thread.
waker: Option<std::task::Waker>,
/// Logger for behaviour actions.
log: slog::Logger,
}
/// Calls the given function with the given args on all sub behaviours.
macro_rules! delegate_to_behaviours {
($self: ident, $fn: ident, $($arg: ident), *) => {
$self.gossipsub.$fn($($arg),*);
$self.eth2_rpc.$fn($($arg),*);
$self.identify.$fn($($arg),*);
};
}
impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
type ProtocolsHandler = BehaviourHandler<TSpec>;
type OutEvent = BehaviourEvent<TSpec>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify)
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.peer_manager.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, peer_id: &PeerId) {
delegate_to_behaviours!(self, inject_connected, peer_id);
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
delegate_to_behaviours!(self, inject_disconnected, peer_id);
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
delegate_to_behaviours!(
self,
inject_connection_established,
peer_id,
conn_id,
endpoint
);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint);
}
fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error);
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_external_addr, addr);
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
delegate_to_behaviours!(self, inject_listener_error, id, err);
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
delegate_to_behaviours!(self, inject_listener_closed, id, reason);
}
fn inject_event(
&mut self,
peer_id: PeerId,
conn_id: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
match event {
// Events comming from the handler, redirected to each behaviour
BehaviourHandlerOut::Delegate(delegate) => match *delegate {
DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev),
DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev),
DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev),
},
/* Custom events sent BY the handler */
BehaviourHandlerOut::Custom => {
// TODO: implement
}
}
}
fn poll(
&mut self,
cx: &mut Context,
poll_params: &mut impl PollParameters,
) -> Poll<NBAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
// TODO: move where it's less distracting
macro_rules! poll_behaviour {
/* $behaviour: The sub-behaviour being polled.
* $on_event_fn: Function to call if we get an event from the sub-behaviour.
* $notify_handler_event_closure: Closure mapping the received event type to
* the one that the handler should get.
*/
($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => {
loop {
// poll the sub-behaviour
match self.$behaviour.poll(cx, poll_params) {
Poll::Ready(action) => match action {
// call the designated function to handle the event from sub-behaviour
NBAction::GenerateEvent(event) => self.$on_event_fn(event),
NBAction::DialAddress { address } => {
return Poll::Ready(NBAction::DialAddress { address })
}
NBAction::DialPeer { peer_id, condition } => {
return Poll::Ready(NBAction::DialPeer { peer_id, condition })
}
NBAction::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(NBAction::NotifyHandler {
peer_id,
handler,
// call the closure mapping the received event to the needed one
// in order to notify the handler
event: BehaviourHandlerIn::Delegate(
$notify_handler_event_closure(event),
),
});
}
NBAction::ReportObservedAddr { address } => {
return Poll::Ready(NBAction::ReportObservedAddr { address })
}
},
Poll::Pending => break,
}
}
};
}
poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub);
poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC);
poll_behaviour!(identify, on_identify_event, DelegateIn::Identify);
self.custom_poll(cx)
}
}
/// Implements the combined behaviour for the libp2p service.
impl<TSpec: EthSpec> Behaviour<TSpec> {
pub fn new(
@ -264,15 +106,27 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
),
identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)?,
events: Vec::new(),
peers_to_dc: Vec::new(),
events: VecDeque::new(),
handler_events: VecDeque::new(),
peers_to_dc: VecDeque::new(),
meta_data,
network_globals,
enr_fork_id,
waker: None,
log: behaviour_log,
})
}
/// Attempts to connect to a libp2p peer.
///
/// This MUST be used over Swarm::dial() as this keeps track of the peer in the peer manager.
///
/// All external dials, dial a multiaddr. This is currently unused but kept here in case any
/// part of lighthouse needs to connect to a peer_id in the future.
pub fn _dial(&mut self, peer_id: &PeerId) {
self.peer_manager.dial_peer(peer_id);
}
/// Returns the local ENR of the node.
pub fn local_enr(&self) -> Enr {
self.network_globals.local_enr()
@ -409,13 +263,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/* Peer management functions */
/// Notify discovery that the peer has been banned.
// TODO: Remove this and integrate all disconnection/banning logic inside the peer manager.
pub fn peer_banned(&mut self, _peer_id: PeerId) {}
/// Report a peer's action.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) {
self.peer_manager.report_peer(peer_id, action)
}
/// Notify discovery that the peer has been unbanned.
// TODO: Remove this and integrate all disconnection/banning logic inside the peer manager.
pub fn peer_unbanned(&mut self, _peer_id: &PeerId) {}
/// Disconnects from a peer providing a reason.
///
/// This will send a goodbye, disconnect and then ban the peer.
/// This is fatal for a peer, and should be used in unrecoverable circumstances.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) {
self.peer_manager.goodbye_peer(peer_id, reason);
}
/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&mut self) -> Vec<Enr> {
@ -531,40 +390,6 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
&mut self.peer_manager
}
/* Address in the new behaviour. Connections are now maintained at the swarm level.
/// Notifies the behaviour that a peer has connected.
pub fn notify_peer_connect(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
match endpoint {
ConnectedPoint::Dialer { .. } => self.peer_manager.connect_outgoing(&peer_id),
ConnectedPoint::Listener { .. } => self.peer_manager.connect_ingoing(&peer_id),
};
// Find ENR info about a peer if possible.
if let Some(enr) = self.discovery.enr_of_peer(&peer_id) {
let bitfield = match enr.bitfield::<TSpec>() {
Ok(v) => v,
Err(e) => {
warn!(self.log, "Peer has invalid ENR bitfield";
"peer_id" => format!("{}", peer_id),
"error" => format!("{:?}", e));
return;
}
};
// use this as a baseline, until we get the actual meta-data
let meta_data = MetaData {
seq_number: 0,
attnets: bitfield,
};
// TODO: Shift to the peer manager
self.network_globals
.peers
.write()
.add_metadata(&peer_id, meta_data);
}
}
*/
fn on_gossip_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(propagation_source, id, gs_msg) => {
@ -576,7 +401,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}
Ok(msg) => {
// if this message isn't a duplicate, notify the network
self.events.push(BehaviourEvent::PubsubMessage {
self.add_event(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topics: gs_msg.topics,
@ -586,8 +411,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}
}
GossipsubEvent::Subscribed { peer_id, topic } => {
self.events
.push(BehaviourEvent::PeerSubscribed(peer_id, topic));
self.add_event(BehaviourEvent::PeerSubscribed(peer_id, topic));
}
GossipsubEvent::Unsubscribed { .. } => {}
}
@ -596,7 +420,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
fn propagate_response(&mut self, id: RequestId, peer_id: PeerId, response: Response<TSpec>) {
if !matches!(id, RequestId::Behaviour) {
self.events.push(BehaviourEvent::ResponseReceived {
self.add_event(BehaviourEvent::ResponseReceived {
peer_id,
id,
response,
@ -606,7 +430,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
/// Convenience function to propagate a request.
fn propagate_request(&mut self, id: PeerRequestId, peer_id: PeerId, request: Request) {
self.events.push(BehaviourEvent::RequestReceived {
self.add_event(BehaviourEvent::RequestReceived {
peer_id,
id,
request,
@ -639,8 +463,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.peer_manager.handle_rpc_error(&peer_id, proto, &error);
// inform failures of requests comming outside the behaviour
if !matches!(id, RequestId::Behaviour) {
self.events
.push(BehaviourEvent::RPCFailed { peer_id, id, error });
self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error });
}
}
}
@ -664,11 +487,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// let the peer manager know this peer is in the process of disconnecting
self.peer_manager._disconnecting_peer(&peer_id);
// queue for disconnection without a goodbye message
debug!(self.log, "Received a Goodbye, queueing for disconnection";
"peer_id" => peer_id.to_string());
self.peers_to_dc.push(peer_id.clone());
// TODO: do not propagate
self.propagate_request(peer_request_id, peer_id, Request::Goodbye(reason));
debug!(
self.log, "Peer sent Goodbye";
"peer_id" => peer_id.to_string(),
"reason" => reason.to_string(),
"client" => self.network_globals.client(&peer_id).to_string(),
);
self.peers_to_dc.push_back(peer_id);
// NOTE: We currently do not inform the application that we are
// disconnecting here.
// The actual disconnection event will be relayed to the application. Ideally
// this time difference is short, but we may need to introduce a message to
// inform the application layer early.
}
/* Protocols propagated to the Network */
RPCRequest::Status(msg) => {
@ -724,10 +554,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
&mut self,
cx: &mut Context,
) -> Poll<NBAction<BehaviourHandlerIn<TSpec>, BehaviourEvent<TSpec>>> {
// if there are any handler_events process them
if let Some(event) = self.handler_events.pop_front() {
return Poll::Ready(event);
}
// handle pending disconnections to perform
if !self.peers_to_dc.is_empty() {
if let Some(peer_id) = self.peers_to_dc.pop_front() {
return Poll::Ready(NBAction::NotifyHandler {
peer_id: self.peers_to_dc.remove(0),
peer_id,
handler: NotifyHandler::All,
event: BehaviourHandlerIn::Shutdown(None),
});
@ -760,18 +595,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
PeerManagerEvent::MetaData(peer_id) => {
self.send_meta_data_request(peer_id);
}
PeerManagerEvent::DisconnectPeer(peer_id) => {
PeerManagerEvent::DisconnectPeer(peer_id, reason) => {
debug!(self.log, "PeerManager requested to disconnect a peer";
"peer_id" => peer_id.to_string());
// queue for disabling
self.peers_to_dc.push(peer_id.clone());
self.peers_to_dc.push_back(peer_id.clone());
// send one goodbye
return Poll::Ready(NBAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(Some((
RequestId::Behaviour,
RPCRequest::Goodbye(GoodbyeReason::Fault),
RPCRequest::Goodbye(reason),
))),
});
}
@ -781,8 +616,8 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}
}
if !self.events.is_empty() {
return Poll::Ready(NBAction::GenerateEvent(self.events.remove(0)));
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NBAction::GenerateEvent(event));
}
Poll::Pending
@ -817,21 +652,244 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
IdentifyEvent::Error { .. } => {}
}
}
/// Adds an event to the queue waking the current thread to process it.
fn add_event(&mut self, event: BehaviourEvent<TSpec>) {
self.events.push_back(event);
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
}
}
/// Calls the given function with the given args on all sub behaviours.
macro_rules! delegate_to_behaviours {
($self: ident, $fn: ident, $($arg: ident), *) => {
$self.gossipsub.$fn($($arg),*);
$self.eth2_rpc.$fn($($arg),*);
$self.identify.$fn($($arg),*);
};
}
impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
type ProtocolsHandler = BehaviourHandler<TSpec>;
type OutEvent = BehaviourEvent<TSpec>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
BehaviourHandler::new(&mut self.gossipsub, &mut self.eth2_rpc, &mut self.identify)
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.peer_manager.addresses_of_peer(peer_id)
}
// This gets called every time a connection is closed.
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
delegate_to_behaviours!(self, inject_connection_closed, peer_id, conn_id, endpoint);
}
// This gets called once there are no more active connections.
fn inject_disconnected(&mut self, peer_id: &PeerId) {
// Inform the peer manager.
self.peer_manager.notify_disconnect(&peer_id);
// Inform the application.
self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone()));
delegate_to_behaviours!(self, inject_disconnected, peer_id);
}
// This gets called every time a connection is established.
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
) {
// If the peer is banned, send a goodbye and disconnect.
if self.peer_manager.is_banned(peer_id) {
self.peers_to_dc.push_back(peer_id.clone());
// send a goodbye on all possible handlers for this peer
self.handler_events.push_back(NBAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: BehaviourHandlerIn::Shutdown(Some((
RequestId::Behaviour,
RPCRequest::Goodbye(GoodbyeReason::Banned),
))),
});
return;
}
// notify the peer manager of a successful connection
match endpoint {
ConnectedPoint::Listener { .. } => {
self.peer_manager.connect_ingoing(&peer_id);
self.add_event(BehaviourEvent::PeerConnected(peer_id.clone()));
debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connection" => "Incoming");
}
ConnectedPoint::Dialer { .. } => {
self.peer_manager.connect_outgoing(&peer_id);
self.add_event(BehaviourEvent::PeerDialed(peer_id.clone()));
debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connection" => "Dialed");
}
}
// report the event to the application
delegate_to_behaviours!(
self,
inject_connection_established,
peer_id,
conn_id,
endpoint
);
}
// This gets called on the initial connection establishment.
fn inject_connected(&mut self, peer_id: &PeerId) {
// Drop any connection from a banned peer. The goodbye and disconnects are handled in
// `inject_connection_established()`, which gets called first.
if self.peer_manager.is_banned(peer_id) {
return;
}
delegate_to_behaviours!(self, inject_connected, peer_id);
}
fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
delegate_to_behaviours!(self, inject_addr_reach_failure, peer_id, addr, error);
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
// Could not dial the peer, inform the peer manager.
self.peer_manager.notify_dial_failure(&peer_id);
delegate_to_behaviours!(self, inject_dial_failure, peer_id);
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_listen_addr, addr);
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_expired_listen_addr, addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
delegate_to_behaviours!(self, inject_new_external_addr, addr);
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
delegate_to_behaviours!(self, inject_listener_error, id, err);
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
delegate_to_behaviours!(self, inject_listener_closed, id, reason);
}
fn inject_event(
&mut self,
peer_id: PeerId,
conn_id: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
match event {
// Events comming from the handler, redirected to each behaviour
BehaviourHandlerOut::Delegate(delegate) => match *delegate {
DelegateOut::Gossipsub(ev) => self.gossipsub.inject_event(peer_id, conn_id, ev),
DelegateOut::RPC(ev) => self.eth2_rpc.inject_event(peer_id, conn_id, ev),
DelegateOut::Identify(ev) => self.identify.inject_event(peer_id, conn_id, *ev),
},
/* Custom events sent BY the handler */
BehaviourHandlerOut::Custom => {
// TODO: implement
}
}
}
fn poll(
&mut self,
cx: &mut Context,
poll_params: &mut impl PollParameters,
) -> Poll<NBAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
// update the waker if needed
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
// TODO: move where it's less distracting
macro_rules! poll_behaviour {
/* $behaviour: The sub-behaviour being polled.
* $on_event_fn: Function to call if we get an event from the sub-behaviour.
* $notify_handler_event_closure: Closure mapping the received event type to
* the one that the handler should get.
*/
($behaviour: ident, $on_event_fn: ident, $notify_handler_event_closure: expr) => {
loop {
// poll the sub-behaviour
match self.$behaviour.poll(cx, poll_params) {
Poll::Ready(action) => match action {
// call the designated function to handle the event from sub-behaviour
NBAction::GenerateEvent(event) => self.$on_event_fn(event),
NBAction::DialAddress { address } => {
return Poll::Ready(NBAction::DialAddress { address })
}
NBAction::DialPeer { peer_id, condition } => {
return Poll::Ready(NBAction::DialPeer { peer_id, condition })
}
NBAction::NotifyHandler {
peer_id,
handler,
event,
} => {
return Poll::Ready(NBAction::NotifyHandler {
peer_id,
handler,
// call the closure mapping the received event to the needed one
// in order to notify the handler
event: BehaviourHandlerIn::Delegate(
$notify_handler_event_closure(event),
),
});
}
NBAction::ReportObservedAddr { address } => {
return Poll::Ready(NBAction::ReportObservedAddr { address })
}
},
Poll::Pending => break,
}
}
};
}
poll_behaviour!(gossipsub, on_gossip_event, DelegateIn::Gossipsub);
poll_behaviour!(eth2_rpc, on_rpc_event, DelegateIn::RPC);
poll_behaviour!(identify, on_identify_event, DelegateIn::Identify);
self.custom_poll(cx)
}
}
/* Public API types */
/// The type of RPC requests the Behaviour informs it has received and allows for sending.
///
// NOTE: This is an application-level wrapper over the lower network leve requests that can be
// sent. The main difference is the absense of the Ping and Metadata protocols, which don't
// NOTE: This is an application-level wrapper over the lower network level requests that can be
// sent. The main difference is the absence of the Ping, Metadata and Goodbye protocols, which don't
// leave the Behaviour. For all protocols managed by RPC see `RPCRequest`.
#[derive(Debug, Clone, PartialEq)]
pub enum Request {
/// A Status message.
Status(StatusMessage),
/// A Goobye message.
Goodbye(GoodbyeReason),
/// A blocks by range request.
BlocksByRange(BlocksByRangeRequest),
/// A request blocks root request.
@ -843,7 +901,6 @@ impl<TSpec: EthSpec> std::convert::From<Request> for RPCRequest<TSpec> {
match req {
Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r),
Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r),
Request::Goodbye(r) => RPCRequest::Goodbye(r),
Request::Status(s) => RPCRequest::Status(s),
}
}
@ -887,6 +944,12 @@ pub type PeerRequestId = (ConnectionId, SubstreamId);
/// The types of events than can be obtained from polling the behaviour.
#[derive(Debug)]
pub enum BehaviourEvent<TSpec: EthSpec> {
/// We have successfully dialed and connected to a peer.
PeerDialed(PeerId),
/// A peer has successfully dialed and connected to us.
PeerConnected(PeerId),
/// A peer has disconnected.
PeerDisconnected(PeerId),
/// An RPC Request that was sent failed.
RPCFailed {
/// The id of the failed request.

View File

@ -90,7 +90,7 @@ impl QueryType {
pub fn min_ttl(&self) -> Option<Instant> {
match self {
Self::FindPeers => None,
Self::Subnet { min_ttl, .. } => min_ttl.clone(),
Self::Subnet { min_ttl, .. } => *min_ttl,
}
}
}
@ -197,7 +197,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
debug!(
log,
"Could not add peer to the local routing table";
"error" => format!("{}", e)
"error" => e.to_string()
)
});
}
@ -267,7 +267,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
debug!(
self.log,
"Could not add peer to the local routing table";
"error" => format!("{}", e)
"error" => e.to_string()
)
}
}
@ -350,7 +350,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let _ = self
.discv5
.enr_insert(ETH2_ENR_KEY.into(), enr_fork_id.as_ssz_bytes())
.enr_insert(ETH2_ENR_KEY, enr_fork_id.as_ssz_bytes())
.map_err(|e| {
warn!(
self.log,
@ -452,11 +452,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Returns a boolean indicating if we are currently processing the maximum number of
// concurrent queries or not.
fn at_capacity(&self) -> bool {
if self.active_queries.len() >= MAX_CONCURRENT_QUERIES {
true
} else {
false
}
self.active_queries.len() >= MAX_CONCURRENT_QUERIES
}
/// Runs a discovery request for a given subnet_id if one already exists.
@ -526,7 +522,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
QueryType::Subnet { subnet_id, .. } => {
// build the subnet predicate as a combination of the eth2_fork_predicate and the
// subnet predicate
let subnet_predicate = subnet_predicate::<TSpec>(subnet_id.clone(), &self.log);
let subnet_predicate = subnet_predicate::<TSpec>(*subnet_id, &self.log);
Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && subnet_predicate(enr))
}
};
@ -645,6 +641,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// to disk.
let enr = self.discv5.local_enr();
enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr, &self.log);
// update network globals
*self.network_globals.local_enr.write() = enr;
return Poll::Ready(DiscoveryEvent::SocketUpdated(socket));
}
_ => {} // Ignore all other discv5 server events

View File

@ -23,5 +23,7 @@ pub use libp2p::gossipsub::{MessageId, Topic, TopicHash};
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr};
pub use metrics::scrape_discovery_metrics;
pub use peer_manager::{client::Client, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo};
pub use peer_manager::{
client::Client, score::PeerAction, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo,
};
pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME};

View File

@ -131,6 +131,20 @@ fn client_from_agent_version(agent_version: &str) -> (ClientKind, String, String
let unknown = String::from("unknown");
(kind, unknown.clone(), unknown)
}
Some("Prysm") => {
let kind = ClientKind::Prysm;
let mut version = String::from("unknown");
let mut os_version = version.clone();
if agent_split.next().is_some() {
if let Some(agent_version) = agent_split.next() {
version = agent_version.into();
if let Some(agent_os_version) = agent_split.next() {
os_version = agent_os_version.into();
}
}
}
(kind, version, os_version)
}
Some("nim-libp2p") => {
let kind = ClientKind::Nimbus;
let mut version = String::from("unknown");

View File

@ -2,7 +2,7 @@
pub use self::peerdb::*;
use crate::discovery::{Discovery, DiscoveryEvent};
use crate::rpc::{MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::{error, metrics};
use crate::{Enr, EnrExt, NetworkConfig, NetworkGlobals, PeerId};
use futures::prelude::*;
@ -10,7 +10,7 @@ use futures::Stream;
use hashset_delay::HashSetDelay;
use libp2p::core::multiaddr::Protocol as MProtocol;
use libp2p::identify::IdentifyInfo;
use slog::{crit, debug, error};
use slog::{crit, debug, error, warn};
use smallvec::SmallVec;
use std::{
net::SocketAddr,
@ -27,12 +27,11 @@ pub mod client;
mod peer_info;
mod peer_sync_status;
mod peerdb;
pub(crate) mod score;
pub use peer_info::{PeerConnectionStatus::*, PeerInfo};
pub use peer_sync_status::{PeerSyncStatus, SyncInfo};
/// The minimum reputation before a peer is disconnected.
// Most likely this needs tweaking.
const _MIN_REP_BEFORE_BAN: Rep = 10;
use score::{PeerAction, ScoreState};
/// The time in seconds between re-status's peers.
const STATUS_INTERVAL: u64 = 300;
/// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within
@ -63,44 +62,6 @@ pub struct PeerManager<TSpec: EthSpec> {
log: slog::Logger,
}
/// A collection of actions a peer can perform which will adjust its reputation.
/// Each variant has an associated reputation change.
// To easily assess the behaviour of reputation changes the number of variants should stay low, and
// somewhat generic.
pub enum PeerAction {
/// We should not communicate more with this peer.
/// This action will cause the peer to get banned.
Fatal,
/// An error occurred with this peer but it is not necessarily malicious.
/// We have high tolerance for this actions: several occurrences are needed for a peer to get
/// kicked.
/// NOTE: ~15 occurrences will get the peer banned
HighToleranceError,
/// An error occurred with this peer but it is not necessarily malicious.
/// We have high tolerance for this actions: several occurrences are needed for a peer to get
/// kicked.
/// NOTE: ~10 occurrences will get the peer banned
MidToleranceError,
/// This peer's action is not malicious but will not be tolerated. A few occurrences will cause
/// the peer to get kicked.
/// NOTE: ~5 occurrences will get the peer banned
LowToleranceError,
/// Received an expected message.
_ValidMessage,
}
impl PeerAction {
fn rep_change(&self) -> RepChange {
match self {
PeerAction::Fatal => RepChange::worst(),
PeerAction::LowToleranceError => RepChange::bad(60),
PeerAction::MidToleranceError => RepChange::bad(25),
PeerAction::HighToleranceError => RepChange::bad(15),
PeerAction::_ValidMessage => RepChange::good(20),
}
}
}
/// The events that the `PeerManager` outputs (requests).
pub enum PeerManagerEvent {
/// Dial a PeerId.
@ -114,7 +75,7 @@ pub enum PeerManagerEvent {
/// Request METADATA from a peer.
MetaData(PeerId),
/// The peer should be disconnected.
DisconnectPeer(PeerId),
DisconnectPeer(PeerId, GoodbyeReason),
}
impl<TSpec: EthSpec> PeerManager<TSpec> {
@ -147,6 +108,89 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/* Public accessible functions */
/// Attempts to connect to a peer.
///
/// Returns true if the peer was accepted into the database.
pub fn dial_peer(&mut self, peer_id: &PeerId) -> bool {
self.events.push(PeerManagerEvent::Dial(peer_id.clone()));
self.connect_peer(peer_id, ConnectingType::Dialing)
}
/// The application layer wants to disconnect from a peer for a particular reason.
///
/// All instant disconnections are fatal and we ban the associated peer.
///
/// This will send a goodbye and disconnect the peer if it is connected or dialing.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) {
// get the peer info
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score.to_string());
// Goodbye's are fatal
info.score.apply_peer_action(PeerAction::Fatal);
if info.connection_status.is_connected_or_dialing() {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason));
}
}
}
/// Reports a peer for some action.
///
/// If the peer doesn't exist, log a warning and insert defaults.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) {
// TODO: Remove duplicate code - This is duplicated in the update_peer_scores()
// function.
// Variables to update the PeerDb if required.
let mut ban_peer = None;
let mut unban_peer = None;
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let previous_state = info.score.state();
info.score.apply_peer_action(action);
if previous_state != info.score.state() {
match info.score.state() {
ScoreState::Ban => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
ban_peer = Some(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
}
}
ScoreState::Disconnect => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
unban_peer = Some(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
}
// TODO: Update the peer manager to inform that the peer is disconnecting.
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
unban_peer = Some(peer_id.clone());
}
}
}
}
// Update the PeerDB state.
if let Some(peer_id) = ban_peer.take() {
self.network_globals.peers.write().ban(&peer_id);
} else {
if let Some(peer_id) = unban_peer.take() {
self.network_globals.peers.write().unban(&peer_id);
}
}
}
/* Discovery Requests */
/// Provides a reference to the underlying discovery service.
@ -178,9 +222,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.status_peers.insert(peer_id.clone());
}
/* Notifications from the Swarm */
/// Updates the state of the peer as disconnected.
///
/// This is also called when dialing a peer fails.
pub fn notify_disconnect(&mut self, peer_id: &PeerId) {
//self.update_reputations();
self.network_globals.peers.write().disconnect(peer_id);
// remove the ping and status timer for the peer
@ -193,6 +240,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
);
}
/// A dial attempt has failed.
///
/// NOTE: It can be the case that we are dialing a peer and during the dialing process the peer
/// connects and the dial attempt later fails. To handle this, we only update the peer_db if
/// the peer is not already connected.
pub fn notify_dial_failure(&mut self, peer_id: &PeerId) {
if !self.network_globals.peers.read().is_connected(peer_id) {
self.notify_disconnect(peer_id);
}
}
/// Sets a peer as connected as long as their reputation allows it
/// Informs if the peer was accepted
pub fn connect_ingoing(&mut self, peer_id: &PeerId) -> bool {
@ -205,28 +263,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.connect_peer(peer_id, ConnectingType::OutgoingConnected)
}
/// Updates the database informing that a peer is being dialed.
pub fn dialing_peer(&mut self, peer_id: &PeerId) -> bool {
self.connect_peer(peer_id, ConnectingType::Dialing)
}
/// Updates the database informing that a peer is being disconnected.
pub fn _disconnecting_peer(&mut self, _peer_id: &PeerId) -> bool {
// TODO: implement
true
}
/// Reports a peer for some action.
/// Reports if a peer is banned or not.
///
/// If the peer doesn't exist, log a warning and insert defaults.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) {
//TODO: Check these. There are double disconnects for example
// self.update_reputations();
self.network_globals
.peers
.write()
.add_reputation(peer_id, action.rep_change());
// self.update_reputations();
/// This is used to determine if we should accept incoming connections.
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
self.network_globals.peers.read().is_banned(peer_id)
}
/// Updates `PeerInfo` with `identify` information.
@ -239,9 +286,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
/// An error has occured in the RPC.
///
/// This adjusts a peer's score based on the error.
pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) {
let client = self.network_globals.client(peer_id);
debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string());
warn!(self.log, "RPC Error"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string());
// Map this error to a `PeerAction` (if any)
let peer_action = match err {
@ -423,6 +473,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup
/// proves resource constraining, we should switch to multiaddr dialling here.
fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option<Instant>) {
let mut to_dial_peers = Vec::new();
for enr in peers {
let peer_id = enr.peer_id();
@ -433,9 +485,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.peers
.read()
.is_connected_or_dialing(&peer_id)
&& !self.network_globals.peers.read().peer_banned(&peer_id)
&& !self.network_globals.peers.read().is_banned(&peer_id)
{
debug!(self.log, "Dialing discovered peer"; "peer_id"=> peer_id.to_string());
// TODO: Update output
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
@ -445,9 +496,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.write()
.update_min_ttl(&peer_id, min_ttl);
}
self.events.push(PeerManagerEvent::Dial(peer_id));
to_dial_peers.push(peer_id);
}
}
for peer_id in to_dial_peers {
debug!(self.log, "Dialing discovered peer"; "peer_id"=> peer_id.to_string());
self.dial_peer(&peer_id);
}
}
/// Registers a peer as connected. The `ingoing` parameter determines if the peer is being
@ -465,9 +520,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let mut peerdb = self.network_globals.peers.write();
if peerdb.connection_status(peer_id).map(|c| c.is_banned()) == Some(true) {
// don't connect if the peer is banned
// TODO: Handle this case. If peer is banned this shouldn't be reached. It will put
// our connection/disconnection out of sync with libp2p
// return false;
slog::crit!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => peer_id.to_string());
}
match connection {
@ -491,36 +544,22 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
true
}
/// Notifies the peer manager that this peer is being dialed.
pub fn _dialing_peer(&mut self, peer_id: &PeerId) {
self.network_globals.peers.write().dialing_peer(peer_id);
}
/// Updates the reputation of known peers according to their connection
/// Updates the scores of known peers according to their connection
/// status and the time that has passed.
///
/// **Disconnected peers** get a 1rep hit every hour they stay disconnected.
/// **Banned peers** get a 1rep gain for every hour to slowly allow them back again.
///
/// A banned(disconnected) peer that gets its rep above(below) MIN_REP_BEFORE_BAN is
/// now considered a disconnected(banned) peer.
// TODO: Implement when reputation is added.
fn _update_reputations(&mut self) {
/*
// avoid locking the peerdb too often
// TODO: call this on a timer
let now = Instant::now();
// Check for peers that get banned, unbanned and that should be disconnected
let mut ban_queue = Vec::new();
let mut unban_queue = Vec::new();
/// NOTE: This is experimental and will likely be adjusted
fn update_peer_scores(&mut self) {
/* Check how long have peers been in this state and update their reputations if needed */
let mut pdb = self.network_globals.peers.write();
for (id, info) in pdb._peers_mut() {
// Update reputations
let mut to_ban_peers = Vec::new();
let mut to_unban_peers = Vec::new();
for (peer_id, info) in pdb.peers_mut() {
let previous_state = info.score.state();
// Update scores
info.score.update();
/* TODO: Implement logic about connection lifetimes
match info.connection_status {
Connected { .. } => {
// Connected peers gain reputation by sending useful messages
@ -570,21 +609,49 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// This peer gets unbanned
unban_queue.push(id.clone());
}
*/
// handle score transitions
if previous_state != info.score.state() {
match info.score.state() {
ScoreState::Ban => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
to_ban_peers.push(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
}
}
ScoreState::Disconnect => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
to_unban_peers.push(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
}
// TODO: Update peer manager to report that it's disconnecting.
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
to_unban_peers.push(peer_id.clone());
}
}
}
}
for id in ban_queue {
pdb.ban(&id);
self.events
.push(PeerManagerEvent::DisconnectPeer(id.clone()));
// process banning peers
for peer_id in to_ban_peers {
pdb.ban(&peer_id);
}
for id in unban_queue {
pdb.disconnect(&id);
// process unbanning peers
for peer_id in to_unban_peers {
pdb.unban(&peer_id);
}
self._last_updated = Instant::now();
*/
}
/// The Peer manager's heartbeat maintains the peer count and maintains peer reputations.
@ -605,7 +672,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// TODO: If we have too many peers, remove peers that are not required for subnet
// validation.
// TODO: Perform peer reputation maintenance here
// Updates peer's scores.
self.update_peer_scores();
}
}
@ -636,7 +704,7 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
self.events.push(PeerManagerEvent::Ping(peer_id));
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e))
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}
@ -649,7 +717,7 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
self.events.push(PeerManagerEvent::Status(peer_id))
}
Poll::Ready(Some(Err(e))) => {
error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e))
error!(self.log, "Failed to check for peers to ping"; "error" => e.to_string())
}
Poll::Ready(None) | Poll::Pending => break,
}

View File

@ -1,5 +1,5 @@
use super::client::Client;
use super::peerdb::{Rep, DEFAULT_REPUTATION};
use super::score::Score;
use super::PeerSyncStatus;
use crate::rpc::MetaData;
use crate::Multiaddr;
@ -18,7 +18,7 @@ pub struct PeerInfo<T: EthSpec> {
/// The connection status of the peer
_status: PeerStatus,
/// The peers reputation
pub reputation: Rep,
pub score: Score,
/// Client managing this peer
pub client: Client,
/// Connection status of this peer
@ -41,7 +41,7 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
fn default() -> PeerInfo<TSpec> {
PeerInfo {
_status: Default::default(),
reputation: DEFAULT_REPUTATION,
score: Score::default(),
client: Client::default(),
connection_status: Default::default(),
listening_addresses: vec![],
@ -146,7 +146,7 @@ impl Default for PeerConnectionStatus {
}
impl PeerConnectionStatus {
/// Checks if the status is connected
/// Checks if the status is connected.
pub fn is_connected(&self) -> bool {
match self {
PeerConnectionStatus::Connected { .. } => true,
@ -154,7 +154,7 @@ impl PeerConnectionStatus {
}
}
/// Checks if the status is connected
/// Checks if the status is connected.
pub fn is_dialing(&self) -> bool {
match self {
PeerConnectionStatus::Dialing { .. } => true,
@ -162,7 +162,12 @@ impl PeerConnectionStatus {
}
}
/// Checks if the status is banned
/// The peer is either connected or in the process of being dialed.
pub fn is_connected_or_dialing(&self) -> bool {
self.is_connected() || self.is_dialing()
}
/// Checks if the status is banned.
pub fn is_banned(&self) -> bool {
match self {
PeerConnectionStatus::Banned { .. } => true,
@ -170,7 +175,7 @@ impl PeerConnectionStatus {
}
}
/// Checks if the status is disconnected
/// Checks if the status is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Disconnected { .. } => true,
@ -214,6 +219,13 @@ impl PeerConnectionStatus {
};
}
/// The score system has unbanned the peer. Update the connection status
pub fn unban(&mut self) {
if let PeerConnectionStatus::Banned { since } = self {
*self = PeerConnectionStatus::Disconnected { since: *since }
}
}
pub fn connections(&self) -> (u8, u8) {
match self {
Connected { n_in, n_out } => (*n_in, *n_out),

View File

@ -1,74 +1,47 @@
use super::peer_info::{PeerConnectionStatus, PeerInfo};
use super::peer_sync_status::PeerSyncStatus;
use super::score::Score;
use crate::rpc::methods::MetaData;
use crate::PeerId;
use slog::{crit, debug, trace, warn};
use std::collections::{hash_map::Entry, HashMap};
use std::collections::HashMap;
use std::time::Instant;
use types::{EthSpec, SubnetId};
/// A peer's reputation (perceived potential usefulness)
pub type Rep = u8;
/// Reputation change (positive or negative)
pub struct RepChange {
is_good: bool,
diff: Rep,
}
/// Max number of disconnected nodes to remember
const MAX_DC_PEERS: usize = 30;
/// The default starting reputation for an unknown peer.
pub const DEFAULT_REPUTATION: Rep = 50;
/// Max number of disconnected nodes to remember.
const MAX_DC_PEERS: usize = 100;
/// The maximum number of banned nodes to remember.
const MAX_BANNED_PEERS: usize = 300;
/// Storage of known peers, their reputation and information
pub struct PeerDB<TSpec: EthSpec> {
/// The collection of known connected peers, their status and reputation
peers: HashMap<PeerId, PeerInfo<TSpec>>,
/// Tracking of number of disconnected nodes
n_dc: usize,
/// The number of disconnected nodes in the database.
disconnected_peers: usize,
/// The number of banned peers in the database.
banned_peers: usize,
/// PeerDB's logger
log: slog::Logger,
}
impl RepChange {
pub fn good(diff: Rep) -> Self {
RepChange {
is_good: true,
diff,
}
}
pub fn bad(diff: Rep) -> Self {
RepChange {
is_good: false,
diff,
}
}
pub const fn worst() -> Self {
RepChange {
is_good: false,
diff: Rep::max_value(),
}
}
}
impl<TSpec: EthSpec> PeerDB<TSpec> {
pub fn new(log: &slog::Logger) -> Self {
Self {
log: log.clone(),
n_dc: 0,
disconnected_peers: 0,
banned_peers: 0,
peers: HashMap::new(),
}
}
/* Getters */
/// Gives the reputation of a peer, or DEFAULT_REPUTATION if it is unknown.
pub fn reputation(&self, peer_id: &PeerId) -> Rep {
/// Gives the score of a peer, or default score if it is unknown.
pub fn score(&self, peer_id: &PeerId) -> Score {
self.peers
.get(peer_id)
.map_or(DEFAULT_REPUTATION, |info| info.reputation)
.map_or(Score::default(), |info| info.score)
}
/// Returns an iterator over all peers in the db.
@ -77,7 +50,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
}
/// Returns an iterator over all peers in the db.
pub(super) fn _peers_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerInfo<TSpec>)> {
pub(super) fn peers_mut(&mut self) -> impl Iterator<Item = (&PeerId, &mut PeerInfo<TSpec>)> {
self.peers.iter_mut()
}
@ -97,8 +70,25 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
self.peers.get_mut(peer_id)
}
/// Returns if the peer is already connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
if let Some(PeerConnectionStatus::Connected { .. }) = self.connection_status(peer_id) {
true
} else {
false
}
}
/// If we are connected or currently dialing the peer returns true.
pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool {
match self.connection_status(peer_id) {
Some(PeerConnectionStatus::Connected { .. })
| Some(PeerConnectionStatus::Dialing { .. }) => true,
_ => false,
}
}
/// Returns true if the peer is synced at least to our current head.
pub fn peer_synced(&self, peer_id: &PeerId) -> bool {
pub fn is_synced(&self, peer_id: &PeerId) -> bool {
match self.peers.get(peer_id).map(|info| &info.sync_status) {
Some(PeerSyncStatus::Synced { .. }) => true,
Some(_) => false,
@ -107,7 +97,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
}
/// Returns true if the Peer is banned.
pub fn peer_banned(&self, peer_id: &PeerId) -> bool {
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
match self.peers.get(peer_id).map(|info| &info.connection_status) {
Some(status) => status.is_banned(),
None => false,
@ -179,7 +169,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
}
/// Returns a vector containing peers (their ids and info), sorted by
/// reputation from highest to lowest, and filtered using `is_status`
/// score from highest to lowest, and filtered using `is_status`
pub fn best_peers_by_status<F>(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo<TSpec>)>
where
F: Fn(&PeerConnectionStatus) -> bool,
@ -189,8 +179,8 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.iter()
.filter(|(_, info)| is_status(&info.connection_status))
.collect::<Vec<_>>();
by_status.sort_by_key(|(_, info)| Rep::max_value() - info.reputation);
by_status
by_status.sort_by_key(|(_, info)| info.score);
by_status.into_iter().rev().collect()
}
/// Returns the peer with highest reputation that satisfies `is_status`
@ -201,7 +191,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
self.peers
.iter()
.filter(|(_, info)| is_status(&info.connection_status))
.max_by_key(|(_, info)| info.reputation)
.max_by_key(|(_, info)| info.score)
.map(|(id, _)| id)
}
@ -211,24 +201,6 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.map(|info| info.connection_status.clone())
}
/// Returns if the peer is already connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
if let Some(PeerConnectionStatus::Connected { .. }) = self.connection_status(peer_id) {
true
} else {
false
}
}
/// If we are connected or currently dialing the peer returns true.
pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool {
match self.connection_status(peer_id) {
Some(PeerConnectionStatus::Connected { .. })
| Some(PeerConnectionStatus::Dialing { .. }) => true,
_ => false,
}
}
/* Setters */
/// A peer is being dialed.
@ -236,7 +208,10 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
let info = self.peers.entry(peer_id.clone()).or_default();
if info.connection_status.is_disconnected() {
self.n_dc = self.n_dc.saturating_sub(1);
self.disconnected_peers = self.disconnected_peers.saturating_sub(1);
}
if info.connection_status.is_banned() {
self.banned_peers = self.banned_peers.saturating_sub(1);
}
info.connection_status = PeerConnectionStatus::Dialing {
since: Instant::now(),
@ -284,7 +259,10 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
let info = self.peers.entry(peer_id.clone()).or_default();
if info.connection_status.is_disconnected() {
self.n_dc = self.n_dc.saturating_sub(1);
self.disconnected_peers = self.disconnected_peers.saturating_sub(1);
}
if info.connection_status.is_banned() {
self.banned_peers = self.banned_peers.saturating_sub(1);
}
info.connection_status.connect_ingoing();
}
@ -294,7 +272,10 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
let info = self.peers.entry(peer_id.clone()).or_default();
if info.connection_status.is_disconnected() {
self.n_dc = self.n_dc.saturating_sub(1);
self.disconnected_peers = self.disconnected_peers.saturating_sub(1);
}
if info.connection_status.is_banned() {
self.banned_peers = self.banned_peers.saturating_sub(1);
}
info.connection_status.connect_outgoing();
}
@ -309,40 +290,93 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
});
if !info.connection_status.is_disconnected() && !info.connection_status.is_banned() {
info.connection_status.disconnect();
self.n_dc += 1;
self.disconnected_peers += 1;
}
self.shrink_to_fit();
}
/// Drops the peers with the lowest reputation so that the number of
/// disconnected peers is less than MAX_DC_PEERS
pub fn shrink_to_fit(&mut self) {
// for caution, but the difference should never be > 1
while self.n_dc > MAX_DC_PEERS {
let to_drop = self
.peers
.iter()
.filter(|(_, info)| info.connection_status.is_disconnected())
.min_by_key(|(_, info)| info.reputation)
.map(|(id, _)| id.clone())
.unwrap(); // should be safe since n_dc > MAX_DC_PEERS > 0
self.peers.remove(&to_drop);
self.n_dc = self.n_dc.saturating_sub(1);
}
}
/// Sets a peer as banned
/// Marks a peer as banned.
pub fn ban(&mut self, peer_id: &PeerId) {
let log_ref = &self.log;
let info = self.peers.entry(peer_id.clone()).or_insert_with(|| {
warn!(log_ref, "Banning unknown peer";
"peer_id" => peer_id.to_string());
"peer_id" => peer_id.to_string());
PeerInfo::default()
});
if info.connection_status.is_disconnected() {
self.n_dc = self.n_dc.saturating_sub(1);
self.disconnected_peers = self.disconnected_peers.saturating_sub(1);
}
if !info.connection_status.is_banned() {
info.connection_status.ban();
self.banned_peers += 1;
}
self.shrink_to_fit();
}
/// Unbans a peer.
pub fn unban(&mut self, peer_id: &PeerId) {
let log_ref = &self.log;
let info = self.peers.entry(peer_id.clone()).or_insert_with(|| {
warn!(log_ref, "UnBanning unknown peer";
"peer_id" => peer_id.to_string());
PeerInfo::default()
});
if info.connection_status.is_banned() {
info.connection_status.unban();
self.banned_peers = self.banned_peers.saturating_sub(1);
}
self.shrink_to_fit();
}
/// Removes banned and disconnected peers from the DB if we have reached any of our limits.
/// Drops the peers with the lowest reputation so that the number of
/// disconnected peers is less than MAX_DC_PEERS
pub fn shrink_to_fit(&mut self) {
// Remove excess baned peers
while self.banned_peers > MAX_BANNED_PEERS {
if let Some(to_drop) = self
.peers
.iter()
.filter(|(_, info)| info.connection_status.is_banned())
.min_by(|(_, info_a), (_, info_b)| {
info_a
.score
.partial_cmp(&info_b.score)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(id, _)| id.clone())
{
debug!(self.log, "Removing old banned peer"; "peer_id" => to_drop.to_string());
self.peers.remove(&to_drop);
}
// If there is no minimum, this is a coding error. For safety we decrease
// the count to avoid a potential infinite loop.
self.banned_peers = self.banned_peers.saturating_sub(1);
}
// Remove excess disconnected peers
while self.disconnected_peers > MAX_DC_PEERS {
if let Some(to_drop) = self
.peers
.iter()
.filter(|(_, info)| info.connection_status.is_disconnected())
.min_by(|(_, info_a), (_, info_b)| {
info_a
.score
.partial_cmp(&info_b.score)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(id, _)| id.clone())
{
debug!(self.log, "Removing old disconnected peer"; "peer_id" => to_drop.to_string());
self.peers.remove(&to_drop);
}
// If there is no minimum, this is a coding error. For safety we decrease
// the count to avoid a potential infinite loop.
self.disconnected_peers = self.disconnected_peers.saturating_sub(1);
}
info.connection_status.ban();
}
/// Add the meta data of a peer.
@ -354,16 +388,6 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
}
}
/// Sets the reputation of peer.
#[allow(dead_code)]
pub(super) fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) {
if let Some(peer_info) = self.peers.get_mut(peer_id) {
peer_info.reputation = rep;
} else {
crit!(self.log, "Tried to modify reputation for an unknown peer"; "peer_id" => peer_id.to_string());
}
}
/// Sets the syncing status of a peer.
pub fn set_sync_status(&mut self, peer_id: &PeerId, sync_status: PeerSyncStatus) {
if let Some(peer_info) = self.peers.get_mut(peer_id) {
@ -372,26 +396,6 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
crit!(self.log, "Tried to the sync status for an unknown peer"; "peer_id" => peer_id.to_string());
}
}
/// Adds to a peer's reputation by `change`. If the reputation exceeds Rep's
/// upper (lower) bounds, it stays at the maximum (minimum) value.
pub(super) fn add_reputation(&mut self, peer_id: &PeerId, change: RepChange) {
let log_ref = &self.log;
let info = match self.peers.entry(peer_id.clone()) {
Entry::Vacant(_) => {
warn!(log_ref, "Peer is unknown, no reputation change made";
"peer_id" => peer_id.to_string());
return;
}
Entry::Occupied(e) => e.into_mut(),
};
info.reputation = if change.is_good {
info.reputation.saturating_add(change.diff)
} else {
info.reputation.saturating_sub(change.diff)
};
}
}
#[cfg(test)]
@ -413,6 +417,12 @@ mod tests {
}
}
fn add_score<TSpec: EthSpec>(db: &mut PeerDB<TSpec>, peer_id: &PeerId, score: f64) {
if let Some(info) = db.peer_info_mut(peer_id) {
info.score.add(score);
}
}
fn get_db() -> PeerDB<M> {
let log = build_log(slog::Level::Debug, false);
PeerDB::new(&log)
@ -437,9 +447,9 @@ mod tests {
// this is the only peer
assert_eq!(pdb.peers().count(), 1);
// the peer has the default reputation
assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION);
assert_eq!(pdb.score(&random_peer).score(), Score::default().score());
// it should be connected, and therefore not counted as disconnected
assert_eq!(pdb.n_dc, 0);
assert_eq!(pdb.disconnected_peers, 0);
assert!(peer_info.unwrap().connection_status.is_connected());
assert_eq!(
peer_info.unwrap().connection_status.connections(),
@ -447,50 +457,6 @@ mod tests {
);
}
#[test]
fn test_set_reputation() {
let mut pdb = get_db();
let random_peer = PeerId::random();
pdb.connect_ingoing(&random_peer);
let mut rep = Rep::min_value();
pdb.set_reputation(&random_peer, rep);
assert_eq!(pdb.reputation(&random_peer), rep);
rep = Rep::max_value();
pdb.set_reputation(&random_peer, rep);
assert_eq!(pdb.reputation(&random_peer), rep);
rep = Rep::max_value() / 100;
pdb.set_reputation(&random_peer, rep);
assert_eq!(pdb.reputation(&random_peer), rep);
}
#[test]
fn test_reputation_change() {
let mut pdb = get_db();
// 0 change does not change de reputation
let random_peer = PeerId::random();
let change = RepChange::good(0);
pdb.connect_ingoing(&random_peer);
pdb.add_reputation(&random_peer, change);
assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION);
// overflowing change is capped
let random_peer = PeerId::random();
let change = RepChange::worst();
pdb.connect_ingoing(&random_peer);
pdb.add_reputation(&random_peer, change);
assert_eq!(pdb.reputation(&random_peer), Rep::min_value());
let random_peer = PeerId::random();
let change = RepChange::good(Rep::max_value());
pdb.connect_ingoing(&random_peer);
pdb.add_reputation(&random_peer, change);
assert_eq!(pdb.reputation(&random_peer), Rep::max_value());
}
#[test]
fn test_disconnected_are_bounded() {
let mut pdb = get_db();
@ -499,13 +465,30 @@ mod tests {
let p = PeerId::random();
pdb.connect_ingoing(&p);
}
assert_eq!(pdb.n_dc, 0);
assert_eq!(pdb.disconnected_peers, 0);
for p in pdb.connected_peer_ids().cloned().collect::<Vec<_>>() {
pdb.disconnect(&p);
}
assert_eq!(pdb.n_dc, MAX_DC_PEERS);
assert_eq!(pdb.disconnected_peers, MAX_DC_PEERS);
}
#[test]
fn test_banned_are_bounded() {
let mut pdb = get_db();
for _ in 0..MAX_BANNED_PEERS + 1 {
let p = PeerId::random();
pdb.connect_ingoing(&p);
}
assert_eq!(pdb.banned_peers, 0);
for p in pdb.connected_peer_ids().cloned().collect::<Vec<_>>() {
pdb.ban(&p);
}
assert_eq!(pdb.banned_peers, MAX_BANNED_PEERS);
}
#[test]
@ -518,14 +501,16 @@ mod tests {
pdb.connect_ingoing(&p0);
pdb.connect_ingoing(&p1);
pdb.connect_ingoing(&p2);
pdb.set_reputation(&p0, 70);
pdb.set_reputation(&p1, 100);
pdb.set_reputation(&p2, 50);
add_score(&mut pdb, &p0, 70.0);
add_score(&mut pdb, &p1, 100.0);
add_score(&mut pdb, &p2, 50.0);
let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected);
assert!(vec![&p1, &p0, &p2]
.into_iter()
.eq(best_peers.into_iter().map(|p| p.0)));
let best_peers: Vec<&PeerId> = pdb
.best_peers_by_status(PeerConnectionStatus::is_connected)
.iter()
.map(|p| p.0)
.collect();
assert_eq!(vec![&p1, &p0, &p2], best_peers);
}
#[test]
@ -538,15 +523,15 @@ mod tests {
pdb.connect_ingoing(&p0);
pdb.connect_ingoing(&p1);
pdb.connect_ingoing(&p2);
pdb.set_reputation(&p0, 70);
pdb.set_reputation(&p1, 100);
pdb.set_reputation(&p2, 50);
add_score(&mut pdb, &p0, 70.0);
add_score(&mut pdb, &p1, 100.0);
add_score(&mut pdb, &p2, 50.0);
let the_best = pdb.best_by_status(PeerConnectionStatus::is_connected);
assert!(the_best.is_some());
// Consistency check
let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected);
assert_eq!(the_best, best_peers.into_iter().map(|p| p.0).next());
assert_eq!(the_best, best_peers.iter().next().map(|p| p.0));
}
#[test]
@ -556,26 +541,86 @@ mod tests {
let random_peer = PeerId::random();
pdb.connect_ingoing(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.connect_ingoing(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.disconnect(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.connect_outgoing(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.disconnect(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.ban(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.disconnect(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.disconnect(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
pdb.disconnect(&random_peer);
assert_eq!(pdb.n_dc, pdb.disconnected_peers().count());
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
dbg!("1");
}
#[test]
fn test_disconnected_ban_consistency() {
let mut pdb = get_db();
let random_peer = PeerId::random();
let random_peer1 = PeerId::random();
let random_peer2 = PeerId::random();
let random_peer3 = PeerId::random();
pdb.connect_ingoing(&random_peer);
pdb.connect_ingoing(&random_peer1);
pdb.connect_ingoing(&random_peer2);
pdb.connect_ingoing(&random_peer3);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.connect_ingoing(&random_peer);
pdb.disconnect(&random_peer1);
pdb.ban(&random_peer2);
pdb.connect_ingoing(&random_peer3);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.ban(&random_peer1);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.connect_outgoing(&random_peer2);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.ban(&random_peer3);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.ban(&random_peer3);
pdb.connect_ingoing(&random_peer1);
pdb.disconnect(&random_peer2);
pdb.ban(&random_peer3);
pdb.connect_ingoing(&random_peer);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.disconnect(&random_peer);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.disconnect(&random_peer);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
assert_eq!(pdb.banned_peers, pdb.banned_peers().count());
pdb.ban(&random_peer);
assert_eq!(pdb.disconnected_peers, pdb.disconnected_peers().count());
}
}

View File

@ -0,0 +1,251 @@
//! This contains the scoring logic for peers.
//!
//! A peer's score is a rational number in the range [-100, 100].
//!
//! As the logic develops this documentation will advance.
//!
//! The scoring algorithms are currently experimental.
use serde::Serialize;
use std::time::Instant;
lazy_static! {
static ref HALFLIFE_DECAY: f64 = -2.0f64.ln() / SCORE_HALFLIFE;
}
/// The default score for new peers.
pub(crate) const DEFAULT_SCORE: f64 = 0.0;
/// The minimum reputation before a peer is disconnected.
const MIN_SCORE_BEFORE_DISCONNECT: f64 = -20.0;
/// The minimum reputation before a peer is banned.
const MIN_SCORE_BEFORE_BAN: f64 = -50.0;
/// The maximum score a peer can obtain.
const MAX_SCORE: f64 = 100.0;
/// The minimum score a peer can obtain.
const MIN_SCORE: f64 = -100.0;
/// The halflife of a peer's score. I.e the number of seconds it takes for the score to decay to half its value.
const SCORE_HALFLIFE: f64 = 600.0;
/// The number of seconds we ban a peer for before their score begins to decay.
const BANNED_BEFORE_DECAY: u64 = 1800;
/// A collection of actions a peer can perform which will adjust its score.
/// Each variant has an associated score change.
// To easily assess the behaviour of scores changes the number of variants should stay low, and
// somewhat generic.
#[derive(Debug, Clone, Copy)]
pub enum PeerAction {
/// We should not communicate more with this peer.
/// This action will cause the peer to get banned.
Fatal,
/// This peer's action is not malicious but will not be tolerated. A few occurrences will cause
/// the peer to get kicked.
/// NOTE: ~5 occurrences will get the peer banned
LowToleranceError,
/// An error occurred with this peer but it is not necessarily malicious.
/// We have high tolerance for this actions: several occurrences are needed for a peer to get
/// kicked.
/// NOTE: ~10 occurrences will get the peer banned
MidToleranceError,
/// An error occurred with this peer but it is not necessarily malicious.
/// We have high tolerance for this actions: several occurrences are needed for a peer to get
/// kicked.
/// NOTE: ~15 occurrences will get the peer banned
HighToleranceError,
/// Received an expected message.
_ValidMessage,
}
/// The expected state of the peer given the peer's score.
#[derive(Debug, PartialEq)]
pub(crate) enum ScoreState {
/// We are content with the peers performance. We permit connections and messages.
Healthy,
/// The peer should be disconnected. We allow re-connections if the peer is persistent.
Disconnect,
/// The peer is banned. We disallow new connections until it's score has decayed into a
/// tolerable threshold.
Ban,
}
/// A peer's score (perceived potential usefulness).
///
/// This simplistic version consists of a global score per peer which decays to 0 over time. The
/// decay rate applies equally to positive and negative scores.
#[derive(Copy, PartialEq, Clone, Debug, Serialize)]
pub struct Score {
/// The global score.
// NOTE: In the future we may separate this into sub-scores involving the RPC, Gossipsub and
// lighthouse.
score: f64,
/// The time the score was last updated to perform time-based adjustments such as score-decay.
#[serde(skip)]
last_updated: Instant,
}
impl Default for Score {
fn default() -> Self {
Score {
score: DEFAULT_SCORE,
last_updated: Instant::now(),
}
}
}
impl Eq for Score {}
impl PartialOrd for Score {
fn partial_cmp(&self, other: &Score) -> Option<std::cmp::Ordering> {
self.score
.partial_cmp(&other.score)
.or_else(|| self.last_updated.partial_cmp(&other.last_updated))
}
}
impl Ord for Score {
fn cmp(&self, other: &Score) -> std::cmp::Ordering {
self.partial_cmp(other)
.unwrap_or_else(|| std::cmp::Ordering::Equal)
}
}
impl From<f64> for Score {
fn from(f: f64) -> Self {
Score {
score: f,
last_updated: Instant::now(),
}
}
}
impl std::fmt::Display for Score {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:.2}", self.score)
}
}
impl std::fmt::Display for PeerAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeerAction::Fatal => write!(f, "Fatal"),
PeerAction::LowToleranceError => write!(f, "Low Tolerance Error"),
PeerAction::MidToleranceError => write!(f, "Mid Tolerance Error"),
PeerAction::HighToleranceError => write!(f, "High Tolerance Error"),
PeerAction::_ValidMessage => write!(f, "Valid Message"),
}
}
}
impl std::fmt::Display for ScoreState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ScoreState::Healthy => write!(f, "Healthy"),
ScoreState::Ban => write!(f, "Ban"),
ScoreState::Disconnect => write!(f, "Disconnect"),
}
}
}
impl Score {
/// Access to the underlying score.
pub fn score(&self) -> f64 {
self.score
}
/// Modifies the score based on a peer's action.
pub fn apply_peer_action(&mut self, peer_action: PeerAction) {
match peer_action {
PeerAction::Fatal => self.score = MIN_SCORE, // The worst possible score
PeerAction::LowToleranceError => self.add(-10.0),
PeerAction::MidToleranceError => self.add(-5.0),
PeerAction::HighToleranceError => self.add(-1.0),
PeerAction::_ValidMessage => self.add(0.1),
}
}
/// Returns the expected state of the peer given it's score.
pub(crate) fn state(&self) -> ScoreState {
match self.score {
x if x <= MIN_SCORE_BEFORE_BAN => ScoreState::Ban,
x if x <= MIN_SCORE_BEFORE_DISCONNECT => ScoreState::Disconnect,
_ => ScoreState::Healthy,
}
}
/// Add an f64 to the score abiding by the limits.
pub fn add(&mut self, score: f64) {
let mut new_score = self.score + score;
if new_score > MAX_SCORE {
new_score = MAX_SCORE;
}
if new_score < MIN_SCORE {
new_score = MIN_SCORE;
}
self.score = new_score;
}
/// Applies time-based logic such as decay rates to the score.
/// This function should be called periodically.
pub fn update(&mut self) {
// Apply decay logic
//
// There is two distinct decay processes. One for banned peers and one for all others. If
// the score is below the banning threshold and the duration since it was last update is
// shorter than the banning threshold, we do nothing.
let now = Instant::now();
if self.score <= MIN_SCORE_BEFORE_BAN
&& now
.checked_duration_since(self.last_updated)
.map(|d| d.as_secs())
<= Some(BANNED_BEFORE_DECAY)
{
// The peer is banned and still within the ban timeout. Do not update it's score.
return;
}
// Decay the current score
// Using exponential decay based on a constant half life.
if let Some(secs_since_update) = now
.checked_duration_since(self.last_updated)
.map(|d| d.as_secs())
{
// e^(-ln(2)/HL*t)
let decay_factor = (*HALFLIFE_DECAY * secs_since_update as f64).exp();
self.score *= decay_factor;
self.last_updated = now;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reputation_change() {
let mut score = Score::default();
// 0 change does not change de reputation
//
let change = 0.0;
score.add(change);
assert_eq!(score.score(), DEFAULT_SCORE);
// underflowing change is capped
let mut score = Score::default();
let change = MIN_SCORE - 50.0;
score.add(change);
assert_eq!(score.score(), MIN_SCORE);
// overflowing change is capped
let mut score = Score::default();
let change = MAX_SCORE + 50.0;
score.add(change);
assert_eq!(score.score(), MAX_SCORE);
// Score adjusts
let mut score = Score::default();
let change = 1.32;
score.add(change);
assert_eq!(score.score(), DEFAULT_SCORE + change);
}
}

View File

@ -145,7 +145,7 @@ impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => {
if packet.len() > 0 {
if !packet.is_empty() {
Err(RPCError::InvalidData)
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))

View File

@ -74,7 +74,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size".into(),
"attempting to encode data > max_packet_size",
));
}
// Inserts the length prefix of the uncompressed bytes into dst
@ -186,7 +186,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
},
Protocol::MetaData => match self.protocol.version {
Version::V1 => {
if decoded_buffer.len() > 0 {
if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData)
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))

View File

@ -20,7 +20,7 @@ use std::{
collections::hash_map::Entry,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
time::Duration,
};
use tokio::time::{delay_queue, delay_until, Delay, DelayQueue, Instant as TInstant};
use types::EthSpec;
@ -40,19 +40,19 @@ const SHUTDOWN_TIMEOUT_SECS: u8 = 15;
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct SubstreamId(usize);
/// An error encoutered by the handler.
/// An error encountered by the handler.
pub enum HandlerErr {
/// An error ocurred for this peer's request. This can occurr during protocol negotiation,
/// message passing, or if the handler identifies that we are sending an error reponse to the peer.
/// An error occurred for this peer's request. This can occur during protocol negotiation,
/// message passing, or if the handler identifies that we are sending an error response to the peer.
Inbound {
/// Id of the peer's request for which an error occurred.
id: SubstreamId,
/// Information of the negotiated protocol.
proto: Protocol,
/// The error that ocurred.
/// The error that occurred.
error: RPCError,
},
/// An error ocurred for this request. Such error can occurr during protocol negotiation,
/// An error occurred for this request. Such error can occur during protocol negotiation,
/// message passing, or if we successfully received a response from the peer, but this response
/// indicates an error.
Outbound {
@ -60,7 +60,7 @@ pub enum HandlerErr {
id: RequestId,
/// Information of the protocol.
proto: Protocol,
/// The error that ocurred.
/// The error that occurred.
error: RPCError,
},
}
@ -122,9 +122,6 @@ where
/// State of the handler.
state: HandlerState,
/// After the given duration has elapsed, an inactive connection will shutdown.
inactive_timeout: Duration,
/// Try to negotiate the outbound upgrade a few times if there is an IO error before reporting the request as failed.
/// This keeps track of the number of attempts.
outbound_io_error_retries: u8,
@ -139,6 +136,7 @@ enum HandlerState {
/// The handler is shutting_down.
///
/// While in this state the handler rejects new requests but tries to finish existing ones.
/// Once the timer expires, all messages are killed.
ShuttingDown(Delay),
/// The handler is deactivated. A goodbye has been sent and no more messages are sent or
/// received.
@ -278,11 +276,7 @@ impl<TSpec> RPCHandler<TSpec>
where
TSpec: EthSpec,
{
pub fn new(
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>,
inactive_timeout: Duration,
log: &slog::Logger,
) -> Self {
pub fn new(listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>, log: &slog::Logger) -> Self {
RPCHandler {
listen_protocol,
pending_errors: Vec::new(),
@ -299,7 +293,6 @@ where
state: HandlerState::Active,
max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
inactive_timeout,
outbound_io_error_retries: 0,
log: log.clone(),
}
@ -496,19 +489,21 @@ where
// Check that we don't have outbound items pending for dialing, nor dialing, nor
// established. Also check that there are no established inbound substreams.
// Errors and events need to be reported back, so check those too.
let should_shutdown = self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0;
let should_shutdown = if let HandlerState::ShuttingDown(_) = self.state {
self.dial_queue.is_empty()
&& self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
&& self.pending_errors.is_empty()
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
} else {
false
};
match self.keep_alive {
KeepAlive::Yes if should_shutdown => {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
KeepAlive::Yes if should_shutdown => self.keep_alive = KeepAlive::No,
KeepAlive::Yes => {} // We continue being active
KeepAlive::Until(_) if should_shutdown => {} // Already deemed inactive
KeepAlive::Until(_) if should_shutdown => self.keep_alive = KeepAlive::No, // Already deemed inactive
KeepAlive::Until(_) => {
// No longer idle
self.keep_alive = KeepAlive::Yes;
@ -833,7 +828,7 @@ where
// await flush
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingFlush {
substream: substream,
substream,
closing,
};
drive_stream_further = true;

View File

@ -116,6 +116,18 @@ pub enum GoodbyeReason {
/// Error/fault in the RPC.
Fault = 3,
/// Teku uses this code for not being able to verify a network.
UnableToVerifyNetwork = 128,
/// The node has too many connected peers.
TooManyPeers = 129,
/// Scored poorly.
BadScore = 250,
/// The peer is banned
Banned = 251,
/// Unknown reason.
Unknown = 0,
}
@ -126,6 +138,10 @@ impl From<u64> for GoodbyeReason {
1 => GoodbyeReason::ClientShutdown,
2 => GoodbyeReason::IrrelevantNetwork,
3 => GoodbyeReason::Fault,
128 => GoodbyeReason::UnableToVerifyNetwork,
129 => GoodbyeReason::TooManyPeers,
250 => GoodbyeReason::BadScore,
251 => GoodbyeReason::Banned,
_ => GoodbyeReason::Unknown,
}
}
@ -381,6 +397,10 @@ impl std::fmt::Display for GoodbyeReason {
GoodbyeReason::ClientShutdown => write!(f, "Client Shutdown"),
GoodbyeReason::IrrelevantNetwork => write!(f, "Irrelevant Network"),
GoodbyeReason::Fault => write!(f, "Fault"),
GoodbyeReason::UnableToVerifyNetwork => write!(f, "Unable to verify network"),
GoodbyeReason::TooManyPeers => write!(f, "Too many peers"),
GoodbyeReason::BadScore => write!(f, "Bad Score"),
GoodbyeReason::Banned => write!(f, "Banned"),
GoodbyeReason::Unknown => write!(f, "Unknown Reason"),
}
}

View File

@ -14,7 +14,6 @@ use libp2p::{Multiaddr, PeerId};
use slog::{debug, o};
use std::marker::PhantomData;
use std::task::{Context, Poll};
use std::time::Duration;
use types::EthSpec;
pub(crate) use handler::HandlerErr;
@ -149,7 +148,6 @@ where
SubstreamProtocol::new(RPCProtocol {
phantom: PhantomData,
}),
Duration::from_secs(30),
&self.log,
)
}

View File

@ -1,10 +1,10 @@
use crate::behaviour::{Behaviour, BehaviourEvent, PeerRequestId, Request, Response};
use crate::discovery::enr;
use crate::multiaddr::Protocol;
use crate::rpc::{RPCResponseErrorCode, RequestId};
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId};
use crate::types::{error, GossipKind};
use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals};
use crate::{NetworkConfig, NetworkGlobals, PeerAction};
use futures::prelude::*;
use libp2p::core::{
identity::Keypair,
@ -12,11 +12,10 @@ use libp2p::core::{
muxing::StreamMuxerBox,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint,
};
use libp2p::{
core, noise,
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
core, noise, secio,
swarm::{SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use slog::{crit, debug, info, o, trace, warn};
@ -26,13 +25,9 @@ use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::DelayQueue;
use types::{EnrForkId, EthSpec};
pub const NETWORK_KEY_FILENAME: &str = "key";
/// The time in milliseconds to wait before banning a peer. This allows for any Goodbye messages to be
/// flushed and protocols to be negotiated.
const BAN_PEER_WAIT_TIMEOUT: u64 = 200;
/// The maximum simultaneous libp2p connections per peer.
const MAX_CONNECTIONS_PER_PEER: usize = 1;
@ -45,40 +40,16 @@ pub enum Libp2pEvent<TSpec: EthSpec> {
Behaviour(BehaviourEvent<TSpec>),
/// A new listening address has been established.
NewListenAddr(Multiaddr),
/// A peer has established at least one connection.
PeerConnected {
/// The peer that connected.
peer_id: PeerId,
/// Whether the peer was a dialer or listener.
endpoint: ConnectedPoint,
},
/// A peer no longer has any connections, i.e is disconnected.
PeerDisconnected {
/// The peer the disconnected.
peer_id: PeerId,
/// Whether the peer was a dialer or a listener.
endpoint: ConnectedPoint,
},
}
/// The configuration and state of the libp2p components for the beacon node.
pub struct Service<TSpec: EthSpec> {
/// The libp2p Swarm handler.
//TODO: Make this private
pub swarm: Swarm<Behaviour<TSpec>>,
/// This node's PeerId.
pub local_peer_id: PeerId,
/// Used for managing the state of peers.
network_globals: Arc<NetworkGlobals<TSpec>>,
/// A current list of peers to ban after a given timeout.
peers_to_ban: DelayQueue<PeerId>,
/// A list of timeouts after which peers become unbanned.
peer_ban_timeout: DelayQueue<PeerId>,
/// The libp2p logger handle.
pub log: slog::Logger,
}
@ -162,7 +133,9 @@ impl<TSpec: EthSpec> Service<TSpec> {
};
// helper closure for dialing peers
let mut dial_addr = |multiaddr: &Multiaddr| {
let mut dial_addr = |mut multiaddr: Multiaddr| {
// strip the p2p protocol if it exists
strip_peer_id(&mut multiaddr);
match Swarm::dial_addr(&mut swarm, multiaddr.clone()) {
Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)),
Err(err) => debug!(
@ -174,7 +147,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
// attempt to connect to user-input libp2p nodes
for multiaddr in &config.libp2p_nodes {
dial_addr(multiaddr);
dial_addr(multiaddr.clone());
}
// attempt to connect to any specified boot-nodes
@ -194,7 +167,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
.read()
.is_connected_or_dialing(&bootnode_enr.peer_id())
{
dial_addr(multiaddr);
dial_addr(multiaddr.clone());
}
}
}
@ -212,25 +185,12 @@ impl<TSpec: EthSpec> Service<TSpec> {
let service = Service {
local_peer_id,
swarm,
network_globals: network_globals.clone(),
peers_to_ban: DelayQueue::new(),
peer_ban_timeout: DelayQueue::new(),
log,
};
Ok((network_globals, service))
}
/// Adds a peer to be banned for a period of time, specified by a timeout.
pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId, timeout: Duration) {
warn!(self.log, "Disconnecting and banning peer"; "peer_id" => peer_id.to_string(), "timeout" => format!("{:?}", timeout));
self.peers_to_ban.insert(
peer_id.clone(),
Duration::from_millis(BAN_PEER_WAIT_TIMEOUT),
);
self.peer_ban_timeout.insert(peer_id, timeout);
}
/// Sends a request to a peer, with a given Id.
pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) {
self.swarm.send_request(peer_id, request_id, request);
@ -247,6 +207,16 @@ impl<TSpec: EthSpec> Service<TSpec> {
self.swarm._send_error_reponse(peer_id, id, error, reason);
}
/// Report a peer's action.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) {
self.swarm.report_peer(peer_id, action);
}
// Disconnect and ban a peer, providing a reason.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) {
self.swarm.goodbye_peer(peer_id, reason);
}
/// Sends a response to a peer's request.
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<TSpec>) {
self.swarm.send_successful_response(peer_id, id, response);
@ -254,115 +224,61 @@ impl<TSpec: EthSpec> Service<TSpec> {
pub async fn next_event(&mut self) -> Libp2pEvent<TSpec> {
loop {
tokio::select! {
event = self.swarm.next_event() => {
match event {
SwarmEvent::Behaviour(behaviour) => {
return Libp2pEvent::Behaviour(behaviour)
}
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
} => {
debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connections" => num_established.get());
// if this is the first connection inform the network layer a new connection
// has been established and update the db
if num_established.get() == 1 {
// update the peerdb
match endpoint {
ConnectedPoint::Listener { .. } => {
self.swarm.peer_manager().connect_ingoing(&peer_id);
}
ConnectedPoint::Dialer { .. } => self
.network_globals
.peers
.write()
.connect_outgoing(&peer_id),
}
return Libp2pEvent::PeerConnected { peer_id, endpoint };
}
}
SwarmEvent::ConnectionClosed {
peer_id,
cause,
endpoint,
num_established,
} => {
debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string(), "connections" => num_established);
if num_established == 0 {
// update the peer_db
self.swarm.peer_manager().notify_disconnect(&peer_id);
// the peer has disconnected
return Libp2pEvent::PeerDisconnected {
peer_id,
endpoint,
};
}
}
SwarmEvent::NewListenAddr(multiaddr) => {
return Libp2pEvent::NewListenAddr(multiaddr)
}
SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
} => {
debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string())
}
SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
} => {
debug!(self.log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string())
}
SwarmEvent::BannedPeer {
peer_id,
endpoint: _,
} => {
debug!(self.log, "Attempted to dial a banned peer"; "peer_id" => peer_id.to_string())
}
SwarmEvent::UnreachableAddr {
peer_id,
address,
error,
attempts_remaining,
} => {
debug!(self.log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining);
self.swarm.peer_manager().notify_disconnect(&peer_id);
}
SwarmEvent::UnknownPeerUnreachableAddr { address, error } => {
debug!(self.log, "Peer not known at dialed address"; "address" => address.to_string(), "error" => error.to_string());
}
SwarmEvent::ExpiredListenAddr(multiaddr) => {
debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string())
}
SwarmEvent::ListenerClosed { addresses, reason } => {
debug!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason))
}
SwarmEvent::ListenerError { error } => {
debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string()))
}
SwarmEvent::Dialing(peer_id) => {
self.swarm.peer_manager().dialing_peer(&peer_id);
}
}
match self.swarm.next_event().await {
SwarmEvent::Behaviour(behaviour) => return Libp2pEvent::Behaviour(behaviour),
SwarmEvent::ConnectionEstablished { .. } => {
// A connection could be established with a banned peer. This is
// handled inside the behaviour.
}
Some(Ok(peer_to_ban)) = self.peers_to_ban.next() => {
let peer_id = peer_to_ban.into_inner();
Swarm::ban_peer_id(&mut self.swarm, peer_id.clone());
// TODO: Correctly notify protocols of the disconnect
// TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629
self.swarm.inject_disconnected(&peer_id);
// inform the behaviour that the peer has been banned
self.swarm.peer_banned(peer_id);
SwarmEvent::ConnectionClosed {
peer_id,
cause,
endpoint: _,
num_established,
} => {
debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string(), "connections" => num_established);
}
Some(Ok(peer_to_unban)) = self.peer_ban_timeout.next() => {
debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_to_unban));
let unban_peer = peer_to_unban.into_inner();
self.swarm.peer_unbanned(&unban_peer);
Swarm::unban_peer_id(&mut self.swarm, unban_peer);
SwarmEvent::NewListenAddr(multiaddr) => {
return Libp2pEvent::NewListenAddr(multiaddr)
}
SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
} => {
debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string())
}
SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
} => {
debug!(self.log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string())
}
SwarmEvent::BannedPeer { .. } => {
// We do not ban peers at the swarm layer, so this should never occur.
}
SwarmEvent::UnreachableAddr {
peer_id,
address,
error,
attempts_remaining,
} => {
debug!(self.log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining);
}
SwarmEvent::UnknownPeerUnreachableAddr { address, error } => {
debug!(self.log, "Peer not known at dialed address"; "address" => address.to_string(), "error" => error.to_string());
}
SwarmEvent::ExpiredListenAddr(multiaddr) => {
debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string())
}
SwarmEvent::ListenerClosed { addresses, reason } => {
debug!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason))
}
SwarmEvent::ListenerError { error } => {
debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string()))
}
SwarmEvent::Dialing(peer_id) => {
debug!(self.log, "Dialing peer"; "peer_id" => peer_id.to_string());
}
}
}
@ -386,7 +302,7 @@ fn build_transport(
let transport = transport
.and_then(move |stream, endpoint| {
let upgrade = core::upgrade::SelectUpgrade::new(
libp2p::secio::SecioConfig::new(local_private_key.clone()),
secio::SecioConfig::new(local_private_key.clone()),
generate_noise_config(&local_private_key),
);
core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then(
@ -455,7 +371,6 @@ fn keypair_from_bytes(mut bytes: Vec<u8>) -> error::Result<Keypair> {
///
/// Currently only secp256k1 keys are allowed, as these are the only keys supported by discv5.
fn load_private_key(config: &NetworkConfig, log: &slog::Logger) -> Keypair {
// TODO: Currently using secp256k1 keypairs - currently required for discv5
// check for key from disk
let network_key_f = config.network_dir.join(NETWORK_KEY_FILENAME);
if let Ok(mut network_key_file) = File::open(network_key_f.clone()) {
@ -507,3 +422,14 @@ fn generate_noise_config(
.expect("signing can fail only once during starting a node");
noise::NoiseConfig::xx(static_dh_keys).into_authenticated()
}
/// For a multiaddr that ends with a peer id, this strips this suffix. Rust-libp2p
/// only supports dialing to an address without providing the peer id.
fn strip_peer_id(addr: &mut Multiaddr) {
let last = addr.pop();
match last {
Some(Protocol::P2p(_)) => {}
Some(other) => addr.push(other),
_ => {}
}
}

View File

@ -68,7 +68,7 @@ impl<T: EthSpec> PubsubMessage<T> {
continue;
}
Ok(gossip_topic) => {
let ref decompressed_data = match gossip_topic.encoding() {
let decompressed_data = &(match gossip_topic.encoding() {
GossipEncoding::SSZSnappy => {
// Exit early if uncompressed data is > GOSSIP_MAX_SIZE
match decompress_len(data) {
@ -86,7 +86,7 @@ impl<T: EthSpec> PubsubMessage<T> {
Err(e) => return Err(format!("{}", e)),
}
}
};
});
// the ssz decoders
match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {

View File

@ -47,7 +47,7 @@ async fn test_status_rpc() {
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
@ -137,7 +137,7 @@ async fn test_blocks_by_range_chunked_rpc() {
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
@ -248,7 +248,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() {
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
@ -377,7 +377,7 @@ async fn test_blocks_by_range_single_empty_rpc() {
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
@ -489,7 +489,7 @@ async fn test_blocks_by_root_chunked_rpc() {
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
@ -608,7 +608,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() {
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender
@ -713,19 +713,19 @@ async fn test_goodbye_rpc() {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(&log).await;
// Goodbye Request
let rpc_request = Request::Goodbye(GoodbyeReason::ClientShutdown);
// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
Libp2pEvent::PeerConnected { peer_id, .. } => {
// Send a STATUS message
Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => {
// Send a goodbye and disconnect
debug!(log, "Sending RPC");
sender
.swarm
.send_request(peer_id, RequestId::Sync(10), rpc_request.clone());
.goodbye_peer(&peer_id, GoodbyeReason::IrrelevantNetwork);
}
Libp2pEvent::Behaviour(BehaviourEvent::PeerDisconnected(_)) => {
return;
}
_ => {} // Ignore other RPC messages
}
@ -736,13 +736,8 @@ async fn test_goodbye_rpc() {
let receiver_future = async {
loop {
match receiver.next_event().await {
Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived {
peer_id: _,
id: _,
request,
}) => {
Libp2pEvent::Behaviour(BehaviourEvent::PeerDisconnected(_)) => {
// Should receive sent RPC request
assert_eq!(rpc_request.clone(), request); // receives the goodbye. Nothing left to do
return;
}
_ => {} // Ignore other events
@ -750,9 +745,10 @@ async fn test_goodbye_rpc() {
}
};
let total_future = futures::future::join(sender_future, receiver_future);
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = total_future => {}
_ = delay_for(Duration::from_secs(30)) => {
panic!("Future timed out");
}

View File

@ -1,7 +1,8 @@
//! This module handles incoming network messages.
//!
//! It routes the messages to appropriate services, such as the Sync
//! and processes those that are
//! It routes the messages to appropriate services.
//! It handles requests at the application layer in its associated processor and directs
//! syncing-related responses to the Sync manager.
#![allow(clippy::unit_arg)]
pub mod processor;
@ -166,15 +167,6 @@ impl<T: BeaconChainTypes> Router<T> {
self.processor
.on_status_request(peer_id, id, status_message)
}
Request::Goodbye(goodbye_reason) => {
debug!(
self.log, "Peer sent Goodbye";
"peer_id" => peer_id.to_string(),
"reason" => format!("{:?}", goodbye_reason),
"client" => self.network_globals.client(&peer_id).to_string(),
);
self.processor.on_disconnect(peer_id);
}
Request::BlocksByRange(request) => self
.processor
.on_blocks_by_range_request(peer_id, id, request),

View File

@ -10,7 +10,7 @@ use beacon_chain::{
GossipVerifiedBlock,
};
use eth2_libp2p::rpc::*;
use eth2_libp2p::{NetworkGlobals, PeerId, PeerRequestId, Request, Response};
use eth2_libp2p::{NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response};
use itertools::process_results;
use slog::{debug, error, o, trace, warn};
use ssz::Encode;
@ -194,7 +194,7 @@ impl<T: BeaconChainTypes> Processor<T> {
);
self.network
.disconnect(peer_id, GoodbyeReason::IrrelevantNetwork);
.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
} else if remote.head_slot
> self.chain.slot().unwrap_or_else(|_| Slot::from(0u64)) + FUTURE_SLOT_TOLERANCE
{
@ -210,7 +210,7 @@ impl<T: BeaconChainTypes> Processor<T> {
"reason" => "different system clocks or genesis time"
);
self.network
.disconnect(peer_id, GoodbyeReason::IrrelevantNetwork);
.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch <= local.finalized_epoch
&& remote.finalized_root != Hash256::zero()
&& local.finalized_root != Hash256::zero()
@ -230,7 +230,7 @@ impl<T: BeaconChainTypes> Processor<T> {
"reason" => "different finalized chain"
);
self.network
.disconnect(peer_id, GoodbyeReason::IrrelevantNetwork);
.goodbye_peer(peer_id, GoodbyeReason::IrrelevantNetwork);
} else if remote.finalized_epoch < local.finalized_epoch {
// The node has a lower finalized epoch, their chain is not useful to us. There are two
// cases where a node can have a lower finalized epoch:
@ -344,7 +344,7 @@ impl<T: BeaconChainTypes> Processor<T> {
warn!(self.log,
"Peer sent invalid range request";
"error" => "Step sent was 0");
self.network.disconnect(peer_id, GoodbyeReason::Fault);
self.network.goodbye_peer(peer_id, GoodbyeReason::Fault);
return;
}
@ -1096,23 +1096,24 @@ impl<T: EthSpec> HandlerNetworkContext<T> {
Self { network_send, log }
}
/// Sends a message to the network task.
fn inform_network(&mut self, msg: NetworkMessage<T>) {
self.network_send
.send(msg)
.unwrap_or_else(|_| warn!(self.log, "Could not send message to the network service"))
}
pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
warn!(
&self.log,
"Disconnecting peer (RPC)";
"reason" => format!("{:?}", reason),
"peer_id" => format!("{:?}", peer_id),
);
self.send_processor_request(peer_id.clone(), Request::Goodbye(reason));
self.inform_network(NetworkMessage::Disconnect { peer_id });
/// Disconnects and ban's a peer, sending a Goodbye request with the associated reason.
pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.inform_network(NetworkMessage::GoodbyePeer { peer_id, reason });
}
/// Reports a peer's action, adjusting the peer's score.
pub fn _report_peer(&mut self, peer_id: PeerId, action: PeerAction) {
self.inform_network(NetworkMessage::ReportPeer { peer_id, action });
}
/// Sends a request to the network task.
pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) {
self.inform_network(NetworkMessage::SendRequest {
peer_id,
@ -1121,6 +1122,7 @@ impl<T: EthSpec> HandlerNetworkContext<T> {
})
}
/// Sends a response to the network task.
pub fn send_response(&mut self, peer_id: PeerId, response: Response<T>, id: PeerRequestId) {
self.inform_network(NetworkMessage::SendResponse {
peer_id,
@ -1128,6 +1130,8 @@ impl<T: EthSpec> HandlerNetworkContext<T> {
response,
})
}
/// Sends an error response to the network task.
pub fn _send_error_response(
&mut self,
peer_id: PeerId,

View File

@ -8,8 +8,8 @@ use crate::{error, metrics};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{
rpc::{RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerRequestId, PubsubMessage, Request, Response,
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response,
};
use eth2_libp2p::{BehaviourEvent, MessageId, NetworkGlobals, PeerId};
use futures::prelude::*;
@ -24,8 +24,49 @@ use types::EthSpec;
mod tests;
/// The time in seconds that a peer will be banned and prevented from reconnecting.
const BAN_PEER_TIMEOUT: u64 = 30;
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage<T: EthSpec> {
/// Subscribes a list of validators to specific slots for attestation duties.
Subscribe {
subscriptions: Vec<ValidatorSubscription>,
},
/// Send an RPC request to the libp2p service.
SendRequest {
peer_id: PeerId,
request: Request,
request_id: RequestId,
},
/// Send a successful Response to the libp2p service.
SendResponse {
peer_id: PeerId,
response: Response<T>,
id: PeerRequestId,
},
/// Respond to a peer's request with an error.
SendError {
// TODO: note that this is never used, we just say goodbye without nicely closing the
// stream assigned to the request
peer_id: PeerId,
error: RPCResponseErrorCode,
reason: String,
id: PeerRequestId,
},
/// Publish a list of messages to the gossipsub protocol.
Publish { messages: Vec<PubsubMessage<T>> },
/// Propagate a received gossipsub message.
Propagate {
propagation_source: PeerId,
message_id: MessageId,
},
/// Reports a peer to the peer manager for performing an action.
ReportPeer { peer_id: PeerId, action: PeerAction },
/// Disconnect an ban a peer, providing a reason.
GoodbyePeer {
peer_id: PeerId,
reason: GoodbyeReason,
},
}
/// Service that handles communication between internal services and the `eth2_libp2p` network service.
pub struct NetworkService<T: BeaconChainTypes> {
@ -200,12 +241,8 @@ fn spawn_service<T: BeaconChainTypes>(
expose_publish_metrics(&messages);
service.libp2p.swarm.publish(messages);
}
NetworkMessage::Disconnect { peer_id } => {
service.libp2p.disconnect_and_ban_peer(
peer_id,
std::time::Duration::from_secs(BAN_PEER_TIMEOUT),
);
}
NetworkMessage::ReportPeer { peer_id, action } => service.libp2p.report_peer(&peer_id, action),
NetworkMessage::GoodbyePeer { peer_id, reason } => service.libp2p.goodbye_peer(&peer_id, reason),
NetworkMessage::Subscribe { subscriptions } => {
if let Err(e) = service
.attestation_service
@ -240,16 +277,27 @@ fn spawn_service<T: BeaconChainTypes>(
// poll the swarm
match libp2p_event {
Libp2pEvent::Behaviour(event) => match event {
BehaviourEvent::PeerDialed(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::PeerDialed(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer dialed to router"); });
},
BehaviourEvent::PeerConnected(_peer_id) => {
// A peer has connected to us
// We currently do not perform any action here.
},
BehaviourEvent::PeerDisconnected(peer_id) => {
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
},
BehaviourEvent::RequestReceived{peer_id, id, request} => {
if let Request::Goodbye(_) = request {
// if we received a Goodbye message, drop and ban the peer
//peers_to_ban.push(peer_id.clone());
// TODO: remove this: https://github.com/sigp/lighthouse/issues/1240
service.libp2p.disconnect_and_ban_peer(
peer_id.clone(),
std::time::Duration::from_secs(BAN_PEER_TIMEOUT),
);
};
let _ = service
.router_send
.send(RouterMessage::RPCRequestReceived{peer_id, id, request})
@ -328,25 +376,6 @@ fn spawn_service<T: BeaconChainTypes>(
Libp2pEvent::NewListenAddr(multiaddr) => {
service.network_globals.listen_multiaddrs.write().push(multiaddr);
}
Libp2pEvent::PeerConnected{ peer_id, endpoint,} => {
debug!(service.log, "Peer Connected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint));
if let eth2_libp2p::ConnectedPoint::Dialer { .. } = endpoint {
let _ = service
.router_send
.send(RouterMessage::PeerDialed(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer dialed to router"); });
}
}
Libp2pEvent::PeerDisconnected{ peer_id, endpoint,} => {
debug!(service.log, "Peer Disconnected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint));
let _ = service
.router_send
.send(RouterMessage::PeerDisconnected(peer_id))
.map_err(|_| {
debug!(service.log, "Failed to send peer disconnect to router");
});
}
}
}
}
@ -378,45 +407,6 @@ fn next_fork_delay<T: BeaconChainTypes>(
})
}
/// Types of messages that the network service can receive.
#[derive(Debug)]
pub enum NetworkMessage<T: EthSpec> {
/// Subscribes a list of validators to specific slots for attestation duties.
Subscribe {
subscriptions: Vec<ValidatorSubscription>,
},
/// Send an RPC request to the libp2p service.
SendRequest {
peer_id: PeerId,
request: Request,
request_id: RequestId,
},
/// Send a successful Response to the libp2p service.
SendResponse {
peer_id: PeerId,
response: Response<T>,
id: PeerRequestId,
},
/// Respond to a peer's request with an error.
SendError {
// TODO: note that this is never used, we just say goodbye without nicely clossing the
// stream assigned to the request
peer_id: PeerId,
error: RPCResponseErrorCode,
reason: String,
id: PeerRequestId,
},
/// Publish a list of messages to the gossipsub protocol.
Publish { messages: Vec<PubsubMessage<T>> },
/// Propagate a received gossipsub message.
Propagate {
propagation_source: PeerId,
message_id: MessageId,
},
/// Disconnect and bans a peer id.
Disconnect { peer_id: PeerId },
}
/// Inspects the `messages` that were being sent to the network and updates Prometheus metrics.
fn expose_publish_metrics<T: EthSpec>(messages: &[PubsubMessage<T>]) {
for message in messages {

View File

@ -40,9 +40,9 @@ use super::range_sync::{BatchId, ChainId, RangeSync, EPOCHS_PER_BATCH};
use super::RequestId;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest};
use eth2_libp2p::rpc::{methods::MAX_REQUEST_BLOCKS, BlocksByRootRequest, GoodbyeReason};
use eth2_libp2p::types::NetworkGlobals;
use eth2_libp2p::PeerId;
use eth2_libp2p::{PeerAction, PeerId};
use fnv::FnvHashMap;
use slog::{crit, debug, error, info, trace, warn, Logger};
use smallvec::SmallVec;
@ -347,10 +347,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// stream termination for a single block lookup, remove the key
if let Some(single_block_request) = self.single_block_lookups.remove(&request_id) {
// the peer didn't respond with a block that it referenced
// The peer didn't respond with a block that it referenced.
// This can be allowed as some clients may implement pruning. We mildly
// tolerate this behaviour.
if !single_block_request.block_returned {
warn!(self.log, "Peer didn't respond with a block it referenced"; "referenced_block_hash" => format!("{}", single_block_request.hash), "peer_id" => format!("{}", peer_id));
self.network.downvote_peer(peer_id);
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
}
return;
}
@ -389,9 +392,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
) {
// verify the hash is correct and try and process the block
if expected_block_hash != block.canonical_root() {
// the peer that sent this, sent us the wrong block
// The peer that sent this, sent us the wrong block.
// We do not tolerate this behaviour. The peer is instantly disconnected and banned.
warn!(self.log, "Peer sent incorrect block for single block lookup"; "peer_id" => format!("{}", peer_id));
self.network.downvote_peer(peer_id);
self.network.goodbye_peer(peer_id, GoodbyeReason::Fault);
return;
}
@ -426,7 +430,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
outcome => {
warn!(self.log, "Single block lookup failed"; "outcome" => format!("{:?}", outcome));
self.network.downvote_peer(peer_id);
// This could be a range of errors. But we couldn't process the block.
// For now we consider this a mid tolerance error.
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
}
}
}
@ -624,8 +631,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
"expected_parent" => format!("{}", expected_hash),
);
// We try again, but downvote the peer.
self.request_parent(parent_request);
self.network.downvote_peer(peer);
// We do not tolerate these kinds of errors. We will accept a few but these are signs
// of a faulty peer.
self.network
.report_peer(peer, PeerAction::LowToleranceError);
} else {
// The last block in the queue is the only one that has not attempted to be processed yet.
//
@ -662,12 +673,18 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// all else we consider the chain a failure and downvote the peer that sent
// us the last block
warn!(
self.log, "Invalid parent chain. Downvoting peer";
self.log, "Invalid parent chain";
"score_adjustment" => PeerAction::MidToleranceError.to_string(),
"outcome" => format!("{:?}", outcome),
"last_peer" => format!("{:?}", parent_request.last_submitted_peer),
"last_peer" => parent_request.last_submitted_peer.to_string(),
);
// This currently can be a host of errors. We permit this due to the partial
// ambiguity.
// TODO: Refine the error types and score the peer appropriately.
self.network.report_peer(
parent_request.last_submitted_peer,
PeerAction::MidToleranceError,
);
self.network
.downvote_peer(parent_request.last_submitted_peer);
return;
}
}
@ -774,7 +791,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
);
}
SyncMessage::ParentLookupFailed(peer_id) => {
self.network.downvote_peer(peer_id);
// A peer sent an object (block or attestation) that referenced a parent.
// On request for this parent the peer indicated it did not have this
// block.
// This is not fatal. Peer's could prune old blocks so we moderately
// tolerate this behaviour.
self.network
.report_peer(peer_id, PeerAction::MidToleranceError);
}
}
}

View File

@ -5,7 +5,7 @@ use crate::router::processor::status_message;
use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId};
use eth2_libp2p::{Client, NetworkGlobals, PeerId, Request};
use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request};
use slog::{debug, trace, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
@ -101,37 +101,20 @@ impl<T: EthSpec> SyncNetworkContext<T> {
self.send_rpc_request(peer_id, Request::BlocksByRoot(request))
}
pub fn downvote_peer(&mut self, peer_id: PeerId) {
debug!(
self.log,
"Peer downvoted";
"peer" => format!("{:?}", peer_id)
);
// TODO: Implement reputation
// TODO: what if we first close the channel sending a response
// RPCResponseErrorCode::InvalidRequest (or something)
// and then disconnect the peer? either request dc or let the behaviour have that logic
// itself
self.disconnect(peer_id, GoodbyeReason::Fault);
pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.network_send
.send(NetworkMessage::GoodbyePeer { peer_id, reason })
.unwrap_or_else(|_| {
warn!(self.log, "Could not report peer, channel failed");
});
}
fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
warn!(
&self.log,
"Disconnecting peer (RPC)";
"reason" => format!("{:?}", reason),
"peer_id" => format!("{:?}", peer_id),
);
// ignore the error if the channel send fails
let _ = self.send_rpc_request(peer_id.clone(), Request::Goodbye(reason));
pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) {
debug!(self.log, "Sync reporting peer"; "peer_id" => peer_id.to_string(), "action"=> action.to_string());
self.network_send
.send(NetworkMessage::Disconnect { peer_id })
.send(NetworkMessage::ReportPeer { peer_id, action })
.unwrap_or_else(|_| {
warn!(
self.log,
"Could not send a Disconnect to the network service"
)
warn!(self.log, "Could not report peer, channel failed");
});
}

View File

@ -3,7 +3,7 @@ use crate::sync::block_processor::{spawn_block_processor, BatchProcessResult, Pr
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::{RequestId, SyncMessage};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PeerId;
use eth2_libp2p::{PeerAction, PeerId};
use rand::prelude::*;
use slog::{crit, debug, warn};
use std::collections::HashSet;
@ -14,7 +14,7 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
/// already requested slots. There is a timeout for each batch request. If this value is too high,
/// we will downvote peers with poor bandwidth. This can be set arbitrarily high, in which case the
/// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which case the
/// responder will fill the response up to the max request size, assuming they have the bandwidth
/// to do so.
pub const EPOCHS_PER_BATCH: u64 = 2;
@ -27,7 +27,7 @@ const BATCH_BUFFER_SIZE: u8 = 5;
/// Invalid batches are attempted to be re-downloaded from other peers. If they cannot be processed
/// after `INVALID_BATCH_LOOKUP_ATTEMPTS` times, the chain is considered faulty and all peers will
/// be downvoted.
/// be reported negatively.
const INVALID_BATCH_LOOKUP_ATTEMPTS: u8 = 3;
#[derive(PartialEq)]
@ -192,7 +192,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
warn!(self.log, "BlocksByRange response returned out of range blocks";
"response_initial_slot" => first_slot,
"requested_initial_slot" => batch.start_slot);
network.downvote_peer(batch.current_peer);
// This is a pretty bad error. We don't consider this fatal, but we don't tolerate
// this much either.
network.report_peer(batch.current_peer, PeerAction::LowToleranceError);
self.to_be_processed_id = batch.id; // reset the id back to here, when incrementing, it will check against completed batches
return;
}
@ -363,14 +365,18 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check that we have not exceeded the re-process retry counter
if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS {
// if a batch has exceeded the invalid batch lookup attempts limit, it means
// If a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
// repeatedly and are either malicious or faulty. We drop the chain and
// downvote all peers.
warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers";
// report all peers.
// There are some edge cases with forks that could land us in this situation.
// This should be unlikely, so we tolerate these errors, but not often.
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => action.to_string(),
"chain_id" => self.id, "id"=> *batch.id);
for peer_id in self.peer_pool.drain() {
network.downvote_peer(peer_id);
network.report_peer(peer_id, action);
}
ProcessingResult::RemoveChain
} else {
@ -389,14 +395,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check that we have not exceeded the re-process retry counter
if batch.reprocess_retries > INVALID_BATCH_LOOKUP_ATTEMPTS {
// if a batch has exceeded the invalid batch lookup attempts limit, it means
// If a batch has exceeded the invalid batch lookup attempts limit, it means
// that it is likely all peers in this chain are are sending invalid batches
// repeatedly and are either malicious or faulty. We drop the chain and
// downvote all peers.
warn!(self.log, "Batch failed to download. Dropping chain and downvoting peers";
let action = PeerAction::LowToleranceError;
warn!(self.log, "Batch failed to download. Dropping chain scoring peers";
"score_adjustment" => action.to_string(),
"chain_id" => self.id, "id"=> *batch.id);
for peer_id in self.peer_pool.drain() {
network.downvote_peer(peer_id);
network.report_peer(peer_id, action);
}
ProcessingResult::RemoveChain
} else {
@ -437,18 +445,30 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// The re-downloaded version was different
if processed_batch.current_peer != processed_batch.original_peer {
// A new peer sent the correct batch, the previous peer did not
// downvote the original peer
//
// If the same peer corrected it's mistake, we allow it.... for
// now.
// We negatively score the original peer.
let action = PeerAction::LowToleranceError;
debug!(
self.log, "Re-processed batch validated. Downvoting original peer";
self.log, "Re-processed batch validated. Scoring original peer";
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",processed_batch.original_peer),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.downvote_peer(processed_batch.original_peer);
network.report_peer(processed_batch.original_peer, action);
} else {
// The same peer corrected it's previous mistake. There was an error, so we
// negative score the original peer.
let action = PeerAction::MidToleranceError;
debug!(
self.log, "Re-processed batch validated by the same peer.";
"chain_id" => self.id,
"batch_id" => *processed_batch.id,
"score_adjustment" => action.to_string(),
"original_peer" => format!("{}",processed_batch.original_peer),
"new_peer" => format!("{}", processed_batch.current_peer)
);
network.report_peer(processed_batch.original_peer, action);
}
}
}

View File

@ -68,7 +68,7 @@ where
// update the timeout
self.update_timeout(&key, entry_duration);
} else {
let delay_key = self.expirations.insert(key.clone(), entry_duration.clone());
let delay_key = self.expirations.insert(key.clone(), entry_duration);
let entry = MapEntry {
key: delay_key,
value: Instant::now() + entry_duration,
@ -114,7 +114,7 @@ where
self.expirations.remove(&entry.key);
return true;
}
return false;
false
}
/// Retains only the elements specified by the predicate.