diff --git a/src/main.rs b/src/main.rs index c7c15578b..228b0689b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::path::PathBuf; use slog::Drain; use clap::{ Arg, App }; use p2p::config::NetworkConfig; -use p2p::floodsub; +use p2p::service; use p2p::state::NetworkState; fn main() { @@ -58,7 +58,7 @@ fn main() { // keys::generate_keys(&log).expect("Failed to generate keys"); } else { let mut state = NetworkState::new(config, &log).expect("setup failed"); - floodsub::listen(state, &log); + service::listen(state, &log); } info!(log, "Exiting."); } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 9ba21ff35..460386f33 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -4,7 +4,7 @@ extern crate pem; extern crate secp256k1; extern crate slog; -pub mod floodsub; +pub mod service; pub mod state; // pub mod keys; pub mod config; diff --git a/src/p2p/floodsub.rs b/src/p2p/service.rs similarity index 76% rename from src/p2p/floodsub.rs rename to src/p2p/service.rs index f98b16450..6ba2c8265 100644 --- a/src/p2p/floodsub.rs +++ b/src/p2p/service.rs @@ -14,7 +14,6 @@ extern crate libp2p_identify; extern crate libp2p_core; extern crate libp2p_mplex; extern crate libp2p_tcp_transport; -extern crate libp2p_floodsub; extern crate libp2p_kad; extern crate slog; extern crate tokio_core; @@ -26,7 +25,6 @@ use super::state::NetworkState; use self::bigint::U512; use self::futures::{ Future, Stream }; use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade }; -use self::libp2p_floodsub::{ FloodSubUpgrade, FloodSubFuture }; use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::slog::Logger; @@ -49,8 +47,6 @@ pub fn listen(state: NetworkState, log: &Logger) .with_upgrade(libp2p_core::upgrade::PlainTextConfig) .with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new()) .into_connection_reuse(); - - let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(peer_id.clone()); let transport_sockets = { let listened_addrs = listened_addrs.clone(); @@ -80,7 +76,6 @@ pub fn listen(state: NetworkState, log: &Logger) let upgrade = ConnectionUpgrader { kad: kad_upgrade.clone(), identify: libp2p_identify::IdentifyProtocolConfig, - floodsub: floodsub_upgrade.clone(), }; let swarm_listened_addrs = listened_addrs.clone(); @@ -89,7 +84,6 @@ pub fn listen(state: NetworkState, log: &Logger) transport_sockets.clone().with_upgrade(upgrade), move |upgrade, client_addr| match upgrade { FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, - FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>, FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( IdentifyInfo { public_key: swarm_peer_id.clone().into_bytes(), @@ -99,7 +93,6 @@ pub fn listen(state: NetworkState, log: &Logger) protocols: vec![ "/ipfs/kad/1.0.0".to_owned(), "/ipfs/id/1.0.0".to_owned(), - "/floodsub/1.0.0".to_owned(), ] }, &client_addr @@ -118,21 +111,7 @@ pub fn listen(state: NetworkState, log: &Logger) let (kad_ctl, kad_init) = kad_ctl_proto.start( swarm_ctl.clone(), - transport_sockets.clone().with_upgrade(kad_upgrade)); - - let topic = libp2p_floodsub::TopicBuilder::new("lighthouse").build(); - - let floodsub_ctl = libp2p_floodsub::FloodSubController:: - new(&floodsub_upgrade); - - floodsub_ctl.subscribe(&topic); - - let floodsub_rx = floodsub_rx.for_each(|msg| { - if let Ok(msg) = String::from_utf8(msg.data) { - info!(log, "< {}", msg); - } - Ok(()) - }); + transport_sockets.clone().with_upgrade(kad_upgrade.clone())); let kad_poll = { let polling_peer_id = peer_id.clone(); @@ -152,9 +131,7 @@ pub fn listen(state: NetworkState, log: &Logger) let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); let dial_result = swarm_ctl.dial( peer_addr, - transport_sockets.clone().with_upgrade( - floodsub_upgrade.clone() - ) + transport_sockets.clone().with_upgrade(kad_upgrade.clone()) ); if let Err(err) = dial_result { warn!(log, "Dialling {:?} failed.", err) @@ -164,19 +141,7 @@ pub fn listen(state: NetworkState, log: &Logger) }) }; - let stdin = build_stdin_future().for_each(move |msg| { - info!(log, "> {}", msg); - floodsub_ctl.publish(&topic, msg.into_bytes()); - Ok(()) - }); - let final_future = swarm_future - .select(floodsub_rx) - .map_err(|(err, _)| err) - .map(|((), _)| ()) - .select(stdin) - .map_err(|(err, _)| err) - .map(|((), _)| ()) .select(kad_poll) .map_err(|(err, _)| err) .map(|((), _)| ()) @@ -187,29 +152,10 @@ pub fn listen(state: NetworkState, log: &Logger) core.run(final_future).unwrap(); } -fn build_stdin_future() -> impl Stream { - use std::mem; - - let mut buffer = Vec::new(); - tokio_stdin::spawn_stdin_stream_unbounded() - .map_err(|_| -> IoError { panic!() }) - .filter_map(move |msg| { - if msg != b'\r' && msg != b'\n' { - buffer.push(msg); - return None; - } else if buffer.is_empty() { - return None; - } - - Some(String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap()) - }) -} - #[derive(Clone)] struct ConnectionUpgrader { kad: KademliaUpgrade, identify: libp2p_identify::IdentifyProtocolConfig, - floodsub: FloodSubUpgrade } impl ConnectionUpgrade for ConnectionUpgrader @@ -229,7 +175,6 @@ where vec![ (Bytes::from("/ipfs/kad/1.0.0"), 0), (Bytes::from("/ipfs/id/1.0.0"), 1), - (Bytes::from("/floodsub/1.0.0"), 2), ].into_iter() } @@ -250,10 +195,6 @@ where self.identify .upgrade(socket, (), ty, remote_addr) .map(|upg| upg.into())), - 2 => Box::new( - self.floodsub - .upgrade(socket, (), ty, remote_addr) - .map(|upg| upg.into())), _ => unreachable!() } @@ -262,8 +203,7 @@ where enum FinalUpgrade { Kad(KademliaProcessingFuture), - Identify(IdentifyOutput), - FloodSub(FloodSubFuture), + Identify(IdentifyOutput) } impl From for FinalUpgrade { @@ -279,10 +219,3 @@ impl From> for FinalUpgrade { FinalUpgrade::Identify(upgrade) } } - -impl From for FinalUpgrade { - #[inline] - fn from(upgrade: FloodSubFuture) -> Self { - FinalUpgrade::FloodSub(upgrade) - } -}