diff --git a/README.md b/README.md index 7ee0638c4..a3e82b162 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Lighthouse: a (future) Ethereum 2.0 client -[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) +[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/sigp/lighthouse?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) A **work-in-progress** implementation of the Ethereum 2.0 Beacon Chain in Rust. diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index f87f05738..ee73c2d29 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -1,6 +1,6 @@ -use std::sync::{ Arc, RwLock }; +use std::sync::Arc; use std::thread; -use super::db::{ DB, open_db }; +use super::db::{ DiskDB }; use super::config::LighthouseConfig; use super::futures::sync::mpsc::{ unbounded, @@ -10,10 +10,12 @@ use super::network_libp2p::state::NetworkState; use super::slog::Logger; use super::sync::run_sync_future; +use super::db::ClientDB; + /// Represents the co-ordination of the /// networking, syncing and RPC (not-yet-implemented) threads. pub struct Client { - pub db: Arc>, + pub db: Arc, pub network_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>, } @@ -29,8 +31,8 @@ impl Client { { // Open the local db let db = { - let db = open_db(&config.data_dir); - Arc::new(RwLock::new(db)) + let db = DiskDB::open(&config.data_dir, None); + Arc::new(db) }; // Start the network thread @@ -57,7 +59,7 @@ impl Client { let (sync_out_sender, sync_out_receiver) = unbounded(); let (sync_in_sender, sync_in_receiver) = unbounded(); let sync_log = log.new(o!()); - let sync_db = Arc::clone(&db); + let sync_db = db.clone(); let thread = thread::spawn(move || { run_sync_future( sync_db, diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs new file mode 100644 index 000000000..e4ebdedec --- /dev/null +++ b/lighthouse/db/disk_db.rs @@ -0,0 +1,173 @@ +extern crate rocksdb; + +use std::fs; +use std::path::Path; +use super::rocksdb::{ + DB, + Options, +}; +use super::rocksdb::Error as RocksError; +use super::{ + ClientDB, + DBValue, + DBError +}; + +/// A on-disk database which implements the ClientDB trait. +/// +/// This implementation uses RocksDB with default options. +pub struct DiskDB { + db: DB, +} + +impl DiskDB { + /// Open the RocksDB database, optionally supplying columns if required. + /// + /// The RocksDB database will be contained in a directory titled + /// "database" in the supplied path. + /// + /// # Panics + /// + /// Panics if the database is unable to be created. + pub fn open(path: &Path, columns: Option<&[&str]>) -> Self { + /* + * Initialise the options + */ + let mut options = Options::default(); + options.create_if_missing(true); + + /* + * Initialise the path + */ + fs::create_dir_all(&path) + .expect(&format!("Unable to create {:?}", &path)); + let db_path = path.join("database"); + + /* + * Open the database + */ + let db = match columns { + None => DB::open(&options, db_path), + Some(columns) => DB::open_cf(&options, db_path, columns) + }.expect("Unable to open local database");; + + Self { + db, + } + } +} + +impl From for DBError { + fn from(e: RocksError) -> Self { + Self { message: e.to_string() } + } +} + +impl ClientDB for DiskDB { + /// Create a RocksDB column family. Corresponds to the + /// `create_cf()` function on the RocksDB API. + fn create_col(&mut self, col: &str) + -> Result<(), DBError> + { + match self.db.create_cf(col, &Options::default()) { + Err(e) => Err(e.into()), + Ok(_) => Ok(()) + } + } + + /// Get the value for some key on some column. + /// + /// Corresponds to the `get_cf()` method on the RocksDB API. + /// Will attempt to get the `ColumnFamily` and return an Err + /// if it fails. + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError> + { + match self.db.cf_handle(col) { + None => Err(DBError{ message: "Unknown column".to_string() }), + Some(handle) => { + match self.db.get_cf(handle, key)? { + None => Ok(None), + Some(db_vec) => Ok(Some(DBValue::from(&*db_vec))) + } + } + } + } + + /// Set some value for some key on some column. + /// + /// Corresponds to the `cf_handle()` method on the RocksDB API. + /// Will attempt to get the `ColumnFamily` and return an Err + /// if it fails. + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError> + { + match self.db.cf_handle(col) { + None => Err(DBError{ message: "Unknown column".to_string() }), + Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()) + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use super::super::ClientDB; + use std::{ env, fs, thread }; + use std::sync::Arc; + + #[test] + #[ignore] + fn test_rocksdb_can_use_db() { + let pwd = env::current_dir().unwrap(); + let path = pwd.join("testdb_please_remove"); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + + let col_name: &str = "TestColumn"; + let column_families = vec![col_name]; + + let mut db = DiskDB::open(&path, None); + + for cf in column_families { + db.create_col(&cf).unwrap(); + } + + let db = Arc::new(db); + + let thread_count = 10; + let write_count = 10; + + // We're execting the product of these numbers to fit in one byte. + assert!(thread_count * write_count <= 255); + + let mut handles = vec![]; + for t in 0..thread_count { + let wc = write_count; + let db = db.clone(); + let col = col_name.clone(); + let handle = thread::spawn(move || { + for w in 0..wc { + let key = (t * w) as u8; + let val = 42; + db.put(&col, &vec![key], &vec![val]).unwrap(); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + for t in 0..thread_count { + for w in 0..write_count { + let key = (t * w) as u8; + let val = db.get(&col_name, &vec![key]).unwrap().unwrap(); + assert_eq!(vec![42], val); + } + } + fs::remove_dir_all(&path).unwrap(); + } +} diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index 92323b3ac..c85eaf18b 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -1,14 +1,11 @@ extern crate rocksdb; -use std::fs; -use std::path::Path; -pub use self::rocksdb::DB; +mod disk_db; +mod traits; -pub fn open_db(path: &Path) -> DB { - let db_path = path.join("rocksdb"); - fs::create_dir_all(&db_path) - .expect(&format!("Unable to create {:?}", &db_path)); - let db = DB::open_default(db_path.join("lighthouse.rdb")) - .expect("Unable to open local database."); - db -} +pub use self::disk_db::DiskDB; +pub use self::traits::{ + DBError, + DBValue, + ClientDB, +}; diff --git a/lighthouse/db/traits.rs b/lighthouse/db/traits.rs new file mode 100644 index 000000000..97759d3b7 --- /dev/null +++ b/lighthouse/db/traits.rs @@ -0,0 +1,30 @@ +pub type DBValue = Vec; + +#[derive(Debug)] +pub struct DBError { + pub message: String +} + +impl DBError { + pub fn new(message: String) -> Self { + Self { message } + } +} + +/// A generic database to be used by the "client' (i.e., +/// the lighthouse blockchain client). +/// +/// The purpose of having this generic trait is to allow the +/// program to use a persistent on-disk database during production, +/// but use a transient database during tests. +pub trait ClientDB: Sync + Send { + fn create_col(&mut self, col: &str) + -> Result<(), DBError>; + + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError>; + + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError>; +} + diff --git a/lighthouse/sync/block.rs b/lighthouse/sync/block.rs new file mode 100644 index 000000000..72d827285 --- /dev/null +++ b/lighthouse/sync/block.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; +use super::db::ClientDB; +use slog::Logger; + +pub enum BlockStatus { + Valid, + AlreadyKnown, + TooOld, + TimeInvalid, + UnknownPoWHash, + NoAttestations, + InvalidAttestation, + NotProposerSigned, +} + +pub fn process_unverified_blocks( + _serialized_block: &[u8], + _db: Arc, + _log: Logger) +{ + // +} + + diff --git a/lighthouse/sync/messages.rs b/lighthouse/sync/messages.rs deleted file mode 100644 index 9173f1c40..000000000 --- a/lighthouse/sync/messages.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub enum SyncEventType { - Invalid, - PeerConnect, - PeerDrop, - ReceiveBlocks, - ReceiveAttestationRecords, -} - -pub struct SyncEvent { - event: SyncEventType, - data: Option> -} diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index 3d7d2d8d2..f56260e4f 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -3,7 +3,7 @@ extern crate slog; extern crate tokio; extern crate network_libp2p; -pub mod messages; +pub mod block; pub mod network; pub mod sync_future; pub mod wire_protocol; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index c2d1f1149..1383b5040 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -1,5 +1,5 @@ -use std::sync::{ RwLock, Arc }; -use super::db::DB; +use std::sync::Arc; +use super::db::ClientDB; use slog::Logger; use super::network_libp2p::message::{ @@ -8,7 +8,12 @@ use super::network_libp2p::message::{ NetworkEventType, }; -use super::wire_protocol::{ WireMessageType, message_type }; +use super::block::process_unverified_blocks; + +use super::wire_protocol::{ + WireMessage, + WireMessageHeader, +}; use super::futures::sync::mpsc::{ UnboundedSender, @@ -20,7 +25,7 @@ use super::futures::sync::mpsc::{ /// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, - db: Arc>, + db: Arc, network_tx: UnboundedSender, log: Logger) -> Result<(), ()> @@ -51,16 +56,27 @@ pub fn handle_network_event( /// (e.g., libp2p) has sent a message to us. fn handle_network_message( message: Vec, - _db: Arc>, + db: Arc, _network_tx: UnboundedSender, - _log: Logger) + log: Logger) -> Result<(), ()> { - match message_type(&message) { - Some(WireMessageType::Blocks) => { - // Do something with inbound blocks. - Ok(()) + match WireMessage::decode(&message) { + Ok(msg) => { + match msg.header { + WireMessageHeader::Blocks => { + process_unverified_blocks( + msg.body, + db, + log + ); + Ok(()) + } + _ => Ok(()) + } + } + Err(_) => { + return Ok(()) // No need to pass the error back } - _ => Ok(()) } } diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index c3b2355fb..0e46e9e50 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -9,8 +9,8 @@ use super::network_libp2p::message::{ OutgoingMessage, }; use super::network::handle_network_event; -use std::sync::{ RwLock, Arc }; -use super::db::DB; +use std::sync::Arc; +use super::db::ClientDB; use slog::Logger; type NetworkSender = UnboundedSender; @@ -25,7 +25,7 @@ type SyncReceiver = UnboundedReceiver>; /// from the network and the RPC and update /// the state. pub fn run_sync_future( - db: Arc>, + db: Arc, network_tx: NetworkSender, network_rx: NetworkReceiver, _sync_tx: SyncSender, diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs index 5d4f77204..999d14a73 100644 --- a/lighthouse/sync/wire_protocol.rs +++ b/lighthouse/sync/wire_protocol.rs @@ -1,4 +1,9 @@ -pub enum WireMessageType { +pub enum WireMessageDecodeError { + TooShort, + UnknownType, +} + +pub enum WireMessageHeader { Status, NewBlockHashes, GetBlockHashes, @@ -8,6 +13,48 @@ pub enum WireMessageType { NewBlock, } +pub struct WireMessage<'a> { + pub header: WireMessageHeader, + pub body: &'a [u8], +} + +impl<'a> WireMessage<'a> { + pub fn decode(bytes: &'a Vec) + -> Result + { + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageHeader::Blocks), + _ => None + }; + match header { + Some(header) => Ok(Self{header, body}), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } + } +} + +/* +pub fn decode_wire_message(bytes: &[u8]) + -> Result +{ + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageType::Blocks), + _ => None + }; + match header { + Some(header) => Ok((header, body)), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } +} + /// Determines the message type of some given /// message. @@ -22,3 +69,21 @@ pub fn message_type(message: &Vec) _ => None } } + +pub fn identify_wire_protocol_message(message: &Vec) + -> Result<(WireMessageType, &[u8]), WireMessageDecodeError> +{ + fn strip_header(v: &Vec) -> &[u8] { + match v.get(1..v.len()) { + None => &vec![], + Some(s) => s + } + } + + match message.get(0) { + Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))), + None => Err(WireMessageDecodeError::TooShort), + _ => Err(WireMessageDecodeError::UnknownType), + } +} +*/ diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index 50f5d26b8..de8077ac6 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -8,13 +8,13 @@ bigint = "4.2" bytes = "" eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" } futures = "0.1.23" -libp2p-peerstore = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-core = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-mplex = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-tcp-transport = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-floodsub = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-identify = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-kad = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } +libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } pem = "0.5.0" rand = "0.3" slog = "^2.2.3"