diff --git a/Cargo.toml b/Cargo.toml index 2e43db450..b3ece7de5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ ethereum-types = "" futures = "0.1.23" network-libp2p = { path = "network-libp2p" } rand = "0.3" +rocksdb = "0.10.1" rlp = { git = "https://github.com/paritytech/parity-common" } slog = "^2.2.3" slog-term = "^2.4.0" diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs new file mode 100644 index 000000000..ad6ebb7ca --- /dev/null +++ b/lighthouse/client/mod.rs @@ -0,0 +1,74 @@ +use std::sync::{ Arc, RwLock }; +use std::thread; +use super::db::{ DB, open_db }; +use super::config::LighthouseConfig; +use super::futures::sync::mpsc::{ + unbounded, +}; +use super::network_libp2p::service::listen as network_listen; +use super::network_libp2p::state::NetworkState; +use super::slog::Logger; +use super::sync::start_sync; + +pub struct Client { + pub db: Arc>, + pub threads: Vec> +} + +impl Client { + pub fn new(config: LighthouseConfig, + log: Logger) + -> Self + { + // Open the local db + let db = { + let db = open_db(&config.data_dir); + Arc::new(RwLock::new(db)) + }; + + // Start the network thread + let network_state = NetworkState::new( + &config.data_dir, + &config.p2p_listen_port, + &log).expect("Network setup failed"); + let (network_thread, network_tx, network_rx) = { + let (message_sender, message_receiver) = unbounded(); + let (event_sender, event_receiver) = unbounded(); + let network_log = log.new(o!()); + let thread = thread::spawn(move || { + network_listen( + network_state, + event_sender, + message_receiver, + network_log, + ); + }); + (thread, message_sender, event_receiver) + }; + + // Start the sync thread + let (sync_thread, _sync_tx, _sync_rx) = { + let (sync_out_sender, sync_out_receiver) = unbounded(); + let (sync_in_sender, sync_in_receiver) = unbounded(); + let sync_log = log.new(o!()); + let sync_db = Arc::clone(&db); + let thread = thread::spawn(move || { + start_sync( + sync_db, + network_tx.clone(), + network_rx, + sync_out_sender, + sync_in_receiver, + sync_log, + ); + }); + (thread, sync_in_sender, sync_out_receiver) + }; + + // Return the client struct + Self { + db: db, + threads: vec![sync_thread, network_thread] + } + } +} diff --git a/lighthouse/config/mod.rs b/lighthouse/config/mod.rs index 31f77e2a7..c5674d43b 100644 --- a/lighthouse/config/mod.rs +++ b/lighthouse/config/mod.rs @@ -1,4 +1,4 @@ -use std::env; +use std::{ env, fs }; use std::path::PathBuf; #[derive(Clone)] @@ -17,6 +17,7 @@ impl LighthouseConfig { .expect("Unable to determine home dir."); home.join(DEFAULT_LIGHTHOUSE_DIR) }; + fs::create_dir_all(&data_dir).expect(&format!("Unable to create {:?}", &data_dir)); let p2p_listen_port = 0; Self { data_dir, diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs new file mode 100644 index 000000000..92323b3ac --- /dev/null +++ b/lighthouse/db/mod.rs @@ -0,0 +1,14 @@ +extern crate rocksdb; + +use std::fs; +use std::path::Path; +pub use self::rocksdb::DB; + +pub fn open_db(path: &Path) -> DB { + let db_path = path.join("rocksdb"); + fs::create_dir_all(&db_path) + .expect(&format!("Unable to create {:?}", &db_path)); + let db = DB::open_default(db_path.join("lighthouse.rdb")) + .expect("Unable to open local database."); + db +} diff --git a/lighthouse/main.rs b/lighthouse/main.rs index 60c89d36d..5bac541aa 100644 --- a/lighthouse/main.rs +++ b/lighthouse/main.rs @@ -4,8 +4,10 @@ extern crate slog_term; extern crate slog_async; extern crate clap; extern crate network_libp2p; +extern crate futures; -pub mod pubkeystore; +pub mod db; +pub mod client; pub mod shuffling; pub mod state; pub mod sync; @@ -17,9 +19,7 @@ use std::path::PathBuf; use slog::Drain; use clap::{ Arg, App }; use config::LighthouseConfig; -use network_libp2p::service::NetworkService; -use network_libp2p::state::NetworkState; -use sync::sync_start; +use client::Client; fn main() { let decorator = slog_term::TermDecorator::new().build(); @@ -64,14 +64,11 @@ fn main() { info!(log, ""; "data_dir" => &config.data_dir.to_str(), "port" => &config.p2p_listen_port); - - let state = NetworkState::new( - &config.data_dir, - &config.p2p_listen_port, - &log) - .expect("setup failed"); - let (service, net_rx) = NetworkService::new(state, log.new(o!())); - sync_start(service, net_rx, log.new(o!())); + + let client = Client::new(config, log.new(o!())); + for thread in client.threads { + thread.join().unwrap(); + } info!(log, "Exiting."); } diff --git a/lighthouse/pubkeystore/mod.rs b/lighthouse/pubkeystore/mod.rs deleted file mode 100644 index 414c9e7fa..000000000 --- a/lighthouse/pubkeystore/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -use super::state::block::Block; - -#[allow(unused_variables)] -pub fn verify_block(block: &Block, proposer: &usize) -> bool { - // TODO: fetch proposer pubkey and verify it. - return true; -} diff --git a/lighthouse/state/shard_and_committee.rs b/lighthouse/state/shard_and_committee.rs index 64d2d8a90..788d78c4b 100644 --- a/lighthouse/state/shard_and_committee.rs +++ b/lighthouse/state/shard_and_committee.rs @@ -1,6 +1,6 @@ pub struct ShardAndCommittee { - shard_id: u16, - committee: Vec + pub shard_id: u16, + pub committee: Vec } impl ShardAndCommittee { diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index 35b8e7cfe..516b067f8 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -2,36 +2,57 @@ extern crate futures; extern crate slog; extern crate tokio; -use super::network_libp2p::service::NetworkService; -use self::futures::sync::mpsc::UnboundedReceiver; +use self::futures::sync::mpsc::{ + UnboundedReceiver, + UnboundedSender, +}; use self::futures::Stream; -use slog::Logger; use self::tokio::timer::Interval; use self::tokio::prelude::*; +use std::sync::{ RwLock, Arc }; +use super::network_libp2p::message::{ + NetworkEvent, + OutgoingMessage, +}; +use super::db::DB; +use slog::Logger; use std::time::{ Duration, Instant }; -pub fn sync_start(service: NetworkService, - net_stream: UnboundedReceiver>, - log: Logger) -{ - let net_rx = net_stream - .for_each(move |msg| { - debug!(&log, "Sync receive"; "msg" => format!("{:?}", msg)); - // service.send("hello".to_bytes()); +type NetworkSender = UnboundedSender; +type NetworkReceiver = UnboundedReceiver; + +type SyncSender = UnboundedSender>; +type SyncReceiver = UnboundedReceiver>; + +pub fn start_sync( + _db: Arc>, + network_tx: NetworkSender, + network_rx: NetworkReceiver, + _sync_tx: SyncSender, + _sync_rx: SyncReceiver, + log: Logger) { + let rx_future = network_rx + .for_each(move |event| { + debug!(&log, "Sync receive"; + "msg" => format!("{:?}", event)); Ok(()) }) .map_err(|_| panic!("rx failed")); - let poll = Interval::new(Instant::now(), Duration::from_secs(2)) + let poll_future = Interval::new(Instant::now(), Duration::from_secs(2)) .for_each(move |_| { - service.send(vec![42, 42, 42]); + let msg = OutgoingMessage { + peer: None, + data: vec![42, 42, 42] + }; + network_tx.unbounded_send(msg); Ok(()) }) .map_err(|_| panic!("send failed")); - let sync_future = poll - .select(net_rx).map_err(|(err, _)| err) + let sync_future = poll_future + .select(rx_future).map_err(|(err, _)| err) .and_then(|((), n)| n); tokio::run(sync_future); diff --git a/network-libp2p/src/lib.rs b/network-libp2p/src/lib.rs index f7a50f520..bbec78c08 100644 --- a/network-libp2p/src/lib.rs +++ b/network-libp2p/src/lib.rs @@ -5,5 +5,6 @@ extern crate secp256k1; #[macro_use] extern crate slog; +pub mod message; pub mod service; pub mod state; diff --git a/network-libp2p/src/message.rs b/network-libp2p/src/message.rs new file mode 100644 index 000000000..aa3ef54f0 --- /dev/null +++ b/network-libp2p/src/message.rs @@ -0,0 +1,18 @@ +#[derive(Debug)] +pub enum NetworkEventType { + PeerConnect, + PeerDrop, + Message, +} + +#[derive(Debug)] +pub struct NetworkEvent { + pub event: NetworkEventType, + pub data: Option>, +} + +#[derive(Debug)] +pub struct OutgoingMessage { + pub peer: Option, + pub data: Vec, +} diff --git a/network-libp2p/src/service.rs b/network-libp2p/src/service.rs index 3448b6b00..1189c5444 100644 --- a/network-libp2p/src/service.rs +++ b/network-libp2p/src/service.rs @@ -15,10 +15,11 @@ extern crate tokio_timer; extern crate tokio_stdin; use super::state::NetworkState; +use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage }; use self::bigint::U512; use self::futures::{ Future, Stream, Poll }; use self::futures::sync::mpsc::{ - unbounded, UnboundedSender, UnboundedReceiver + UnboundedSender, UnboundedReceiver }; use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, Transport, ConnectionUpgrade }; @@ -26,61 +27,18 @@ use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture}; use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade }; use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput }; use self::slog::Logger; -use std::sync::{ Arc, RwLock, Mutex }; +use std::sync::{ Arc, RwLock }; use std::time::{ Duration, Instant }; -use std::thread; use std::ops::Deref; use std::io::Error as IoError; -use std::collections::VecDeque; use self::tokio_io::{ AsyncRead, AsyncWrite }; use self::bytes::Bytes; pub use self::libp2p_floodsub::Message; -pub struct NetworkService { - app_to_net: UnboundedSender>, - pub bg_thread: thread::JoinHandle<()>, - msgs: Mutex>, -} - -impl NetworkService { - /// Create a new network service. Spawns a new thread running tokio - /// services. Accepts a NetworkState, which is derived from a NetworkConfig. - /// Also accepts a logger. - pub fn new(state: NetworkState, log: Logger) - -> (Self, UnboundedReceiver>) - { - let (input_tx, input_rx) = unbounded(); // app -> net - let (output_tx, output_rx) = unbounded(); // net -> app - let bg_thread = thread::spawn(move || { - listen(state, output_tx, input_rx, log); - }); - let msgs = Mutex::new(VecDeque::new()); - let ns = Self { - app_to_net: input_tx, - bg_thread, - msgs, - }; - (ns, output_rx) - } - - /// Sends a message (byte vector) to the network. The present network - /// determines which the recipients of the message. - pub fn send(&self, msg: Vec) { - self.app_to_net.unbounded_send(msg).expect("unable to contact network") - } - - pub fn read_message(&mut self) - -> Option - { - let mut buf = self.msgs.lock().unwrap(); - buf.pop_front() - } -} - -fn listen(state: NetworkState, - app_tx: UnboundedSender>, - raw_rx: UnboundedReceiver>, +pub fn listen(state: NetworkState, + events_to_app: UnboundedSender, + raw_rx: UnboundedReceiver, log: Logger) { let peer_store = state.peer_store; @@ -195,6 +153,7 @@ fn listen(state: NetworkState, // Generate a tokio timer "wheel" future that sends kad FIND_NODE at // a routine interval. let kad_poll_log = log.new(o!()); + let kad_poll_event_tx = events_to_app.clone(); let kad_poll = { let polling_peer_id = peer_id.clone(); tokio_timer::wheel() @@ -217,8 +176,14 @@ fn listen(state: NetworkState, ); if let Err(err) = dial_result { warn!(kad_poll_log, "Dialling {:?} failed.", err) - } - } + }; + let event = NetworkEvent { + event: NetworkEventType::PeerConnect, + data: None, + }; + kad_poll_event_tx.unbounded_send(event) + .expect("Network unable to contact application."); + }; Ok(()) }) }; @@ -226,7 +191,11 @@ fn listen(state: NetworkState, // Create a future to handle message recieved from the network let floodsub_rx = floodsub_rx.for_each(|msg| { debug!(&log, "Network receive"; "msg" => format!("{:?}", msg)); - app_tx.unbounded_send(msg.data) + let event = NetworkEvent { + event: NetworkEventType::Message, + data: Some(msg.data), + }; + events_to_app.unbounded_send(event) .expect("Network unable to contact application."); Ok(()) }); @@ -234,7 +203,7 @@ fn listen(state: NetworkState, // Create a future to handle messages recieved from the application let app_rx = rx.for_each(|msg| { debug!(&log, "Network publish"; "msg" => format!("{:?}", msg)); - floodsub_ctl.publish(&topic, msg); + floodsub_ctl.publish(&topic, msg.data); Ok(()) }); @@ -249,11 +218,11 @@ fn listen(state: NetworkState, } struct ApplicationReciever { - inner: UnboundedReceiver>, + inner: UnboundedReceiver, } impl Stream for ApplicationReciever { - type Item = Vec; + type Item = OutgoingMessage; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> {