Update logs + do not downscore peers if WE time out (#1901)
## Issue Addressed - RPC Errors were being logged twice: first in the peer manager and then again in the router, so leave just the peer manager's one - The "reduce peer count" warn message gets thrown to the user for every missed chunk, so instead print it when the request times out and also do not include there info that is not relevant to the user - The processor didn't have the service tag so add it - Impl `KV` for status message - Do not downscore peers if we are the ones that timed out Other small improvements
This commit is contained in:
parent
6a7d221f72
commit
eb56140582
@ -1,5 +1,5 @@
|
||||
use crate::behaviour::gossipsub_scoring_parameters::PeerScoreSettings;
|
||||
use crate::peer_manager::{score::PeerAction, PeerManager, PeerManagerEvent};
|
||||
use crate::peer_manager::{score::PeerAction, ConnectionDirection, PeerManager, PeerManagerEvent};
|
||||
use crate::rpc::*;
|
||||
use crate::service::METADATA_FILENAME;
|
||||
use crate::types::{GossipEncoding, GossipKind, GossipTopic, MessageData, SubnetDiscovery};
|
||||
@ -70,8 +70,6 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
|
||||
id: RequestId,
|
||||
/// The peer to which this request was sent.
|
||||
peer_id: PeerId,
|
||||
/// The error that occurred.
|
||||
error: RPCError,
|
||||
},
|
||||
RequestReceived {
|
||||
/// The peer that sent the request.
|
||||
@ -692,14 +690,24 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
|
||||
// Inform the peer manager of the error.
|
||||
// An inbound error here means we sent an error to the peer, or the stream
|
||||
// timed out.
|
||||
self.peer_manager.handle_rpc_error(&peer_id, proto, &error);
|
||||
self.peer_manager.handle_rpc_error(
|
||||
&peer_id,
|
||||
proto,
|
||||
&error,
|
||||
ConnectionDirection::Incoming,
|
||||
);
|
||||
}
|
||||
HandlerErr::Outbound { id, proto, error } => {
|
||||
// Inform the peer manager that a request we sent to the peer failed
|
||||
self.peer_manager.handle_rpc_error(&peer_id, proto, &error);
|
||||
self.peer_manager.handle_rpc_error(
|
||||
&peer_id,
|
||||
proto,
|
||||
&error,
|
||||
ConnectionDirection::Outgoing,
|
||||
);
|
||||
// inform failures of requests comming outside the behaviour
|
||||
if !matches!(id, RequestId::Behaviour) {
|
||||
self.add_event(BehaviourEvent::RPCFailed { peer_id, id, error });
|
||||
self.add_event(BehaviourEvent::RPCFailed { peer_id, id });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -354,10 +354,17 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
/// An error has occured in the RPC.
|
||||
///
|
||||
/// This adjusts a peer's score based on the error.
|
||||
pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) {
|
||||
pub fn handle_rpc_error(
|
||||
&mut self,
|
||||
peer_id: &PeerId,
|
||||
protocol: Protocol,
|
||||
err: &RPCError,
|
||||
direction: ConnectionDirection,
|
||||
) {
|
||||
let client = self.network_globals.client(peer_id);
|
||||
let score = self.network_globals.peers.read().score(peer_id);
|
||||
debug!(self.log, "RPC Error"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string(), "peer_id" => peer_id.to_string(), "score" => score.to_string());
|
||||
debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client,
|
||||
"peer_id" => %peer_id, "score" => %score, "direction" => ?direction);
|
||||
|
||||
// Map this error to a `PeerAction` (if any)
|
||||
let peer_action = match err {
|
||||
@ -398,13 +405,20 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
|
||||
Protocol::Status => PeerAction::LowToleranceError,
|
||||
}
|
||||
}
|
||||
RPCError::StreamTimeout => match protocol {
|
||||
Protocol::Ping => PeerAction::LowToleranceError,
|
||||
Protocol::BlocksByRange => PeerAction::MidToleranceError,
|
||||
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
|
||||
Protocol::Goodbye => return,
|
||||
Protocol::MetaData => return,
|
||||
Protocol::Status => return,
|
||||
RPCError::StreamTimeout => match direction {
|
||||
ConnectionDirection::Incoming => {
|
||||
// we timed out
|
||||
warn!(self.log, "Timed out to a peer's request. Likely too many resources, reduce peer count");
|
||||
return;
|
||||
}
|
||||
ConnectionDirection::Outgoing => match protocol {
|
||||
Protocol::Ping => PeerAction::LowToleranceError,
|
||||
Protocol::BlocksByRange => PeerAction::MidToleranceError,
|
||||
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
|
||||
Protocol::Goodbye => return,
|
||||
Protocol::MetaData => return,
|
||||
Protocol::Status => return,
|
||||
},
|
||||
},
|
||||
RPCError::NegotiationTimeout => PeerAction::HighToleranceError,
|
||||
RPCError::RateLimited => match protocol {
|
||||
|
@ -14,7 +14,7 @@ use libp2p::swarm::protocols_handler::{
|
||||
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||
};
|
||||
use libp2p::swarm::NegotiatedSubstream;
|
||||
use slog::{crit, debug, warn};
|
||||
use slog::{crit, debug, trace, warn};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
collections::hash_map::Entry,
|
||||
@ -238,7 +238,9 @@ where
|
||||
/// Initiates the handler's shutdown process, sending an optional last message to the peer.
|
||||
pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest<TSpec>)>) {
|
||||
if matches!(self.state, HandlerState::Active) {
|
||||
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
|
||||
if !self.dial_queue.is_empty() {
|
||||
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
|
||||
}
|
||||
// we now drive to completion communications already dialed/established
|
||||
while let Some((id, req)) = self.dial_queue.pop() {
|
||||
self.pending_errors.push(HandlerErr::Outbound {
|
||||
@ -283,8 +285,11 @@ where
|
||||
let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) {
|
||||
info
|
||||
} else {
|
||||
warn!(self.log, "Inbound stream has expired, response not sent";
|
||||
"response" => response.to_string(), "id" => inbound_id, "msg" => "Likely too many resources, reduce peer count");
|
||||
if !matches!(response, RPCCodedResponse::StreamTermination(..)) {
|
||||
// the stream is closed after sending the expected number of responses
|
||||
trace!(self.log, "Inbound stream has expired, response not sent";
|
||||
"response" => %response, "id" => inbound_id);
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
|
@ -392,6 +392,22 @@ impl std::fmt::Display for BlocksByRangeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
impl slog::KV for StatusMessage {
|
||||
fn serialize(
|
||||
&self,
|
||||
record: &slog::Record,
|
||||
serializer: &mut dyn slog::Serializer,
|
||||
) -> slog::Result {
|
||||
use slog::Value;
|
||||
serializer.emit_str("fork_digest", &format!("{:?}", self.fork_digest))?;
|
||||
Value::serialize(&self.finalized_epoch, record, "finalized_epoch", serializer)?;
|
||||
serializer.emit_str("finalized_root", &self.finalized_root.to_string())?;
|
||||
Value::serialize(&self.head_slot, record, "head_slot", serializer)?;
|
||||
serializer.emit_str("head_root", &self.head_root.to_string())?;
|
||||
slog::Result::Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl slog::Value for RequestId {
|
||||
fn serialize(
|
||||
&self,
|
||||
|
@ -34,12 +34,12 @@ pub fn handle_chain_segment<T: BeaconChainTypes>(
|
||||
|
||||
let result = match process_blocks(chain, downloaded_blocks.iter(), &log) {
|
||||
(_, Ok(_)) => {
|
||||
debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot,
|
||||
debug!(log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id,
|
||||
"last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "service"=> "sync");
|
||||
BatchProcessResult::Success(sent_blocks > 0)
|
||||
}
|
||||
(imported_blocks, Err(e)) => {
|
||||
debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot,
|
||||
debug!(log, "Batch processing failed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, "chain" => chain_id,
|
||||
"last_block_slot" => end_slot, "error" => e, "imported_blocks" => imported_blocks, "service" => "sync");
|
||||
BatchProcessResult::Failed(imported_blocks > 0)
|
||||
}
|
||||
|
@ -11,8 +11,8 @@ use crate::error;
|
||||
use crate::service::NetworkMessage;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use eth2_libp2p::{
|
||||
rpc::{RPCError, RequestId},
|
||||
MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request, Response,
|
||||
rpc::RequestId, MessageId, NetworkGlobals, PeerId, PeerRequestId, PubsubMessage, Request,
|
||||
Response,
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use processor::Processor;
|
||||
@ -26,8 +26,6 @@ use types::EthSpec;
|
||||
/// passing them to the internal message processor. The message processor spawns a syncing thread
|
||||
/// which manages which blocks need to be requested and processed.
|
||||
pub struct Router<T: BeaconChainTypes> {
|
||||
/// Access to the peer db for logging.
|
||||
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
|
||||
/// Processes validated and decoded messages from the network. Has direct access to the
|
||||
/// sync manager.
|
||||
processor: Processor<T>,
|
||||
@ -58,7 +56,6 @@ pub enum RouterMessage<T: EthSpec> {
|
||||
RPCFailed {
|
||||
peer_id: PeerId,
|
||||
request_id: RequestId,
|
||||
error: RPCError,
|
||||
},
|
||||
/// A gossip message has been received. The fields are: message id, the peer that sent us this
|
||||
/// message, the message itself and a bool which indicates if the message should be processed
|
||||
@ -86,14 +83,13 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
let processor = Processor::new(
|
||||
executor.clone(),
|
||||
beacon_chain,
|
||||
network_globals.clone(),
|
||||
network_globals,
|
||||
network_send,
|
||||
&log,
|
||||
);
|
||||
|
||||
// generate the Message handler
|
||||
let mut handler = Router {
|
||||
network_globals,
|
||||
processor,
|
||||
log: message_handler_log,
|
||||
};
|
||||
@ -141,13 +137,7 @@ impl<T: BeaconChainTypes> Router<T> {
|
||||
RouterMessage::RPCFailed {
|
||||
peer_id,
|
||||
request_id,
|
||||
error,
|
||||
} => {
|
||||
debug!(self.log, "RPC Error";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"request_id" => request_id,
|
||||
"error" => error.to_string(),
|
||||
"client" => self.network_globals.client(&peer_id).to_string());
|
||||
self.processor.on_rpc_error(peer_id, request_id);
|
||||
}
|
||||
RouterMessage::PubsubMessage(id, peer_id, gossip, should_process) => {
|
||||
|
@ -78,7 +78,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
sync_send,
|
||||
network: HandlerNetworkContext::new(network_send, log.clone()),
|
||||
beacon_processor_send,
|
||||
log: log.clone(),
|
||||
log: log.new(o!("service" => "router")),
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,16 +114,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
/// re-status.
|
||||
pub fn send_status(&mut self, peer_id: PeerId) {
|
||||
if let Ok(status_message) = status_message(&self.chain) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Sending Status Request";
|
||||
"peer" => peer_id.to_string(),
|
||||
"fork_digest" => format!("{:?}", status_message.fork_digest),
|
||||
"finalized_root" => format!("{:?}", status_message.finalized_root),
|
||||
"finalized_epoch" => format!("{:?}", status_message.finalized_epoch),
|
||||
"head_root" => format!("{}", status_message.head_root),
|
||||
"head_slot" => format!("{}", status_message.head_slot),
|
||||
);
|
||||
debug!(self.log, "Sending Status Request"; "peer" => %peer_id, &status_message);
|
||||
self.network
|
||||
.send_processor_request(peer_id, Request::Status(status_message));
|
||||
}
|
||||
@ -138,16 +129,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
request_id: PeerRequestId,
|
||||
status: StatusMessage,
|
||||
) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received Status Request";
|
||||
"peer" => peer_id.to_string(),
|
||||
"fork_digest" => format!("{:?}", status.fork_digest),
|
||||
"finalized_root" => format!("{:?}", status.finalized_root),
|
||||
"finalized_epoch" => format!("{:?}", status.finalized_epoch),
|
||||
"head_root" => format!("{}", status.head_root),
|
||||
"head_slot" => format!("{}", status.head_slot),
|
||||
);
|
||||
debug!(self.log, "Received Status Request"; "peer_id" => %peer_id, &status);
|
||||
|
||||
// ignore status responses if we are shutting down
|
||||
if let Ok(status_message) = status_message(&self.chain) {
|
||||
@ -166,16 +148,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
|
||||
/// Process a `Status` response from a peer.
|
||||
pub fn on_status_response(&mut self, peer_id: PeerId, status: StatusMessage) {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received Status Response";
|
||||
"peer_id" => peer_id.to_string(),
|
||||
"fork_digest" => format!("{:?}", status.fork_digest),
|
||||
"finalized_root" => format!("{:?}", status.finalized_root),
|
||||
"finalized_epoch" => format!("{:?}", status.finalized_epoch),
|
||||
"head_root" => format!("{}", status.head_root),
|
||||
"head_slot" => format!("{}", status.head_slot),
|
||||
);
|
||||
debug!(self.log, "Received Status Response"; "peer_id" => %peer_id, &status);
|
||||
|
||||
// Process the status message, without sending back another status.
|
||||
if let Err(e) = self.process_status(peer_id, status) {
|
||||
@ -292,7 +265,7 @@ impl<T: BeaconChainTypes> Processor<T> {
|
||||
debug!(
|
||||
self.log,
|
||||
"Received BlocksByRange Request";
|
||||
"peer" => format!("{:?}", peer_id),
|
||||
"peer_id" => %peer_id,
|
||||
"count" => req.count,
|
||||
"start_slot" => req.start_slot,
|
||||
"step" => req.step,
|
||||
|
@ -484,10 +484,10 @@ fn spawn_service<T: BeaconChainTypes>(
|
||||
});
|
||||
|
||||
}
|
||||
BehaviourEvent::RPCFailed{id, peer_id, error} => {
|
||||
BehaviourEvent::RPCFailed{id, peer_id} => {
|
||||
let _ = service
|
||||
.router_send
|
||||
.send(RouterMessage::RPCFailed{ peer_id, request_id: id, error })
|
||||
.send(RouterMessage::RPCFailed{ peer_id, request_id: id})
|
||||
.map_err(|_| {
|
||||
debug!(service.log, "Failed to send RPC to router");
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user