From e6d4f28133ce9fa96520b93966f8c0e8ccad65df Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 4 Aug 2018 13:45:02 +1000 Subject: [PATCH] Add comments to Network, add mpsc --- src/main.rs | 7 ++- src/p2p/service.rs | 113 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 100 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 228b0689b..b5b98c6b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::path::PathBuf; use slog::Drain; use clap::{ Arg, App }; use p2p::config::NetworkConfig; -use p2p::service; +use p2p::service::NetworkService; use p2p::state::NetworkState; fn main() { @@ -58,7 +58,10 @@ fn main() { // keys::generate_keys(&log).expect("Failed to generate keys"); } else { let mut state = NetworkState::new(config, &log).expect("setup failed"); - service::listen(state, &log); + let service = NetworkService::new(state, log.new(o!())); + service.send(vec![31, 32, 33]); + service.bg_thread.join().unwrap(); + } info!(log, "Exiting."); } diff --git a/src/p2p/service.rs b/src/p2p/service.rs index 6ba2c8265..9cb11d0ba 100644 --- a/src/p2p/service.rs +++ b/src/p2p/service.rs @@ -23,45 +23,85 @@ extern crate tokio_stdin; use super::state::NetworkState; use self::bigint::U512; -use self::futures::{ Future, Stream }; +use self::futures::{ Future, Stream, Poll }; +use self::futures::sync::mpsc::{ unbounded, UnboundedSender, UnboundedReceiver }; use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade }; use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::slog::Logger; use std::sync::{ Arc, RwLock }; use std::time::{ Duration, Instant }; +// use std::sync::mpsc::{ channel, Sender, Receiver }; +use std::thread; use std::ops::Deref; use std::io::Error as IoError; use self::tokio_io::{ AsyncRead, AsyncWrite }; use self::bytes::Bytes; -pub fn listen(state: NetworkState, log: &Logger) +pub struct NetworkService { + tx: UnboundedSender>, + pub bg_thread: thread::JoinHandle<()>, +} + +impl NetworkService { + pub fn new(state: NetworkState, log: Logger) -> Self { + let (tx, rx) = unbounded(); + let net_rx = NetworkReciever{ inner: rx }; + let bg_thread = thread::spawn(move || { + listen(state, net_rx, log); + }); + + Self { + tx, + bg_thread, + } + } + + pub fn send(&self, msg: Vec) { + self.tx.unbounded_send(msg); + } +} + +pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger) { let peer_store = state.peer_store; let peer_id = state.peer_id; let listen_multiaddr = state.listen_multiaddr; - - let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); let listened_addrs = Arc::new(RwLock::new(vec![])); + + // Build a tokio core + let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); + // Build a base TCP libp2p transport let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) .with_upgrade(libp2p_core::upgrade::PlainTextConfig) .with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new()) .into_connection_reuse(); - let transport_sockets = { + // Build an identify transport to allow identification and negotiation + // of layers running atop the TCP transport (e.g., kad) + let identify_transport = { let listened_addrs = listened_addrs.clone(); let listen_multiaddr = listen_multiaddr.clone(); IdentifyTransport::new(transport.clone(), peer_store.clone()) + // Managed NAT'ed connections - ensuring the external IP + // is stored not the internal addr. .map(move |out, _, _| { - if let(Some(ref observed), ref listen_multiaddr) = (out.observed_addr, listen_multiaddr) { - if let Some(viewed_from_outside) = transport.nat_traversal(listen_multiaddr, observed) { - listened_addrs.write().unwrap().push(viewed_from_outside); + if let(Some(ref observed), ref listen_multiaddr) = + (out.observed_addr, listen_multiaddr) + { + if let Some(viewed_from_outside) = + transport.nat_traversal(listen_multiaddr, observed) + { + listened_addrs.write().unwrap() + .push(viewed_from_outside); } } out.socket }) }; + // Configure and build a Kademlia upgrade to be applied + // to the base TCP transport. let kad_config = libp2p_kad::KademliaConfig { parallelism: 3, record_store: (), @@ -69,19 +109,23 @@ pub fn listen(state: NetworkState, log: &Logger) local_peer_id: peer_id.clone(), timeout: Duration::from_secs(2) }; + let kad_ctl_proto = libp2p_kad:: + KademliaControllerPrototype::new(kad_config); + let kad_upgrade = libp2p_kad:: + KademliaUpgrade::from_prototype(&kad_ctl_proto); - let kad_ctl_proto = libp2p_kad::KademliaControllerPrototype::new(kad_config); - let kad_upgrade = libp2p_kad::KademliaUpgrade::from_prototype(&kad_ctl_proto); - + // Combine the Kademlia and Identify upgrades into a single + // upgrader struct. let upgrade = ConnectionUpgrader { kad: kad_upgrade.clone(), identify: libp2p_identify::IdentifyProtocolConfig, }; - + + // Build a Swarm to manage upgrading connections to peers. let swarm_listened_addrs = listened_addrs.clone(); let swarm_peer_id = peer_id.clone(); let (swarm_ctl, swarm_future) = libp2p_core::swarm( - transport_sockets.clone().with_upgrade(upgrade), + identify_transport.clone().with_upgrade(upgrade), move |upgrade, client_addr| match upgrade { FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( @@ -103,16 +147,21 @@ pub fn listen(state: NetworkState, log: &Logger) }, ); + // Start the Swarm controller listening on the local machine let actual_addr = swarm_ctl .listen_on(listen_multiaddr) .expect("Failed to listen on multiaddr"); - info!(log, "libp2p listening"; "listen_addr" => actual_addr.to_string()); + // Convert the kad prototype into a controller by providing it the + // newly built swarm. let (kad_ctl, kad_init) = kad_ctl_proto.start( swarm_ctl.clone(), - transport_sockets.clone().with_upgrade(kad_upgrade.clone())); + identify_transport.clone().with_upgrade(kad_upgrade.clone())); + // Generate a tokio timer "wheel" future that sends kad FIND_NODE at + // a routine interval. + let kad_poll_log = log.new(o!()); let kad_poll = { let polling_peer_id = peer_id.clone(); tokio_timer::wheel() @@ -125,23 +174,36 @@ pub fn listen(state: NetworkState, log: &Logger) for peer in peers { let peer_hash = U512::from(peer.hash()); let distance = 512 - (local_hash ^ peer_hash).leading_zeros(); - info!(log, "Discovered peer"; + info!(kad_poll_log, "Discovered peer"; "distance" => distance, "peer_id" => peer.to_base58()); let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); let dial_result = swarm_ctl.dial( peer_addr, - transport_sockets.clone().with_upgrade(kad_upgrade.clone()) + identify_transport.clone().with_upgrade(kad_upgrade.clone()) ); if let Err(err) = dial_result { - warn!(log, "Dialling {:?} failed.", err) + warn!(kad_poll_log, "Dialling {:?} failed.", err) } } Ok(()) }) }; + let kad_send_log = log.new(o!()); + let kad_send = rx.for_each(|msg| { + if let Ok(msg) = String::from_utf8(msg) { + info!(kad_send_log, "message: {:?}", msg); + } + Ok(()) + }); + + // Generate a future featuring the kad init future + // and the kad polling cycle. let final_future = swarm_future + .select(kad_send) + .map_err(|(err, _)| err) + .map(|((), _)| ()) .select(kad_poll) .map_err(|(err, _)| err) .map(|((), _)| ()) @@ -152,6 +214,21 @@ pub fn listen(state: NetworkState, log: &Logger) core.run(final_future).unwrap(); } +pub struct NetworkReciever { + inner: UnboundedReceiver>, +} + +impl Stream for NetworkReciever { + type Item = Vec; + type Error = IoError; + + fn poll(&mut self) -> Poll, Self::Error> { + self.inner + .poll() + .map_err(|_| unreachable!()) + } +} + #[derive(Clone)] struct ConnectionUpgrader { kad: KademliaUpgrade,