diff --git a/README.md b/README.md index 7ee0638c4..a3e82b162 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Lighthouse: a (future) Ethereum 2.0 client -[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) +[![Build Status](https://travis-ci.org/sigp/lighthouse.svg?branch=master)](https://travis-ci.org/sigp/lighthouse) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/sigp/lighthouse?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) A **work-in-progress** implementation of the Ethereum 2.0 Beacon Chain in Rust. diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index f87f05738..8c65da1a5 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -1,6 +1,6 @@ -use std::sync::{ Arc, RwLock }; +use std::sync::Arc; use std::thread; -use super::db::{ DB, open_db }; +use super::db::{ DiskDB }; use super::config::LighthouseConfig; use super::futures::sync::mpsc::{ unbounded, @@ -10,10 +10,12 @@ use super::network_libp2p::state::NetworkState; use super::slog::Logger; use super::sync::run_sync_future; +use super::db::ClientDB; + /// Represents the co-ordination of the /// networking, syncing and RPC (not-yet-implemented) threads. pub struct Client { - pub db: Arc>, + pub db: Arc, pub network_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>, } @@ -23,14 +25,14 @@ impl Client { /// /// Presently, this means starting network and sync threads /// and plumbing them together. - pub fn new(config: LighthouseConfig, - log: Logger) + pub fn new(config: &LighthouseConfig, + log: &Logger) -> Self { // Open the local db let db = { - let db = open_db(&config.data_dir); - Arc::new(RwLock::new(db)) + let db = DiskDB::open(&config.data_dir, None); + Arc::new(db) }; // Start the network thread @@ -57,14 +59,14 @@ impl Client { let (sync_out_sender, sync_out_receiver) = unbounded(); let (sync_in_sender, sync_in_receiver) = unbounded(); let sync_log = log.new(o!()); - let sync_db = Arc::clone(&db); + let sync_db = db.clone(); let thread = thread::spawn(move || { run_sync_future( sync_db, network_tx.clone(), network_rx, - sync_out_sender, - sync_in_receiver, + &sync_out_sender, + &sync_in_receiver, sync_log, ); }); @@ -73,7 +75,7 @@ impl Client { // Return the client struct Self { - db: db, + db, network_thread, sync_thread, } diff --git a/lighthouse/config/mod.rs b/lighthouse/config/mod.rs index 80350293d..725cb455e 100644 --- a/lighthouse/config/mod.rs +++ b/lighthouse/config/mod.rs @@ -1,5 +1,7 @@ -use std::{ env, fs }; -use std::path::PathBuf; +extern crate dirs; + +use std::fs; +use std::path::PathBuf; /// Stores the core configuration for this Lighthouse instance. /// This struct is general, other components may implement more @@ -16,12 +18,12 @@ impl LighthouseConfig { /// Build a new lighthouse configuration from defaults. pub fn default() -> Self{ let data_dir = { - let home = env::home_dir() + let home = dirs::home_dir() .expect("Unable to determine home dir."); home.join(DEFAULT_LIGHTHOUSE_DIR) }; fs::create_dir_all(&data_dir) - .expect(&format!("Unable to create {:?}", &data_dir)); + .unwrap_or_else(|_| panic!("Unable to create {:?}", &data_dir)); let p2p_listen_port = 0; Self { data_dir, diff --git a/lighthouse/db/disk_db.rs b/lighthouse/db/disk_db.rs new file mode 100644 index 000000000..86face602 --- /dev/null +++ b/lighthouse/db/disk_db.rs @@ -0,0 +1,174 @@ +extern crate rocksdb; + +use std::fs; +use std::path::Path; +use super::rocksdb::{ + DB, + Options, +}; +use super::rocksdb::Error as RocksError; +use super::{ + ClientDB, + DBValue, + DBError +}; + +/// A on-disk database which implements the ClientDB trait. +/// +/// This implementation uses RocksDB with default options. +pub struct DiskDB { + db: DB, +} + +impl DiskDB { + /// Open the RocksDB database, optionally supplying columns if required. + /// + /// The RocksDB database will be contained in a directory titled + /// "database" in the supplied path. + /// + /// # Panics + /// + /// Panics if the database is unable to be created. + pub fn open(path: &Path, columns: Option<&[&str]>) -> Self { + /* + * Initialise the options + */ + let mut options = Options::default(); + options.create_if_missing(true); + + /* + * Initialise the path + */ + fs::create_dir_all(&path) + .unwrap_or_else(|_| panic!("Unable to create {:?}", &path)); + let db_path = path.join("database"); + + /* + * Open the database + */ + let db = match columns { + None => DB::open(&options, db_path), + Some(columns) => DB::open_cf(&options, db_path, columns) + }.expect("Unable to open local database");; + + Self { + db, + } + } + + /// Create a RocksDB column family. Corresponds to the + /// `create_cf()` function on the RocksDB API. + fn create_col(&mut self, col: &str) + -> Result<(), DBError> + { + match self.db.create_cf(col, &Options::default()) { + Err(e) => Err(e.into()), + Ok(_) => Ok(()) + } + } + +} + +impl From for DBError { + fn from(e: RocksError) -> Self { + Self { message: e.to_string() } + } +} + +impl ClientDB for DiskDB { + /// Get the value for some key on some column. + /// + /// Corresponds to the `get_cf()` method on the RocksDB API. + /// Will attempt to get the `ColumnFamily` and return an Err + /// if it fails. + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError> + { + match self.db.cf_handle(col) { + None => Err(DBError{ message: "Unknown column".to_string() }), + Some(handle) => { + match self.db.get_cf(handle, key)? { + None => Ok(None), + Some(db_vec) => Ok(Some(DBValue::from(&*db_vec))) + } + } + } + } + + /// Set some value for some key on some column. + /// + /// Corresponds to the `cf_handle()` method on the RocksDB API. + /// Will attempt to get the `ColumnFamily` and return an Err + /// if it fails. + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError> + { + match self.db.cf_handle(col) { + None => Err(DBError{ message: "Unknown column".to_string() }), + Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()) + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use super::super::ClientDB; + use std::{ env, fs, thread }; + use std::sync::Arc; + + #[test] + #[ignore] + fn test_rocksdb_can_use_db() { + let pwd = env::current_dir().unwrap(); + let path = pwd.join("testdb_please_remove"); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + + let col_name: &str = "TestColumn"; + let column_families = vec![col_name]; + + let mut db = DiskDB::open(&path, None); + + for cf in column_families { + db.create_col(&cf).unwrap(); + } + + let db = Arc::new(db); + + let thread_count = 10; + let write_count = 10; + + // We're execting the product of these numbers to fit in one byte. + assert!(thread_count * write_count <= 255); + + let mut handles = vec![]; + for t in 0..thread_count { + let wc = write_count; + let db = db.clone(); + let col = col_name.clone(); + let handle = thread::spawn(move || { + for w in 0..wc { + let key = (t * w) as u8; + let val = 42; + db.put(&col, &vec![key], &vec![val]).unwrap(); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + for t in 0..thread_count { + for w in 0..write_count { + let key = (t * w) as u8; + let val = db.get(&col_name, &vec![key]).unwrap().unwrap(); + assert_eq!(vec![42], val); + } + } + fs::remove_dir_all(&path).unwrap(); + } +} diff --git a/lighthouse/db/memory_db.rs b/lighthouse/db/memory_db.rs new file mode 100644 index 000000000..29c2091de --- /dev/null +++ b/lighthouse/db/memory_db.rs @@ -0,0 +1,177 @@ +use std::collections::{ HashSet, HashMap }; +use std::sync::RwLock; +use super::blake2::blake2b::blake2b; +use super::{ + ClientDB, + DBValue, + DBError +}; + +type DBHashMap = HashMap, Vec>; +type ColumnHashSet = HashSet; + +/// An in-memory database implementing the ClientDB trait. +/// +/// It is not particularily optimized, it exists for ease and speed of testing. It's not expected +/// this DB would be used outside of tests. +pub struct MemoryDB { + db: RwLock, + known_columns: RwLock +} + +impl MemoryDB { + /// Open the in-memory database. + /// + /// All columns must be supplied initially, you will get an error if you try to access a column + /// that was not declared here. This condition is enforced artificially to simulate RocksDB. + pub fn open(columns: Option<&[&str]>) -> Self { + let db: DBHashMap = HashMap::new(); + let mut known_columns: ColumnHashSet = HashSet::new(); + if let Some(columns) = columns { + for col in columns { + known_columns.insert(col.to_string()); + } + } + Self { + db: RwLock::new(db), + known_columns: RwLock::new(known_columns), + } + } + + /// Hashes a key and a column name in order to get a unique key for the supplied column. + fn get_key_for_col(col: &str, key: &[u8]) -> Vec { + blake2b(32, col.as_bytes(), key).as_bytes().to_vec() + } +} + +impl ClientDB for MemoryDB { + /// Get the value of some key from the database. Returns `None` if the key does not exist. + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError> + { + // Panic if the DB locks are poisoned. + let db = self.db.read().unwrap(); + let known_columns = self.known_columns.read().unwrap(); + + if known_columns.contains(&col.to_string()) { + let column_key = MemoryDB::get_key_for_col(col, key); + Ok(db.get(&column_key).and_then(|val| Some(val.clone()))) + } else { + Err(DBError{ message: "Unknown column".to_string() }) + } + } + + /// Puts a key in the database. + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError> + { + // Panic if the DB locks are poisoned. + let mut db = self.db.write().unwrap(); + let known_columns = self.known_columns.read().unwrap(); + + if known_columns.contains(&col.to_string()) { + let column_key = MemoryDB::get_key_for_col(col, key); + db.insert(column_key, val.to_vec()); + Ok(()) + } else { + Err(DBError{ message: "Unknown column".to_string() }) + } + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use super::super::ClientDB; + use std::thread; + use std::sync::Arc; + + #[test] + fn test_memorydb_column_access() { + let col_a: &str = "ColumnA"; + let col_b: &str = "ColumnB"; + + let column_families = vec![ + col_a, + col_b, + ]; + + let db = MemoryDB::open(Some(&column_families)); + + /* + * Testing that if we write to the same key in different columns that + * there is not an overlap. + */ + db.put(col_a, "same".as_bytes(), "cat".as_bytes()).unwrap(); + db.put(col_b, "same".as_bytes(), "dog".as_bytes()).unwrap(); + + assert_eq!(db.get(col_a, "same".as_bytes()).unwrap().unwrap(), "cat".as_bytes()); + assert_eq!(db.get(col_b, "same".as_bytes()).unwrap().unwrap(), "dog".as_bytes()); + + + } + + #[test] + fn test_memorydb_unknown_column_access() { + let col_a: &str = "ColumnA"; + let col_x: &str = "ColumnX"; + + let column_families = vec![ + col_a, + // col_x is excluded on purpose + ]; + + let db = MemoryDB::open(Some(&column_families)); + + /* + * Test that we get errors when using undeclared columns + */ + assert!(db.put(col_a, "cats".as_bytes(), "lol".as_bytes()).is_ok()); + assert!(db.put(col_x, "cats".as_bytes(), "lol".as_bytes()).is_err()); + + assert!(db.get(col_a, "cats".as_bytes()).is_ok()); + assert!(db.get(col_x, "cats".as_bytes()).is_err()); + } + + #[test] + fn test_memorydb_threading() { + let col_name: &str = "TestColumn"; + let column_families = vec![col_name]; + + let db = Arc::new(MemoryDB::open(Some(&column_families))); + + let thread_count = 10; + let write_count = 10; + + // We're execting the product of these numbers to fit in one byte. + assert!(thread_count * write_count <= 255); + + let mut handles = vec![]; + for t in 0..thread_count { + let wc = write_count; + let db = db.clone(); + let col = col_name.clone(); + let handle = thread::spawn(move || { + for w in 0..wc { + let key = (t * w) as u8; + let val = 42; + db.put(&col, &vec![key], &vec![val]).unwrap(); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.join().unwrap(); + } + + for t in 0..thread_count { + for w in 0..write_count { + let key = (t * w) as u8; + let val = db.get(&col_name, &vec![key]).unwrap().unwrap(); + assert_eq!(vec![42], val); + } + } + } +} diff --git a/lighthouse/db/mod.rs b/lighthouse/db/mod.rs index 92323b3ac..2919cdb28 100644 --- a/lighthouse/db/mod.rs +++ b/lighthouse/db/mod.rs @@ -1,14 +1,14 @@ extern crate rocksdb; +extern crate blake2_rfc as blake2; -use std::fs; -use std::path::Path; -pub use self::rocksdb::DB; +mod disk_db; +mod memory_db; +mod traits; -pub fn open_db(path: &Path) -> DB { - let db_path = path.join("rocksdb"); - fs::create_dir_all(&db_path) - .expect(&format!("Unable to create {:?}", &db_path)); - let db = DB::open_default(db_path.join("lighthouse.rdb")) - .expect("Unable to open local database."); - db -} +pub use self::disk_db::DiskDB; +pub use self::memory_db::MemoryDB; +pub use self::traits::{ + DBError, + DBValue, + ClientDB, +}; diff --git a/lighthouse/db/traits.rs b/lighthouse/db/traits.rs new file mode 100644 index 000000000..79766329a --- /dev/null +++ b/lighthouse/db/traits.rs @@ -0,0 +1,27 @@ +pub type DBValue = Vec; + +#[derive(Debug)] +pub struct DBError { + pub message: String +} + +impl DBError { + pub fn new(message: String) -> Self { + Self { message } + } +} + +/// A generic database to be used by the "client' (i.e., +/// the lighthouse blockchain client). +/// +/// The purpose of having this generic trait is to allow the +/// program to use a persistent on-disk database during production, +/// but use a transient database during tests. +pub trait ClientDB: Sync + Send { + fn get(&self, col: &str, key: &[u8]) + -> Result, DBError>; + + fn put(&self, col: &str, key: &[u8], val: &[u8]) + -> Result<(), DBError>; +} + diff --git a/lighthouse/main.rs b/lighthouse/main.rs index 0a9f35288..367b71920 100644 --- a/lighthouse/main.rs +++ b/lighthouse/main.rs @@ -64,7 +64,7 @@ fn main() { "data_dir" => &config.data_dir.to_str(), "port" => &config.p2p_listen_port); - let client = Client::new(config, log.new(o!())); + let client = Client::new(&config, &log); client.sync_thread.join().unwrap(); info!(log, "Exiting."); diff --git a/lighthouse/state/transition/attestation_parent_hashes.rs b/lighthouse/state/transition/attestation_parent_hashes.rs index 624d8c443..35866159f 100644 --- a/lighthouse/state/transition/attestation_parent_hashes.rs +++ b/lighthouse/state/transition/attestation_parent_hashes.rs @@ -13,16 +13,16 @@ use super::TransitionError; /// See this slide for more information: /// https://tinyurl.com/ybzn2spw pub fn attestation_parent_hashes( - cycle_length: &u8, - block_slot: &u64, - attestation_slot: &u64, - current_hashes: &Vec, - oblique_hashes: &Vec) + cycle_length: u8, + block_slot: u64, + attestation_slot: u64, + current_hashes: &[Hash256], + oblique_hashes: &[Hash256]) -> Result, TransitionError> { // This cast places a limit on cycle_length. If you change it, check math // for overflow. - let cycle_length: u64 = *cycle_length as u64; + let cycle_length: u64 = u64::from(cycle_length); if current_hashes.len() as u64 != (cycle_length * 2) { return Err(TransitionError::InvalidInput(String::from( @@ -69,7 +69,7 @@ pub fn attestation_parent_hashes( let mut hashes = Vec::new(); hashes.extend_from_slice( ¤t_hashes[(start as usize)..(end as usize)]); - hashes.append(&mut oblique_hashes.clone()); + hashes.extend_from_slice(oblique_hashes); Ok(hashes) } @@ -98,9 +98,9 @@ mod tests { let current_hashes = get_range_of_hashes(3, 19); let oblique_hashes = get_range_of_hashes(100, 102); let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -123,9 +123,9 @@ mod tests { let current_hashes = get_range_of_hashes(3, 19); let oblique_hashes = get_range_of_hashes(100, 108); let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -148,9 +148,9 @@ mod tests { let current_hashes = get_range_of_hashes(3, 19); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -171,9 +171,9 @@ mod tests { let current_hashes = get_range_of_hashes(0, 16); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_ok()); @@ -194,9 +194,9 @@ mod tests { let current_hashes = get_range_of_hashes(0, 16); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_err()); @@ -213,9 +213,9 @@ mod tests { let current_hashes = get_range_of_hashes(0, 15); let oblique_hashes = vec![]; let result = attestation_parent_hashes( - &cycle_length, - &block_slot, - &attestation_slot, + cycle_length, + block_slot, + attestation_slot, ¤t_hashes, &oblique_hashes); assert!(result.is_err()); diff --git a/lighthouse/state/transition/shuffling/rng.rs b/lighthouse/state/transition/shuffling/rng.rs index 224e9130f..e43c582ff 100644 --- a/lighthouse/state/transition/shuffling/rng.rs +++ b/lighthouse/state/transition/shuffling/rng.rs @@ -2,7 +2,7 @@ use super::blake2_rfc::blake2s::{ Blake2s, Blake2sResult }; const SEED_SIZE_BYTES: usize = 32; const RAND_BYTES: usize = 3; // 24 / 8 -const RAND_MAX: u32 = 16777216; // 2**24 +const RAND_MAX: u32 = 16_777_216; // 2**24 /// A pseudo-random number generator which given a seed /// uses successive blake2s hashing to generate "entropy". @@ -31,17 +31,14 @@ impl ShuffleRng { /// Extracts 3 bytes from the `seed`. Rehashes seed if required. fn rand(&mut self) -> u32 { self.idx += RAND_BYTES; - match self.idx >= SEED_SIZE_BYTES { - true => { - self.rehash_seed(); - self.rand() - } - false => { - int_from_byte_slice( - self.seed.as_bytes(), - self.idx - RAND_BYTES, - ) - } + if self.idx >= SEED_SIZE_BYTES { + self.rehash_seed(); + self.rand() + } else { + int_from_byte_slice( + self.seed.as_bytes(), + self.idx - RAND_BYTES, + ) } } @@ -65,9 +62,9 @@ impl ShuffleRng { /// Returns that integer. fn int_from_byte_slice(source: &[u8], offset: usize) -> u32 { ( - source[offset + 2] as u32) | - ((source[offset + 1] as u32) << 8) | - ((source[offset ] as u32) << 16 + u32::from(source[offset + 2])) | + (u32::from(source[offset + 1]) << 8) | + (u32::from(source[offset ]) << 16 ) } diff --git a/lighthouse/sync/messages.rs b/lighthouse/sync/messages.rs deleted file mode 100644 index 9173f1c40..000000000 --- a/lighthouse/sync/messages.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub enum SyncEventType { - Invalid, - PeerConnect, - PeerDrop, - ReceiveBlocks, - ReceiveAttestationRecords, -} - -pub struct SyncEvent { - event: SyncEventType, - data: Option> -} diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index 3d7d2d8d2..6e1f0be11 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -3,7 +3,6 @@ extern crate slog; extern crate tokio; extern crate network_libp2p; -pub mod messages; pub mod network; pub mod sync_future; pub mod wire_protocol; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index c2d1f1149..1204a2093 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -1,5 +1,5 @@ -use std::sync::{ RwLock, Arc }; -use super::db::DB; +use std::sync::Arc; +use super::db::ClientDB; use slog::Logger; use super::network_libp2p::message::{ @@ -8,7 +8,10 @@ use super::network_libp2p::message::{ NetworkEventType, }; -use super::wire_protocol::{ WireMessageType, message_type }; +use super::wire_protocol::{ + WireMessage, + WireMessageHeader, +}; use super::futures::sync::mpsc::{ UnboundedSender, @@ -20,9 +23,9 @@ use super::futures::sync::mpsc::{ /// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, - db: Arc>, - network_tx: UnboundedSender, - log: Logger) + db: &Arc, + network_tx: &UnboundedSender, + log: &Logger) -> Result<(), ()> { debug!(&log, ""; @@ -33,10 +36,10 @@ pub fn handle_network_event( NetworkEventType::Message => { if let Some(data) = event.data { handle_network_message( - data, - db, - network_tx, - log) + &data, + &db, + &network_tx, + &log) } else { Ok(()) } @@ -50,17 +53,35 @@ pub fn handle_network_event( /// This function should be called whenever a peer from a network /// (e.g., libp2p) has sent a message to us. fn handle_network_message( - message: Vec, - _db: Arc>, - _network_tx: UnboundedSender, - _log: Logger) + message: &[u8], + db: &Arc, + _network_tx: &UnboundedSender, + log: &Logger) -> Result<(), ()> { - match message_type(&message) { - Some(WireMessageType::Blocks) => { - // Do something with inbound blocks. - Ok(()) + match WireMessage::decode(&message) { + Ok(msg) => { + match msg.header { + WireMessageHeader::Blocks => { + process_unverified_blocks( + msg.body, + &db, + &log + ); + Ok(()) + } + _ => Ok(()) + } + } + Err(_) => { + Ok(()) // No need to pass the error back } - _ => Ok(()) } } + +fn process_unverified_blocks(_message: &[u8], + _db: &Arc, + _log: &Logger) +{ + // +} diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index c3b2355fb..31cc933ca 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -9,8 +9,8 @@ use super::network_libp2p::message::{ OutgoingMessage, }; use super::network::handle_network_event; -use std::sync::{ RwLock, Arc }; -use super::db::DB; +use std::sync::Arc; +use super::db::ClientDB; use slog::Logger; type NetworkSender = UnboundedSender; @@ -25,11 +25,11 @@ type SyncReceiver = UnboundedReceiver>; /// from the network and the RPC and update /// the state. pub fn run_sync_future( - db: Arc>, + db: Arc, network_tx: NetworkSender, network_rx: NetworkReceiver, - _sync_tx: SyncSender, - _sync_rx: SyncReceiver, + _sync_tx: &SyncSender, + _sync_rx: &SyncReceiver, log: Logger) { let network_future = { @@ -37,9 +37,9 @@ pub fn run_sync_future( .for_each(move |event| { handle_network_event( event, - db.clone(), - network_tx.clone(), - log.clone()) + &db.clone(), + &network_tx.clone(), + &log.clone()) }) .map_err(|_| panic!("rx failed")) }; diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs index 5d4f77204..8f9c8bd57 100644 --- a/lighthouse/sync/wire_protocol.rs +++ b/lighthouse/sync/wire_protocol.rs @@ -1,4 +1,9 @@ -pub enum WireMessageType { +pub enum WireMessageDecodeError { + TooShort, + UnknownType, +} + +pub enum WireMessageHeader { Status, NewBlockHashes, GetBlockHashes, @@ -8,6 +13,48 @@ pub enum WireMessageType { NewBlock, } +pub struct WireMessage<'a> { + pub header: WireMessageHeader, + pub body: &'a [u8], +} + +impl<'a> WireMessage<'a> { + pub fn decode(bytes: &'a [u8]) + -> Result + { + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageHeader::Blocks), + _ => None + }; + match header { + Some(header) => Ok(Self{header, body}), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } + } +} + +/* +pub fn decode_wire_message(bytes: &[u8]) + -> Result +{ + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageType::Blocks), + _ => None + }; + match header { + Some(header) => Ok((header, body)), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } +} + /// Determines the message type of some given /// message. @@ -22,3 +69,21 @@ pub fn message_type(message: &Vec) _ => None } } + +pub fn identify_wire_protocol_message(message: &Vec) + -> Result<(WireMessageType, &[u8]), WireMessageDecodeError> +{ + fn strip_header(v: &Vec) -> &[u8] { + match v.get(1..v.len()) { + None => &vec![], + Some(s) => s + } + } + + match message.get(0) { + Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))), + None => Err(WireMessageDecodeError::TooShort), + _ => Err(WireMessageDecodeError::UnknownType), + } +} +*/ diff --git a/lighthouse/utils/logging.rs b/lighthouse/utils/logging.rs index 67b27bf2c..7dc186d30 100644 --- a/lighthouse/utils/logging.rs +++ b/lighthouse/utils/logging.rs @@ -7,11 +7,10 @@ pub use slog::Logger; pub fn test_logger() -> slog::Logger { let plain = slog_term::PlainSyncDecorator::new(slog_term::TestStdoutWriter); - let logger = Logger::root( + Logger::root( slog_term::FullFormat::new(plain) .build().fuse(), o!() - ); - logger + ) } pub fn get_logger() -> slog::Logger { diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index 50f5d26b8..de8077ac6 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -8,13 +8,13 @@ bigint = "4.2" bytes = "" eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" } futures = "0.1.23" -libp2p-peerstore = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-core = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-mplex = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-tcp-transport = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-floodsub = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-identify = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } -libp2p-kad = { git = "https://github.com/tomaka/libp2p-rs", branch ="zksummit" } +libp2p-peerstore = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-core = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-mplex = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-tcp-transport = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-floodsub = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-identify = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } +libp2p-kad = { git = "https://github.com/sigp/libp2p-rs", branch ="zksummit" } pem = "0.5.0" rand = "0.3" slog = "^2.2.3"