From 19b8c5a9e01a4e814533fa68127e67e3c0afa6cf Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 9 Apr 2020 14:28:37 +1000 Subject: [PATCH] Small bug fixes from initial sim tests (#993) * Debug logging and fixes * Minor fixes * Remove debugging statements --- beacon_node/eth2-libp2p/src/discovery/mod.rs | 22 +++++----- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 6 +-- beacon_node/eth2-libp2p/src/rpc/handler.rs | 43 +++++++++++++------ beacon_node/network/src/router/processor.rs | 5 ++- .../network/src/sync/network_context.rs | 2 +- tests/node_test_rig/src/lib.rs | 2 +- tests/simulator/src/local_network.rs | 3 +- 7 files changed, 52 insertions(+), 31 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index f7b7fea93..5107565cb 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -103,7 +103,10 @@ impl Discovery { log, "Adding node to routing table"; "node_id" => format!("{}", bootnode_enr.node_id()), - "peer_id" => format!("{}", bootnode_enr.peer_id()) + "peer_id" => format!("{}", bootnode_enr.peer_id()), + "ip" => format!("{:?}", bootnode_enr.ip()), + "udp" => format!("{:?}", bootnode_enr.udp()), + "tcp" => format!("{:?}", bootnode_enr.udp()) ); let _ = discovery.add_enr(bootnode_enr).map_err(|e| { warn!( @@ -256,7 +259,7 @@ impl Discovery { "subnet_id" => *subnet_id, "connected_peers_on_subnet" => peers_on_subnet, "target_subnet_peers" => TARGET_SUBNET_PEERS, - "target_peers" => target_peers + "peers_to_find" => target_peers ); let log_clone = self.log.clone(); @@ -283,12 +286,13 @@ impl Discovery { // start the query self.start_query(subnet_predicate, target_peers as usize); + } else { + debug!(self.log, "Discovery ignored"; + "reason" => "Already connected to desired peers", + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + ); } - debug!(self.log, "Discovery ignored"; - "reason" => "Already connected to desired peers", - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - ); } /* Internal Functions */ @@ -478,7 +482,6 @@ where }); } Discv5Event::FindNodeResult { closer_peers, .. } => { - // TODO: Modify once ENR predicate search is available debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len()); // update the time to the next query if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES { @@ -491,9 +494,6 @@ where self.peer_discovery_delay .reset(Instant::now() + Duration::from_secs(delay)); - if closer_peers.is_empty() { - debug!(self.log, "Discovery random query found no peers"); - } for peer_id in closer_peers { // if we need more peers, attempt a connection diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 9fd430acd..b3c7f2ce7 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -107,9 +107,9 @@ impl Decoder for SSZInboundCodec { _ => unreachable!("Cannot negotiate an unknown version"), }, RPC_PING => match self.protocol.version.as_str() { - "1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &packet, - )?))), + "1" => Ok(Some(RPCRequest::Ping(Ping { + data: u64::from_ssz_bytes(&packet)?, + }))), _ => unreachable!("Cannot negotiate an unknown version"), }, RPC_META_DATA => match self.protocol.version.as_str() { diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 83b82edc6..d8ff541c9 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -12,7 +12,7 @@ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use slog::{crit, debug, error, warn}; +use slog::{crit, debug, error, trace, warn}; use smallvec::SmallVec; use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; @@ -318,7 +318,22 @@ where // add the stream to substreams if we expect a response, otherwise drop the stream. match rpc_event { - RPCEvent::Request(id, request) if request.expect_response() => { + RPCEvent::Request(mut id, request) if request.expect_response() => { + // outbound requests can be sent from various aspects of lighthouse which don't + // track request ids. In the future these will be flagged as None, currently they + // are flagged as 0. These can overlap. In this case, we pick the highest request + // Id available + if id == 0 && self.outbound_substreams.get(&id).is_some() { + // have duplicate outbound request with no id. Pick one that will not collide + let mut new_id = std::usize::MAX; + while self.outbound_substreams.get(&new_id).is_some() { + // panic all outbound substreams are full + new_id -= 1; + } + trace!(self.log, "New outbound stream id created"; "id" => new_id); + id = RequestId::from(new_id); + } + // new outbound request. Store the stream and tag the output. let delay_key = self .outbound_substreams_delay @@ -331,7 +346,7 @@ where .outbound_substreams .insert(id, (awaiting_stream, delay_key)) { - warn!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); + crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); } } _ => { // a response is not expected, drop the stream for all other requests @@ -401,7 +416,7 @@ where } } None => { - debug!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response)); + warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response)); } }; } @@ -473,12 +488,14 @@ where } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => { // IO/Decode/Custom Error, report to the application + debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error(request_id, err), ))); } ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => { // Error during negotiation + debug!(self.log, "Upgrade Error"; "error" => format!("{}",err)); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))), ))); @@ -496,10 +513,11 @@ where } // purge expired inbound substreams and send an error - while let Async::Ready(Some(stream_id)) = self - .inbound_substreams_delay - .poll() - .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? + while let Async::Ready(Some(stream_id)) = + self.inbound_substreams_delay.poll().map_err(|e| { + warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e)); + ProtocolsHandlerUpgrErr::Timer + })? { let rpc_id = stream_id.get_ref(); @@ -517,10 +535,11 @@ where } // purge expired outbound substreams - if let Async::Ready(Some(stream_id)) = self - .outbound_substreams_delay - .poll() - .map_err(|_| ProtocolsHandlerUpgrErr::Timer)? + if let Async::Ready(Some(stream_id)) = + self.outbound_substreams_delay.poll().map_err(|e| { + warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e)); + ProtocolsHandlerUpgrErr::Timer + })? { self.outbound_substreams.remove(stream_id.get_ref()); // notify the user diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index ca996b86b..2907851a5 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -644,8 +644,9 @@ pub(crate) fn status_message( }) } -/// Wraps a Network Channel to employ various RPC related network functionality for the message -/// handler. The handler doesn't manage it's own request Id's and can therefore only send +/// Wraps a Network Channel to employ various RPC related network functionality for the +/// processor. +/// The Processor doesn't manage it's own request Id's and can therefore only send /// responses or requests with 0 request Ids. pub struct HandlerNetworkContext { /// The network channel to relay messages to the Network service. diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 6b029e0a1..c89b964b4 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -27,7 +27,7 @@ impl SyncNetworkContext { pub fn new(network_send: mpsc::UnboundedSender>, log: slog::Logger) -> Self { Self { network_send, - request_id: 0, + request_id: 1, log, } } diff --git a/tests/node_test_rig/src/lib.rs b/tests/node_test_rig/src/lib.rs index fafdb887f..9cfcc7eea 100644 --- a/tests/node_test_rig/src/lib.rs +++ b/tests/node_test_rig/src/lib.rs @@ -91,7 +91,7 @@ pub fn testing_client_config() -> ClientConfig { client_config } -/// Provids a validator client that is running in the current process on a given tokio executor (it +/// Provides a validator client that is running in the current process on a given tokio executor (it /// is _local_ to this process). /// /// Intended for use in testing and simulation. Not for production. diff --git a/tests/simulator/src/local_network.rs b/tests/simulator/src/local_network.rs index ad01eaf25..22ffc4d36 100644 --- a/tests/simulator/src/local_network.rs +++ b/tests/simulator/src/local_network.rs @@ -46,9 +46,10 @@ impl LocalNetwork { context: RuntimeContext, mut beacon_config: ClientConfig, ) -> impl Future { - // Fix bootnode ports beacon_config.network.discovery_port = BOOTNODE_PORT; beacon_config.network.libp2p_port = BOOTNODE_PORT; + beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT); + beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT); LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config).map( |beacon_node| Self { inner: Arc::new(Inner {