Add "Client" concept and RocksDB

This commit is contained in:
Paul Hauner 2018-08-16 14:17:28 +10:00
parent 3372583c18
commit 9689142883
11 changed files with 180 additions and 91 deletions

View File

@ -15,6 +15,7 @@ ethereum-types = ""
futures = "0.1.23"
network-libp2p = { path = "network-libp2p" }
rand = "0.3"
rocksdb = "0.10.1"
rlp = { git = "https://github.com/paritytech/parity-common" }
slog = "^2.2.3"
slog-term = "^2.4.0"

74
lighthouse/client/mod.rs Normal file
View File

@ -0,0 +1,74 @@
use std::sync::{ Arc, RwLock };
use std::thread;
use super::db::{ DB, open_db };
use super::config::LighthouseConfig;
use super::futures::sync::mpsc::{
unbounded,
};
use super::network_libp2p::service::listen as network_listen;
use super::network_libp2p::state::NetworkState;
use super::slog::Logger;
use super::sync::start_sync;
pub struct Client {
pub db: Arc<RwLock<DB>>,
pub threads: Vec<thread::JoinHandle<()>>
}
impl Client {
pub fn new(config: LighthouseConfig,
log: Logger)
-> Self
{
// Open the local db
let db = {
let db = open_db(&config.data_dir);
Arc::new(RwLock::new(db))
};
// Start the network thread
let network_state = NetworkState::new(
&config.data_dir,
&config.p2p_listen_port,
&log).expect("Network setup failed");
let (network_thread, network_tx, network_rx) = {
let (message_sender, message_receiver) = unbounded();
let (event_sender, event_receiver) = unbounded();
let network_log = log.new(o!());
let thread = thread::spawn(move || {
network_listen(
network_state,
event_sender,
message_receiver,
network_log,
);
});
(thread, message_sender, event_receiver)
};
// Start the sync thread
let (sync_thread, _sync_tx, _sync_rx) = {
let (sync_out_sender, sync_out_receiver) = unbounded();
let (sync_in_sender, sync_in_receiver) = unbounded();
let sync_log = log.new(o!());
let sync_db = Arc::clone(&db);
let thread = thread::spawn(move || {
start_sync(
sync_db,
network_tx.clone(),
network_rx,
sync_out_sender,
sync_in_receiver,
sync_log,
);
});
(thread, sync_in_sender, sync_out_receiver)
};
// Return the client struct
Self {
db: db,
threads: vec![sync_thread, network_thread]
}
}
}

View File

@ -1,4 +1,4 @@
use std::env;
use std::{ env, fs };
use std::path::PathBuf;
#[derive(Clone)]
@ -17,6 +17,7 @@ impl LighthouseConfig {
.expect("Unable to determine home dir.");
home.join(DEFAULT_LIGHTHOUSE_DIR)
};
fs::create_dir_all(&data_dir).expect(&format!("Unable to create {:?}", &data_dir));
let p2p_listen_port = 0;
Self {
data_dir,

14
lighthouse/db/mod.rs Normal file
View File

@ -0,0 +1,14 @@
extern crate rocksdb;
use std::fs;
use std::path::Path;
pub use self::rocksdb::DB;
pub fn open_db(path: &Path) -> DB {
let db_path = path.join("rocksdb");
fs::create_dir_all(&db_path)
.expect(&format!("Unable to create {:?}", &db_path));
let db = DB::open_default(db_path.join("lighthouse.rdb"))
.expect("Unable to open local database.");
db
}

View File

@ -4,8 +4,10 @@ extern crate slog_term;
extern crate slog_async;
extern crate clap;
extern crate network_libp2p;
extern crate futures;
pub mod pubkeystore;
pub mod db;
pub mod client;
pub mod shuffling;
pub mod state;
pub mod sync;
@ -17,9 +19,7 @@ use std::path::PathBuf;
use slog::Drain;
use clap::{ Arg, App };
use config::LighthouseConfig;
use network_libp2p::service::NetworkService;
use network_libp2p::state::NetworkState;
use sync::sync_start;
use client::Client;
fn main() {
let decorator = slog_term::TermDecorator::new().build();
@ -64,14 +64,11 @@ fn main() {
info!(log, "";
"data_dir" => &config.data_dir.to_str(),
"port" => &config.p2p_listen_port);
let state = NetworkState::new(
&config.data_dir,
&config.p2p_listen_port,
&log)
.expect("setup failed");
let (service, net_rx) = NetworkService::new(state, log.new(o!()));
sync_start(service, net_rx, log.new(o!()));
let client = Client::new(config, log.new(o!()));
for thread in client.threads {
thread.join().unwrap();
}
info!(log, "Exiting.");
}

View File

@ -1,7 +0,0 @@
use super::state::block::Block;
#[allow(unused_variables)]
pub fn verify_block(block: &Block, proposer: &usize) -> bool {
// TODO: fetch proposer pubkey and verify it.
return true;
}

View File

@ -1,6 +1,6 @@
pub struct ShardAndCommittee {
shard_id: u16,
committee: Vec<u32>
pub shard_id: u16,
pub committee: Vec<u32>
}
impl ShardAndCommittee {

View File

@ -2,36 +2,57 @@ extern crate futures;
extern crate slog;
extern crate tokio;
use super::network_libp2p::service::NetworkService;
use self::futures::sync::mpsc::UnboundedReceiver;
use self::futures::sync::mpsc::{
UnboundedReceiver,
UnboundedSender,
};
use self::futures::Stream;
use slog::Logger;
use self::tokio::timer::Interval;
use self::tokio::prelude::*;
use std::sync::{ RwLock, Arc };
use super::network_libp2p::message::{
NetworkEvent,
OutgoingMessage,
};
use super::db::DB;
use slog::Logger;
use std::time::{ Duration, Instant };
pub fn sync_start(service: NetworkService,
net_stream: UnboundedReceiver<Vec<u8>>,
log: Logger)
{
let net_rx = net_stream
.for_each(move |msg| {
debug!(&log, "Sync receive"; "msg" => format!("{:?}", msg));
// service.send("hello".to_bytes());
type NetworkSender = UnboundedSender<OutgoingMessage>;
type NetworkReceiver = UnboundedReceiver<NetworkEvent>;
type SyncSender = UnboundedSender<Vec<u8>>;
type SyncReceiver = UnboundedReceiver<Vec<u8>>;
pub fn start_sync(
_db: Arc<RwLock<DB>>,
network_tx: NetworkSender,
network_rx: NetworkReceiver,
_sync_tx: SyncSender,
_sync_rx: SyncReceiver,
log: Logger) {
let rx_future = network_rx
.for_each(move |event| {
debug!(&log, "Sync receive";
"msg" => format!("{:?}", event));
Ok(())
})
.map_err(|_| panic!("rx failed"));
let poll = Interval::new(Instant::now(), Duration::from_secs(2))
let poll_future = Interval::new(Instant::now(), Duration::from_secs(2))
.for_each(move |_| {
service.send(vec![42, 42, 42]);
let msg = OutgoingMessage {
peer: None,
data: vec![42, 42, 42]
};
network_tx.unbounded_send(msg);
Ok(())
})
.map_err(|_| panic!("send failed"));
let sync_future = poll
.select(net_rx).map_err(|(err, _)| err)
let sync_future = poll_future
.select(rx_future).map_err(|(err, _)| err)
.and_then(|((), n)| n);
tokio::run(sync_future);

View File

@ -5,5 +5,6 @@ extern crate secp256k1;
#[macro_use]
extern crate slog;
pub mod message;
pub mod service;
pub mod state;

View File

@ -0,0 +1,18 @@
#[derive(Debug)]
pub enum NetworkEventType {
PeerConnect,
PeerDrop,
Message,
}
#[derive(Debug)]
pub struct NetworkEvent {
pub event: NetworkEventType,
pub data: Option<Vec<u8>>,
}
#[derive(Debug)]
pub struct OutgoingMessage {
pub peer: Option<String>,
pub data: Vec<u8>,
}

View File

@ -15,10 +15,11 @@ extern crate tokio_timer;
extern crate tokio_stdin;
use super::state::NetworkState;
use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage };
use self::bigint::U512;
use self::futures::{ Future, Stream, Poll };
use self::futures::sync::mpsc::{
unbounded, UnboundedSender, UnboundedReceiver
UnboundedSender, UnboundedReceiver
};
use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr,
Transport, ConnectionUpgrade };
@ -26,61 +27,18 @@ use self::libp2p_kad::{ KademliaUpgrade, KademliaProcessingFuture};
use self::libp2p_floodsub::{ FloodSubFuture, FloodSubUpgrade };
use self::libp2p_identify::{ IdentifyInfo, IdentifyTransport, IdentifyOutput };
use self::slog::Logger;
use std::sync::{ Arc, RwLock, Mutex };
use std::sync::{ Arc, RwLock };
use std::time::{ Duration, Instant };
use std::thread;
use std::ops::Deref;
use std::io::Error as IoError;
use std::collections::VecDeque;
use self::tokio_io::{ AsyncRead, AsyncWrite };
use self::bytes::Bytes;
pub use self::libp2p_floodsub::Message;
pub struct NetworkService {
app_to_net: UnboundedSender<Vec<u8>>,
pub bg_thread: thread::JoinHandle<()>,
msgs: Mutex<VecDeque<Message>>,
}
impl NetworkService {
/// Create a new network service. Spawns a new thread running tokio
/// services. Accepts a NetworkState, which is derived from a NetworkConfig.
/// Also accepts a logger.
pub fn new(state: NetworkState, log: Logger)
-> (Self, UnboundedReceiver<Vec<u8>>)
{
let (input_tx, input_rx) = unbounded(); // app -> net
let (output_tx, output_rx) = unbounded(); // net -> app
let bg_thread = thread::spawn(move || {
listen(state, output_tx, input_rx, log);
});
let msgs = Mutex::new(VecDeque::new());
let ns = Self {
app_to_net: input_tx,
bg_thread,
msgs,
};
(ns, output_rx)
}
/// Sends a message (byte vector) to the network. The present network
/// determines which the recipients of the message.
pub fn send(&self, msg: Vec<u8>) {
self.app_to_net.unbounded_send(msg).expect("unable to contact network")
}
pub fn read_message(&mut self)
-> Option<Message>
{
let mut buf = self.msgs.lock().unwrap();
buf.pop_front()
}
}
fn listen(state: NetworkState,
app_tx: UnboundedSender<Vec<u8>>,
raw_rx: UnboundedReceiver<Vec<u8>>,
pub fn listen(state: NetworkState,
events_to_app: UnboundedSender<NetworkEvent>,
raw_rx: UnboundedReceiver<OutgoingMessage>,
log: Logger)
{
let peer_store = state.peer_store;
@ -195,6 +153,7 @@ fn listen(state: NetworkState,
// Generate a tokio timer "wheel" future that sends kad FIND_NODE at
// a routine interval.
let kad_poll_log = log.new(o!());
let kad_poll_event_tx = events_to_app.clone();
let kad_poll = {
let polling_peer_id = peer_id.clone();
tokio_timer::wheel()
@ -217,8 +176,14 @@ fn listen(state: NetworkState,
);
if let Err(err) = dial_result {
warn!(kad_poll_log, "Dialling {:?} failed.", err)
}
}
};
let event = NetworkEvent {
event: NetworkEventType::PeerConnect,
data: None,
};
kad_poll_event_tx.unbounded_send(event)
.expect("Network unable to contact application.");
};
Ok(())
})
};
@ -226,7 +191,11 @@ fn listen(state: NetworkState,
// Create a future to handle message recieved from the network
let floodsub_rx = floodsub_rx.for_each(|msg| {
debug!(&log, "Network receive"; "msg" => format!("{:?}", msg));
app_tx.unbounded_send(msg.data)
let event = NetworkEvent {
event: NetworkEventType::Message,
data: Some(msg.data),
};
events_to_app.unbounded_send(event)
.expect("Network unable to contact application.");
Ok(())
});
@ -234,7 +203,7 @@ fn listen(state: NetworkState,
// Create a future to handle messages recieved from the application
let app_rx = rx.for_each(|msg| {
debug!(&log, "Network publish"; "msg" => format!("{:?}", msg));
floodsub_ctl.publish(&topic, msg);
floodsub_ctl.publish(&topic, msg.data);
Ok(())
});
@ -249,11 +218,11 @@ fn listen(state: NetworkState,
}
struct ApplicationReciever {
inner: UnboundedReceiver<Vec<u8>>,
inner: UnboundedReceiver<OutgoingMessage>,
}
impl Stream for ApplicationReciever {
type Item = Vec<u8>;
type Item = OutgoingMessage;
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {