Restructure network and config

This commit is contained in:
Paul Hauner 2018-08-07 09:13:24 +10:00
parent e6d4f28133
commit 819527038e
9 changed files with 182 additions and 127 deletions

View File

@ -27,6 +27,8 @@ rlp = { git = "https://github.com/paritytech/parity-common" }
slog = "^2.2.3"
slog-term = "^2.4.0"
slog-async = "^2.3.0"
tokio = "0.1"
# Old tokio required for libp2p
tokio-io = "0.1"
tokio-core = "0.1"
tokio-timer = "0.1"

26
src/config/mod.rs Normal file
View File

@ -0,0 +1,26 @@
use std::env;
use std::path::PathBuf;
#[derive(Clone)]
pub struct LighthouseConfig {
pub data_dir: PathBuf,
pub p2p_listen_port: String,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse";
impl LighthouseConfig {
/// Build a new lighthouse configuration from defaults.
pub fn default() -> Self{
let data_dir = {
let home = env::home_dir()
.expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
};
let p2p_listen_port = "0".to_string();
Self {
data_dir,
p2p_listen_port,
}
}
}

View File

@ -8,15 +8,18 @@ extern crate libp2p_peerstore;
pub mod p2p;
pub mod pubkeystore;
pub mod state;
pub mod sync;
pub mod utils;
pub mod config;
use std::path::PathBuf;
use slog::Drain;
use clap::{ Arg, App };
use p2p::config::NetworkConfig;
use config::LighthouseConfig;
use p2p::service::NetworkService;
use p2p::state::NetworkState;
use sync::sync_start;
fn main() {
let decorator = slog_term::TermDecorator::new().build();
@ -40,17 +43,16 @@ fn main() {
.takes_value(true))
.get_matches();
let mut config = NetworkConfig::default();
let mut config = LighthouseConfig::default();
// Custom datadir
if let Some(dir) = matches.value_of("datadir") {
config.data_dir = PathBuf::from(dir.to_string());
}
// Custom listen port
// Custom p2p listen port
if let Some(port) = matches.value_of("port") {
config.listen_multiaddr =
NetworkConfig::multiaddr_on_port(&port.to_string());
config.p2p_listen_port = port.to_string();
}
info!(log, ""; "data_dir" => &config.data_dir.to_str());
@ -58,10 +60,8 @@ fn main() {
// keys::generate_keys(&log).expect("Failed to generate keys");
} else {
let mut state = NetworkState::new(config, &log).expect("setup failed");
let service = NetworkService::new(state, log.new(o!()));
service.send(vec![31, 32, 33]);
service.bg_thread.join().unwrap();
let (service, net_rx) = NetworkService::new(state, log.new(o!()));
sync_start(service, net_rx, log.new(o!()));
}
info!(log, "Exiting.");
}

View File

@ -1,33 +0,0 @@
use std::env;
use std::path::PathBuf;
use super::libp2p_core::Multiaddr;
#[derive(Clone)]
pub struct NetworkConfig {
pub data_dir: PathBuf,
pub listen_multiaddr: Multiaddr,
}
const DEFAULT_LIGHTHOUSE_DIR: &str = ".lighthouse";
impl NetworkConfig {
pub fn default() -> Self{
let data_dir = {
let home = env::home_dir()
.expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
};
Self {
data_dir,
listen_multiaddr: NetworkConfig::multiaddr_on_port("0")
}
}
/// Return a TCP multiaddress on 0.0.0.0 for a given port.
pub fn multiaddr_on_port(port: &str) -> Multiaddr {
return format!("/ip4/0.0.0.0/tcp/{}", port)
.parse::<Multiaddr>().unwrap()
}
}

View File

@ -1,40 +0,0 @@
extern crate secp256k1;
extern crate libp2p_peerstore;
extern crate rand;
extern crate hex;
use std;
use std::io::prelude::*;
use std::fs::File;
use slog::Logger;
use std::io::Error as IoError;
use std::path::Path;
use super::config::NetworkConfig;
use self::secp256k1::key::{ SecretKey, PublicKey };
use self::libp2p_peerstore::PeerId;
const LOCAL_PK_FILE: &str = "local.pk";
const LOCAL_SK_FILE: &str = "local.sk";
const BOOTSTRAP_PK_FILE: &str = "bootstrap.pk";
/// Generates a new public and secret key pair and writes them to
/// individual files.
///
/// This function should only be present during
/// early development states and should be removed.
pub fn generate_keys(config: NetworkConfig, log: &Logger)
-> Result<(), IoError>
{
// TODO: remove this method and import pem files instead
info!(log, "Generating keys...");
let mut rng = rand::thread_rng();
let curve = secp256k1::Secp256k1::new();
let s = SecretKey::new(&curve, &mut rng);
let s_vec = &s[..];
let s_string = hex::encode(s_vec);
let mut s_file = File::create(LOCAL_SK_FILE)?;
info!(log, "Writing secret key...");
s_file.write(s_string.as_bytes())?;
Ok(())
}

View File

@ -6,5 +6,3 @@ extern crate slog;
pub mod service;
pub mod state;
// pub mod keys;
pub mod config;

View File

@ -10,6 +10,7 @@ extern crate bigint;
extern crate bytes;
extern crate futures;
extern crate libp2p_peerstore;
extern crate libp2p_floodsub;
extern crate libp2p_identify;
extern crate libp2p_core;
extern crate libp2p_mplex;
@ -24,50 +25,77 @@ extern crate tokio_stdin;
use super::state::NetworkState;
use self::bigint::U512;
use self::futures::{ Future, Stream, Poll };
use self::futures::sync::mpsc::{ unbounded, UnboundedSender, UnboundedReceiver };
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade };
use self::futures::sync::mpsc::{
unbounded, UnboundedSender, UnboundedReceiver
};
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr,
Transport, ConnectionUpgrade };
use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade };
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
use self::slog::Logger;
use std::sync::{ Arc, RwLock };
use std::sync::{ Arc, RwLock, Mutex };
use std::time::{ Duration, Instant };
// use std::sync::mpsc::{ channel, Sender, Receiver };
use std::thread;
use std::ops::Deref;
use std::io::Error as IoError;
use std::collections::VecDeque;
use self::tokio_io::{ AsyncRead, AsyncWrite };
use self::bytes::Bytes;
pub use self::libp2p_floodsub::Message;
pub struct NetworkService {
tx: UnboundedSender<Vec<u8>>,
app_to_net: UnboundedSender<Vec<u8>>,
pub bg_thread: thread::JoinHandle<()>,
msgs: Mutex<VecDeque<Message>>,
}
impl NetworkService {
pub fn new(state: NetworkState, log: Logger) -> Self {
let (tx, rx) = unbounded();
let net_rx = NetworkReciever{ inner: rx };
/// Create a new network service. Spawns a new thread running tokio
/// services. Accepts a NetworkState, which is derived from a NetworkConfig.
/// Also accepts a logger.
pub fn new(state: NetworkState, log: Logger)
-> (Self, UnboundedReceiver<Vec<u8>>)
{
let (input_tx, input_rx) = unbounded(); // app -> net
let (output_tx, output_rx) = unbounded(); // net -> app
let bg_thread = thread::spawn(move || {
listen(state, net_rx, log);
listen(state, output_tx, input_rx, log);
});
Self {
tx,
let msgs = Mutex::new(VecDeque::new());
let ns = Self {
app_to_net: input_tx,
bg_thread,
}
msgs,
};
(ns, output_rx)
}
/// Sends a message (byte vector) to the network. The present network
/// determines which the recipients of the message.
pub fn send(&self, msg: Vec<u8>) {
self.tx.unbounded_send(msg);
self.app_to_net.unbounded_send(msg).expect("unable to contact network")
}
pub fn read_message(&mut self)
-> Option<Message>
{
let mut buf = self.msgs.lock().unwrap();
buf.pop_front()
}
}
pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
fn listen(state: NetworkState,
app_tx: UnboundedSender<Vec<u8>>,
raw_rx: UnboundedReceiver<Vec<u8>>,
log: Logger)
{
let peer_store = state.peer_store;
let peer_id = state.peer_id;
let listen_multiaddr = state.listen_multiaddr;
let listened_addrs = Arc::new(RwLock::new(vec![]));
let rx = ApplicationReciever{ inner: raw_rx };
// Build a tokio core
let mut core = tokio_core::reactor::Core::new().expect("tokio failure.");
@ -113,11 +141,17 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
KademliaControllerPrototype::new(kad_config);
let kad_upgrade = libp2p_kad::
KademliaUpgrade::from_prototype(&kad_ctl_proto);
// Build a floodsub upgrade to allow pushing topic'ed
// messages across the network.
let (floodsub_upgrade, floodsub_rx) =
FloodSubUpgrade::new(peer_id.clone());
// Combine the Kademlia and Identify upgrades into a single
// upgrader struct.
let upgrade = ConnectionUpgrader {
kad: kad_upgrade.clone(),
floodsub: floodsub_upgrade.clone(),
identify: libp2p_identify::IdentifyProtocolConfig,
};
@ -128,6 +162,7 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
identify_transport.clone().with_upgrade(upgrade),
move |upgrade, client_addr| match upgrade {
FinalUpgrade::Kad(kad) => Box::new(kad) as Box<_>,
FinalUpgrade::FloodSub(future) => Box::new(future) as Box<_>,
FinalUpgrade::Identify(IdentifyOutput::Sender { sender, .. }) => sender.send(
IdentifyInfo {
public_key: swarm_peer_id.clone().into_bytes(),
@ -137,6 +172,7 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
protocols: vec![
"/ipfs/kad/1.0.0".to_owned(),
"/ipfs/id/1.0.0".to_owned(),
"/floodsub/1.0.0".to_owned(),
]
},
&client_addr
@ -159,6 +195,11 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
swarm_ctl.clone(),
identify_transport.clone().with_upgrade(kad_upgrade.clone()));
// Create a new floodsub controller using a specific topic
let topic = libp2p_floodsub::TopicBuilder::new("beacon_chain").build();
let floodsub_ctl = libp2p_floodsub::FloodSubController::new(&floodsub_upgrade);
floodsub_ctl.subscribe(&topic);
// Generate a tokio timer "wheel" future that sends kad FIND_NODE at
// a routine interval.
let kad_poll_log = log.new(o!());
@ -180,7 +221,7 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
let peer_addr = AddrComponent::P2P(peer.into_bytes()).into();
let dial_result = swarm_ctl.dial(
peer_addr,
identify_transport.clone().with_upgrade(kad_upgrade.clone())
identify_transport.clone().with_upgrade(floodsub_upgrade.clone())
);
if let Err(err) = dial_result {
warn!(kad_poll_log, "Dialling {:?} failed.", err)
@ -190,35 +231,36 @@ pub fn listen(state: NetworkState, rx: NetworkReciever, log: Logger)
})
};
let kad_send_log = log.new(o!());
let kad_send = rx.for_each(|msg| {
if let Ok(msg) = String::from_utf8(msg) {
info!(kad_send_log, "message: {:?}", msg);
}
// Create a future to handle message recieved from the network
let floodsub_rx = floodsub_rx.for_each(|msg| {
debug!(&log, "Network receive"; "msg" => format!("{:?}", msg));
app_tx.unbounded_send(msg.data)
.expect("Network unable to contact application.");
Ok(())
});
// Generate a future featuring the kad init future
// and the kad polling cycle.
// Create a future to handle messages recieved from the application
let app_rx = rx.for_each(|msg| {
debug!(&log, "Network publish"; "msg" => format!("{:?}", msg));
floodsub_ctl.publish(&topic, msg);
Ok(())
});
// Generate a full future
let final_future = swarm_future
.select(kad_send)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(kad_poll)
.map_err(|(err, _)| err)
.map(|((), _)| ())
.select(kad_init)
.map_err(|(err, _)| err)
.and_then(|((), n)| n);
.select(floodsub_rx).map_err(|(err, _)| err).map(|((), _)| ())
.select(app_rx).map_err(|(err, _)| err).map(|((), _)| ())
.select(kad_poll).map_err(|(err, _)| err).map(|((), _)| ())
.select(kad_init).map_err(|(err, _)| err).and_then(|((), n)| n);
core.run(final_future).unwrap();
}
pub struct NetworkReciever {
struct ApplicationReciever {
inner: UnboundedReceiver<Vec<u8>>,
}
impl Stream for NetworkReciever {
impl Stream for ApplicationReciever {
type Item = Vec<u8>;
type Error = IoError;
@ -233,6 +275,7 @@ impl Stream for NetworkReciever {
struct ConnectionUpgrader<P, R> {
kad: KademliaUpgrade<P, R>,
identify: libp2p_identify::IdentifyProtocolConfig,
floodsub: FloodSubUpgrade,
}
impl<C, P, R, Pc> ConnectionUpgrade<C> for ConnectionUpgrader<P, R>
@ -252,6 +295,7 @@ where
vec![
(Bytes::from("/ipfs/kad/1.0.0"), 0),
(Bytes::from("/ipfs/id/1.0.0"), 1),
(Bytes::from("/floodsub/1.0.0"), 2),
].into_iter()
}
@ -272,6 +316,11 @@ where
self.identify
.upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into())),
2 => Box::new(
self.floodsub
.upgrade(socket, (), ty, remote_addr)
.map(|upg| upg.into()),
),
_ => unreachable!()
}
@ -280,11 +329,11 @@ where
enum FinalUpgrade<C> {
Kad(KademliaProcessingFuture),
Identify(IdentifyOutput<C>)
Identify(IdentifyOutput<C>),
FloodSub(FloodSubFuture),
}
impl<C> From<libp2p_kad::KademliaProcessingFuture> for FinalUpgrade<C> {
#[inline]
impl<C> From<libp2p_kad::KademliaProcessingFuture> for FinalUpgrade<C> { #[inline]
fn from(upgrade: libp2p_kad::KademliaProcessingFuture) -> Self {
FinalUpgrade::Kad(upgrade)
}
@ -296,3 +345,10 @@ impl<C> From<IdentifyOutput<C>> for FinalUpgrade<C> {
FinalUpgrade::Identify(upgrade)
}
}
impl<C> From<FloodSubFuture> for FinalUpgrade<C> {
#[inline]
fn from(upgr: FloodSubFuture) -> Self {
FinalUpgrade::FloodSub(upgr)
}
}

View File

@ -6,7 +6,7 @@ use std::fs::File;
use std::sync::Arc;
use std::time::Duration;
use super::config::NetworkConfig;
use super::super::config::LighthouseConfig;
use super::libp2p_core::Multiaddr;
use super::libp2p_peerstore::{ Peerstore, PeerAccess, PeerId };
use super::libp2p_peerstore::json_peerstore::JsonPeerstore;
@ -20,7 +20,7 @@ const PEERS_FILE: &str = "peerstore.json";
const LOCAL_PEM_FILE: &str = "local_peer_id.pem";
pub struct NetworkState {
pub config: NetworkConfig,
pub config: LighthouseConfig,
pub pubkey: PublicKey,
pub seckey: SecretKey,
pub peer_id: PeerId,
@ -29,7 +29,7 @@ pub struct NetworkState {
}
impl NetworkState {
pub fn new(config: NetworkConfig, log: &Logger) -> Result <Self, Box<Error>> {
pub fn new(config: LighthouseConfig, log: &Logger) -> Result <Self, Box<Error>> {
let curve = Secp256k1::new();
let seckey = match
NetworkState::load_secret_key_from_pem_file(&config, &curve)
@ -47,7 +47,9 @@ impl NetworkState {
Arc::new(base)
};
info!(log, "Loaded peerstore"; "peer_count" => &peer_store.peers().count());
let listen_multiaddr = config.listen_multiaddr.clone();
// let listen_multiaddr = config.listen_multiaddr.clone();
let listen_multiaddr =
NetworkState::multiaddr_on_port(&config.p2p_listen_port);
Ok(Self {
config: config,
seckey,
@ -58,6 +60,12 @@ impl NetworkState {
})
}
/// Return a TCP multiaddress on 0.0.0.0 for a given port.
pub fn multiaddr_on_port(port: &str) -> Multiaddr {
return format!("/ip4/0.0.0.0/tcp/{}", port)
.parse::<Multiaddr>().unwrap()
}
pub fn add_peer(&mut self,
peer_id: PeerId,
multiaddr: Multiaddr,
@ -67,7 +75,7 @@ impl NetworkState {
}
/// Instantiate a SecretKey from a .pem file on disk.
pub fn load_secret_key_from_pem_file(config: &NetworkConfig, curve: &Secp256k1)
pub fn load_secret_key_from_pem_file(config: &LighthouseConfig, curve: &Secp256k1)
-> Result<SecretKey, Box<Error>>
{
let path = config.data_dir.join(LOCAL_PEM_FILE);
@ -81,7 +89,7 @@ impl NetworkState {
/// Generate a new SecretKey and store it on disk as a .pem file.
pub fn generate_new_secret_key(
config: &NetworkConfig,
config: &LighthouseConfig,
curve: &Secp256k1)
-> Result<SecretKey, Box<Error>>
{

38
src/sync/mod.rs Normal file
View File

@ -0,0 +1,38 @@
extern crate futures;
extern crate slog;
extern crate tokio;
use super::p2p::service::NetworkService;
use self::futures::sync::mpsc::UnboundedReceiver;
use self::futures::Stream;
use slog::Logger;
use self::tokio::timer::Interval;
use self::tokio::prelude::*;
use std::time::{ Duration, Instant };
pub fn sync_start(service: NetworkService,
net_stream: UnboundedReceiver<Vec<u8>>,
log: Logger)
{
let net_rx = net_stream
.for_each(move |msg| {
debug!(&log, "Sync receive"; "msg" => format!("{:?}", msg));
// service.send("hello".to_bytes());
Ok(())
})
.map_err(|_| panic!("rx failed"));
let poll = Interval::new(Instant::now(), Duration::from_secs(2))
.for_each(move |_| {
service.send(vec![42, 42, 42]);
Ok(())
})
.map_err(|_| panic!("send failed"));
let sync_future = poll
.select(net_rx).map_err(|(err, _)| err)
.and_then(|((), n)| n);
tokio::run(sync_future);
}