Client identification (#1158)

* Add logs and client identification

* Add client to RPC Error log

* Remove attestation service tests
This commit is contained in:
Age Manning 2020-05-18 21:35:14 +10:00 committed by GitHub
parent 4331834003
commit dd51a72f1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 44 deletions

View File

@ -258,13 +258,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
} }
pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) { pub fn handle_rpc_error(&mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError) {
let client = self let client = self.network_globals.client(peer_id);
.network_globals
.peers
.read()
.peer_info(peer_id)
.map(|info| info.client.clone())
.unwrap_or_default();
debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string()); debug!(self.log, "RPCError"; "protocol" => protocol.to_string(), "err" => err.to_string(), "client" => client.to_string());
// Map this error to a `PeerAction` (if any) // Map this error to a `PeerAction` (if any)

View File

@ -18,7 +18,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport, PeerId, Swarm, Transport,
}; };
use slog::{crit, debug, error, info, o, trace, warn}; use slog::{crit, debug, info, o, trace, warn};
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
@ -220,7 +220,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
/// Adds a peer to be banned for a period of time, specified by a timeout. /// Adds a peer to be banned for a period of time, specified by a timeout.
pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId, timeout: Duration) { pub fn disconnect_and_ban_peer(&mut self, peer_id: PeerId, timeout: Duration) {
error!(self.log, "Disconnecting and banning peer"; "peer_id" => format!("{:?}", peer_id), "timeout" => format!("{:?}", timeout)); warn!(self.log, "Disconnecting and banning peer"; "peer_id" => peer_id.to_string(), "timeout" => format!("{:?}", timeout));
self.peers_to_ban.insert( self.peers_to_ban.insert(
peer_id.clone(), peer_id.clone(),
Duration::from_millis(BAN_PEER_WAIT_TIMEOUT), Duration::from_millis(BAN_PEER_WAIT_TIMEOUT),

View File

@ -2,6 +2,7 @@
use crate::peer_manager::PeerDB; use crate::peer_manager::PeerDB;
use crate::rpc::methods::MetaData; use crate::rpc::methods::MetaData;
use crate::types::SyncState; use crate::types::SyncState;
use crate::Client;
use crate::EnrExt; use crate::EnrExt;
use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock; use parking_lot::RwLock;
@ -99,6 +100,15 @@ impl<TSpec: EthSpec> NetworkGlobals<TSpec> {
self.sync_state.read().clone() self.sync_state.read().clone()
} }
/// Returns a `Client` type if one is known for the `PeerId`.
pub fn client(&self, peer_id: &PeerId) -> Client {
self.peers
.read()
.peer_info(peer_id)
.map(|info| info.client.clone())
.unwrap_or_default()
}
/// Updates the syncing state of the node. /// Updates the syncing state of the node.
/// ///
/// If there is a new state, the old state and the new states are returned. /// If there is a new state, the old state and the new states are returned.

View File

@ -17,7 +17,8 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use types::{Attestation, EthSpec, Slot, SubnetId}; use types::{Attestation, EthSpec, Slot, SubnetId};
mod tests; //TODO: Removed attestation subnet tests until they become deterministic
//mod tests;
/// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the
/// slot is less than this number, skip the peer discovery process. /// slot is less than this number, skip the peer discovery process.

View File

@ -10,10 +10,7 @@ use crate::error;
use crate::service::NetworkMessage; use crate::service::NetworkMessage;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError};
use eth2_libp2p::{ use eth2_libp2p::{
rpc::{ rpc::{RPCCodedResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination},
RPCCodedResponse, RPCError, RPCRequest, RPCResponse, RPCResponseErrorCode, RequestId,
ResponseTermination,
},
MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent,
}; };
use futures::prelude::*; use futures::prelude::*;
@ -30,6 +27,8 @@ use types::EthSpec;
pub struct Router<T: BeaconChainTypes> { pub struct Router<T: BeaconChainTypes> {
/// A channel to the network service to allow for gossip propagation. /// A channel to the network service to allow for gossip propagation.
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
/// Access to the peer db for logging.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
/// Processes validated and decoded messages from the network. Has direct access to the /// Processes validated and decoded messages from the network. Has direct access to the
/// sync manager. /// sync manager.
processor: Processor<T>, processor: Processor<T>,
@ -71,7 +70,7 @@ impl<T: BeaconChainTypes> Router<T> {
let processor = Processor::new( let processor = Processor::new(
runtime_handle, runtime_handle,
beacon_chain, beacon_chain,
network_globals, network_globals.clone(),
network_send.clone(), network_send.clone(),
&log, &log,
); );
@ -79,6 +78,7 @@ impl<T: BeaconChainTypes> Router<T> {
// generate the Message handler // generate the Message handler
let mut handler = Router { let mut handler = Router {
network_send, network_send,
network_globals,
processor, processor,
log: message_handler_log, log: message_handler_log,
}; };
@ -124,7 +124,11 @@ impl<T: BeaconChainTypes> Router<T> {
match rpc_message { match rpc_message {
RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req),
RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp),
RPCEvent::Error(id, _protocol, error) => self.handle_rpc_error(peer_id, id, error), RPCEvent::Error(id, _protocol, error) => {
warn!(self.log, "RPC Error"; "peer_id" => peer_id.to_string(), "request_id" => id, "error" => error.to_string(),
"client" => self.network_globals.client(&peer_id).to_string());
self.processor.on_rpc_error(peer_id, id);
}
} }
} }
@ -142,9 +146,10 @@ impl<T: BeaconChainTypes> Router<T> {
} }
RPCRequest::Goodbye(goodbye_reason) => { RPCRequest::Goodbye(goodbye_reason) => {
debug!( debug!(
self.log, "PeerGoodbye"; self.log, "Peer sent Goodbye";
"peer" => format!("{:?}", peer_id), "peer_id" => peer_id.to_string(),
"reason" => format!("{:?}", goodbye_reason), "reason" => format!("{:?}", goodbye_reason),
"client" => self.network_globals.client(&peer_id).to_string(),
); );
self.processor.on_disconnect(peer_id); self.processor.on_disconnect(peer_id);
} }
@ -170,28 +175,28 @@ impl<T: BeaconChainTypes> Router<T> {
// an error could have occurred. // an error could have occurred.
match error_response { match error_response {
RPCCodedResponse::InvalidRequest(error) => { RPCCodedResponse::InvalidRequest(error) => {
warn!(self.log, "RPC Invalid Request"; "peer_id" => peer_id.to_string(), "request_id" => request_id, "error" => error.to_string()); warn!(self.log, "RPC Invalid Request";
self.handle_rpc_error( "peer_id" => peer_id.to_string(),
peer_id, "request_id" => request_id,
request_id, "error" => error.to_string(),
RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest), "client" => self.network_globals.client(&peer_id).to_string());
); self.processor.on_rpc_error(peer_id, request_id);
} }
RPCCodedResponse::ServerError(error) => { RPCCodedResponse::ServerError(error) => {
warn!(self.log, "RPC Server Error"; "peer_id" => peer_id.to_string(), "request_id" => request_id, "error" => error.to_string()); warn!(self.log, "RPC Server Error" ;
self.handle_rpc_error( "peer_id" => peer_id.to_string(),
peer_id, "request_id" => request_id,
request_id, "error" => error.to_string(),
RPCError::ErrorResponse(RPCResponseErrorCode::ServerError), "client" => self.network_globals.client(&peer_id).to_string());
); self.processor.on_rpc_error(peer_id, request_id);
} }
RPCCodedResponse::Unknown(error) => { RPCCodedResponse::Unknown(error) => {
warn!(self.log, "RPC Unknown Error"; "peer_id" => peer_id.to_string(), "request_id" => request_id, "error" => error.to_string()); warn!(self.log, "RPC Unknown Error";
self.handle_rpc_error( "peer_id" => peer_id.to_string(),
peer_id, "request_id" => request_id,
request_id, "error" => error.to_string(),
RPCError::ErrorResponse(RPCResponseErrorCode::Unknown), "client" => self.network_globals.client(&peer_id).to_string());
); self.processor.on_rpc_error(peer_id, request_id);
} }
RPCCodedResponse::Success(response) => match response { RPCCodedResponse::Success(response) => match response {
RPCResponse::Status(status_message) => { RPCResponse::Status(status_message) => {
@ -234,12 +239,6 @@ impl<T: BeaconChainTypes> Router<T> {
} }
} }
/// Handle various RPC errors
fn handle_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) {
warn!(self.log, "RPC Error"; "Peer" => format!("{:?}", peer_id), "request_id" => format!("{}", request_id), "Error" => format!("{:?}", error));
self.processor.on_rpc_error(peer_id, request_id);
}
/// Handle RPC messages /// Handle RPC messages
fn handle_gossip( fn handle_gossip(
&mut self, &mut self,

View File

@ -151,7 +151,7 @@ impl<T: BeaconChainTypes> Processor<T> {
debug!( debug!(
self.log, self.log,
"Received Status Response"; "Received Status Response";
"peer" => format!("{:?}", peer_id), "peer_id" => peer_id.to_string(),
"fork_digest" => format!("{:?}", status.fork_digest), "fork_digest" => format!("{:?}", status.fork_digest),
"finalized_root" => format!("{:?}", status.finalized_root), "finalized_root" => format!("{:?}", status.finalized_root),
"finalized_epoch" => format!("{:?}", status.finalized_epoch), "finalized_epoch" => format!("{:?}", status.finalized_epoch),
@ -185,7 +185,7 @@ impl<T: BeaconChainTypes> Processor<T> {
// The node is on a different network/fork, disconnect them. // The node is on a different network/fork, disconnect them.
debug!( debug!(
self.log, "Handshake Failure"; self.log, "Handshake Failure";
"peer" => format!("{:?}", peer_id), "peer_id" => peer_id.to_string(),
"reason" => "incompatible forks", "reason" => "incompatible forks",
"our_fork" => hex::encode(local.fork_digest), "our_fork" => hex::encode(local.fork_digest),
"their_fork" => hex::encode(remote.fork_digest) "their_fork" => hex::encode(remote.fork_digest)