Small bug fixes from initial sim tests (#993)

* Debug logging and fixes

* Minor fixes

* Remove debugging statements
This commit is contained in:
Age Manning 2020-04-09 14:28:37 +10:00 committed by GitHub
parent 1779aa6a8a
commit 19b8c5a9e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 52 additions and 31 deletions

View File

@ -103,7 +103,10 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
log,
"Adding node to routing table";
"node_id" => format!("{}", bootnode_enr.node_id()),
"peer_id" => format!("{}", bootnode_enr.peer_id())
"peer_id" => format!("{}", bootnode_enr.peer_id()),
"ip" => format!("{:?}", bootnode_enr.ip()),
"udp" => format!("{:?}", bootnode_enr.udp()),
"tcp" => format!("{:?}", bootnode_enr.udp())
);
let _ = discovery.add_enr(bootnode_enr).map_err(|e| {
warn!(
@ -256,7 +259,7 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
"subnet_id" => *subnet_id,
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
"target_peers" => target_peers
"peers_to_find" => target_peers
);
let log_clone = self.log.clone();
@ -283,13 +286,14 @@ impl<TSubstream, TSpec: EthSpec> Discovery<TSubstream, TSpec> {
// start the query
self.start_query(subnet_predicate, target_peers as usize);
}
} else {
debug!(self.log, "Discovery ignored";
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
);
}
}
/* Internal Functions */
@ -478,7 +482,6 @@ where
});
}
Discv5Event::FindNodeResult { closer_peers, .. } => {
// TODO: Modify once ENR predicate search is available
debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len());
// update the time to the next query
if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES {
@ -491,9 +494,6 @@ where
self.peer_discovery_delay
.reset(Instant::now() + Duration::from_secs(delay));
if closer_peers.is_empty() {
debug!(self.log, "Discovery random query found no peers");
}
for peer_id in closer_peers {
// if we need more peers, attempt a connection

View File

@ -107,9 +107,9 @@ impl<TSpec: EthSpec> Decoder for SSZInboundCodec<TSpec> {
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_PING => match self.protocol.version.as_str() {
"1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&packet,
)?))),
"1" => Ok(Some(RPCRequest::Ping(Ping {
data: u64::from_ssz_bytes(&packet)?,
}))),
_ => unreachable!("Cannot negotiate an unknown version"),
},
RPC_META_DATA => match self.protocol.version.as_str() {

View File

@ -12,7 +12,7 @@ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use slog::{crit, debug, error, warn};
use slog::{crit, debug, error, trace, warn};
use smallvec::SmallVec;
use std::collections::hash_map::Entry;
use std::time::{Duration, Instant};
@ -318,7 +318,22 @@ where
// add the stream to substreams if we expect a response, otherwise drop the stream.
match rpc_event {
RPCEvent::Request(id, request) if request.expect_response() => {
RPCEvent::Request(mut id, request) if request.expect_response() => {
// outbound requests can be sent from various aspects of lighthouse which don't
// track request ids. In the future these will be flagged as None, currently they
// are flagged as 0. These can overlap. In this case, we pick the highest request
// Id available
if id == 0 && self.outbound_substreams.get(&id).is_some() {
// have duplicate outbound request with no id. Pick one that will not collide
let mut new_id = std::usize::MAX;
while self.outbound_substreams.get(&new_id).is_some() {
// panic all outbound substreams are full
new_id -= 1;
}
trace!(self.log, "New outbound stream id created"; "id" => new_id);
id = RequestId::from(new_id);
}
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
@ -331,7 +346,7 @@ where
.outbound_substreams
.insert(id, (awaiting_stream, delay_key))
{
warn!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
}
}
_ => { // a response is not expected, drop the stream for all other requests
@ -401,7 +416,7 @@ where
}
}
None => {
debug!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response));
warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response));
}
};
}
@ -473,12 +488,14 @@ where
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => {
// IO/Decode/Custom Error, report to the application
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, err),
)));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
// Error during negotiation
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))),
)));
@ -496,10 +513,11 @@ where
}
// purge expired inbound substreams and send an error
while let Async::Ready(Some(stream_id)) = self
.inbound_substreams_delay
.poll()
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
while let Async::Ready(Some(stream_id)) =
self.inbound_substreams_delay.poll().map_err(|e| {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
let rpc_id = stream_id.get_ref();
@ -517,10 +535,11 @@ where
}
// purge expired outbound substreams
if let Async::Ready(Some(stream_id)) = self
.outbound_substreams_delay
.poll()
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
if let Async::Ready(Some(stream_id)) =
self.outbound_substreams_delay.poll().map_err(|e| {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
self.outbound_substreams.remove(stream_id.get_ref());
// notify the user

View File

@ -644,8 +644,9 @@ pub(crate) fn status_message<T: BeaconChainTypes>(
})
}
/// Wraps a Network Channel to employ various RPC related network functionality for the message
/// handler. The handler doesn't manage it's own request Id's and can therefore only send
/// Wraps a Network Channel to employ various RPC related network functionality for the
/// processor.
/// The Processor doesn't manage it's own request Id's and can therefore only send
/// responses or requests with 0 request Ids.
pub struct HandlerNetworkContext<T: EthSpec> {
/// The network channel to relay messages to the Network service.

View File

@ -27,7 +27,7 @@ impl<T: EthSpec> SyncNetworkContext<T> {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage<T>>, log: slog::Logger) -> Self {
Self {
network_send,
request_id: 0,
request_id: 1,
log,
}
}

View File

@ -91,7 +91,7 @@ pub fn testing_client_config() -> ClientConfig {
client_config
}
/// Provids a validator client that is running in the current process on a given tokio executor (it
/// Provides a validator client that is running in the current process on a given tokio executor (it
/// is _local_ to this process).
///
/// Intended for use in testing and simulation. Not for production.

View File

@ -46,9 +46,10 @@ impl<E: EthSpec> LocalNetwork<E> {
context: RuntimeContext<E>,
mut beacon_config: ClientConfig,
) -> impl Future<Item = Self, Error = String> {
// Fix bootnode ports
beacon_config.network.discovery_port = BOOTNODE_PORT;
beacon_config.network.libp2p_port = BOOTNODE_PORT;
beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT);
beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT);
LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config).map(
|beacon_node| Self {
inner: Arc::new(Inner {