Improve logging
This commit is contained in:
parent
3210489a36
commit
ce50616032
@ -100,16 +100,9 @@ where
|
|||||||
}
|
}
|
||||||
do_state_catchup(&beacon_chain, &log);
|
do_state_catchup(&beacon_chain, &log);
|
||||||
|
|
||||||
// Start the network service, libp2p and syncing threads
|
|
||||||
// TODO: Add beacon_chain reference to network parameters
|
|
||||||
let network_config = &client_config.network;
|
let network_config = &client_config.network;
|
||||||
let network_logger = log.new(o!("Service" => "Network"));
|
let (network, network_send) =
|
||||||
let (network, network_send) = NetworkService::new(
|
NetworkService::new(beacon_chain.clone(), network_config, executor, log.clone())?;
|
||||||
beacon_chain.clone(),
|
|
||||||
network_config,
|
|
||||||
executor,
|
|
||||||
network_logger,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// spawn the RPC server
|
// spawn the RPC server
|
||||||
let rpc_exit_signal = if client_config.rpc.enabled {
|
let rpc_exit_signal = if client_config.rpc.enabled {
|
||||||
|
@ -38,7 +38,7 @@ pub fn run<T: BeaconChainTypes + Send + Sync + 'static>(
|
|||||||
// Panics if libp2p is poisoned.
|
// Panics if libp2p is poisoned.
|
||||||
let connected_peer_count = libp2p.lock().swarm.connected_peers();
|
let connected_peer_count = libp2p.lock().swarm.connected_peers();
|
||||||
|
|
||||||
debug!(log, "libp2p"; "peer_count" => connected_peer_count);
|
debug!(log, "Libp2p connected peer status"; "peer_count" => connected_peer_count);
|
||||||
|
|
||||||
if connected_peer_count <= WARN_PEER_COUNT {
|
if connected_peer_count <= WARN_PEER_COUNT {
|
||||||
warn!(log, "Low libp2p peer count"; "peer_count" => connected_peer_count);
|
warn!(log, "Low libp2p peer count"; "peer_count" => connected_peer_count);
|
||||||
|
@ -9,7 +9,7 @@ use libp2p::discv5::{Discv5, Discv5Event};
|
|||||||
use libp2p::enr::{Enr, EnrBuilder, NodeId};
|
use libp2p::enr::{Enr, EnrBuilder, NodeId};
|
||||||
use libp2p::multiaddr::Protocol;
|
use libp2p::multiaddr::Protocol;
|
||||||
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
|
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
|
||||||
use slog::{debug, info, o, warn};
|
use slog::{debug, info, warn};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
@ -63,7 +63,7 @@ impl<TSubstream> Discovery<TSubstream> {
|
|||||||
config: &NetworkConfig,
|
config: &NetworkConfig,
|
||||||
log: &slog::Logger,
|
log: &slog::Logger,
|
||||||
) -> error::Result<Self> {
|
) -> error::Result<Self> {
|
||||||
let log = log.new(o!("Service" => "Libp2p-Discovery"));
|
let log = log.clone();
|
||||||
|
|
||||||
// checks if current ENR matches that found on disk
|
// checks if current ENR matches that found on disk
|
||||||
let local_enr = load_enr(local_key, config, &log)?;
|
let local_enr = load_enr(local_key, config, &log)?;
|
||||||
@ -73,19 +73,19 @@ impl<TSubstream> Discovery<TSubstream> {
|
|||||||
None => String::from(""),
|
None => String::from(""),
|
||||||
};
|
};
|
||||||
|
|
||||||
info!(log, "Local ENR: {}", local_enr.to_base64());
|
info!(log, "ENR Initialised"; "ENR" => local_enr.to_base64(), "Seq" => local_enr.seq());
|
||||||
debug!(log, "Local Node Id: {}", local_enr.node_id());
|
debug!(log, "Discv5 Node ID Initialised"; "node_id" => format!("{}",local_enr.node_id()));
|
||||||
debug!(log, "Local ENR seq: {}", local_enr.seq());
|
|
||||||
|
|
||||||
let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address)
|
let mut discovery = Discv5::new(local_enr, local_key.clone(), config.listen_address)
|
||||||
.map_err(|e| format!("Discv5 service failed: {:?}", e))?;
|
.map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?;
|
||||||
|
|
||||||
// Add bootnodes to routing table
|
// Add bootnodes to routing table
|
||||||
for bootnode_enr in config.boot_nodes.clone() {
|
for bootnode_enr in config.boot_nodes.clone() {
|
||||||
debug!(
|
debug!(
|
||||||
log,
|
log,
|
||||||
"Adding node to routing table: {}",
|
"Adding node to routing table";
|
||||||
bootnode_enr.node_id()
|
"Node ID" => format!("{}",
|
||||||
|
bootnode_enr.node_id())
|
||||||
);
|
);
|
||||||
discovery.add_enr(bootnode_enr);
|
discovery.add_enr(bootnode_enr);
|
||||||
}
|
}
|
||||||
@ -123,7 +123,7 @@ impl<TSubstream> Discovery<TSubstream> {
|
|||||||
fn find_peers(&mut self) {
|
fn find_peers(&mut self) {
|
||||||
// pick a random NodeId
|
// pick a random NodeId
|
||||||
let random_node = NodeId::random();
|
let random_node = NodeId::random();
|
||||||
debug!(self.log, "Searching for peers...");
|
debug!(self.log, "Searching for peers");
|
||||||
self.discovery.find_node(random_node);
|
self.discovery.find_node(random_node);
|
||||||
|
|
||||||
// update the time until next discovery
|
// update the time until next discovery
|
||||||
@ -201,7 +201,7 @@ where
|
|||||||
}
|
}
|
||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(self.log, "Discovery peer search failed: {:?}", e);
|
warn!(self.log, "Discovery peer search failed"; "Error" => format!("{:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -227,16 +227,16 @@ where
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
Discv5Event::FindNodeResult { closer_peers, .. } => {
|
Discv5Event::FindNodeResult { closer_peers, .. } => {
|
||||||
debug!(self.log, "Discv5 query found {} peers", closer_peers.len());
|
debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len());
|
||||||
if closer_peers.is_empty() {
|
if closer_peers.is_empty() {
|
||||||
debug!(self.log, "Discv5 random query yielded empty results");
|
debug!(self.log, "Discovery random query found no peers");
|
||||||
}
|
}
|
||||||
for peer_id in closer_peers {
|
for peer_id in closer_peers {
|
||||||
// if we need more peers, attempt a connection
|
// if we need more peers, attempt a connection
|
||||||
if self.connected_peers.len() < self.max_peers
|
if self.connected_peers.len() < self.max_peers
|
||||||
&& self.connected_peers.get(&peer_id).is_none()
|
&& self.connected_peers.get(&peer_id).is_none()
|
||||||
{
|
{
|
||||||
debug!(self.log, "Discv5: Peer discovered"; "Peer"=> format!("{:?}", peer_id));
|
debug!(self.log, "Peer discovered"; "peer_id"=> format!("{:?}", peer_id));
|
||||||
return Async::Ready(NetworkBehaviourAction::DialPeer {
|
return Async::Ready(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id,
|
peer_id,
|
||||||
});
|
});
|
||||||
@ -283,14 +283,12 @@ fn load_enr(
|
|||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
match Enr::from_str(&enr_string) {
|
match Enr::from_str(&enr_string) {
|
||||||
Ok(enr) => {
|
Ok(enr) => {
|
||||||
debug!(log, "ENR found in file: {:?}", enr_f);
|
|
||||||
|
|
||||||
if enr.node_id() == local_enr.node_id() {
|
if enr.node_id() == local_enr.node_id() {
|
||||||
if enr.ip() == config.discovery_address.into()
|
if enr.ip() == config.discovery_address.into()
|
||||||
&& enr.tcp() == Some(config.libp2p_port)
|
&& enr.tcp() == Some(config.libp2p_port)
|
||||||
&& enr.udp() == Some(config.discovery_port)
|
&& enr.udp() == Some(config.discovery_port)
|
||||||
{
|
{
|
||||||
debug!(log, "ENR loaded from file");
|
debug!(log, "ENR loaded from file"; "File" => format!("{:?}", enr_f));
|
||||||
// the stored ENR has the same configuration, use it
|
// the stored ENR has the same configuration, use it
|
||||||
return Ok(enr);
|
return Ok(enr);
|
||||||
}
|
}
|
||||||
@ -300,11 +298,11 @@ fn load_enr(
|
|||||||
local_enr.set_seq(new_seq_no, local_key).map_err(|e| {
|
local_enr.set_seq(new_seq_no, local_key).map_err(|e| {
|
||||||
format!("Could not update ENR sequence number: {:?}", e)
|
format!("Could not update ENR sequence number: {:?}", e)
|
||||||
})?;
|
})?;
|
||||||
debug!(log, "ENR sequence number increased to: {}", new_seq_no);
|
debug!(log, "ENR sequence number increased"; "Seq" => new_seq_no);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(log, "ENR from file could not be decoded: {:?}", e);
|
warn!(log, "ENR from file could not be decoded"; "Error" => format!("{:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -327,7 +325,7 @@ fn save_enr_to_disc(dir: &Path, enr: &Enr, log: &slog::Logger) {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
log,
|
log,
|
||||||
"Could not write ENR to file: {:?}{:?}. Error: {}", dir, ENR_FILENAME, e
|
"Could not write ENR to file"; "File" => format!("{:?}{:?}",dir, ENR_FILENAME), "Error" => format!("{}", e)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,13 +40,12 @@ pub struct Service {
|
|||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
|
pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
|
||||||
debug!(log, "Network-libp2p Service starting");
|
trace!(log, "Libp2p Service starting");
|
||||||
|
|
||||||
// load the private key from CLI flag, disk or generate a new one
|
// load the private key from CLI flag, disk or generate a new one
|
||||||
let local_private_key = load_private_key(&config, &log);
|
let local_private_key = load_private_key(&config, &log);
|
||||||
|
|
||||||
let local_peer_id = PeerId::from(local_private_key.public());
|
let local_peer_id = PeerId::from(local_private_key.public());
|
||||||
info!(log, "Local peer id: {:?}", local_peer_id);
|
info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", local_peer_id));
|
||||||
|
|
||||||
let mut swarm = {
|
let mut swarm = {
|
||||||
// Set up the transport - tcp/ws with secio and mplex/yamux
|
// Set up the transport - tcp/ws with secio and mplex/yamux
|
||||||
@ -67,21 +66,21 @@ impl Service {
|
|||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let mut log_address = listen_multiaddr;
|
let mut log_address = listen_multiaddr;
|
||||||
log_address.push(Protocol::P2p(local_peer_id.clone().into()));
|
log_address.push(Protocol::P2p(local_peer_id.clone().into()));
|
||||||
info!(log, "Listening on: {}", log_address);
|
info!(log, "Listening established"; "Address" => format!("{}", log_address));
|
||||||
}
|
}
|
||||||
Err(err) => warn!(
|
Err(err) => warn!(
|
||||||
log,
|
log,
|
||||||
"Cannot listen on: {} because: {:?}", listen_multiaddr, err
|
"Failed to listen on address"; "Address" => format!("{}", listen_multiaddr), "Error" => format!("{:?}", err)
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
// attempt to connect to user-input libp2p nodes
|
// attempt to connect to user-input libp2p nodes
|
||||||
for multiaddr in config.libp2p_nodes {
|
for multiaddr in config.libp2p_nodes {
|
||||||
match Swarm::dial_addr(&mut swarm, multiaddr.clone()) {
|
match Swarm::dial_addr(&mut swarm, multiaddr.clone()) {
|
||||||
Ok(()) => debug!(log, "Dialing libp2p node: {}", multiaddr),
|
Ok(()) => debug!(log, "Dialing libp2p peer"; "Address" => format!("{}", multiaddr)),
|
||||||
Err(err) => debug!(
|
Err(err) => debug!(
|
||||||
log,
|
log,
|
||||||
"Could not connect to node: {} error: {:?}", multiaddr, err
|
"Could not connect to peer"; "Address" => format!("{}", multiaddr), "Error" => format!("{:?}", err)
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -104,13 +103,13 @@ impl Service {
|
|||||||
let mut subscribed_topics = vec![];
|
let mut subscribed_topics = vec![];
|
||||||
for topic in topics {
|
for topic in topics {
|
||||||
if swarm.subscribe(topic.clone()) {
|
if swarm.subscribe(topic.clone()) {
|
||||||
trace!(log, "Subscribed to topic: {:?}", topic);
|
trace!(log, "Subscribed to topic"; "Topic" => format!("{}", topic));
|
||||||
subscribed_topics.push(topic);
|
subscribed_topics.push(topic);
|
||||||
} else {
|
} else {
|
||||||
warn!(log, "Could not subscribe to topic: {:?}", topic)
|
warn!(log, "Could not subscribe to topic"; "Topic" => format!("{}", topic));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!(log, "Subscribed to topics: {:?}", subscribed_topics);
|
info!(log, "Subscribed to topics"; "Topics" => format!("{:?}", subscribed_topics.iter().map(|t| format!("{}", t)).collect::<Vec<String>>()));
|
||||||
|
|
||||||
Ok(Service {
|
Ok(Service {
|
||||||
_local_peer_id: local_peer_id,
|
_local_peer_id: local_peer_id,
|
||||||
|
@ -20,8 +20,7 @@ pub struct Service<T: BeaconChainTypes> {
|
|||||||
libp2p_service: Arc<Mutex<LibP2PService>>,
|
libp2p_service: Arc<Mutex<LibP2PService>>,
|
||||||
_libp2p_exit: oneshot::Sender<()>,
|
_libp2p_exit: oneshot::Sender<()>,
|
||||||
_network_send: mpsc::UnboundedSender<NetworkMessage>,
|
_network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||||
_phantom: PhantomData<T>, //message_handler: MessageHandler,
|
_phantom: PhantomData<T>,
|
||||||
//message_handler_send: Sender<HandlerMessage>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BeaconChainTypes + 'static> Service<T> {
|
impl<T: BeaconChainTypes + 'static> Service<T> {
|
||||||
@ -42,17 +41,19 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
|
|||||||
message_handler_log,
|
message_handler_log,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
let network_log = log.new(o!("Service" => "Network"));
|
||||||
// launch libp2p service
|
// launch libp2p service
|
||||||
let libp2p_log = log.new(o!("Service" => "Libp2p"));
|
let libp2p_service = Arc::new(Mutex::new(LibP2PService::new(
|
||||||
let libp2p_service = Arc::new(Mutex::new(LibP2PService::new(config.clone(), libp2p_log)?));
|
config.clone(),
|
||||||
|
network_log.clone(),
|
||||||
|
)?));
|
||||||
|
|
||||||
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
|
|
||||||
let libp2p_exit = spawn_service(
|
let libp2p_exit = spawn_service(
|
||||||
libp2p_service.clone(),
|
libp2p_service.clone(),
|
||||||
network_recv,
|
network_recv,
|
||||||
message_handler_send,
|
message_handler_send,
|
||||||
executor,
|
executor,
|
||||||
log,
|
network_log,
|
||||||
)?;
|
)?;
|
||||||
let network_service = Service {
|
let network_service = Service {
|
||||||
libp2p_service,
|
libp2p_service,
|
||||||
@ -142,13 +143,13 @@ fn network_service(
|
|||||||
.map_err(|_| "Failed to send RPC to handler")?;
|
.map_err(|_| "Failed to send RPC to handler")?;
|
||||||
}
|
}
|
||||||
Libp2pEvent::PeerDialed(peer_id) => {
|
Libp2pEvent::PeerDialed(peer_id) => {
|
||||||
debug!(log, "Peer Dialed: {:?}", peer_id);
|
debug!(log, "Peer Dialed"; "PeerID" => format!("{:?}", peer_id));
|
||||||
message_handler_send
|
message_handler_send
|
||||||
.try_send(HandlerMessage::PeerDialed(peer_id))
|
.try_send(HandlerMessage::PeerDialed(peer_id))
|
||||||
.map_err(|_| "Failed to send PeerDialed to handler")?;
|
.map_err(|_| "Failed to send PeerDialed to handler")?;
|
||||||
}
|
}
|
||||||
Libp2pEvent::PeerDisconnected(peer_id) => {
|
Libp2pEvent::PeerDisconnected(peer_id) => {
|
||||||
debug!(log, "Peer Disconnected: {:?}", peer_id);
|
debug!(log, "Peer Disconnected"; "PeerID" => format!("{:?}", peer_id));
|
||||||
message_handler_send
|
message_handler_send
|
||||||
.try_send(HandlerMessage::PeerDisconnected(peer_id))
|
.try_send(HandlerMessage::PeerDisconnected(peer_id))
|
||||||
.map_err(|_| "Failed to send PeerDisconnected to handler")?;
|
.map_err(|_| "Failed to send PeerDisconnected to handler")?;
|
||||||
|
Loading…
Reference in New Issue
Block a user