diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index bcb591496..fde3955c1 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -1,5 +1,5 @@ use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings; -use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent}; +use crate::peer_manager::{score::PeerAction, ConnectionDirection, PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::service::METADATA_FILENAME; use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery}; @@ -70,8 +70,6 @@ pub enum BehaviourEvent { id: RequestId, /// The peer to which this request was sent. peer_id: PeerId, - /// The error that occurred. - error: RPCError, }, RequestReceived { /// The peer that sent the request. @@ -692,14 +690,24 @@ impl Behaviour { // Inform the peer manager of the error. // An inbound error here means we sent an error to the peer, or the stream // timed out. - self.peer_manager.handle_rpc_error(&peer_id, proto, &error); + self.peer_manager.handle_rpc_error( + &peer_id, + proto, + &error, + ConnectionDirection::Incoming, + ); } HandlerErr::Outbound { id, proto, error } => { // Inform the peer manager that a request we sent to the peer failed - self.peer_manager.handle_rpc_error(&peer_id, proto, &error); + self.peer_manager.handle_rpc_error( + &peer_id, + proto, + &error, + ConnectionDirection::Outgoing, + ); // inform failures of requests comming outside the behaviour if !matches!(id, RequestId::Behaviour) { - self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error }); + self.add_event(BehaviourEvent::RPCFailed { peer_id, id }); } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 3377c3d7b..11dc37315 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -354,10 +354,17 @@ impl PeerManager { /// An error has occured in the RPC. /// /// This adjusts a peer's score based on the error. - pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { + pub fn handle_rpc_error( + &mut self, + peer_id: &PeerId, + protocol: Protocol, + err: &RPCError, + direction: ConnectionDirection, + ) { let client = self.network_globals.client(peer_id); let score = self.network_globals.peers.read().score(peer_id); - debug!(self.log, "RPC Error"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string(), "peer_id" => peer_id.to_string(), "score" => score.to_string()); + debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client, + "peer_id" => %peer_id, "score" => %score, "direction" => ?direction); // Map this error to a `PeerAction` (if any) let peer_action = match err { @@ -398,13 +405,20 @@ impl PeerManager { Protocol::Status => PeerAction::LowToleranceError, } } - RPCError::StreamTimeout => match protocol { - Protocol::Ping => PeerAction::LowToleranceError, - Protocol::BlocksByRange => PeerAction::MidToleranceError, - Protocol::BlocksByRoot => PeerAction::MidToleranceError, - Protocol::Goodbye => return, - Protocol::MetaData => return, - Protocol::Status => return, + RPCError::StreamTimeout => match direction { + ConnectionDirection::Incoming => { + // we timed out + warn!(self.log, "Timed out to a peer's request. Likely too many resources, reduce peer count"); + return; + } + ConnectionDirection::Outgoing => match protocol { + Protocol::Ping => PeerAction::LowToleranceError, + Protocol::BlocksByRange => PeerAction::MidToleranceError, + Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::Goodbye => return, + Protocol::MetaData => return, + Protocol::Status => return, + }, }, RPCError::NegotiationTimeout => PeerAction::HighToleranceError, RPCError::RateLimited => match protocol { diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index b41426d41..81641ff72 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -14,7 +14,7 @@ use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::swarm::NegotiatedSubstream; -use slog::{crit, debug, warn}; +use slog::{crit, debug, trace, warn}; use smallvec::SmallVec; use std::{ collections::hash_map::Entry, @@ -238,7 +238,9 @@ where /// Initiates the handler's shutdown process, sending an optional last message to the peer. pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest)>) { if matches!(self.state, HandlerState::Active) { - debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); + if !self.dial_queue.is_empty() { + debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); + } // we now drive to completion communications already dialed/established while let Some((id, req)) = self.dial_queue.pop() { self.pending_errors.push(HandlerErr::Outbound { @@ -283,8 +285,11 @@ where let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) { info } else { - warn!(self.log, "Inbound stream has expired, response not sent"; - "response" => response.to_string(), "id" => inbound_id, "msg" => "Likely too many resources, reduce peer count"); + if !matches!(response, RPCCodedResponse::StreamTermination(..)) { + // the stream is closed after sending the expected number of responses + trace!(self.log, "Inbound stream has expired, response not sent"; + "response" => %response, "id" => inbound_id); + } return; }; diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index 0fe4d1347..b2ce0cb8c 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -392,6 +392,22 @@ impl std::fmt::Display for BlocksByRangeRequest { } } +impl slog::KV for StatusMessage { + fn serialize( + &self, + record: &slog::Record, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + use slog::Value; + serializer.emit_str("fork_digest", &format!("{:?}", self.fork_digest))?; + Value::serialize(&self.finalized_epoch, record, "finalized_epoch", serializer)?; + serializer.emit_str("finalized_root", &self.finalized_root.to_string())?; + Value::serialize(&self.head_slot, record, "head_slot", serializer)?; + serializer.emit_str("head_root", &self.head_root.to_string())?; + slog::Result::Ok(()) + } +} + impl slog::Value for RequestId { fn serialize( &self, diff --git a/beacon_node/network/src/beacon_processor/chain_segment.rs b/beacon_node/network/src/beacon_processor/chain_segment.rs index e659a84b8..47e14f5e2 100644 --- a/beacon_node/network/src/beacon_processor/chain_segment.rs +++ b/beacon_node/network/src/beacon_processor/chain_segment.rs @@ -34,12 +34,12 @@ pub fn handle_chain_segment( let result = match process_blocks(chain, downloaded_blocks.iter(), &log) { (_, Ok(_)) => { - debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync"); BatchProcessResult::Success(sent_blocks > 0) } (imported_blocks, Err(e)) => { - debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, + debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id, "last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync"); BatchProcessResult::Failed(imported_blocks > 0) } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 4701bdb73..0e98f04dd 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -11,8 +11,8 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::{ - rpc::{RPCError, RequestId}, - MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response, + rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, + Response, }; use futures::prelude::*; use processor::Processor; @@ -26,8 +26,6 @@ use types::EthSpec; /// passing them to the internal message processor. The message processor spawns a syncing thread /// which manages which blocks need to be requested and processed. pub struct Router { - /// Access to the peer db for logging. - network_globals: Arc>, /// Processes validated and decoded messages from the network. Has direct access to the /// sync manager. processor: Processor, @@ -58,7 +56,6 @@ pub enum RouterMessage { RPCFailed { peer_id: PeerId, request_id: RequestId, - error: RPCError, }, /// A gossip message has been received. The fields are: message id, the peer that sent us this /// message, the message itself and a bool which indicates if the message should be processed @@ -86,14 +83,13 @@ impl Router { let processor = Processor::new( executor.clone(), beacon_chain, - network_globals.clone(), + network_globals, network_send, &log, ); // generate the Message handler let mut handler = Router { - network_globals, processor, log: message_handler_log, }; @@ -141,13 +137,7 @@ impl Router { RouterMessage::RPCFailed { peer_id, request_id, - error, } => { - debug!(self.log, "RPC Error"; - "peer_id" => peer_id.to_string(), - "request_id" => request_id, - "error" => error.to_string(), - "client" => self.network_globals.client(&peer_id).to_string()); self.processor.on_rpc_error(peer_id, request_id); } RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => { diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 109edba19..a0578a4b3 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -78,7 +78,7 @@ impl Processor { sync_send, network: HandlerNetworkContext::new(network_send, log.clone()), beacon_processor_send, - log: log.clone(), + log: log.new(o!("service" => "router")), } } @@ -114,16 +114,7 @@ impl Processor { /// re-status. pub fn send_status(&mut self, peer_id: PeerId) { if let Ok(status_message) = status_message(&self.chain) { - debug!( - self.log, - "Sending Status Request"; - "peer" => peer_id.to_string(), - "fork_digest" => format!("{:?}", status_message.fork_digest), - "finalized_root" => format!("{:?}", status_message.finalized_root), - "finalized_epoch" => format!("{:?}", status_message.finalized_epoch), - "head_root" => format!("{}", status_message.head_root), - "head_slot" => format!("{}", status_message.head_slot), - ); + debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message); self.network .send_processor_request(peer_id, Request::Status(status_message)); } @@ -138,16 +129,7 @@ impl Processor { request_id: PeerRequestId, status: StatusMessage, ) { - debug!( - self.log, - "Received Status Request"; - "peer" => peer_id.to_string(), - "fork_digest" => format!("{:?}", status.fork_digest), - "finalized_root" => format!("{:?}", status.finalized_root), - "finalized_epoch" => format!("{:?}", status.finalized_epoch), - "head_root" => format!("{}", status.head_root), - "head_slot" => format!("{}", status.head_slot), - ); + debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status); // ignore status responses if we are shutting down if let Ok(status_message) = status_message(&self.chain) { @@ -166,16 +148,7 @@ impl Processor { /// Process a `Status` response from a peer. pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) { - debug!( - self.log, - "Received Status Response"; - "peer_id" => peer_id.to_string(), - "fork_digest" => format!("{:?}", status.fork_digest), - "finalized_root" => format!("{:?}", status.finalized_root), - "finalized_epoch" => format!("{:?}", status.finalized_epoch), - "head_root" => format!("{}", status.head_root), - "head_slot" => format!("{}", status.head_slot), - ); + debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status); // Process the status message, without sending back another status. if let Err(e) = self.process_status(peer_id, status) { @@ -292,7 +265,7 @@ impl Processor { debug!( self.log, "Received BlocksByRange Request"; - "peer" => format!("{:?}", peer_id), + "peer_id" => %peer_id, "count" => req.count, "start_slot" => req.start_slot, "step" => req.step, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4973a1a7b..d0c9e4159 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -484,10 +484,10 @@ fn spawn_service( }); } - BehaviourEvent::RPCFailed{id, peer_id, error} => { + BehaviourEvent::RPCFailed{id, peer_id} => { let _ = service .router_send - .send(RouterMessage::RPCFailed{ peer_id, request_id: id, error }) + .send(RouterMessage::RPCFailed{ peer_id, request_id: id}) .map_err(|_| { debug!(service.log, "Failed to send RPC to router"); });