handle peer state transitions on gossipsub score changes + refactoring (#1892)

## Issue Addressed

NA

## Proposed Changes

Correctly handles peer state transitions on gossipsub changes + refactors handling of peer state transitions into one function used for lighthouse score changes and gossipsub score changes.


Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
blacktemplar 2020-11-13 03:15:03 +00:00
parent cb26c15eb6
commit c7ac967d5a
2 changed files with 130 additions and 108 deletions

View File

@ -163,55 +163,30 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// If the peer doesn't exist, log a warning and insert defaults.
pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) {
// Helper function to avoid any potential deadlocks.
let mut ban_peer = None;
let mut unban_peer = None;
let mut to_ban_peers = Vec::with_capacity(1);
let mut to_unban_peers = Vec::with_capacity(1);
{
let mut peer_db = self.network_globals.peers.write();
if let Some(info) = peer_db.peer_info_mut(peer_id) {
let previous_state = info.score_state();
info.apply_peer_action_to_score(action);
Self::handle_score_transitions(
previous_state,
peer_id,
info,
&mut to_ban_peers,
&mut to_unban_peers,
&mut self.events,
&self.log,
);
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
ban_peer = Some(peer_id);
}
ScoreState::Disconnected => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
if info.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
peer_db.notify_disconnecting(peer_id);
} else if info.is_banned() {
unban_peer = Some(peer_id);
}
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
if info.is_banned() {
unban_peer = Some(peer_id);
}
}
}
} else {
debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
}
}
} // end write lock
if let Some(peer_id) = ban_peer {
self.ban_peer(peer_id);
}
if let Some(peer_id) = unban_peer {
if let Err(e) = self.unban_peer(peer_id) {
error!(self.log, "{}", e; "peer_id" => %peer_id);
}
}
self.ban_and_unban_peers(to_ban_peers, to_unban_peers);
}
/* Discovery Requests */
@ -528,31 +503,56 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) {
//collect peers with scores
let mut guard = self.network_globals.peers.write();
let mut peers: Vec<_> = guard
.peers_mut()
.filter_map(|(peer_id, info)| gossipsub.peer_score(peer_id).map(|score| (info, score)))
.collect();
let mut to_ban_peers = Vec::new();
let mut to_unban_peers = Vec::new();
// sort descending by score
peers.sort_unstable_by(|(.., s1), (.., s2)| s2.partial_cmp(s1).unwrap_or(Ordering::Equal));
{
//collect peers with scores
let mut guard = self.network_globals.peers.write();
let mut peers: Vec<_> = guard
.peers_mut()
.filter_map(|(peer_id, info)| {
gossipsub
.peer_score(peer_id)
.map(|score| (peer_id, info, score))
})
.collect();
let mut to_ignore_negative_peers =
(self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize;
for (info, score) in peers {
info.update_gossipsub_score(
score,
if score < 0.0 && to_ignore_negative_peers > 0 {
to_ignore_negative_peers -= 1;
// We ignore the negative score for the best negative peers so that their
// gossipsub score can recover without getting disconnected.
true
} else {
false
},
);
}
// sort descending by score
peers.sort_unstable_by(|(.., s1), (.., s2)| {
s2.partial_cmp(s1).unwrap_or(Ordering::Equal)
});
let mut to_ignore_negative_peers =
(self.target_peers as f32 * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR).ceil() as usize;
for (peer_id, info, score) in peers {
let previous_state = info.score_state();
info.update_gossipsub_score(
score,
if score < 0.0 && to_ignore_negative_peers > 0 {
to_ignore_negative_peers -= 1;
// We ignore the negative score for the best negative peers so that their
// gossipsub score can recover without getting disconnected.
true
} else {
false
},
);
Self::handle_score_transitions(
previous_state,
peer_id,
info,
&mut to_ban_peers,
&mut to_unban_peers,
&mut self.events,
&self.log,
);
}
} // end write lock
self.ban_and_unban_peers(to_ban_peers, to_unban_peers);
}
/* Internal functions */
@ -693,6 +693,59 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
true
}
fn handle_score_transitions(
previous_state: ScoreState,
peer_id: &PeerId,
info: &mut PeerInfo<TSpec>,
to_ban_peers: &mut Vec<PeerId>,
to_unban_peers: &mut Vec<PeerId>,
events: &mut SmallVec<[PeerManagerEvent; 16]>,
log: &slog::Logger,
) {
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
to_ban_peers.push(peer_id.clone());
}
ScoreState::Disconnected => {
debug!(log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
if info.is_connected_or_dialing() {
// Change the state to inform that we are disconnecting the peer.
info.disconnecting(false);
events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
} else if info.is_banned() {
to_unban_peers.push(peer_id.clone());
}
}
ScoreState::Healthy => {
debug!(log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
if info.is_banned() {
to_unban_peers.push(peer_id.clone());
}
}
}
}
}
fn ban_and_unban_peers(&mut self, to_ban_peers: Vec<PeerId>, to_unban_peers: Vec<PeerId>) {
// process banning peers
for peer_id in to_ban_peers {
self.ban_peer(&peer_id);
}
// process unbanning peers
for peer_id in to_unban_peers {
if let Err(e) = self.unban_peer(&peer_id) {
error!(self.log, "{}", e; "peer_id" => %peer_id);
}
}
}
/// Updates the scores of known peers according to their connection
/// status and the time that has passed.
/// NOTE: This is experimental and will likely be adjusted
@ -707,47 +760,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// Update scores
info.score_update();
// handle score transitions
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
to_ban_peers.push(peer_id.clone());
}
ScoreState::Disconnected => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
if info.is_connected_or_dialing() {
// Change the state to inform that we are disconnecting the peer.
info.disconnecting(false);
self.events.push(PeerManagerEvent::DisconnectPeer(
peer_id.clone(),
GoodbyeReason::BadScore,
));
} else if info.is_banned() {
to_unban_peers.push(peer_id.clone());
}
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
if info.is_banned() {
to_unban_peers.push(peer_id.clone());
}
}
}
}
}
// process banning peers
for peer_id in to_ban_peers {
self.ban_peer(&peer_id);
}
// process unbanning peers
for peer_id in to_unban_peers {
if let Err(e) = self.unban_peer(&peer_id) {
error!(self.log, "{}", e; "peer_id" => %peer_id);
}
Self::handle_score_transitions(
previous_state,
peer_id,
info,
&mut to_ban_peers,
&mut to_unban_peers,
&mut self.events,
&self.log,
);
}
self.ban_and_unban_peers(to_ban_peers, to_unban_peers);
}
/// Bans a peer.

View File

@ -78,7 +78,7 @@ impl std::fmt::Display for PeerAction {
}
/// The expected state of the peer given the peer's score.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub(crate) enum ScoreState {
/// We are content with the peers performance. We permit connections and messages.
Healthy,
@ -267,7 +267,6 @@ macro_rules! apply {
}
apply!(apply_peer_action, peer_action: PeerAction);
apply!(add, delta: f64);
apply!(update);
apply!(update_gossipsub_score, new_score: f64, ignore: bool);
#[cfg(test)]
@ -336,25 +335,25 @@ mod tests {
// 0 change does not change de reputation
//
let change = 0.0;
score.add(change);
score.test_add(change);
assert_eq!(score.score(), DEFAULT_SCORE);
// underflowing change is capped
let mut score = Score::default();
let change = MIN_SCORE - 50.0;
score.add(change);
score.test_add(change);
assert_eq!(score.score(), MIN_SCORE);
// overflowing change is capped
let mut score = Score::default();
let change = MAX_SCORE + 50.0;
score.add(change);
score.test_add(change);
assert_eq!(score.score(), MAX_SCORE);
// Score adjusts
let mut score = Score::default();
let change = 1.32;
score.add(change);
score.test_add(change);
assert_eq!(score.score(), DEFAULT_SCORE + change);
}
@ -364,7 +363,7 @@ mod tests {
let now = Instant::now();
let change = MIN_SCORE_BEFORE_BAN;
score.add(change);
score.test_add(change);
assert_eq!(score.score(), MIN_SCORE_BEFORE_BAN);
score.update_at(now + BANNED_BEFORE_DECAY);
@ -381,7 +380,7 @@ mod tests {
assert!(!score.is_good_gossipsub_peer());
assert!(score.score() < 0.0);
assert_eq!(score.state(), ScoreState::Healthy);
score.add(-1.0001);
score.test_add(-1.0001);
assert_eq!(score.state(), ScoreState::Disconnected);
}