Remove floodsub upgrade

This commit is contained in:
Paul Hauner 2018-08-02 23:06:25 +10:00
parent 44bcd1b40e
commit 09822e42b6
3 changed files with 6 additions and 73 deletions

View File

@ -15,7 +15,7 @@ use std::path::PathBuf;
use slog::Drain; use slog::Drain;
use clap::{ Arg, App }; use clap::{ Arg, App };
use p2p::config::NetworkConfig; use p2p::config::NetworkConfig;
use p2p::floodsub; use p2p::service;
use p2p::state::NetworkState; use p2p::state::NetworkState;
fn main() { fn main() {
@ -58,7 +58,7 @@ fn main() {
// keys::generate_keys(&log).expect("Failed to generate keys"); // keys::generate_keys(&log).expect("Failed to generate keys");
} else { } else {
let mut state = NetworkState::new(config, &log).expect("setup failed"); let mut state = NetworkState::new(config, &log).expect("setup failed");
floodsub::listen(state, &log); service::listen(state, &log);
} }
info!(log, "Exiting."); info!(log, "Exiting.");
} }

View File

@ -4,7 +4,7 @@ extern crate pem;
extern crate secp256k1; extern crate secp256k1;
extern crate slog; extern crate slog;
pub mod floodsub; pub mod service;
pub mod state; pub mod state;
// pub mod keys; // pub mod keys;
pub mod config; pub mod config;

View File

@ -14,7 +14,6 @@ extern crate libp2p_identify;
extern crate libp2p_core; extern crate libp2p_core;
extern crate libp2p_mplex; extern crate libp2p_mplex;
extern crate libp2p_tcp_transport; extern crate libp2p_tcp_transport;
extern crate libp2p_floodsub;
extern crate libp2p_kad; extern crate libp2p_kad;
extern crate slog; extern crate slog;
extern crate tokio_core; extern crate tokio_core;
@ -26,7 +25,6 @@ use super::state::NetworkState;
use self::bigint::U512; use self::bigint::U512;
use self::futures::{ Future, Stream }; use self::futures::{ Future, Stream };
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade }; use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade };
use self::libp2p_floodsub::{ FloodSubUpgrade, FloodSubFuture };
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
use self::slog::Logger; use self::slog::Logger;
@ -50,8 +48,6 @@ pub fn listen(state: NetworkState, log: &Logger)
.with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new()) .with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new())
.into_connection_reuse(); .into_connection_reuse();
let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(peer_id.clone());
let transport_sockets = { let transport_sockets = {
let listened_addrs = listened_addrs.clone(); let listened_addrs = listened_addrs.clone();
let listen_multiaddr = listen_multiaddr.clone(); let listen_multiaddr = listen_multiaddr.clone();
@ -80,7 +76,6 @@ pub fn listen(state: NetworkState, log: &Logger)
let upgrade = ConnectionUpgrader { let upgrade = ConnectionUpgrader {
kad: kad_upgrade.clone(), kad: kad_upgrade.clone(),
identify: libp2p_identify::IdentifyProtocolConfig, identify: libp2p_identify::IdentifyProtocolConfig,
floodsub: floodsub_upgrade.clone(),
}; };
let swarm_listened_addrs = listened_addrs.clone(); let swarm_listened_addrs = listened_addrs.clone();
@ -89,7 +84,6 @@ pub fn listen(state: NetworkState, log: &Logger)
transport_sockets.clone().with_upgrade(upgrade), transport_sockets.clone().with_upgrade(upgrade),
move |upgrade, client_addr| match upgrade { move |upgrade, client_addr| match upgrade {
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>, FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>,
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send( FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
IdentifyInfo { IdentifyInfo {
public_key: swarm_peer_id.clone().into_bytes(), public_key: swarm_peer_id.clone().into_bytes(),
@ -99,7 +93,6 @@ pub fn listen(state: NetworkState, log: &Logger)
protocols: vec![ protocols: vec![
"/ipfs/kad/1.0.0".to_owned(), "/ipfs/kad/1.0.0".to_owned(),
"/ipfs/id/1.0.0".to_owned(), "/ipfs/id/1.0.0".to_owned(),
"/floodsub/1.0.0".to_owned(),
] ]
}, },
&client_addr &client_addr
@ -118,21 +111,7 @@ pub fn listen(state: NetworkState, log: &Logger)
let (kad_ctl, kad_init) = kad_ctl_proto.start( let (kad_ctl, kad_init) = kad_ctl_proto.start(
swarm_ctl.clone(), swarm_ctl.clone(),
transport_sockets.clone().with_upgrade(kad_upgrade)); transport_sockets.clone().with_upgrade(kad_upgrade.clone()));
let topic = libp2p_floodsub::TopicBuilder::new("lighthouse").build();
let floodsub_ctl = libp2p_floodsub::FloodSubController::
new(&floodsub_upgrade);
floodsub_ctl.subscribe(&topic);
let floodsub_rx = floodsub_rx.for_each(|msg| {
if let Ok(msg) = String::from_utf8(msg.data) {
info!(log, "< {}", msg);
}
Ok(())
});
let kad_poll = { let kad_poll = {
let polling_peer_id = peer_id.clone(); let polling_peer_id = peer_id.clone();
@ -152,9 +131,7 @@ pub fn listen(state: NetworkState, log: &Logger)
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
let dial_result = swarm_ctl.dial( let dial_result = swarm_ctl.dial(
peer_addr, peer_addr,
transport_sockets.clone().with_upgrade( transport_sockets.clone().with_upgrade(kad_upgrade.clone())
floodsub_upgrade.clone()
)
); );
if let Err(err) = dial_result { if let Err(err) = dial_result {
warn!(log, "Dialling {:?} failed.", err) warn!(log, "Dialling {:?} failed.", err)
@ -164,19 +141,7 @@ pub fn listen(state: NetworkState, log: &Logger)
}) })
}; };
let stdin = build_stdin_future().for_each(move |msg| {
info!(log, "> {}", msg);
floodsub_ctl.publish(&topic, msg.into_bytes());
Ok(())
});
let final_future = swarm_future let final_future = swarm_future
.select(floodsub_rx)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(stdin)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(kad_poll) .select(kad_poll)
.map_err(|(err, _)| err) .map_err(|(err, _)| err)
.map(|((), _)| ()) .map(|((), _)| ())
@ -187,29 +152,10 @@ pub fn listen(state: NetworkState, log: &Logger)
core.run(final_future).unwrap(); core.run(final_future).unwrap();
} }
fn build_stdin_future() -> impl Stream<Item = String, Error = IoError> {
use std::mem;
let mut buffer = Vec::new();
tokio_stdin::spawn_stdin_stream_unbounded()
.map_err(|_| -> IoError { panic!() })
.filter_map(move |msg| {
if msg != b'\r' && msg != b'\n' {
buffer.push(msg);
return None;
} else if buffer.is_empty() {
return None;
}
Some(String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap())
})
}
#[derive(Clone)] #[derive(Clone)]
struct ConnectionUpgrader<P, R> { struct ConnectionUpgrader<P, R> {
kad: KademliaUpgrade<P, R>, kad: KademliaUpgrade<P, R>,
identify: libp2p_identify::IdentifyProtocolConfig, identify: libp2p_identify::IdentifyProtocolConfig,
floodsub: FloodSubUpgrade
} }
impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R> impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R>
@ -229,7 +175,6 @@ where
vec![ vec![
(Bytes::from("/ipfs/kad/1.0.0"), 0), (Bytes::from("/ipfs/kad/1.0.0"), 0),
(Bytes::from("/ipfs/id/1.0.0"), 1), (Bytes::from("/ipfs/id/1.0.0"), 1),
(Bytes::from("/floodsub/1.0.0"), 2),
].into_iter() ].into_iter()
} }
@ -250,10 +195,6 @@ where
self.identify self.identify
.upgrade(socket, (), ty, remote_addr) .upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into())), .map(|upg| upg.into())),
2 => Box::new(
self.floodsub
.upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into())),
_ => unreachable!() _ => unreachable!()
} }
@ -262,8 +203,7 @@ where
enum FinalUpgrade<C> { enum FinalUpgrade<C> {
Kad(KademliaProcessingFuture), Kad(KademliaProcessingFuture),
Identify(IdentifyOutput<C>), Identify(IdentifyOutput<C>)
FloodSub(FloodSubFuture),
} }
impl<C> From<libp2p_kad::KademliaProcessingFuture> for FinalUpgrade<C> { impl<C> From<libp2p_kad::KademliaProcessingFuture> for FinalUpgrade<C> {
@ -279,10 +219,3 @@ impl<C> From<IdentifyOutput<C>> for FinalUpgrade<C> {
FinalUpgrade::Identify(upgrade) FinalUpgrade::Identify(upgrade)
} }
} }
impl<C> From<FloodSubFuture> for FinalUpgrade<C> {
#[inline]
fn from(upgrade: FloodSubFuture) -> Self {
FinalUpgrade::FloodSub(upgrade)
}
}