From 513972b75c3d42c86b82aa27850103a4f1b8a7bf Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 14 Sep 2018 15:52:49 +1000 Subject: [PATCH 01/14] Add more plumbing for block processing --- lighthouse/client/mod.rs | 8 ++-- lighthouse/sync/block.rs | 23 +++++++++++ lighthouse/sync/messages.rs | 12 ------ lighthouse/sync/mod.rs | 2 +- lighthouse/sync/network.rs | 38 ++++++++++++------ lighthouse/sync/sync_future.rs | 4 +- lighthouse/sync/wire_protocol.rs | 67 +++++++++++++++++++++++++++++++- 7 files changed, 123 insertions(+), 31 deletions(-) create mode 100644 lighthouse/sync/block.rs delete mode 100644 lighthouse/sync/messages.rs diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index f87f05738..89b1729ba 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -1,4 +1,4 @@ -use std::sync::{ Arc, RwLock }; +use std::sync::Arc; use std::thread; use super::db::{ DB, open_db }; use super::config::LighthouseConfig; @@ -13,7 +13,7 @@ use super::sync::run_sync_future; /// Represents the co-ordination of the /// networking, syncing and RPC (not-yet-implemented) threads. pub struct Client { - pub db: Arc>, + pub db: Arc, pub network_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>, } @@ -30,7 +30,7 @@ impl Client { // Open the local db let db = { let db = open_db(&config.data_dir); - Arc::new(RwLock::new(db)) + Arc::new(db) }; // Start the network thread @@ -57,7 +57,7 @@ impl Client { let (sync_out_sender, sync_out_receiver) = unbounded(); let (sync_in_sender, sync_in_receiver) = unbounded(); let sync_log = log.new(o!()); - let sync_db = Arc::clone(&db); + let sync_db = db.clone(); let thread = thread::spawn(move || { run_sync_future( sync_db, diff --git a/lighthouse/sync/block.rs b/lighthouse/sync/block.rs new file mode 100644 index 000000000..2612ca8b1 --- /dev/null +++ b/lighthouse/sync/block.rs @@ -0,0 +1,23 @@ +use super::db::DB; +use slog::Logger; + +pub enum BlockStatus { + Valid, + AlreadyKnown, + TooOld, + TimeInvalid, + UnknownPoWHash, + NoAttestations, + InvalidAttestation, + NotProposerSigned, +} + +pub fn process_unverified_blocks( + _serialized_block: &[u8], + _db: &DB, + _log: Logger) +{ + // +} + + diff --git a/lighthouse/sync/messages.rs b/lighthouse/sync/messages.rs deleted file mode 100644 index 9173f1c40..000000000 --- a/lighthouse/sync/messages.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub enum SyncEventType { - Invalid, - PeerConnect, - PeerDrop, - ReceiveBlocks, - ReceiveAttestationRecords, -} - -pub struct SyncEvent { - event: SyncEventType, - data: Option> -} diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index 3d7d2d8d2..f56260e4f 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -3,7 +3,7 @@ extern crate slog; extern crate tokio; extern crate network_libp2p; -pub mod messages; +pub mod block; pub mod network; pub mod sync_future; pub mod wire_protocol; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index c2d1f1149..99d178ff0 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -1,4 +1,4 @@ -use std::sync::{ RwLock, Arc }; +use std::sync::Arc; use super::db::DB; use slog::Logger; @@ -8,7 +8,12 @@ use super::network_libp2p::message::{ NetworkEventType, }; -use super::wire_protocol::{ WireMessageType, message_type }; +use super::block::process_unverified_blocks; + +use super::wire_protocol::{ + WireMessage, + WireMessageHeader, +}; use super::futures::sync::mpsc::{ UnboundedSender, @@ -20,7 +25,7 @@ use super::futures::sync::mpsc::{ /// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, - db: Arc>, + db: Arc, network_tx: UnboundedSender, log: Logger) -> Result<(), ()> @@ -34,7 +39,7 @@ pub fn handle_network_event( if let Some(data) = event.data { handle_network_message( data, - db, + &db, network_tx, log) } else { @@ -51,16 +56,27 @@ pub fn handle_network_event( /// (e.g., libp2p) has sent a message to us. fn handle_network_message( message: Vec, - _db: Arc>, + db: &DB, _network_tx: UnboundedSender, - _log: Logger) + log: Logger) -> Result<(), ()> { - match message_type(&message) { - Some(WireMessageType::Blocks) => { - // Do something with inbound blocks. - Ok(()) + match WireMessage::decode(&message) { + Ok(msg) => { + match msg.header { + WireMessageHeader::Blocks => { + process_unverified_blocks( + msg.body, + db, + log + ); + Ok(()) + } + _ => Ok(()) + } + } + Err(_) => { + return Ok(()) // No need to pass the error back } - _ => Ok(()) } } diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index c3b2355fb..06836fb30 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -9,7 +9,7 @@ use super::network_libp2p::message::{ OutgoingMessage, }; use super::network::handle_network_event; -use std::sync::{ RwLock, Arc }; +use std::sync::Arc; use super::db::DB; use slog::Logger; @@ -25,7 +25,7 @@ type SyncReceiver = UnboundedReceiver>; /// from the network and the RPC and update /// the state. pub fn run_sync_future( - db: Arc>, + db: Arc, network_tx: NetworkSender, network_rx: NetworkReceiver, _sync_tx: SyncSender, diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs index 5d4f77204..999d14a73 100644 --- a/lighthouse/sync/wire_protocol.rs +++ b/lighthouse/sync/wire_protocol.rs @@ -1,4 +1,9 @@ -pub enum WireMessageType { +pub enum WireMessageDecodeError { + TooShort, + UnknownType, +} + +pub enum WireMessageHeader { Status, NewBlockHashes, GetBlockHashes, @@ -8,6 +13,48 @@ pub enum WireMessageType { NewBlock, } +pub struct WireMessage<'a> { + pub header: WireMessageHeader, + pub body: &'a [u8], +} + +impl<'a> WireMessage<'a> { + pub fn decode(bytes: &'a Vec) + -> Result + { + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageHeader::Blocks), + _ => None + }; + match header { + Some(header) => Ok(Self{header, body}), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } + } +} + +/* +pub fn decode_wire_message(bytes: &[u8]) + -> Result +{ + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageType::Blocks), + _ => None + }; + match header { + Some(header) => Ok((header, body)), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } +} + /// Determines the message type of some given /// message. @@ -22,3 +69,21 @@ pub fn message_type(message: &Vec) _ => None } } + +pub fn identify_wire_protocol_message(message: &Vec) + -> Result<(WireMessageType, &[u8]), WireMessageDecodeError> +{ + fn strip_header(v: &Vec) -> &[u8] { + match v.get(1..v.len()) { + None => &vec![], + Some(s) => s + } + } + + match message.get(0) { + Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))), + None => Err(WireMessageDecodeError::TooShort), + _ => Err(WireMessageDecodeError::UnknownType), + } +} +*/ From 33b1e6ddf446674f9ccd8d5d0e6346845013ff2b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 17 Sep 2018 17:52:32 +1000 Subject: [PATCH 02/14] Partially implemented db wrapper Addresses issue #12 --- lighthouse/client/mod.rs | 8 +++++--- lighthouse/db/mod.rs | 27 +++++++++++++++++---------- lighthouse/sync/block.rs | 5 +++-- lighthouse/sync/network.rs | 8 ++++---- lighthouse/sync/sync_future.rs | 4 ++-- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index 89b1729ba..f76f87aef 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; use std::thread; -use super::db::{ DB, open_db }; +use super::db::{ DiskDB }; use super::config::LighthouseConfig; use super::futures::sync::mpsc::{ unbounded, @@ -10,10 +10,12 @@ use super::network_libp2p::state::NetworkState; use super::slog::Logger; use super::sync::run_sync_future; +use super::db::ClientDB; + /// Represents the co-ordination of the /// networking, syncing and RPC (not-yet-implemented) threads. pub struct Client { - pub db: Arc, + pub db: Arc, pub network_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>, } @@ -29,7 +31,7 @@ impl Client { { // Open the local db let db = { - let db = open_db(&config.data_dir); + let db = DiskDB::open(&config.data_dir); Arc::new(db) }; diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index 92323b3ac..23ddb9a37 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -1,14 +1,21 @@ extern crate rocksdb; -use std::fs; -use std::path::Path; -pub use self::rocksdb::DB; +mod disk_db; -pub fn open_db(path: &Path) -> DB { - let db_path = path.join("rocksdb"); - fs::create_dir_all(&db_path) - .expect(&format!("Unable to create {:?}", &db_path)); - let db = DB::open_default(db_path.join("lighthouse.rdb")) - .expect("Unable to open local database."); - db +pub use self::disk_db::DiskDB; + +#[derive(Debug)] +pub struct DBError { + message: String +} + +impl DBError { + fn new(message: String) -> Self { + Self { message } + } +} + +pub trait ClientDB: Sync + Send { + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError>; } diff --git a/lighthouse/sync/block.rs b/lighthouse/sync/block.rs index 2612ca8b1..72d827285 100644 --- a/lighthouse/sync/block.rs +++ b/lighthouse/sync/block.rs @@ -1,4 +1,5 @@ -use super::db::DB; +use std::sync::Arc; +use super::db::ClientDB; use slog::Logger; pub enum BlockStatus { @@ -14,7 +15,7 @@ pub enum BlockStatus { pub fn process_unverified_blocks( _serialized_block: &[u8], - _db: &DB, + _db: Arc, _log: Logger) { // diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index 99d178ff0..1383b5040 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use super::db::DB; +use super::db::ClientDB; use slog::Logger; use super::network_libp2p::message::{ @@ -25,7 +25,7 @@ use super::futures::sync::mpsc::{ /// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, - db: Arc, + db: Arc, network_tx: UnboundedSender, log: Logger) -> Result<(), ()> @@ -39,7 +39,7 @@ pub fn handle_network_event( if let Some(data) = event.data { handle_network_message( data, - &db, + db, network_tx, log) } else { @@ -56,7 +56,7 @@ pub fn handle_network_event( /// (e.g., libp2p) has sent a message to us. fn handle_network_message( message: Vec, - db: &DB, + db: Arc, _network_tx: UnboundedSender, log: Logger) -> Result<(), ()> diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index 06836fb30..0e46e9e50 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -10,7 +10,7 @@ use super::network_libp2p::message::{ }; use super::network::handle_network_event; use std::sync::Arc; -use super::db::DB; +use super::db::ClientDB; use slog::Logger; type NetworkSender = UnboundedSender; @@ -25,7 +25,7 @@ type SyncReceiver = UnboundedReceiver>; /// from the network and the RPC and update /// the state. pub fn run_sync_future( - db: Arc, + db: Arc, network_tx: NetworkSender, network_rx: NetworkReceiver, _sync_tx: SyncSender, From 3876c0261aea03af1a2e47bffae05e36b7950d14 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 18 Sep 2018 10:27:29 +1000 Subject: [PATCH 03/14] Implement get and put on ClientDB trait --- lighthouse/db/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index 23ddb9a37..6472e7f62 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -4,18 +4,23 @@ mod disk_db; pub use self::disk_db::DiskDB; +type DBValue = Vec; + #[derive(Debug)] pub struct DBError { message: String } impl DBError { - fn new(message: String) -> Self { + pub fn new(message: String) -> Self { Self { message } } } pub trait ClientDB: Sync + Send { fn get(&self, col: &str, key: &[u8]) - -> Result, DBError>; + -> Result, DBError>; + + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError>; } From 9b7463f68a7aacc71b540e50f5a2c92577063348 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 18 Sep 2018 15:59:44 +1000 Subject: [PATCH 04/14] Update ClientDB trait --- lighthouse/client/mod.rs | 2 +- lighthouse/db/disk_db.rs | 151 +++++++++++++++++++++++++++++++++++++++ lighthouse/db/mod.rs | 3 + 3 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 lighthouse/db/disk_db.rs diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index f76f87aef..ee73c2d29 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -31,7 +31,7 @@ impl Client { { // Open the local db let db = { - let db = DiskDB::open(&config.data_dir); + let db = DiskDB::open(&config.data_dir, None); Arc::new(db) }; diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs new file mode 100644 index 000000000..266b9afe1 --- /dev/null +++ b/lighthouse/db/disk_db.rs @@ -0,0 +1,151 @@ +extern crate rocksdb; + +use std::fs; +use std::path::Path; +use super::rocksdb::{ + DB, + Options, +}; +use super::rocksdb::Error as RocksError; +use super::{ + ClientDB, + DBValue, + DBError +}; + +pub struct DiskDB { + db: DB, +} + +impl DiskDB { + pub fn open(path: &Path, columns: Option<&[&str]>) -> Self { + /* + * Initialise the options + */ + let mut options = Options::default(); + options.create_if_missing(true); + + /* + * Initialise the path + */ + let mut db_path = path.join("rocksdb"); + fs::create_dir_all(&db_path) + .expect(&format!("Unable to create {:?}", &db_path)); + db_path = db_path.join("database"); + + /* + * Open the database + */ + let db = match columns { + None => DB::open(&options, db_path), + Some(columns) => DB::open_cf(&options, db_path, columns) + }.expect("Unable to open local database");; + + Self { + db, + } + } +} + +impl From for DBError { + fn from(e: RocksError) -> Self { + Self { message: e.to_string() } + } +} + +impl ClientDB for DiskDB { + fn create_col(&mut self, col: &str) + -> Result<(), DBError> + { + match self.db.create_cf(col, &Options::default()) { + Err(e) => Err(e.into()), + Ok(_) => Ok(()) + } + } + + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError> + { + match self.db.cf_handle(col) { + None => Err(DBError{ message: "Unknown column".to_string() }), + Some(handle) => { + match self.db.get_cf(handle, key)? { + None => Ok(None), + Some(db_vec) => Ok(Some(DBValue::from(&*db_vec))) + } + } + } + } + + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError> + { + match self.db.cf_handle(col) { + None => Err(DBError{ message: "Unknown column".to_string() }), + Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()) + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use super::super::ClientDB; + use std::{ env, fs, thread }; + use std::sync::Arc; + + #[test] + #[ignore] + fn test_rocksdb_can_use_db() { + let pwd = env::current_dir().unwrap(); + let path = pwd.join("testdb_please_remove"); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + + let col_name: &str = "TestColumn"; + let column_families = vec![col_name]; + + let mut db = DiskDB::open(&path, None); + + for cf in column_families { + db.create_col(&cf).unwrap(); + } + + let db = Arc::new(db); + + let thread_count = 10; + let write_count = 10; + + // We're execting the product of these numbers to fit in one byte. + assert!(thread_count * write_count <= 255); + + let mut handles = vec![]; + for t in 0..thread_count { + let wc = write_count; + let db = db.clone(); + let col = col_name.clone(); + let handle = thread::spawn(move || { + for w in 0..wc { + let key = (t * w) as u8; + let val = 42; + db.put(&col, &vec![key], &vec![val]).unwrap(); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + for t in 0..thread_count { + for w in 0..write_count { + let key = (t * w) as u8; + let val = db.get(&col_name, &vec![key]).unwrap().unwrap(); + assert_eq!(vec![42], val); + } + } + fs::remove_dir_all(&path).unwrap(); + } +} diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index 6472e7f62..33f97cc37 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -18,6 +18,9 @@ impl DBError { } pub trait ClientDB: Sync + Send { + fn create_col(&mut self, col: &str) + -> Result<(), DBError>; + fn get(&self, col: &str, key: &[u8]) -> Result, DBError>; From b6197ce04d49b3866e155d6c39e04a5a6edf4991 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 18 Sep 2018 17:39:38 +1000 Subject: [PATCH 05/14] Move db traits into own file --- lighthouse/db/mod.rs | 30 ++++++------------------------ lighthouse/db/traits.rs | 30 ++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 24 deletions(-) create mode 100644 lighthouse/db/traits.rs diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index 33f97cc37..c85eaf18b 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -1,29 +1,11 @@ extern crate rocksdb; mod disk_db; +mod traits; pub use self::disk_db::DiskDB; - -type DBValue = Vec; - -#[derive(Debug)] -pub struct DBError { - message: String -} - -impl DBError { - pub fn new(message: String) -> Self { - Self { message } - } -} - -pub trait ClientDB: Sync + Send { - fn create_col(&mut self, col: &str) - -> Result<(), DBError>; - - fn get(&self, col: &str, key: &[u8]) - -> Result, DBError>; - - fn put(&self, col: &str, key: &[u8], val: &[u8]) - -> Result<(), DBError>; -} +pub use self::traits::{ + DBError, + DBValue, + ClientDB, +}; diff --git a/lighthouse/db/traits.rs b/lighthouse/db/traits.rs new file mode 100644 index 000000000..97759d3b7 --- /dev/null +++ b/lighthouse/db/traits.rs @@ -0,0 +1,30 @@ +pub type DBValue = Vec; + +#[derive(Debug)] +pub struct DBError { + pub message: String +} + +impl DBError { + pub fn new(message: String) -> Self { + Self { message } + } +} + +/// A generic database to be used by the "client' (i.e., +/// the lighthouse blockchain client). +/// +/// The purpose of having this generic trait is to allow the +/// program to use a persistent on-disk database during production, +/// but use a transient database during tests. +pub trait ClientDB: Sync + Send { + fn create_col(&mut self, col: &str) + -> Result<(), DBError>; + + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError>; + + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError>; +} + From c077e8dbb95a1b9a75a8b37fd4d1e46e4b5318bc Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 18 Sep 2018 17:39:50 +1000 Subject: [PATCH 06/14] Use different directory for rocksdb database --- lighthouse/db/disk_db.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs index 266b9afe1..df6e660b9 100644 --- a/lighthouse/db/disk_db.rs +++ b/lighthouse/db/disk_db.rs @@ -28,10 +28,9 @@ impl DiskDB { /* * Initialise the path */ - let mut db_path = path.join("rocksdb"); - fs::create_dir_all(&db_path) - .expect(&format!("Unable to create {:?}", &db_path)); - db_path = db_path.join("database"); + fs::create_dir_all(&path) + .expect(&format!("Unable to create {:?}", &path)); + let db_path = path.join("database"); /* * Open the database From 446e8ae53ee5b5b972040b873ea95b770323dc7b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 18 Sep 2018 17:40:07 +1000 Subject: [PATCH 07/14] Add doc comments to the db module --- lighthouse/db/disk_db.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs index df6e660b9..e4ebdedec 100644 --- a/lighthouse/db/disk_db.rs +++ b/lighthouse/db/disk_db.rs @@ -13,11 +13,22 @@ use super::{ DBError }; +/// A on-disk database which implements the ClientDB trait. +/// +/// This implementation uses RocksDB with default options. pub struct DiskDB { db: DB, } impl DiskDB { + /// Open the RocksDB database, optionally supplying columns if required. + /// + /// The RocksDB database will be contained in a directory titled + /// "database" in the supplied path. + /// + /// # Panics + /// + /// Panics if the database is unable to be created. pub fn open(path: &Path, columns: Option<&[&str]>) -> Self { /* * Initialise the options @@ -53,6 +64,8 @@ impl From for DBError { } impl ClientDB for DiskDB { + /// Create a RocksDB column family. Corresponds to the + /// `create_cf()` function on the RocksDB API. fn create_col(&mut self, col: &str) -> Result<(), DBError> { @@ -62,6 +75,11 @@ impl ClientDB for DiskDB { } } + /// Get the value for some key on some column. + /// + /// Corresponds to the `get_cf()` method on the RocksDB API. + /// Will attempt to get the `ColumnFamily` and return an Err + /// if it fails. fn get(&self, col: &str, key: &[u8]) -> Result, DBError> { @@ -76,6 +94,11 @@ impl ClientDB for DiskDB { } } + /// Set some value for some key on some column. + /// + /// Corresponds to the `cf_handle()` method on the RocksDB API. + /// Will attempt to get the `ColumnFamily` and return an Err + /// if it fails. fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> { From 39cacd9521bc4246e9eec9a137885e6019f2dc6f Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 18 Sep 2018 18:03:32 +1000 Subject: [PATCH 08/14] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7ee0638c4..a3e82b162 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Lighthouse: a (future) Ethereum 2.0 client -[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) +[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/sigp/lighthouse?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) A **work-in-progress** implementation of the Ethereum 2.0 Beacon Chain in Rust. From cc2e2103645be3fa31c3d2397147192ccf6f1a25 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Thu, 20 Sep 2018 14:09:03 +1000 Subject: [PATCH 09/14] Move from tomaka/libp2p-rs to sigp/libp2p-rs tomaka removed the zksummit branch that we were (lazily) relying upon. --- network-libp2p/Cargo.toml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index 50f5d26b8..de8077ac6 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -8,13 +8,13 @@ bigint = "4.2" bytes = "" eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" } futures = "0.1.23" -libp2p-peerstore = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-core = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-mplex = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-tcp-transport = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-floodsub = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-identify = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-kad = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } +libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } pem = "0.5.0" rand = "0.3" slog = "^2.2.3" From 66dc073c9ba7894d01af08a3e8987d8af2746a24 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 21 Sep 2018 14:01:48 +1000 Subject: [PATCH 10/14] Add basic memorydb implementation --- lighthouse/db/memory_db.rs | 174 +++++++++++++++++++++++++++++++++++++ lighthouse/db/mod.rs | 3 + 2 files changed, 177 insertions(+) create mode 100644 lighthouse/db/memory_db.rs diff --git a/lighthouse/db/memory_db.rs b/lighthouse/db/memory_db.rs new file mode 100644 index 000000000..65b2b8629 --- /dev/null +++ b/lighthouse/db/memory_db.rs @@ -0,0 +1,174 @@ +use std::collections::{ HashSet, HashMap }; +use std::sync::RwLock; +use super::blake2::blake2b::blake2b; +use super::{ + ClientDB, + DBValue, + DBError +}; + +type DBHashMap = HashMap, Vec>; +type ColumnHashSet = HashSet; + +pub struct MemoryDB { + db: RwLock, + known_columns: RwLock +} + +impl MemoryDB { + pub fn open(columns: Option<&[&str]>) -> Self { + let mut db: DBHashMap = HashMap::new(); + let mut known_columns: ColumnHashSet = HashSet::new(); + if let Some(columns) = columns { + for col in columns { + known_columns.insert(col.to_string()); + } + } + Self { + db: RwLock::new(db), + known_columns: RwLock::new(known_columns), + } + } + + fn get_key_for_col(col: &str, key: &[u8]) -> Vec { + blake2b(32, col.as_bytes(), key).as_bytes().to_vec() + } +} + +impl ClientDB for MemoryDB { + fn create_col(&mut self, col: &str) + -> Result<(), DBError> + { + Ok(()) // This field is not used. Will remove from trait. + } + + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError> + { + // Panic if the DB locks are poisoned. + let db = self.db.read().unwrap(); + let known_columns = self.known_columns.read().unwrap(); + + match known_columns.contains(&col.to_string()) { + false => Err(DBError{ message: "Unknown column".to_string() }), + true => { + let column_key = MemoryDB::get_key_for_col(col, key); + Ok(db.get(&column_key).and_then(|val| Some(val.clone()))) + } + } + } + + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError> + { + // Panic if the DB locks are poisoned. + let mut db = self.db.write().unwrap(); + let known_columns = self.known_columns.read().unwrap(); + + match known_columns.contains(&col.to_string()) { + false => Err(DBError{ message: "Unknown column".to_string() }), + true => { + let column_key = MemoryDB::get_key_for_col(col, key); + db.insert(column_key, val.to_vec()); + Ok(()) + } + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use super::super::ClientDB; + use std::thread; + use std::sync::Arc; + + #[test] + fn test_memorydb_column_access() { + let col_a: &str = "ColumnA"; + let col_b: &str = "ColumnB"; + + let column_families = vec![ + col_a, + col_b, + ]; + + let db = MemoryDB::open(Some(&column_families)); + + /* + * Testing that if we write to the same key in different columns that + * there is not an overlap. + */ + db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap(); + db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap(); + + assert_eq!(db.get(col_a, "same".as_bytes()).unwrap().unwrap(), "cat".as_bytes()); + assert_eq!(db.get(col_b, "same".as_bytes()).unwrap().unwrap(), "dog".as_bytes()); + + + } + + #[test] + fn test_memorydb_unknown_column_access() { + let col_a: &str = "ColumnA"; + let col_x: &str = "ColumnX"; + + let column_families = vec![ + col_a, + // col_x is excluded on purpose + ]; + + let db = MemoryDB::open(Some(&column_families)); + + /* + * Test that we get errors when using undeclared columns + */ + assert!(db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).is_ok()); + assert!(db.put(col_x, "cats".as_bytes(), "lol".as_bytes()).is_err()); + + assert!(db.get(col_a, "cats".as_bytes()).is_ok()); + assert!(db.get(col_x, "cats".as_bytes()).is_err()); + } + + #[test] + fn test_memorydb_threading() { + let col_name: &str = "TestColumn"; + let column_families = vec![col_name]; + + let db = Arc::new(MemoryDB::open(Some(&column_families))); + + let thread_count = 10; + let write_count = 10; + + // We're execting the product of these numbers to fit in one byte. + assert!(thread_count * write_count <= 255); + + let mut handles = vec![]; + for t in 0..thread_count { + let wc = write_count; + let db = db.clone(); + let col = col_name.clone(); + let handle = thread::spawn(move || { + for w in 0..wc { + let key = (t * w) as u8; + let val = 42; + db.put(&col, &vec![key], &vec![val]).unwrap(); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + for t in 0..thread_count { + for w in 0..write_count { + let key = (t * w) as u8; + let val = db.get(&col_name, &vec![key]).unwrap().unwrap(); + assert_eq!(vec![42], val); + } + } + } +} diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index c85eaf18b..2919cdb28 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -1,9 +1,12 @@ extern crate rocksdb; +extern crate blake2_rfc as blake2; mod disk_db; +mod memory_db; mod traits; pub use self::disk_db::DiskDB; +pub use self::memory_db::MemoryDB; pub use self::traits::{ DBError, DBValue, From 5b177a80b9d5ab43ba9e2aac878165b1ccfc29ef Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 21 Sep 2018 14:08:07 +1000 Subject: [PATCH 11/14] Add comments, fix warning in MemoryDB --- lighthouse/db/memory_db.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lighthouse/db/memory_db.rs b/lighthouse/db/memory_db.rs index 65b2b8629..b7ca658bb 100644 --- a/lighthouse/db/memory_db.rs +++ b/lighthouse/db/memory_db.rs @@ -10,14 +10,22 @@ use super::{ type DBHashMap = HashMap, Vec>; type ColumnHashSet = HashSet; +/// An in-memory database implementing the ClientDB trait. +/// +/// It is not particularily optimized, it exists for ease and speed of testing. It's not expected +/// this DB would be used outside of tests. pub struct MemoryDB { db: RwLock, known_columns: RwLock } impl MemoryDB { + /// Open the in-memory database. + /// + /// All columns must be supplied initially, you will get an error if you try to access a column + /// that was not declared here. This condition is enforced artificially to simulate RocksDB. pub fn open(columns: Option<&[&str]>) -> Self { - let mut db: DBHashMap = HashMap::new(); + let db: DBHashMap = HashMap::new(); let mut known_columns: ColumnHashSet = HashSet::new(); if let Some(columns) = columns { for col in columns { @@ -30,6 +38,7 @@ impl MemoryDB { } } + /// Hashes a key and a column name in order to get a unique key for the supplied column. fn get_key_for_col(col: &str, key: &[u8]) -> Vec { blake2b(32, col.as_bytes(), key).as_bytes().to_vec() } @@ -42,6 +51,7 @@ impl ClientDB for MemoryDB { Ok(()) // This field is not used. Will remove from trait. } + /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get(&self, col: &str, key: &[u8]) -> Result, DBError> { @@ -58,6 +68,7 @@ impl ClientDB for MemoryDB { } } + /// Puts a key in the database. fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> { From f80d5ff0bdd172ca99d2a2d17ce84333338d8b26 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 21 Sep 2018 14:12:53 +1000 Subject: [PATCH 12/14] Remove `create_col` from ClientDB trait --- lighthouse/db/disk_db.rs | 17 +++++++++-------- lighthouse/db/memory_db.rs | 6 ------ lighthouse/db/traits.rs | 3 --- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs index e4ebdedec..f8f8a7a4c 100644 --- a/lighthouse/db/disk_db.rs +++ b/lighthouse/db/disk_db.rs @@ -55,15 +55,7 @@ impl DiskDB { db, } } -} -impl From for DBError { - fn from(e: RocksError) -> Self { - Self { message: e.to_string() } - } -} - -impl ClientDB for DiskDB { /// Create a RocksDB column family. Corresponds to the /// `create_cf()` function on the RocksDB API. fn create_col(&mut self, col: &str) @@ -75,6 +67,15 @@ impl ClientDB for DiskDB { } } +} + +impl From for DBError { + fn from(e: RocksError) -> Self { + Self { message: e.to_string() } + } +} + +impl ClientDB for DiskDB { /// Get the value for some key on some column. /// /// Corresponds to the `get_cf()` method on the RocksDB API. diff --git a/lighthouse/db/memory_db.rs b/lighthouse/db/memory_db.rs index b7ca658bb..c875b5554 100644 --- a/lighthouse/db/memory_db.rs +++ b/lighthouse/db/memory_db.rs @@ -45,12 +45,6 @@ impl MemoryDB { } impl ClientDB for MemoryDB { - fn create_col(&mut self, col: &str) - -> Result<(), DBError> - { - Ok(()) // This field is not used. Will remove from trait. - } - /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get(&self, col: &str, key: &[u8]) -> Result, DBError> diff --git a/lighthouse/db/traits.rs b/lighthouse/db/traits.rs index 97759d3b7..79766329a 100644 --- a/lighthouse/db/traits.rs +++ b/lighthouse/db/traits.rs @@ -18,9 +18,6 @@ impl DBError { /// program to use a persistent on-disk database during production, /// but use a transient database during tests. pub trait ClientDB: Sync + Send { - fn create_col(&mut self, col: &str) - -> Result<(), DBError>; - fn get(&self, col: &str, key: &[u8]) -> Result, DBError>; From 091379f01161213b29b8cdefb49a52ec20e02190 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 22 Sep 2018 07:45:40 +1000 Subject: [PATCH 13/14] Replace `env::dir` with `dirs` crate --- Cargo.toml | 1 + lighthouse/config/mod.rs | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ae9194004..bcd5fceb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ boolean-bitfield = { path = "boolean-bitfield" } bytes = "" crypto-mac = "^0.6.2" clap = "2.32.0" +dirs = "1.0.3" ethereum-types = "" futures = "0.1.23" network-libp2p = { path = "network-libp2p" } diff --git a/lighthouse/config/mod.rs b/lighthouse/config/mod.rs index 80350293d..42dd919e6 100644 --- a/lighthouse/config/mod.rs +++ b/lighthouse/config/mod.rs @@ -1,5 +1,7 @@ -use std::{ env, fs }; -use std::path::PathBuf; +extern crate dirs; + +use std::fs; +use std::path::PathBuf; /// Stores the core configuration for this Lighthouse instance. /// This struct is general, other components may implement more @@ -16,7 +18,7 @@ impl LighthouseConfig { /// Build a new lighthouse configuration from defaults. pub fn default() -> Self{ let data_dir = { - let home = env::home_dir() + let home = dirs::home_dir() .expect("Unable to determine home dir."); home.join(DEFAULT_LIGHTHOUSE_DIR) }; From 616cc616db1509519908e2687ad9eeed6b555cb6 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 22 Sep 2018 08:17:31 +1000 Subject: [PATCH 14/14] Fix some clippy lints --- lighthouse/client/mod.rs | 10 ++-- lighthouse/config/mod.rs | 2 +- lighthouse/db/disk_db.rs | 2 +- lighthouse/db/memory_db.rs | 24 ++++----- lighthouse/main.rs | 2 +- .../transition/attestation_parent_hashes.rs | 50 +++++++++---------- lighthouse/state/transition/shuffling/rng.rs | 27 +++++----- lighthouse/sync/block.rs | 24 --------- lighthouse/sync/mod.rs | 1 - lighthouse/sync/network.rs | 37 ++++++++------ lighthouse/sync/sync_future.rs | 10 ++-- lighthouse/sync/wire_protocol.rs | 2 +- lighthouse/utils/logging.rs | 5 +- 13 files changed, 85 insertions(+), 111 deletions(-) delete mode 100644 lighthouse/sync/block.rs diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index ee73c2d29..8c65da1a5 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -25,8 +25,8 @@ impl Client { /// /// Presently, this means starting network and sync threads /// and plumbing them together. - pub fn new(config: LighthouseConfig, - log: Logger) + pub fn new(config: &LighthouseConfig, + log: &Logger) -> Self { // Open the local db @@ -65,8 +65,8 @@ impl Client { sync_db, network_tx.clone(), network_rx, - sync_out_sender, - sync_in_receiver, + &sync_out_sender, + &sync_in_receiver, sync_log, ); }); @@ -75,7 +75,7 @@ impl Client { // Return the client struct Self { - db: db, + db, network_thread, sync_thread, } diff --git a/lighthouse/config/mod.rs b/lighthouse/config/mod.rs index 42dd919e6..725cb455e 100644 --- a/lighthouse/config/mod.rs +++ b/lighthouse/config/mod.rs @@ -23,7 +23,7 @@ impl LighthouseConfig { home.join(DEFAULT_LIGHTHOUSE_DIR) }; fs::create_dir_all(&data_dir) - .expect(&format!("Unable to create {:?}", &data_dir)); + .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); let p2p_listen_port = 0; Self { data_dir, diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs index f8f8a7a4c..86face602 100644 --- a/lighthouse/db/disk_db.rs +++ b/lighthouse/db/disk_db.rs @@ -40,7 +40,7 @@ impl DiskDB { * Initialise the path */ fs::create_dir_all(&path) - .expect(&format!("Unable to create {:?}", &path)); + .unwrap_or_else(|_| panic!("Unable to create {:?}", &path)); let db_path = path.join("database"); /* diff --git a/lighthouse/db/memory_db.rs b/lighthouse/db/memory_db.rs index c875b5554..29c2091de 100644 --- a/lighthouse/db/memory_db.rs +++ b/lighthouse/db/memory_db.rs @@ -53,12 +53,11 @@ impl ClientDB for MemoryDB { let db = self.db.read().unwrap(); let known_columns = self.known_columns.read().unwrap(); - match known_columns.contains(&col.to_string()) { - false => Err(DBError{ message: "Unknown column".to_string() }), - true => { - let column_key = MemoryDB::get_key_for_col(col, key); - Ok(db.get(&column_key).and_then(|val| Some(val.clone()))) - } + if known_columns.contains(&col.to_string()) { + let column_key = MemoryDB::get_key_for_col(col, key); + Ok(db.get(&column_key).and_then(|val| Some(val.clone()))) + } else { + Err(DBError{ message: "Unknown column".to_string() }) } } @@ -70,13 +69,12 @@ impl ClientDB for MemoryDB { let mut db = self.db.write().unwrap(); let known_columns = self.known_columns.read().unwrap(); - match known_columns.contains(&col.to_string()) { - false => Err(DBError{ message: "Unknown column".to_string() }), - true => { - let column_key = MemoryDB::get_key_for_col(col, key); - db.insert(column_key, val.to_vec()); - Ok(()) - } + if known_columns.contains(&col.to_string()) { + let column_key = MemoryDB::get_key_for_col(col, key); + db.insert(column_key, val.to_vec()); + Ok(()) + } else { + Err(DBError{ message: "Unknown column".to_string() }) } } } diff --git a/lighthouse/main.rs b/lighthouse/main.rs index 0a9f35288..367b71920 100644 --- a/lighthouse/main.rs +++ b/lighthouse/main.rs @@ -64,7 +64,7 @@ fn main() { "data_dir" => &config.data_dir.to_str(), "port" => &config.p2p_listen_port); - let client = Client::new(config, log.new(o!())); + let client = Client::new(&config, &log); client.sync_thread.join().unwrap(); info!(log, "Exiting."); diff --git a/lighthouse/state/transition/attestation_parent_hashes.rs b/lighthouse/state/transition/attestation_parent_hashes.rs index 624d8c443..35866159f 100644 --- a/lighthouse/state/transition/attestation_parent_hashes.rs +++ b/lighthouse/state/transition/attestation_parent_hashes.rs @@ -13,16 +13,16 @@ use super::TransitionError; /// See this slide for more information: /// https://tinyurl.com/ybzn2spw pub fn attestation_parent_hashes( - cycle_length: &u8, - block_slot: &u64, - attestation_slot: &u64, - current_hashes: &Vec, - oblique_hashes: &Vec) + cycle_length: u8, + block_slot: u64, + attestation_slot: u64, + current_hashes: &[Hash256], + oblique_hashes: &[Hash256]) -> Result, TransitionError> { // This cast places a limit on cycle_length. If you change it, check math // for overflow. - let cycle_length: u64 = *cycle_length as u64; + let cycle_length: u64 = u64::from(cycle_length); if current_hashes.len() as u64 != (cycle_length * 2) { return Err(TransitionError::InvalidInput(String::from( @@ -69,7 +69,7 @@ pub fn attestation_parent_hashes( let mut hashes = Vec::new(); hashes.extend_from_slice( ¤t_hashes[(start as usize)..(end as usize)]); - hashes.append(&mut oblique_hashes.clone()); + hashes.extend_from_slice(oblique_hashes); Ok(hashes) } @@ -98,9 +98,9 @@ mod tests { let current_hashes = get_range_of_hashes(3, 19); let oblique_hashes = get_range_of_hashes(100, 102); let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -123,9 +123,9 @@ mod tests { let current_hashes = get_range_of_hashes(3, 19); let oblique_hashes = get_range_of_hashes(100, 108); let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -148,9 +148,9 @@ mod tests { let current_hashes = get_range_of_hashes(3, 19); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -171,9 +171,9 @@ mod tests { let current_hashes = get_range_of_hashes(0, 16); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -194,9 +194,9 @@ mod tests { let current_hashes = get_range_of_hashes(0, 16); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_err()); @@ -213,9 +213,9 @@ mod tests { let current_hashes = get_range_of_hashes(0, 15); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_err()); diff --git a/lighthouse/state/transition/shuffling/rng.rs b/lighthouse/state/transition/shuffling/rng.rs index 224e9130f..e43c582ff 100644 --- a/lighthouse/state/transition/shuffling/rng.rs +++ b/lighthouse/state/transition/shuffling/rng.rs @@ -2,7 +2,7 @@ use super::blake2_rfc::blake2s::{ Blake2s, Blake2sResult }; const SEED_SIZE_BYTES: usize = 32; const RAND_BYTES: usize = 3; // 24 / 8 -const RAND_MAX: u32 = 16777216; // 2**24 +const RAND_MAX: u32 = 16_777_216; // 2**24 /// A pseudo-random number generator which given a seed /// uses successive blake2s hashing to generate "entropy". @@ -31,17 +31,14 @@ impl ShuffleRng { /// Extracts 3 bytes from the `seed`. Rehashes seed if required. fn rand(&mut self) -> u32 { self.idx += RAND_BYTES; - match self.idx >= SEED_SIZE_BYTES { - true => { - self.rehash_seed(); - self.rand() - } - false => { - int_from_byte_slice( - self.seed.as_bytes(), - self.idx - RAND_BYTES, - ) - } + if self.idx >= SEED_SIZE_BYTES { + self.rehash_seed(); + self.rand() + } else { + int_from_byte_slice( + self.seed.as_bytes(), + self.idx - RAND_BYTES, + ) } } @@ -65,9 +62,9 @@ impl ShuffleRng { /// Returns that integer. fn int_from_byte_slice(source: &[u8], offset: usize) -> u32 { ( - source[offset + 2] as u32) | - ((source[offset + 1] as u32) << 8) | - ((source[offset ] as u32) << 16 + u32::from(source[offset + 2])) | + (u32::from(source[offset + 1]) << 8) | + (u32::from(source[offset ]) << 16 ) } diff --git a/lighthouse/sync/block.rs b/lighthouse/sync/block.rs deleted file mode 100644 index 72d827285..000000000 --- a/lighthouse/sync/block.rs +++ /dev/null @@ -1,24 +0,0 @@ -use std::sync::Arc; -use super::db::ClientDB; -use slog::Logger; - -pub enum BlockStatus { - Valid, - AlreadyKnown, - TooOld, - TimeInvalid, - UnknownPoWHash, - NoAttestations, - InvalidAttestation, - NotProposerSigned, -} - -pub fn process_unverified_blocks( - _serialized_block: &[u8], - _db: Arc, - _log: Logger) -{ - // -} - - diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index f56260e4f..6e1f0be11 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -3,7 +3,6 @@ extern crate slog; extern crate tokio; extern crate network_libp2p; -pub mod block; pub mod network; pub mod sync_future; pub mod wire_protocol; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index 1383b5040..1204a2093 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -8,8 +8,6 @@ use super::network_libp2p::message::{ NetworkEventType, }; -use super::block::process_unverified_blocks; - use super::wire_protocol::{ WireMessage, WireMessageHeader, @@ -25,9 +23,9 @@ use super::futures::sync::mpsc::{ /// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, - db: Arc, - network_tx: UnboundedSender, - log: Logger) + db: &Arc, + network_tx: &UnboundedSender, + log: &Logger) -> Result<(), ()> { debug!(&log, ""; @@ -38,10 +36,10 @@ pub fn handle_network_event( NetworkEventType::Message => { if let Some(data) = event.data { handle_network_message( - data, - db, - network_tx, - log) + &data, + &db, + &network_tx, + &log) } else { Ok(()) } @@ -55,10 +53,10 @@ pub fn handle_network_event( /// This function should be called whenever a peer from a network /// (e.g., libp2p) has sent a message to us. fn handle_network_message( - message: Vec, - db: Arc, - _network_tx: UnboundedSender, - log: Logger) + message: &[u8], + db: &Arc, + _network_tx: &UnboundedSender, + log: &Logger) -> Result<(), ()> { match WireMessage::decode(&message) { @@ -67,8 +65,8 @@ fn handle_network_message( WireMessageHeader::Blocks => { process_unverified_blocks( msg.body, - db, - log + &db, + &log ); Ok(()) } @@ -76,7 +74,14 @@ fn handle_network_message( } } Err(_) => { - return Ok(()) // No need to pass the error back + Ok(()) // No need to pass the error back } } } + +fn process_unverified_blocks(_message: &[u8], + _db: &Arc, + _log: &Logger) +{ + // +} diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index 0e46e9e50..31cc933ca 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -28,8 +28,8 @@ pub fn run_sync_future( db: Arc, network_tx: NetworkSender, network_rx: NetworkReceiver, - _sync_tx: SyncSender, - _sync_rx: SyncReceiver, + _sync_tx: &SyncSender, + _sync_rx: &SyncReceiver, log: Logger) { let network_future = { @@ -37,9 +37,9 @@ pub fn run_sync_future( .for_each(move |event| { handle_network_event( event, - db.clone(), - network_tx.clone(), - log.clone()) + &db.clone(), + &network_tx.clone(), + &log.clone()) }) .map_err(|_| panic!("rx failed")) }; diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs index 999d14a73..8f9c8bd57 100644 --- a/lighthouse/sync/wire_protocol.rs +++ b/lighthouse/sync/wire_protocol.rs @@ -19,7 +19,7 @@ pub struct WireMessage<'a> { } impl<'a> WireMessage<'a> { - pub fn decode(bytes: &'a Vec) + pub fn decode(bytes: &'a [u8]) -> Result { if let Some((header_byte, body)) = bytes.split_first() { diff --git a/lighthouse/utils/logging.rs b/lighthouse/utils/logging.rs index 67b27bf2c..7dc186d30 100644 --- a/lighthouse/utils/logging.rs +++ b/lighthouse/utils/logging.rs @@ -7,11 +7,10 @@ pub use slog::Logger; pub fn test_logger() -> slog::Logger { let plain = slog_term::PlainSyncDecorator::new(slog_term::TestStdoutWriter); - let logger = Logger::root( + Logger::root( slog_term::FullFormat::new(plain) .build().fuse(), o!() - ); - logger + ) } pub fn get_logger() -> slog::Logger {