diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index f87f05738..89b1729ba 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -1,4 +1,4 @@ -use std::sync::{ Arc, RwLock }; +use std::sync::Arc; use std::thread; use super::db::{ DB, open_db }; use super::config::LighthouseConfig; @@ -13,7 +13,7 @@ use super::sync::run_sync_future; /// Represents the co-ordination of the /// networking, syncing and RPC (not-yet-implemented) threads. pub struct Client { - pub db: Arc>, + pub db: Arc, pub network_thread: thread::JoinHandle<()>, pub sync_thread: thread::JoinHandle<()>, } @@ -30,7 +30,7 @@ impl Client { // Open the local db let db = { let db = open_db(&config.data_dir); - Arc::new(RwLock::new(db)) + Arc::new(db) }; // Start the network thread @@ -57,7 +57,7 @@ impl Client { let (sync_out_sender, sync_out_receiver) = unbounded(); let (sync_in_sender, sync_in_receiver) = unbounded(); let sync_log = log.new(o!()); - let sync_db = Arc::clone(&db); + let sync_db = db.clone(); let thread = thread::spawn(move || { run_sync_future( sync_db, diff --git a/lighthouse/sync/block.rs b/lighthouse/sync/block.rs new file mode 100644 index 000000000..2612ca8b1 --- /dev/null +++ b/lighthouse/sync/block.rs @@ -0,0 +1,23 @@ +use super::db::DB; +use slog::Logger; + +pub enum BlockStatus { + Valid, + AlreadyKnown, + TooOld, + TimeInvalid, + UnknownPoWHash, + NoAttestations, + InvalidAttestation, + NotProposerSigned, +} + +pub fn process_unverified_blocks( + _serialized_block: &[u8], + _db: &DB, + _log: Logger) +{ + // +} + + diff --git a/lighthouse/sync/messages.rs b/lighthouse/sync/messages.rs deleted file mode 100644 index 9173f1c40..000000000 --- a/lighthouse/sync/messages.rs +++ /dev/null @@ -1,12 +0,0 @@ -pub enum SyncEventType { - Invalid, - PeerConnect, - PeerDrop, - ReceiveBlocks, - ReceiveAttestationRecords, -} - -pub struct SyncEvent { - event: SyncEventType, - data: Option> -} diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index 3d7d2d8d2..f56260e4f 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -3,7 +3,7 @@ extern crate slog; extern crate tokio; extern crate network_libp2p; -pub mod messages; +pub mod block; pub mod network; pub mod sync_future; pub mod wire_protocol; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index c2d1f1149..99d178ff0 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -1,4 +1,4 @@ -use std::sync::{ RwLock, Arc }; +use std::sync::Arc; use super::db::DB; use slog::Logger; @@ -8,7 +8,12 @@ use super::network_libp2p::message::{ NetworkEventType, }; -use super::wire_protocol::{ WireMessageType, message_type }; +use super::block::process_unverified_blocks; + +use super::wire_protocol::{ + WireMessage, + WireMessageHeader, +}; use super::futures::sync::mpsc::{ UnboundedSender, @@ -20,7 +25,7 @@ use super::futures::sync::mpsc::{ /// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, - db: Arc>, + db: Arc, network_tx: UnboundedSender, log: Logger) -> Result<(), ()> @@ -34,7 +39,7 @@ pub fn handle_network_event( if let Some(data) = event.data { handle_network_message( data, - db, + &db, network_tx, log) } else { @@ -51,16 +56,27 @@ pub fn handle_network_event( /// (e.g., libp2p) has sent a message to us. fn handle_network_message( message: Vec, - _db: Arc>, + db: &DB, _network_tx: UnboundedSender, - _log: Logger) + log: Logger) -> Result<(), ()> { - match message_type(&message) { - Some(WireMessageType::Blocks) => { - // Do something with inbound blocks. - Ok(()) + match WireMessage::decode(&message) { + Ok(msg) => { + match msg.header { + WireMessageHeader::Blocks => { + process_unverified_blocks( + msg.body, + db, + log + ); + Ok(()) + } + _ => Ok(()) + } + } + Err(_) => { + return Ok(()) // No need to pass the error back } - _ => Ok(()) } } diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index c3b2355fb..06836fb30 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -9,7 +9,7 @@ use super::network_libp2p::message::{ OutgoingMessage, }; use super::network::handle_network_event; -use std::sync::{ RwLock, Arc }; +use std::sync::Arc; use super::db::DB; use slog::Logger; @@ -25,7 +25,7 @@ type SyncReceiver = UnboundedReceiver>; /// from the network and the RPC and update /// the state. pub fn run_sync_future( - db: Arc>, + db: Arc, network_tx: NetworkSender, network_rx: NetworkReceiver, _sync_tx: SyncSender, diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs index 5d4f77204..999d14a73 100644 --- a/lighthouse/sync/wire_protocol.rs +++ b/lighthouse/sync/wire_protocol.rs @@ -1,4 +1,9 @@ -pub enum WireMessageType { +pub enum WireMessageDecodeError { + TooShort, + UnknownType, +} + +pub enum WireMessageHeader { Status, NewBlockHashes, GetBlockHashes, @@ -8,6 +13,48 @@ pub enum WireMessageType { NewBlock, } +pub struct WireMessage<'a> { + pub header: WireMessageHeader, + pub body: &'a [u8], +} + +impl<'a> WireMessage<'a> { + pub fn decode(bytes: &'a Vec) + -> Result + { + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageHeader::Blocks), + _ => None + }; + match header { + Some(header) => Ok(Self{header, body}), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } + } +} + +/* +pub fn decode_wire_message(bytes: &[u8]) + -> Result +{ + if let Some((header_byte, body)) = bytes.split_first() { + let header = match header_byte { + 0x06 => Some(WireMessageType::Blocks), + _ => None + }; + match header { + Some(header) => Ok((header, body)), + None => Err(WireMessageDecodeError::UnknownType) + } + } else { + Err(WireMessageDecodeError::TooShort) + } +} + /// Determines the message type of some given /// message. @@ -22,3 +69,21 @@ pub fn message_type(message: &Vec) _ => None } } + +pub fn identify_wire_protocol_message(message: &Vec) + -> Result<(WireMessageType, &[u8]), WireMessageDecodeError> +{ + fn strip_header(v: &Vec) -> &[u8] { + match v.get(1..v.len()) { + None => &vec![], + Some(s) => s + } + } + + match message.get(0) { + Some(0x06) => Ok((WireMessageType::Blocks, strip_header(message))), + None => Err(WireMessageDecodeError::TooShort), + _ => Err(WireMessageDecodeError::UnknownType), + } +} +*/