diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 55e295661..5f24734c3 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,5 +1,8 @@ use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings; -use crate::peer_manager::{score::PeerAction, ConnectionDirection, PeerManager, PeerManagerEvent}; +use crate::peer_manager::{ + score::{PeerAction, ReportSource}, + ConnectionDirection, PeerManager, PeerManagerEvent, +}; use crate::rpc::*; use crate::service::METADATA_FILENAME; use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery}; @@ -427,6 +430,25 @@ impl Behaviour { message_id: MessageId, validation_result: MessageAcceptance, ) { + if let Some(result) = match validation_result { + MessageAcceptance::Accept => None, + MessageAcceptance::Ignore => Some("ignore"), + MessageAcceptance::Reject => Some("reject"), + } { + if let Some(client) = self + .network_globals + .peers + .read() + .peer_info(propagation_source) + .map(|info| info.client.kind.as_static_ref()) + { + metrics::inc_counter_vec( + &metrics::GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT, + &[client, result], + ) + } + } + if let Err(e) = self.gossipsub.report_message_validation_result( &message_id, propagation_source, @@ -469,16 +491,16 @@ impl Behaviour { /* Peer management functions */ /// Report a peer's action. - pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { - self.peer_manager.report_peer(peer_id, action) + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) { + self.peer_manager.report_peer(peer_id, action, source) } /// Disconnects from a peer providing a reason. /// /// This will send a goodbye, disconnect and then ban the peer. /// This is fatal for a peer, and should be used in unrecoverable circumstances. - pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { - self.peer_manager.goodbye_peer(peer_id, reason); + pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { + self.peer_manager.goodbye_peer(peer_id, reason, source); } /// Returns an iterator over all enr entries in the DHT. diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index 6978e465e..06a66c99f 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -72,7 +72,8 @@ pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use metrics::scrape_discovery_metrics; pub use peer_manager::{ - client::Client, score::PeerAction, ConnectionDirection, PeerConnectionStatus, PeerDB, PeerInfo, - PeerSyncStatus, SyncInfo, + client::Client, + score::{PeerAction, ReportSource}, + ConnectionDirection, PeerConnectionStatus, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo, }; pub use service::{load_private_key, Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2_libp2p/src/metrics.rs b/beacon_node/eth2_libp2p/src/metrics.rs index bffe72922..d27cdd3fb 100644 --- a/beacon_node/eth2_libp2p/src/metrics.rs +++ b/beacon_node/eth2_libp2p/src/metrics.rs @@ -50,6 +50,23 @@ lazy_static! { "Failed gossip publishes", &["topic_hash"] ); + pub static ref TOTAL_RPC_ERRORS_PER_CLIENT: Result = try_create_int_counter_vec( + "libp2p_rpc_errors_per_client", + "RPC errors per client", + &["client", "rpc_error", "direction"] + ); + pub static ref PEER_ACTION_EVENTS_PER_CLIENT: Result = + try_create_int_counter_vec( + "libp2p_peer_actions_per_client", + "Score reports per client", + &["client", "action", "source"] + ); + pub static ref GOSSIP_UNACCEPTED_MESSAGES_PER_CLIENT: Result = + try_create_int_counter_vec( + "gossipsub_unaccepted_messages_per_client", + "Gossipsub messages that we did not accept, per client", + &["client", "validation_result"] + ); } pub fn scrape_discovery_metrics() { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 161865867..ae318e915 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -33,7 +33,7 @@ pub(crate) mod score; pub use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; -use score::{PeerAction, ScoreState}; +use score::{PeerAction, ReportSource, ScoreState}; use std::cmp::Ordering; use std::collections::HashMap; @@ -145,7 +145,7 @@ impl PeerManager { /// All instant disconnections are fatal and we ban the associated peer. /// /// This will send a goodbye and disconnect the peer if it is connected or dialing. - pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { + pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { // get the peer info if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score()); @@ -155,6 +155,14 @@ impl PeerManager { // Goodbye's are fatal info.apply_peer_action_to_score(PeerAction::Fatal); + metrics::inc_counter_vec( + &metrics::PEER_ACTION_EVENTS_PER_CLIENT, + &[ + info.client.kind.as_static_ref(), + PeerAction::Fatal.as_static_str(), + source.into(), + ], + ); } // Update the peerdb and peer state accordingly @@ -173,7 +181,7 @@ impl PeerManager { /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. - pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) { // Helper function to avoid any potential deadlocks. let mut to_ban_peers = Vec::with_capacity(1); let mut to_unban_peers = Vec::with_capacity(1); @@ -183,6 +191,15 @@ impl PeerManager { if let Some(info) = peer_db.peer_info_mut(peer_id) { let previous_state = info.score_state(); info.apply_peer_action_to_score(action); + metrics::inc_counter_vec( + &metrics::PEER_ACTION_EVENTS_PER_CLIENT, + &[ + info.client.kind.as_static_ref(), + action.as_static_str(), + source.into(), + ], + ); + Self::handle_score_transitions( previous_state, peer_id, @@ -352,7 +369,7 @@ impl PeerManager { } } - /// An error has occured in the RPC. + /// An error has occurred in the RPC. /// /// This adjusts a peer's score based on the error. pub fn handle_rpc_error( @@ -366,6 +383,14 @@ impl PeerManager { let score = self.network_globals.peers.read().score(peer_id); debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client, "peer_id" => %peer_id, "score" => %score, "direction" => ?direction); + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_ERRORS_PER_CLIENT, + &[ + client.kind.as_static_ref(), + err.as_static_str(), + direction.as_static_str(), + ], + ); // Map this error to a `PeerAction` (if any) let peer_action = match err { @@ -389,7 +414,14 @@ impl PeerManager { RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError, RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError, - RPCResponseErrorCode::RateLimited => PeerAction::LowToleranceError, + RPCResponseErrorCode::RateLimited => match protocol { + Protocol::Ping => PeerAction::MidToleranceError, + Protocol::BlocksByRange => PeerAction::MidToleranceError, + Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::Goodbye => PeerAction::LowToleranceError, + Protocol::MetaData => PeerAction::LowToleranceError, + Protocol::Status => PeerAction::LowToleranceError, + }, }, RPCError::SSZDecodeError(_) => PeerAction::Fatal, RPCError::UnsupportedProtocol => { @@ -422,17 +454,9 @@ impl PeerManager { }, }, RPCError::NegotiationTimeout => PeerAction::HighToleranceError, - RPCError::RateLimited => match protocol { - Protocol::Ping => PeerAction::MidToleranceError, - Protocol::BlocksByRange => PeerAction::HighToleranceError, - Protocol::BlocksByRoot => PeerAction::HighToleranceError, - Protocol::Goodbye => PeerAction::LowToleranceError, - Protocol::MetaData => PeerAction::LowToleranceError, - Protocol::Status => PeerAction::LowToleranceError, - }, }; - self.report_peer(peer_id, peer_action); + self.report_peer(peer_id, peer_action, ReportSource::RPC); } /// A ping request has been received. diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index bd9a6a5b0..a5af1fba6 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -318,6 +318,15 @@ pub enum ConnectionDirection { Outgoing, } +impl ConnectionDirection { + pub fn as_static_str(&self) -> &'static str { + match self { + ConnectionDirection::Incoming => "incoming", + ConnectionDirection::Outgoing => "outgoing", + } + } +} + /// Connection Status of the peer. #[derive(Debug, Clone)] pub enum PeerConnectionStatus { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/score.rs b/beacon_node/eth2_libp2p/src/peer_manager/score.rs index 5b6b0ac36..7505fb5c9 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/score.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/score.rs @@ -61,8 +61,26 @@ pub enum PeerAction { /// kicked. /// NOTE: ~50 occurrences will get the peer banned HighToleranceError, - /// Received an expected message. - _ValidMessage, +} + +/// Service reporting a `PeerAction` for a peer. +#[derive(Debug)] +pub enum ReportSource { + Gossipsub, + RPC, + Processor, + SyncService, +} + +impl From for &'static str { + fn from(report_source: ReportSource) -> &'static str { + match report_source { + ReportSource::Gossipsub => "gossipsub", + ReportSource::RPC => "rpc_error", + ReportSource::Processor => "processor", + ReportSource::SyncService => "sync", + } + } } impl std::fmt::Display for PeerAction { @@ -72,7 +90,17 @@ impl std::fmt::Display for PeerAction { PeerAction::LowToleranceError => write!(f, "Low Tolerance Error"), PeerAction::MidToleranceError => write!(f, "Mid Tolerance Error"), PeerAction::HighToleranceError => write!(f, "High Tolerance Error"), - PeerAction::_ValidMessage => write!(f, "Valid Message"), + } + } +} + +impl PeerAction { + pub fn as_static_str(&self) -> &'static str { + match self { + PeerAction::HighToleranceError => "high_tolerance", + PeerAction::MidToleranceError => "mid_tolerance", + PeerAction::LowToleranceError => "low_tolerance", + PeerAction::Fatal => "fatal", } } } @@ -155,7 +183,6 @@ impl RealScore { PeerAction::LowToleranceError => self.add(-10.0), PeerAction::MidToleranceError => self.add(-5.0), PeerAction::HighToleranceError => self.add(-1.0), - PeerAction::_ValidMessage => self.add(0.1), } } diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index efc5996e1..38d0391aa 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -49,6 +49,9 @@ type InboundProcessingOutput = ( u64, /* Chunks remaining to be sent after this processing finishes */ ); +/// Events the handler emits to the behaviour. +type HandlerEvent = Result, HandlerErr>; + /// An error encountered by the handler. pub enum HandlerErr { /// An error occurred for this peer's request. This can occur during protocol negotiation, @@ -82,11 +85,8 @@ where /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, ()>, - /// Errors occurring on outbound and inbound connections queued for reporting back. - pending_errors: Vec, - /// Queue of events to produce in `poll()`. - events_out: SmallVec<[RPCReceived; 4]>, + events_out: SmallVec<[HandlerEvent; 4]>, /// Queue of outbound substreams to open. dial_queue: SmallVec<[(RequestId, RPCRequest); 4]>, @@ -203,7 +203,6 @@ where ) -> Self { RPCHandler { listen_protocol, - pending_errors: Vec::new(), events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, @@ -220,22 +219,6 @@ where } } - /// Returns a reference to the listen protocol configuration. - /// - /// > **Note**: If you modify the protocol, modifications will only applies to future inbound - /// > substreams, not the ones already being negotiated. - pub fn listen_protocol_ref(&self) -> &SubstreamProtocol, ()> { - &self.listen_protocol - } - - /// Returns a mutable reference to the listen protocol configuration. - /// - /// > **Note**: If you modify the protocol, modifications will only apply to future inbound - /// > substreams, not the ones already being negotiated. - pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol, ()> { - &mut self.listen_protocol - } - /// Initiates the handler's shutdown process, sending an optional last message to the peer. pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest)>) { if matches!(self.state, HandlerState::Active) { @@ -244,11 +227,11 @@ where } // we now drive to completion communications already dialed/established while let Some((id, req)) = self.dial_queue.pop() { - self.pending_errors.push(HandlerErr::Outbound { - id, - proto: req.protocol(), + self.events_out.push(Err(HandlerErr::Outbound { error: RPCError::HandlerRejected, - }) + proto: req.protocol(), + id, + })); } // Queue our final message, if any @@ -268,13 +251,11 @@ where HandlerState::Active => { self.dial_queue.push((id, req)); } - _ => { - self.pending_errors.push(HandlerErr::Outbound { - id, - proto: req.protocol(), - error: RPCError::HandlerRejected, - }); - } + _ => self.events_out.push(Err(HandlerErr::Outbound { + error: RPCError::HandlerRejected, + proto: req.protocol(), + id, + })), } } @@ -296,12 +277,11 @@ where // If the response we are sending is an error, report back for handling if let RPCCodedResponse::Error(ref code, ref reason) = response { - let err = HandlerErr::Inbound { - id: inbound_id, - proto: inbound_info.protocol, + self.events_out.push(Err(HandlerErr::Inbound { error: RPCError::ErrorResponse(*code, reason.to_string()), - }; - self.pending_errors.push(err); + proto: inbound_info.protocol, + id: inbound_id, + })); } if matches!(self.state, HandlerState::Deactivated) { @@ -319,7 +299,7 @@ where TSpec: EthSpec, { type InEvent = RPCSend; - type OutEvent = Result, HandlerErr>; + type OutEvent = HandlerEvent; type Error = RPCError; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; @@ -363,8 +343,10 @@ where ); } - self.events_out - .push(RPCReceived::Request(self.current_inbound_substream_id, req)); + self.events_out.push(Ok(RPCReceived::Request( + self.current_inbound_substream_id, + req, + ))); self.current_inbound_substream_id.0 += 1; } @@ -379,12 +361,11 @@ where // accept outbound connections only if the handler is not deactivated if matches!(self.state, HandlerState::Deactivated) { - self.pending_errors.push(HandlerErr::Outbound { - id, - proto, + self.events_out.push(Err(HandlerErr::Outbound { error: RPCError::HandlerRejected, - }); - return; + proto, + id, + })); } // add the stream to substreams if we expect a response, otherwise drop the stream. @@ -474,11 +455,11 @@ where } }, }; - self.pending_errors.push(HandlerErr::Outbound { - id, - proto: req.protocol(), + self.events_out.push(Err(HandlerErr::Outbound { error, - }); + proto: req.protocol(), + id, + })); } fn connection_keep_alive(&self) -> KeepAlive { @@ -490,7 +471,6 @@ where self.dial_queue.is_empty() && self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() - && self.pending_errors.is_empty() && self.events_out.is_empty() && self.dial_negotiated == 0 } @@ -518,15 +498,9 @@ where Self::Error, >, > { - // report failures - if !self.pending_errors.is_empty() { - let err_info = self.pending_errors.remove(0); - return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(err_info))); - } - // return any events that need to be reported if !self.events_out.is_empty() { - return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(self.events_out.remove(0)))); + return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))); } else { self.events_out.shrink_to_fit(); } @@ -547,11 +521,11 @@ where if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) { // the delay has been removed info.delay_key = None; - self.pending_errors.push(HandlerErr::Inbound { - id: *inbound_id.get_ref(), - proto: info.protocol, + self.events_out.push(Err(HandlerErr::Inbound { error: RPCError::StreamTimeout, - }); + proto: info.protocol, + id: *inbound_id.get_ref(), + })); if info.pending_items.last().map(|l| l.close_after()) == Some(false) { // if the last chunk does not close the stream, append an error @@ -636,20 +610,20 @@ where self.inbound_substreams_delay.remove(delay_key); } if let Err(error) = res { - self.pending_errors.push(HandlerErr::Inbound { - id: *id, + self.events_out.push(Err(HandlerErr::Inbound { error, proto: info.protocol, - }); + id: *id, + })); } if info.pending_items.last().map(|l| l.close_after()) == Some(false) { // if the request was still active, report back to cancel it - self.pending_errors.push(HandlerErr::Inbound { - id: *id, - proto: info.protocol, + self.events_out.push(Err(HandlerErr::Inbound { error: RPCError::HandlerRejected, - }); + proto: info.protocol, + id: *id, + })); } } } @@ -662,11 +636,11 @@ where info.remaining_chunks = new_remaining_chunks; // report any error for error in errors { - self.pending_errors.push(HandlerErr::Inbound { - id: *id, + self.events_out.push(Err(HandlerErr::Inbound { error, proto: info.protocol, - }) + id: *id, + })) } if remove { substreams_to_remove.push(*id); @@ -740,11 +714,11 @@ where } if deactivated => { // the handler is deactivated. Close the stream entry.get_mut().state = OutboundSubstreamState::Closing(substream); - self.pending_errors.push(HandlerErr::Outbound { - id: entry.get().req_id, - proto: entry.get().proto, + self.events_out.push(Err(HandlerErr::Outbound { error: RPCError::HandlerRejected, - }) + proto: entry.get().proto, + id: entry.get().req_id, + })) } OutboundSubstreamState::RequestPendingResponse { mut substream, diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 00e92f3fc..3c8604387 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -285,6 +285,7 @@ impl RPCCodedResponse { let code = match response_code { 1 => RPCResponseErrorCode::InvalidRequest, 2 => RPCResponseErrorCode::ServerError, + 139 => RPCResponseErrorCode::RateLimited, _ => RPCResponseErrorCode::Unknown, }; RPCCodedResponse::Error(code, err) @@ -318,7 +319,7 @@ impl RPCResponseErrorCode { RPCResponseErrorCode::InvalidRequest => 1, RPCResponseErrorCode::ServerError => 2, RPCResponseErrorCode::Unknown => 255, - RPCResponseErrorCode::RateLimited => 128, + RPCResponseErrorCode::RateLimited => 139, } } } diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index 00d892815..d491a082c 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -105,7 +105,7 @@ impl RPC { let log = log.new(o!("service" => "libp2p_rpc")); let limiter = RPCRateLimiterBuilder::new() .n_every(Protocol::MetaData, 2, Duration::from_secs(5)) - .one_every(Protocol::Ping, Duration::from_secs(5)) + .n_every(Protocol::Ping, 2, Duration::from_secs(10)) .n_every(Protocol::Status, 5, Duration::from_secs(15)) .one_every(Protocol::Goodbye, Duration::from_secs(10)) .n_every( @@ -261,7 +261,7 @@ where (conn_id, *id), RPCCodedResponse::Error( RPCResponseErrorCode::RateLimited, - format!("Rate limited: wait {:?}", wait_time).into(), + format!("Wait {:?}", wait_time).into(), ), ); } diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 329d76ceb..4af31f420 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -493,8 +493,6 @@ pub enum RPCError { NegotiationTimeout, /// Handler rejected this request. HandlerRejected, - /// The request exceeds the rate limit. - RateLimited, } impl From for RPCError { @@ -533,7 +531,6 @@ impl std::fmt::Display for RPCError { RPCError::InternalError(ref err) => write!(f, "Internal error: {}", err), RPCError::NegotiationTimeout => write!(f, "Negotiation timeout"), RPCError::HandlerRejected => write!(f, "Handler rejected the request"), - RPCError::RateLimited => write!(f, "Request exceeds the rate limit"), } } } @@ -552,7 +549,6 @@ impl std::error::Error for RPCError { RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, RPCError::HandlerRejected => None, - RPCError::RateLimited => None, } } } @@ -569,3 +565,27 @@ impl std::fmt::Display for RPCRequest { } } } + +impl RPCError { + /// Get a `str` representation of the error. + /// Used for metrics. + pub fn as_static_str(&self) -> &'static str { + match self { + RPCError::SSZDecodeError { .. } => "decode_error", + RPCError::IoError { .. } => "io_error", + RPCError::ErrorResponse(ref code, ..) => match code { + RPCResponseErrorCode::RateLimited => "rate_limited", + RPCResponseErrorCode::InvalidRequest => "invalid_request", + RPCResponseErrorCode::ServerError => "server_error", + RPCResponseErrorCode::Unknown => "unknown_response_code", + }, + RPCError::StreamTimeout => "stream_timeout", + RPCError::UnsupportedProtocol => "unsupported_protocol", + RPCError::IncompleteStream => "incomplete_stream", + RPCError::InvalidData => "invalid_data", + RPCError::InternalError { .. } => "internal_error", + RPCError::NegotiationTimeout => "negotiation_timeout", + RPCError::HandlerRejected => "handler_rejected", + } + } +} diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index 0ae21f229..f67cf821d 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -6,7 +6,7 @@ use crate::multiaddr::Protocol; use crate::rpc::{GoodbyeReason, MetaData, RPCResponseErrorCode, RequestId}; use crate::types::{error, EnrBitfield, GossipKind}; use crate::EnrExt; -use crate::{NetworkConfig, NetworkGlobals, PeerAction}; +use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource}; use futures::prelude::*; use libp2p::core::{ identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed, @@ -251,13 +251,13 @@ impl Service { } /// Report a peer's action. - pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { - self.swarm.report_peer(peer_id, action); + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource) { + self.swarm.report_peer(peer_id, action, source); } /// Disconnect and ban a peer, providing a reason. - pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) { - self.swarm.goodbye_peer(peer_id, reason); + pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { + self.swarm.goodbye_peer(peer_id, reason, source); } /// Sends a response to a peer's request. diff --git a/beacon_node/eth2_libp2p/tests/rpc_tests.rs b/beacon_node/eth2_libp2p/tests/rpc_tests.rs index 5a5e0b28f..29abd14fe 100644 --- a/beacon_node/eth2_libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2_libp2p/tests/rpc_tests.rs @@ -1,6 +1,6 @@ #![cfg(test)] use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::{BehaviourEvent, Libp2pEvent, Request, Response}; +use eth2_libp2p::{BehaviourEvent, Libp2pEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz_types::VariableList; use std::sync::Arc; @@ -759,9 +759,11 @@ fn test_goodbye_rpc() { Libp2pEvent::Behaviour(BehaviourEvent::PeerDialed(peer_id)) => { // Send a goodbye and disconnect debug!(log, "Sending RPC"); - sender - .swarm - .goodbye_peer(&peer_id, GoodbyeReason::IrrelevantNetwork); + sender.swarm.goodbye_peer( + &peer_id, + GoodbyeReason::IrrelevantNetwork, + ReportSource::SyncService, + ); } Libp2pEvent::Behaviour(BehaviourEvent::PeerDisconnected(_)) => { return; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 2594791af..a1a778bd7 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -4,7 +4,7 @@ use beacon_chain::{ attestation_verification::Error as AttnError, observed_operations::ObservationOutcome, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, }; -use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId}; +use eth2_libp2p::{MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use slog::{debug, error, info, trace, warn}; use ssz::Encode; use types::{ @@ -18,8 +18,12 @@ impl Worker { /* Auxiliary functions */ /// Penalizes a peer for misbehaviour. - fn penalize_peer(&self, peer_id: PeerId, action: PeerAction) { - self.send_network_message(NetworkMessage::ReportPeer { peer_id, action }) + fn gossip_penalize_peer(&self, peer_id: PeerId, action: PeerAction) { + self.send_network_message(NetworkMessage::ReportPeer { + peer_id, + action, + source: ReportSource::Gossipsub, + }) } /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on @@ -235,7 +239,7 @@ impl Worker { debug!(self.log, "Could not verify block for gossip, ignoring the block"; "error" => %e); // Prevent recurring behaviour by penalizing the peer slightly. - self.penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; } @@ -259,7 +263,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id, PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); return; } }; @@ -312,7 +316,7 @@ impl Worker { "block root" => %block.canonical_root(), "block slot" => block.slot() ); - self.penalize_peer(peer_id, PeerAction::MidToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::MidToleranceError); trace!( self.log, "Invalid gossip beacon block ssz"; @@ -362,7 +366,7 @@ impl Worker { MessageAcceptance::Ignore, ); // We still penalize a peer slightly to prevent overuse of invalids. - self.penalize_peer(peer_id, PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); return; } }; @@ -419,7 +423,7 @@ impl Worker { ); // Penalize peer slightly for invalids. - self.penalize_peer(peer_id, PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); return; } }; @@ -468,7 +472,7 @@ impl Worker { MessageAcceptance::Ignore, ); // Penalize peer slightly for invalids. - self.penalize_peer(peer_id, PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); return; } }; @@ -518,7 +522,7 @@ impl Worker { // Peers that are slow or not to spec can spam us with these messages draining our // bandwidth. We therefore penalize these peers when they do this. - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); // Do not propagate these messages. self.propagate_validation_result( @@ -538,7 +542,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::EmptyAggregationBitfield => { /* @@ -553,7 +557,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::AggregatorPubkeyUnknown(_) => { /* @@ -574,7 +578,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::AggregatorNotInCommittee { .. } => { /* @@ -595,7 +599,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::AttestationAlreadyKnown { .. } => { /* @@ -630,7 +634,7 @@ impl Worker { ); // We still penalize the peer slightly. We don't want this to be a recurring // behaviour. - self.penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); @@ -651,7 +655,7 @@ impl Worker { ); // We still penalize the peer slightly. We don't want this to be a recurring // behaviour. - self.penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); @@ -669,7 +673,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::UnknownHeadBlock { beacon_block_root } => { // Note: its a little bit unclear as to whether or not this block is unknown or @@ -699,7 +703,7 @@ impl Worker { }); // We still penalize the peer slightly. We don't want this to be a recurring // behaviour. - self.penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -726,7 +730,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::BadTargetEpoch => { /* @@ -740,7 +744,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::NoCommitteeForSlotAndIndex { .. } => { /* @@ -753,7 +757,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::NotExactlyOneAggregationBitSet(_) => { /* @@ -766,7 +770,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::AttestsToFutureBlock { .. } => { /* @@ -779,7 +783,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::InvalidSubnetId { received, expected } => { @@ -797,7 +801,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::Invalid(_) => { /* @@ -810,7 +814,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::InvalidTargetEpoch { .. } => { /* @@ -823,7 +827,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::InvalidTargetRoot { .. } => { /* @@ -836,7 +840,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); } AttnError::TooManySkippedSlots { head_block_slot, @@ -860,7 +864,7 @@ impl Worker { peer_id.clone(), MessageAcceptance::Reject, ); - self.penalize_peer(peer_id.clone(), PeerAction::MidToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::MidToleranceError); } AttnError::BeaconChainError(e) => { /* @@ -882,7 +886,7 @@ impl Worker { MessageAcceptance::Ignore, ); // Penalize the peer slightly - self.penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 42796ee2a..f0c8e85f7 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -5,7 +5,7 @@ use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::StatusMessage; use eth2_libp2p::rpc::*; -use eth2_libp2p::{PeerId, PeerRequestId, Response, SyncInfo}; +use eth2_libp2p::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use itertools::process_results; use slog::{debug, error, warn}; use slot_clock::SlotClock; @@ -18,7 +18,11 @@ impl Worker { /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. pub fn goodbye_peer(&self, peer_id: PeerId, reason: GoodbyeReason) { - self.send_network_message(NetworkMessage::GoodbyePeer { peer_id, reason }); + self.send_network_message(NetworkMessage::GoodbyePeer { + peer_id, + reason, + source: ReportSource::Processor, + }); } pub fn send_response( diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 0b28482c6..3155aeb08 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -5,9 +5,7 @@ use crate::service::NetworkMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::rpc::*; -use eth2_libp2p::{ - MessageId, NetworkGlobals, PeerAction, PeerId, PeerRequestId, Request, Response, -}; +use eth2_libp2p::{MessageId, NetworkGlobals, PeerId, PeerRequestId, Request, Response}; use slog::{debug, error, o, trace, warn}; use std::cmp; use std::sync::Arc; @@ -361,16 +359,6 @@ impl HandlerNetworkContext { ) } - /// Disconnects and ban's a peer, sending a Goodbye request with the associated reason. - pub fn _goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { - self.inform_network(NetworkMessage::GoodbyePeer { peer_id, reason }); - } - - /// Reports a peer's action, adjusting the peer's score. - pub fn _report_peer(&mut self, peer_id: PeerId, action: PeerAction) { - self.inform_network(NetworkMessage::ReportPeer { peer_id, action }); - } - /// Sends a request to the network task. pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { self.inform_network(NetworkMessage::SendRequest { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index d47204f71..3f725a229 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -8,7 +8,7 @@ use crate::{error, metrics}; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2_libp2p::{ rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId}, - Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, Request, Response, + Libp2pEvent, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, }; use eth2_libp2p::{types::GossipKind, BehaviourEvent, MessageId, NetworkGlobals, PeerId}; use eth2_libp2p::{MessageAcceptance, Service as LibP2PService}; @@ -75,11 +75,16 @@ pub enum NetworkMessage { udp_socket: Option, }, /// Reports a peer to the peer manager for performing an action. - ReportPeer { peer_id: PeerId, action: PeerAction }, + ReportPeer { + peer_id: PeerId, + action: PeerAction, + source: ReportSource, + }, /// Disconnect an ban a peer, providing a reason. GoodbyePeer { peer_id: PeerId, reason: GoodbyeReason, + source: ReportSource, }, } @@ -381,8 +386,8 @@ fn spawn_service( metrics::expose_publish_metrics(&messages); service.libp2p.swarm.publish(messages); } - NetworkMessage::ReportPeer { peer_id, action } => service.libp2p.report_peer(&peer_id, action), - NetworkMessage::GoodbyePeer { peer_id, reason } => service.libp2p.goodbye_peer(&peer_id, reason), + NetworkMessage::ReportPeer { peer_id, action, source } => service.libp2p.report_peer(&peer_id, action, source), + NetworkMessage::GoodbyePeer { peer_id, reason, source } => service.libp2p.goodbye_peer(&peer_id, reason, source), NetworkMessage::Subscribe { subscriptions } => { if let Err(e) = service .attestation_service diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0c59d077f..35a8046c3 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -7,7 +7,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; -use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, Request}; +use eth2_libp2p::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use fnv::FnvHashMap; use slog::{debug, trace, warn}; use std::sync::Arc; @@ -132,7 +132,11 @@ impl SyncNetworkContext { pub fn goodbye_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.network_send - .send(NetworkMessage::GoodbyePeer { peer_id, reason }) + .send(NetworkMessage::GoodbyePeer { + peer_id, + reason, + source: ReportSource::SyncService, + }) .unwrap_or_else(|_| { warn!(self.log, "Could not report peer, channel failed"); }); @@ -141,7 +145,11 @@ impl SyncNetworkContext { pub fn report_peer(&mut self, peer_id: PeerId, action: PeerAction) { debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action); self.network_send - .send(NetworkMessage::ReportPeer { peer_id, action }) + .send(NetworkMessage::ReportPeer { + peer_id, + action, + source: ReportSource::SyncService, + }) .unwrap_or_else(|e| { warn!(self.log, "Could not report peer, channel failed"; "error"=> %e); });