Add support for noise protocol (#873)

* Add noise support with fallback to secio

* Add config parameter for noise support

* Add secio/noise compatibility test

* Cleanup

* Remove config parameter for noise support

* Modify test to work between a secio swarm and a noise libp2p service

* Minor fixes
This commit is contained in:
Pawan Dhananjay 2020-03-02 08:05:20 +05:30 committed by GitHub
parent 0c96c515a0
commit 4d60694443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 238 additions and 17 deletions

1
Cargo.lock generated
View File

@ -1132,6 +1132,7 @@ dependencies = [
"slog-stdlog 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-stdlog 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slog-term 2.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "slog-term 2.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io-timeout 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io-timeout 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"types 0.1.0", "types 0.1.0",

View File

@ -36,3 +36,4 @@ base64 = "0.11.0"
slog-stdlog = "4.0.0" slog-stdlog = "4.0.0"
slog-term = "2.4.2" slog-term = "2.4.2"
slog-async = "2.3.0" slog-async = "2.3.0"
tempdir = "0.3"

View File

@ -7,11 +7,16 @@ use crate::{NetworkGlobals, Topic, TopicHash};
use futures::prelude::*; use futures::prelude::*;
use futures::Stream; use futures::Stream;
use libp2p::core::{ use libp2p::core::{
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, nodes::Substream, identity::Keypair,
transport::boxed::Boxed, ConnectedPoint, multiaddr::Multiaddr,
muxing::StreamMuxerBox,
nodes::Substream,
transport::boxed::Boxed,
upgrade::{InboundUpgradeExt, OutboundUpgradeExt},
ConnectedPoint,
}; };
use libp2p::gossipsub::MessageId; use libp2p::gossipsub::MessageId;
use libp2p::{core, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport}; use libp2p::{core, noise, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport};
use slog::{crit, debug, error, info, trace, warn}; use slog::{crit, debug, error, info, trace, warn};
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
@ -68,7 +73,7 @@ impl Service {
let network_globals = Arc::new(NetworkGlobals::new(local_peer_id.clone())); let network_globals = Arc::new(NetworkGlobals::new(local_peer_id.clone()));
let mut swarm = { let mut swarm = {
// Set up the transport - tcp/ws with secio and mplex/yamux // Set up the transport - tcp/ws with noise/secio and mplex/yamux
let transport = build_transport(local_keypair.clone()); let transport = build_transport(local_keypair.clone());
// Lighthouse network behaviour // Lighthouse network behaviour
let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?;
@ -250,7 +255,7 @@ impl Stream for Service {
} }
} }
/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, and /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise/secio as the encryption layer, and
/// mplex or yamux as the multiplexing layer. /// mplex or yamux as the multiplexing layer.
fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
@ -262,20 +267,51 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox)
let trans_clone = transport.clone(); let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone)) transport.or_transport(websocket::WsConfig::new(trans_clone))
}; };
transport // Authentication
.upgrade(core::upgrade::Version::V1) let transport = transport
.authenticate(secio::SecioConfig::new(local_private_key)) .and_then(move |stream, endpoint| {
.multiplex(core::upgrade::SelectUpgrade::new( let upgrade = core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(), generate_noise_config(&local_private_key),
libp2p::mplex::MplexConfig::new(), secio::SecioConfig::new(local_private_key),
)) );
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1).and_then(
.timeout(Duration::from_secs(20)) move |out| {
match out {
// Noise was negotiated
core::either::EitherOutput::First((remote_id, out)) => {
Ok((core::either::EitherOutput::First(out), remote_id))
}
// Secio was negotiated
core::either::EitherOutput::Second((remote_id, out)) => {
Ok((core::either::EitherOutput::Second(out), remote_id))
}
}
},
)
})
.timeout(Duration::from_secs(20));
// Multiplexing
let transport = transport
.and_then(move |(stream, peer_id), endpoint| {
let peer_id2 = peer_id.clone();
let upgrade = core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
libp2p::mplex::MplexConfig::new(),
)
.map_inbound(move |muxer| (peer_id, muxer))
.map_outbound(move |muxer| (peer_id2, muxer));
core::upgrade::apply(stream, upgrade, endpoint, core::upgrade::Version::V1)
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
.timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err)) .map_err(|err| Error::new(ErrorKind::Other, err))
.boxed() .boxed();
transport
} }
#[derive(Debug)]
/// Events that can be obtained from polling the Libp2p Service. /// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent { pub enum Libp2pEvent {
/// An RPC response request has been received on the swarm. /// An RPC response request has been received on the swarm.
@ -363,3 +399,13 @@ fn load_private_key(config: &NetworkConfig, log: &slog::Logger) -> Keypair {
} }
local_private_key local_private_key
} }
/// Generate authenticated XX Noise config from identity keys
fn generate_noise_config(
identity_keypair: &Keypair,
) -> noise::NoiseAuthenticated<noise::XX, noise::X25519, ()> {
let static_dh_keys = noise::Keypair::<noise::X25519>::new()
.into_authentic(identity_keypair)
.expect("signing can fail only once during starting a node");
noise::NoiseConfig::xx(static_dh_keys).into_authenticated()
}

View File

@ -5,6 +5,7 @@ use eth2_libp2p::NetworkConfig;
use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::Service as LibP2PService;
use slog::{debug, error, o, Drain}; use slog::{debug, error, o, Drain};
use std::time::Duration; use std::time::Duration;
use tempdir::TempDir;
pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger {
let decorator = slog_term::TermDecorator::new().build(); let decorator = slog_term::TermDecorator::new().build();
@ -24,11 +25,13 @@ pub fn build_config(
secret_key: Option<String>, secret_key: Option<String>,
) -> NetworkConfig { ) -> NetworkConfig {
let mut config = NetworkConfig::default(); let mut config = NetworkConfig::default();
let path = TempDir::new(&format!("libp2p_test{}", port)).unwrap();
config.libp2p_port = port; // tcp port config.libp2p_port = port; // tcp port
config.discovery_port = port; // udp port config.discovery_port = port; // udp port
config.boot_nodes.append(&mut boot_nodes); config.boot_nodes.append(&mut boot_nodes);
config.secret_key_hex = secret_key; config.secret_key_hex = secret_key;
config.network_dir.push(port.to_string()); config.network_dir = path.into_path();
// Reduce gossipsub heartbeat parameters // Reduce gossipsub heartbeat parameters
config.gs_config.heartbeat_initial_delay = Duration::from_millis(500); config.gs_config.heartbeat_initial_delay = Duration::from_millis(500);
config.gs_config.heartbeat_interval = Duration::from_millis(500); config.gs_config.heartbeat_interval = Duration::from_millis(500);
@ -43,7 +46,9 @@ pub fn build_libp2p_instance(
) -> LibP2PService { ) -> LibP2PService {
let config = build_config(port, boot_nodes, secret_key); let config = build_config(port, boot_nodes, secret_key);
// launch libp2p service // launch libp2p service
LibP2PService::new(&config, log.clone()).unwrap().1 LibP2PService::new(&config, log.clone())
.expect("should build libp2p instance")
.1
} }
#[allow(dead_code)] #[allow(dead_code)]

View File

@ -0,0 +1,168 @@
#![cfg(test)]
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::multiaddr::Protocol;
use eth2_libp2p::*;
use futures::prelude::*;
use libp2p::core::identity::Keypair;
use libp2p::{
core,
core::{muxing::StreamMuxerBox, nodes::Substream, transport::boxed::Boxed},
secio, PeerId, Swarm, Transport,
};
use slog::{crit, debug, info, Level};
use std::io::{Error, ErrorKind};
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::Arc;
use std::time::Duration;
use tokio::prelude::*;
mod common;
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour = Behaviour<Substream<StreamMuxerBox>>;
/// Build and return a eth2_libp2p Swarm with only secio support.
fn build_secio_swarm(
config: &NetworkConfig,
log: slog::Logger,
) -> error::Result<Swarm<Libp2pStream, Libp2pBehaviour>> {
let local_keypair = Keypair::generate_secp256k1();
let local_peer_id = PeerId::from(local_keypair.public());
let network_globals = Arc::new(NetworkGlobals::new(local_peer_id.clone()));
let mut swarm = {
// Set up the transport - tcp/ws with secio and mplex/yamux
let transport = build_secio_transport(local_keypair.clone());
// Lighthouse network behaviour
let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?;
Swarm::new(transport, behaviour, local_peer_id.clone())
};
// listen on the specified address
let listen_multiaddr = {
let mut m = Multiaddr::from(config.listen_address);
m.push(Protocol::Tcp(config.libp2p_port));
m
};
match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) {
Ok(_) => {
let mut log_address = listen_multiaddr;
log_address.push(Protocol::P2p(local_peer_id.clone().into()));
info!(log, "Listening established"; "address" => format!("{}", log_address));
}
Err(err) => {
crit!(
log,
"Unable to listen on libp2p address";
"error" => format!("{:?}", err),
"listen_multiaddr" => format!("{}", listen_multiaddr),
);
return Err("Libp2p was unable to listen on the given listen address.".into());
}
};
// helper closure for dialing peers
let mut dial_addr = |multiaddr: &Multiaddr| {
match Swarm::dial_addr(&mut swarm, multiaddr.clone()) {
Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)),
Err(err) => debug!(
log,
"Could not connect to peer"; "address" => format!("{}", multiaddr), "error" => format!("{:?}", err)
),
};
};
// attempt to connect to any specified boot-nodes
for bootnode_enr in &config.boot_nodes {
for multiaddr in &bootnode_enr.multiaddr() {
// ignore udp multiaddr if it exists
let components = multiaddr.iter().collect::<Vec<_>>();
if let Protocol::Udp(_) = components[1] {
continue;
}
dial_addr(multiaddr);
}
}
Ok(swarm)
}
/// Build a simple TCP transport with secio, mplex/yamux.
fn build_secio_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
let transport = libp2p::tcp::TcpConfig::new().nodelay(true);
transport
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
libp2p::mplex::MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed()
}
/// Test if the encryption falls back to secio if noise isn't available
#[test]
fn test_secio_noise_fallback() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Trace;
let enable_logging = true;
let log = common::build_log(log_level, enable_logging);
let noisy_config = common::build_config(56010, vec![], None);
let mut noisy_node = Service::new(&noisy_config, log.clone())
.expect("should build a libp2p instance")
.1;
let secio_config = common::build_config(56011, vec![common::get_enr(&noisy_node)], None);
// Building a custom Libp2pService from outside the crate isn't possible because of
// private fields in the Libp2pService struct. A swarm is good enough for testing
// compatibility with secio.
let mut secio_swarm =
build_secio_swarm(&secio_config, log.clone()).expect("should build a secio swarm");
let secio_log = log.clone();
let noisy_future = future::poll_fn(move || -> Poll<bool, ()> {
loop {
match noisy_node.poll().unwrap() {
_ => return Ok(Async::NotReady),
}
}
});
let secio_future = future::poll_fn(move || -> Poll<bool, ()> {
loop {
match secio_swarm.poll().unwrap() {
Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => {
// secio node negotiated a secio transport with
// the noise compatible node
info!(secio_log, "Connected to peer {}", peer_id);
return Ok(Async::Ready(true));
}
_ => return Ok(Async::NotReady),
}
}
});
// execute the futures and check the result
let test_result = Arc::new(AtomicBool::new(false));
let error_result = test_result.clone();
let thread_result = test_result.clone();
tokio::run(
noisy_future
.select(secio_future)
.timeout(Duration::from_millis(1000))
.map_err(move |_| error_result.store(false, Relaxed))
.map(move |result| {
thread_result.store(result.0, Relaxed);
}),
);
assert!(test_result.load(Relaxed));
}