Add trusted peers (#1640)

## Issue Addressed

Closes #1581 

## Proposed Changes

Adds a new cli option for trusted peers who always have the maximum possible score.
This commit is contained in:
Pawan Dhananjay 2020-09-22 01:12:36 +00:00
parent 5d17eb899f
commit 14ff38539c
11 changed files with 190 additions and 39 deletions

View File

@ -1,5 +1,5 @@
use crate::types::GossipKind;
use crate::Enr;
use crate::{Enr, PeerIdSerialized};
use discv5::{Discv5Config, Discv5ConfigBuilder};
use libp2p::gossipsub::{
GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, ValidationMode,
@ -58,6 +58,9 @@ pub struct Config {
/// List of libp2p nodes to initially connect to.
pub libp2p_nodes: Vec<Multiaddr>,
/// List of trusted libp2p nodes which are not scored.
pub trusted_peers: Vec<PeerIdSerialized>,
/// Client version
pub client_version: String,
@ -139,6 +142,7 @@ impl Default for Config {
boot_nodes_enr: vec![],
boot_nodes_multiaddr: vec![],
libp2p_nodes: vec![],
trusted_peers: vec![],
client_version: lighthouse_version::version_with_platform(),
disable_discovery: false,
topics,

View File

@ -14,6 +14,50 @@ pub mod rpc;
mod service;
pub mod types;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::str::FromStr;
/// Wrapper over a libp2p `PeerId` which implements `Serialize` and `Deserialize`
#[derive(Clone, Debug)]
pub struct PeerIdSerialized(libp2p::PeerId);
impl From<PeerIdSerialized> for PeerId {
fn from(peer_id: PeerIdSerialized) -> Self {
peer_id.0
}
}
impl FromStr for PeerIdSerialized {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
PeerId::from_str(s).map_err(|e| format!("Invalid peer id: {}", e))?,
))
}
}
impl Serialize for PeerIdSerialized {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.to_string())
}
}
impl<'de> Deserialize<'de> for PeerIdSerialized {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
Ok(Self(PeerId::from_str(&s).map_err(|e| {
de::Error::custom(format!("Failed to deserialise peer id: {:?}", e))
})?))
}
}
pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;

View File

@ -133,9 +133,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) {
// get the peer info
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score.to_string());
debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score().to_string());
// Goodbye's are fatal
info.score.apply_peer_action(PeerAction::Fatal);
info.apply_peer_action_to_score(PeerAction::Fatal);
if info.connection_status.is_connected_or_dialing() {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason));
@ -155,12 +155,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let mut unban_peer = None;
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let previous_state = info.score.state();
info.score.apply_peer_action(action);
if previous_state != info.score.state() {
match info.score.state() {
let previous_state = info.score_state();
info.apply_peer_action_to_score(action);
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
ban_peer = Some(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
@ -170,7 +170,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
ScoreState::Disconnected => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
unban_peer = Some(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
@ -182,13 +182,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// TODO: Update the peer manager to inform that the peer is disconnecting.
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
unban_peer = Some(peer_id.clone());
}
}
} else {
debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
}
}
@ -689,9 +689,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let mut to_unban_peers = Vec::new();
for (peer_id, info) in pdb.peers_mut() {
let previous_state = info.score.state();
let previous_state = info.score_state();
// Update scores
info.score.update();
info.score_update();
/* TODO: Implement logic about connection lifetimes
match info.connection_status {
@ -746,10 +746,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
*/
// handle score transitions
if previous_state != info.score.state() {
match info.score.state() {
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
to_ban_peers.push(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
@ -759,7 +759,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
ScoreState::Disconnected => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
to_unban_peers.push(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
@ -771,7 +771,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// TODO: Update peer manager to report that it's disconnecting.
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
to_unban_peers.push(peer_id.clone());
}
@ -821,7 +821,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.take(connected_peer_count - self.target_peers)
//we only need to disconnect peers with healthy scores, since the others got already
//disconnected in update_peer_scores
.filter(|(_, info)| info.score.state() == ScoreState::Healthy)
.filter(|(_, info)| info.score_state() == ScoreState::Healthy)
{
self.events.push(PeerManagerEvent::DisconnectPeer(
(*peer_id).clone(),

View File

@ -1,5 +1,5 @@
use super::client::Client;
use super::score::Score;
use super::score::{PeerAction, Score, ScoreState};
use super::PeerSyncStatus;
use crate::rpc::MetaData;
use crate::Multiaddr;
@ -19,7 +19,7 @@ pub struct PeerInfo<T: EthSpec> {
/// The connection status of the peer
_status: PeerStatus,
/// The peers reputation
pub score: Score,
score: Score,
/// Client managing this peer
pub client: Client,
/// Connection status of this peer
@ -36,6 +36,8 @@ pub struct PeerInfo<T: EthSpec> {
/// necessary.
#[serde(skip)]
pub min_ttl: Option<Instant>,
/// Is the peer a trusted peer.
pub is_trusted: bool,
}
impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
@ -49,11 +51,21 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
sync_status: PeerSyncStatus::Unknown,
meta_data: None,
min_ttl: None,
is_trusted: false,
}
}
}
impl<T: EthSpec> PeerInfo<T> {
/// Return a PeerInfo struct for a trusted peer.
pub fn trusted_peer_info() -> Self {
PeerInfo {
score: Score::max_score(),
is_trusted: true,
..Default::default()
}
}
/// Returns if the peer is subscribed to a given `SubnetId`
pub fn on_subnet(&self, subnet_id: SubnetId) -> bool {
if let Some(meta_data) = &self.meta_data {
@ -69,6 +81,38 @@ impl<T: EthSpec> PeerInfo<T> {
pub fn has_future_duty(&self) -> bool {
self.min_ttl.map_or(false, |i| i >= Instant::now())
}
/// Returns score of the peer.
pub fn score(&self) -> Score {
self.score
}
/// Returns the state of the peer based on the score.
pub(crate) fn score_state(&self) -> ScoreState {
self.score.state()
}
/// Applies decay rates to a non-trusted peer's score.
pub fn score_update(&mut self) {
if !self.is_trusted {
self.score.update()
}
}
/// Apply peer action to a non-trusted peer's score.
pub fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) {
if !self.is_trusted {
self.score.apply_peer_action(peer_action)
}
}
#[cfg(test)]
/// Add an f64 to a non-trusted peer's score abiding by the limits.
pub fn add_to_score(&mut self, score: f64) {
if !self.is_trusted {
self.score.add(score)
}
}
}
#[derive(Clone, Debug, Serialize)]

View File

@ -86,12 +86,17 @@ impl BannedPeersCount {
}
impl<TSpec: EthSpec> PeerDB<TSpec> {
pub fn new(log: &slog::Logger) -> Self {
pub fn new(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> Self {
// Initialize the peers hashmap with trusted peers
let peers = trusted_peers
.into_iter()
.map(|peer_id| (peer_id, PeerInfo::trusted_peer_info()))
.collect();
Self {
log: log.clone(),
disconnected_peers: 0,
banned_peers_count: BannedPeersCount::new(),
peers: HashMap::new(),
peers,
}
}
@ -101,7 +106,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
pub fn score(&self, peer_id: &PeerId) -> Score {
self.peers
.get(peer_id)
.map_or(Score::default(), |info| info.score)
.map_or(Score::default(), |info| info.score())
}
/// Returns an iterator over all peers in the db.
@ -159,7 +164,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
/// Returns true if the Peer is banned.
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
if let Some(peer) = self.peers.get(peer_id) {
match peer.score.state() {
match peer.score().state() {
ScoreState::Banned => true,
_ => self.ip_is_banned(peer),
}
@ -181,7 +186,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
/// Returns true if the Peer is either banned or in the disconnected state.
pub fn is_banned_or_disconnected(&self, peer_id: &PeerId) -> bool {
if let Some(peer) = self.peers.get(peer_id) {
match peer.score.state() {
match peer.score().state() {
ScoreState::Banned | ScoreState::Disconnected => true,
_ => self.ip_is_banned(peer),
}
@ -264,7 +269,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.collect::<Vec<_>>();
connected.shuffle(&mut rand::thread_rng());
connected.sort_by_key(|(_, info)| info.score);
connected.sort_by_key(|(_, info)| info.score());
connected
}
@ -279,7 +284,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.iter()
.filter(|(_, info)| is_status(&info.connection_status))
.collect::<Vec<_>>();
by_status.sort_by_key(|(_, info)| info.score);
by_status.sort_by_key(|(_, info)| info.score());
by_status.into_iter().rev().collect()
}
@ -291,7 +296,7 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
self.peers
.iter()
.filter(|(_, info)| is_status(&info.connection_status))
.max_by_key(|(_, info)| info.score)
.max_by_key(|(_, info)| info.score())
.map(|(id, _)| id)
}
@ -455,8 +460,8 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.filter(|(_, info)| info.connection_status.is_banned())
.min_by(|(_, info_a), (_, info_b)| {
info_a
.score
.partial_cmp(&info_b.score)
.score()
.partial_cmp(&info_b.score())
.unwrap_or(std::cmp::Ordering::Equal)
}) {
self.banned_peers_count
@ -485,8 +490,8 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {
.filter(|(_, info)| info.connection_status.is_disconnected())
.min_by(|(_, info_a), (_, info_b)| {
info_a
.score
.partial_cmp(&info_b.score)
.score()
.partial_cmp(&info_b.score())
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(id, _)| id.clone())
@ -543,13 +548,13 @@ mod tests {
fn add_score<TSpec: EthSpec>(db: &mut PeerDB<TSpec>, peer_id: &PeerId, score: f64) {
if let Some(info) = db.peer_info_mut(peer_id) {
info.score.add(score);
info.add_to_score(score);
}
}
fn get_db() -> PeerDB<M> {
let log = build_log(slog::Level::Debug, false);
PeerDB::new(&log)
PeerDB::new(vec![], &log)
}
#[test]
@ -938,4 +943,28 @@ mod tests {
assert!(pdb.is_banned(&p1));
assert!(!pdb.is_banned(&p2));
}
#[test]
fn test_trusted_peers_score() {
let trusted_peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer.clone()], &log);
pdb.connect_ingoing(&trusted_peer);
// Check trusted status and score
assert!(pdb.peer_info(&trusted_peer).unwrap().is_trusted);
assert_eq!(
pdb.peer_info(&trusted_peer).unwrap().score().score(),
Score::max_score().score()
);
// Adding/Subtracting score should have no effect on a trusted peer
add_score(&mut pdb, &trusted_peer, -50.0);
assert_eq!(
pdb.peer_info(&trusted_peer).unwrap().score().score(),
Score::max_score().score()
);
}
}

View File

@ -145,6 +145,13 @@ impl std::fmt::Display for ScoreState {
}
impl Score {
/// Return max possible score.
pub fn max_score() -> Self {
Score {
score: MAX_SCORE,
last_updated: Instant::now(),
}
}
/// Access to the underlying score.
pub fn score(&self) -> f64 {
self.score

View File

@ -84,6 +84,11 @@ impl<TSpec: EthSpec> Service<TSpec> {
config.libp2p_port,
config.discovery_port,
meta_data,
config
.trusted_peers
.iter()
.map(|x| PeerId::from(x.clone()))
.collect(),
&log,
));

View File

@ -37,6 +37,7 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
tcp_port: u16,
udp_port: u16,
local_metadata: MetaData<TSpec>,
trusted_peers: Vec<PeerId>,
log: &slog::Logger,
) -> Self {
NetworkGlobals {
@ -45,8 +46,8 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
listen_multiaddrs: RwLock::new(Vec::new()),
listen_port_tcp: AtomicU16::new(tcp_port),
listen_port_udp: AtomicU16::new(udp_port),
peers: RwLock::new(PeerDB::new(log)),
local_metadata: RwLock::new(local_metadata),
peers: RwLock::new(PeerDB::new(trusted_peers, log)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
}

View File

@ -111,7 +111,7 @@ mod tests {
};
let network_globals: NetworkGlobals<MinimalEthSpec> =
NetworkGlobals::new(enr, 0, 0, meta_data, &log);
NetworkGlobals::new(enr, 0, 0, meta_data, vec![], &log);
AttestationService::new(beacon_chain, Arc::new(network_globals), &log)
}

View File

@ -131,7 +131,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.help("Disables the discv5 discovery protocol. The node will not search for new peers or participate in the discovery protocol.")
.takes_value(false),
)
.arg(
Arg::with_name("trusted-peers")
.long("trusted-peers")
.value_name("TRUSTED_PEERS")
.help("One or more comma-delimited trusted peer ids which always have the highest score according to the peer scoring system.")
.takes_value(true),
)
/* REST API related arguments */
.arg(
Arg::with_name("http")

View File

@ -2,7 +2,7 @@ use beacon_chain::builder::PUBKEY_CACHE_FILENAME;
use clap::ArgMatches;
use clap_utils::BAD_TESTNET_DIR_MESSAGE;
use client::{config::DEFAULT_DATADIR, ClientConfig, ClientGenesis};
use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr, NetworkConfig};
use eth2_libp2p::{multiaddr::Protocol, Enr, Multiaddr, NetworkConfig, PeerIdSerialized};
use eth2_testnet_config::Eth2TestnetConfig;
use slog::{crit, info, Logger};
use ssz::Encode;
@ -343,6 +343,17 @@ pub fn set_network_config(
.collect::<Result<Vec<Multiaddr>, _>>()?;
}
if let Some(trusted_peers_str) = cli_args.value_of("trusted-peers") {
config.trusted_peers = trusted_peers_str
.split(',')
.map(|peer_id| {
peer_id
.parse()
.map_err(|_| format!("Invalid trusted peer id: {}", peer_id))
})
.collect::<Result<Vec<PeerIdSerialized>, _>>()?;
}
if let Some(enr_udp_port_str) = cli_args.value_of("enr-udp-port") {
config.enr_udp_port = Some(
enr_udp_port_str