Adds a workaround for peer manager integration (#1003)

This commit is contained in:
Age Manning 2020-04-14 20:32:03 +10:00 committed by GitHub
parent e5874f4565
commit be7aaa3dbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 90 deletions

View File

@ -5,7 +5,7 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use futures::prelude::*;
use libp2p::{
core::identity::Keypair,
core::{identity::Keypair, ConnectedPoint},
discv5::Discv5Event,
gossipsub::{Gossipsub, GossipsubEvent, MessageId},
identify::{Identify, IdentifyEvent},
@ -366,6 +366,43 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
{
fn inject_event(&mut self, event: RPCMessage<TSpec>) {
match event {
// TODO: These are temporary methods to give access to injected behaviour
// events to the
// peer manager. After a behaviour re-write remove these:
RPCMessage::PeerConnectedHack(peer_id, connected_point) => {
match connected_point {
ConnectedPoint::Dialer { .. } => self.peer_manager.connect_outgoing(&peer_id),
ConnectedPoint::Listener { .. } => self.peer_manager.connect_ingoing(&peer_id),
};
// Find ENR info about a peer if possible.
if let Some(enr) = self.discovery.enr_of_peer(&peer_id) {
let bitfield = match enr.bitfield::<TSpec>() {
Ok(v) => v,
Err(e) => {
warn!(self.log, "Peer has invalid ENR bitfield";
"peer_id" => format!("{}", peer_id),
"error" => format!("{:?}", e));
return;
}
};
// use this as a baseline, until we get the actual meta-data
let meta_data = MetaData {
seq_number: 0,
attnets: bitfield,
};
// TODO: Shift to the peer manager
self.network_globals
.peers
.write()
.add_metadata(&peer_id, meta_data);
}
}
RPCMessage::PeerDisconnectedHack(peer_id, _connected_point) => {
self.peer_manager.notify_disconnect(&peer_id)
}
RPCMessage::PeerDialed(peer_id) => {
self.events.push(BehaviourEvent::PeerDialed(peer_id))
}
@ -402,7 +439,6 @@ impl<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>
// propagate the STATUS message upwards
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event));
}
_ => {
// propagate all other RPC messages upwards
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))

View File

@ -5,7 +5,6 @@ pub(crate) mod enr;
pub use enr::build_enr;
use crate::metrics;
use crate::rpc::MetaData;
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*;
@ -169,6 +168,11 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
self.discovery.enr_entries()
}
/// Returns the ENR of a known peer if it exists.
pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option<Enr> {
self.discovery.enr_of_peer(peer_id)
}
/// Adds/Removes a subnet from the ENR Bitfield
pub fn update_enr_bitfield(&mut self, subnet_id: SubnetId, value: bool) -> Result<(), String> {
let id = *subnet_id as usize;
@ -351,62 +355,9 @@ where
self.discovery.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
// TODO: Replace with PeerManager with custom behvaviour
// Find ENR info about a peer if possible.
fn inject_connected(&mut self, _peer_id: PeerId, _endpoint: ConnectedPoint) {}
match endpoint {
ConnectedPoint::Dialer { .. } => {
self.network_globals
.peers
.write()
.connect_outgoing(&peer_id);
}
ConnectedPoint::Listener { .. } => {
self.network_globals.peers.write().connect_ingoing(&peer_id);
}
}
if let Some(enr) = self.discovery.enr_of_peer(&peer_id) {
let bitfield = match enr.bitfield::<TSpec>() {
Ok(v) => v,
Err(e) => {
warn!(self.log, "Peer has invalid ENR bitfield";
"peer_id" => format!("{}", peer_id),
"error" => format!("{:?}", e));
return;
}
};
// use this as a baseline, until we get the actual meta-data
let meta_data = MetaData {
seq_number: 0,
attnets: bitfield,
};
self.network_globals
.peers
.write()
.add_metadata(&peer_id, meta_data);
}
// TODO: Drop peers if over max_peer limit
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(
&metrics::PEERS_CONNECTED,
self.network_globals.connected_peers() as i64,
);
}
fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) {
self.network_globals.peers.write().disconnect(peer_id);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(
&metrics::PEERS_CONNECTED,
self.network_globals.connected_peers() as i64,
);
}
fn inject_disconnected(&mut self, _peer_id: &PeerId, _endpoint: ConnectedPoint) {}
fn inject_replaced(
&mut self,

View File

@ -1,6 +1,7 @@
//! Implementation of a Lighthouse's peer management system.
pub use self::peerdb::*;
use crate::metrics;
use crate::rpc::MetaData;
use crate::{NetworkGlobals, PeerId};
use futures::prelude::*;
@ -172,57 +173,41 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
false
}
/// Sets a peer as disconnected. If its reputation gets too low requests
/// the peer to be banned and to be disconnected otherwise
pub fn disconnect(&mut self, peer_id: &PeerId) {
/// Requests that a peer get disconnected.
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id.clone()));
}
/// Updates the state of the peer as disconnected.
pub fn notify_disconnect(&mut self, peer_id: &PeerId) {
self.update_reputations();
{
let mut peerdb = self.network_globals.peers.write();
peerdb.disconnect(peer_id);
peerdb.add_reputation(peer_id, PeerAction::Disconnected as Rep);
}
if !self.gets_banned(peer_id) {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id.clone()));
}
// remove the ping and status timer for the peer
self.ping_peers.remove(peer_id);
self.status_peers.remove(peer_id);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
metrics::set_gauge(
&metrics::PEERS_CONNECTED,
self.network_globals.connected_peers() as i64,
);
}
/// Sets a peer as connected as long as their reputation allows it
/// Informs if the peer was accepted
pub fn connect_ingoing(&mut self, peer_id: &PeerId) -> bool {
self.update_reputations();
let mut peerdb = self.network_globals.peers.write();
if !peerdb.connection_status(peer_id).is_banned() {
peerdb.connect_ingoing(peer_id);
// start a ping and status timer for the peer
self.ping_peers.insert(peer_id.clone());
self.status_peers.insert(peer_id.clone());
return true;
}
false
self.connect_peer(peer_id, false)
}
/// Sets a peer as connected as long as their reputation allows it
/// Informs if the peer was accepted
pub fn connect_outgoing(&mut self, peer_id: &PeerId) -> bool {
self.update_reputations();
let mut peerdb = self.network_globals.peers.write();
if !peerdb.connection_status(peer_id).is_banned() {
peerdb.connect_outgoing(peer_id);
// start a ping and status timer for the peer
self.ping_peers.insert(peer_id.clone());
self.status_peers.insert(peer_id.clone());
return true;
}
false
self.connect_peer(peer_id, true)
}
/// Provides a given peer's reputation if it exists.
@ -256,6 +241,46 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.add_reputation(peer_id, action as Rep);
self.update_reputations();
}
/* Internal functions */
/// Registers a peer as connected. The `ingoing` parameter determines if the peer is being
/// dialed or connecting to us.
///
/// This is called by `connect_ingoing` and `connect_outgoing`.
///
/// This informs if the peer was accepted in to the db or not.
// TODO: Drop peers if over max_peer limit
fn connect_peer(&mut self, peer_id: &PeerId, outgoing: bool) -> bool {
// TODO: Call this on a timer
self.update_reputations();
{
let mut peerdb = self.network_globals.peers.write();
if peerdb.connection_status(peer_id).is_banned() {
return false;
}
if outgoing {
peerdb.connect_outgoing(peer_id);
} else {
peerdb.connect_outgoing(peer_id);
}
}
// start a ping and status timer for the peer
self.ping_peers.insert(peer_id.clone());
self.status_peers.insert(peer_id.clone());
// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(
&metrics::PEERS_CONNECTED,
self.network_globals.connected_peers() as i64,
);
true
}
}
impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {

View File

@ -140,7 +140,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.map(|(id, _)| id)
}
/// Gets the connection status of the peer.
/// Returns the peer's connection status. Returns unknown if the peer is not in the DB.
pub fn connection_status(&self, peer_id: &PeerId) -> PeerConnectionStatus {
self.peer_info(peer_id)
.map_or(PeerConnectionStatus::default(), |info| {

View File

@ -175,4 +175,8 @@ pub enum RPCMessage<TSpec: EthSpec> {
RPC(PeerId, RPCEvent<TSpec>),
PeerDialed(PeerId),
PeerDisconnected(PeerId),
// TODO: This is a hack to give access to connections to peer manager. Remove this once
// behaviour is re-written
PeerConnectedHack(PeerId, ConnectedPoint),
PeerDisconnectedHack(PeerId, ConnectedPoint),
}