Add comments to Network, add mpsc
This commit is contained in:
parent
09822e42b6
commit
e6d4f28133
@ -15,7 +15,7 @@ use std::path::PathBuf;
|
|||||||
use slog::Drain;
|
use slog::Drain;
|
||||||
use clap::{ Arg, App };
|
use clap::{ Arg, App };
|
||||||
use p2p::config::NetworkConfig;
|
use p2p::config::NetworkConfig;
|
||||||
use p2p::service;
|
use p2p::service::NetworkService;
|
||||||
use p2p::state::NetworkState;
|
use p2p::state::NetworkState;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -58,7 +58,10 @@ fn main() {
|
|||||||
// keys::generate_keys(&log).expect("Failed to generate keys");
|
// keys::generate_keys(&log).expect("Failed to generate keys");
|
||||||
} else {
|
} else {
|
||||||
let mut state = NetworkState::new(config, &log).expect("setup failed");
|
let mut state = NetworkState::new(config, &log).expect("setup failed");
|
||||||
service::listen(state, &log);
|
let service = NetworkService::new(state, log.new(o!()));
|
||||||
|
service.send(vec![31, 32, 33]);
|
||||||
|
service.bg_thread.join().unwrap();
|
||||||
|
|
||||||
}
|
}
|
||||||
info!(log, "Exiting.");
|
info!(log, "Exiting.");
|
||||||
}
|
}
|
||||||
|
@ -23,45 +23,85 @@ extern crate tokio_stdin;
|
|||||||
|
|
||||||
use super::state::NetworkState;
|
use super::state::NetworkState;
|
||||||
use self::bigint::U512;
|
use self::bigint::U512;
|
||||||
use self::futures::{ Future, Stream };
|
use self::futures::{ Future, Stream, Poll };
|
||||||
|
use self::futures::sync::mpsc::{ unbounded, UnboundedSender, UnboundedReceiver };
|
||||||
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade };
|
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade };
|
||||||
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
|
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
|
||||||
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
|
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
|
||||||
use self::slog::Logger;
|
use self::slog::Logger;
|
||||||
use std::sync::{ Arc, RwLock };
|
use std::sync::{ Arc, RwLock };
|
||||||
use std::time::{ Duration, Instant };
|
use std::time::{ Duration, Instant };
|
||||||
|
// use std::sync::mpsc::{ channel, Sender, Receiver };
|
||||||
|
use std::thread;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
use self::tokio_io::{ AsyncRead, AsyncWrite };
|
use self::tokio_io::{ AsyncRead, AsyncWrite };
|
||||||
use self::bytes::Bytes;
|
use self::bytes::Bytes;
|
||||||
|
|
||||||
pub fn listen(state: NetworkState, log: &Logger)
|
pub struct NetworkService {
|
||||||
|
tx: UnboundedSender<Vec<u8>>,
|
||||||
|
pub bg_thread: thread::JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetworkService {
|
||||||
|
pub fn new(state: NetworkState, log: Logger) -> Self {
|
||||||
|
let (tx, rx) = unbounded();
|
||||||
|
let net_rx = NetworkReciever{ inner: rx };
|
||||||
|
let bg_thread = thread::spawn(move || {
|
||||||
|
listen(state, net_rx, log);
|
||||||
|
});
|
||||||
|
|
||||||
|
Self {
|
||||||
|
tx,
|
||||||
|
bg_thread,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&self, msg: Vec<u8>) {
|
||||||
|
self.tx.unbounded_send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
|
||||||
{
|
{
|
||||||
let peer_store = state.peer_store;
|
let peer_store = state.peer_store;
|
||||||
let peer_id = state.peer_id;
|
let peer_id = state.peer_id;
|
||||||
let listen_multiaddr = state.listen_multiaddr;
|
let listen_multiaddr = state.listen_multiaddr;
|
||||||
|
|
||||||
let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
|
|
||||||
let listened_addrs = Arc::new(RwLock::new(vec![]));
|
let listened_addrs = Arc::new(RwLock::new(vec![]));
|
||||||
|
|
||||||
|
// Build a tokio core
|
||||||
|
let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
|
||||||
|
// Build a base TCP libp2p transport
|
||||||
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
||||||
.with_upgrade(libp2p_core::upgrade::PlainTextConfig)
|
.with_upgrade(libp2p_core::upgrade::PlainTextConfig)
|
||||||
.with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new())
|
.with_upgrade(libp2p_mplex::BufferedMultiplexConfig::<[_; 256]>::new())
|
||||||
.into_connection_reuse();
|
.into_connection_reuse();
|
||||||
|
|
||||||
let transport_sockets = {
|
// Build an identify transport to allow identification and negotiation
|
||||||
|
// of layers running atop the TCP transport (e.g., kad)
|
||||||
|
let identify_transport = {
|
||||||
let listened_addrs = listened_addrs.clone();
|
let listened_addrs = listened_addrs.clone();
|
||||||
let listen_multiaddr = listen_multiaddr.clone();
|
let listen_multiaddr = listen_multiaddr.clone();
|
||||||
IdentifyTransport::new(transport.clone(), peer_store.clone())
|
IdentifyTransport::new(transport.clone(), peer_store.clone())
|
||||||
|
// Managed NAT'ed connections - ensuring the external IP
|
||||||
|
// is stored not the internal addr.
|
||||||
.map(move |out, _, _| {
|
.map(move |out, _, _| {
|
||||||
if let(Some(ref observed), ref listen_multiaddr) = (out.observed_addr, listen_multiaddr) {
|
if let(Some(ref observed), ref listen_multiaddr) =
|
||||||
if let Some(viewed_from_outside) = transport.nat_traversal(listen_multiaddr, observed) {
|
(out.observed_addr, listen_multiaddr)
|
||||||
listened_addrs.write().unwrap().push(viewed_from_outside);
|
{
|
||||||
|
if let Some(viewed_from_outside) =
|
||||||
|
transport.nat_traversal(listen_multiaddr, observed)
|
||||||
|
{
|
||||||
|
listened_addrs.write().unwrap()
|
||||||
|
.push(viewed_from_outside);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
out.socket
|
out.socket
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Configure and build a Kademlia upgrade to be applied
|
||||||
|
// to the base TCP transport.
|
||||||
let kad_config = libp2p_kad::KademliaConfig {
|
let kad_config = libp2p_kad::KademliaConfig {
|
||||||
parallelism: 3,
|
parallelism: 3,
|
||||||
record_store: (),
|
record_store: (),
|
||||||
@ -69,19 +109,23 @@ pub fn listen(state: NetworkState, log: &Logger)
|
|||||||
local_peer_id: peer_id.clone(),
|
local_peer_id: peer_id.clone(),
|
||||||
timeout: Duration::from_secs(2)
|
timeout: Duration::from_secs(2)
|
||||||
};
|
};
|
||||||
|
let kad_ctl_proto = libp2p_kad::
|
||||||
|
KademliaControllerPrototype::new(kad_config);
|
||||||
|
let kad_upgrade = libp2p_kad::
|
||||||
|
KademliaUpgrade::from_prototype(&kad_ctl_proto);
|
||||||
|
|
||||||
let kad_ctl_proto = libp2p_kad::KademliaControllerPrototype::new(kad_config);
|
// Combine the Kademlia and Identify upgrades into a single
|
||||||
let kad_upgrade = libp2p_kad::KademliaUpgrade::from_prototype(&kad_ctl_proto);
|
// upgrader struct.
|
||||||
|
|
||||||
let upgrade = ConnectionUpgrader {
|
let upgrade = ConnectionUpgrader {
|
||||||
kad: kad_upgrade.clone(),
|
kad: kad_upgrade.clone(),
|
||||||
identify: libp2p_identify::IdentifyProtocolConfig,
|
identify: libp2p_identify::IdentifyProtocolConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Build a Swarm to manage upgrading connections to peers.
|
||||||
let swarm_listened_addrs = listened_addrs.clone();
|
let swarm_listened_addrs = listened_addrs.clone();
|
||||||
let swarm_peer_id = peer_id.clone();
|
let swarm_peer_id = peer_id.clone();
|
||||||
let (swarm_ctl, swarm_future) = libp2p_core::swarm(
|
let (swarm_ctl, swarm_future) = libp2p_core::swarm(
|
||||||
transport_sockets.clone().with_upgrade(upgrade),
|
identify_transport.clone().with_upgrade(upgrade),
|
||||||
move |upgrade, client_addr| match upgrade {
|
move |upgrade, client_addr| match upgrade {
|
||||||
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
|
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
|
||||||
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
|
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
|
||||||
@ -103,16 +147,21 @@ pub fn listen(state: NetworkState, log: &Logger)
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Start the Swarm controller listening on the local machine
|
||||||
let actual_addr = swarm_ctl
|
let actual_addr = swarm_ctl
|
||||||
.listen_on(listen_multiaddr)
|
.listen_on(listen_multiaddr)
|
||||||
.expect("Failed to listen on multiaddr");
|
.expect("Failed to listen on multiaddr");
|
||||||
|
|
||||||
info!(log, "libp2p listening"; "listen_addr" => actual_addr.to_string());
|
info!(log, "libp2p listening"; "listen_addr" => actual_addr.to_string());
|
||||||
|
|
||||||
|
// Convert the kad prototype into a controller by providing it the
|
||||||
|
// newly built swarm.
|
||||||
let (kad_ctl, kad_init) = kad_ctl_proto.start(
|
let (kad_ctl, kad_init) = kad_ctl_proto.start(
|
||||||
swarm_ctl.clone(),
|
swarm_ctl.clone(),
|
||||||
transport_sockets.clone().with_upgrade(kad_upgrade.clone()));
|
identify_transport.clone().with_upgrade(kad_upgrade.clone()));
|
||||||
|
|
||||||
|
// Generate a tokio timer "wheel" future that sends kad FIND_NODE at
|
||||||
|
// a routine interval.
|
||||||
|
let kad_poll_log = log.new(o!());
|
||||||
let kad_poll = {
|
let kad_poll = {
|
||||||
let polling_peer_id = peer_id.clone();
|
let polling_peer_id = peer_id.clone();
|
||||||
tokio_timer::wheel()
|
tokio_timer::wheel()
|
||||||
@ -125,23 +174,36 @@ pub fn listen(state: NetworkState, log: &Logger)
|
|||||||
for peer in peers {
|
for peer in peers {
|
||||||
let peer_hash = U512::from(peer.hash());
|
let peer_hash = U512::from(peer.hash());
|
||||||
let distance = 512 - (local_hash ^ peer_hash).leading_zeros();
|
let distance = 512 - (local_hash ^ peer_hash).leading_zeros();
|
||||||
info!(log, "Discovered peer";
|
info!(kad_poll_log, "Discovered peer";
|
||||||
"distance" => distance,
|
"distance" => distance,
|
||||||
"peer_id" => peer.to_base58());
|
"peer_id" => peer.to_base58());
|
||||||
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
|
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
|
||||||
let dial_result = swarm_ctl.dial(
|
let dial_result = swarm_ctl.dial(
|
||||||
peer_addr,
|
peer_addr,
|
||||||
transport_sockets.clone().with_upgrade(kad_upgrade.clone())
|
identify_transport.clone().with_upgrade(kad_upgrade.clone())
|
||||||
);
|
);
|
||||||
if let Err(err) = dial_result {
|
if let Err(err) = dial_result {
|
||||||
warn!(log, "Dialling {:?} failed.", err)
|
warn!(kad_poll_log, "Dialling {:?} failed.", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let kad_send_log = log.new(o!());
|
||||||
|
let kad_send = rx.for_each(|msg| {
|
||||||
|
if let Ok(msg) = String::from_utf8(msg) {
|
||||||
|
info!(kad_send_log, "message: {:?}", msg);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
// Generate a future featuring the kad init future
|
||||||
|
// and the kad polling cycle.
|
||||||
let final_future = swarm_future
|
let final_future = swarm_future
|
||||||
|
.select(kad_send)
|
||||||
|
.map_err(|(err, _)| err)
|
||||||
|
.map(|((), _)| ())
|
||||||
.select(kad_poll)
|
.select(kad_poll)
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.map(|((), _)| ())
|
.map(|((), _)| ())
|
||||||
@ -152,6 +214,21 @@ pub fn listen(state: NetworkState, log: &Logger)
|
|||||||
core.run(final_future).unwrap();
|
core.run(final_future).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct NetworkReciever {
|
||||||
|
inner: UnboundedReceiver<Vec<u8>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for NetworkReciever {
|
||||||
|
type Item = Vec<u8>;
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
self.inner
|
||||||
|
.poll()
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct ConnectionUpgrader<P, R> {
|
struct ConnectionUpgrader<P, R> {
|
||||||
kad: KademliaUpgrade<P, R>,
|
kad: KademliaUpgrade<P, R>,
|
||||||
|
Loading…
Reference in New Issue
Block a user