From 3399b02393d097f39f2b286e15031c2498d0aac9 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 9 Sep 2018 18:28:36 +0200 Subject: [PATCH] Expand sync works --- lighthouse/sync/mod.rs | 1 + lighthouse/sync/network.rs | 44 ++++++++++++++++++++++++-------- lighthouse/sync/sync_future.rs | 14 ++++------ lighthouse/sync/wire_protocol.rs | 24 +++++++++++++++++ 4 files changed, 63 insertions(+), 20 deletions(-) create mode 100644 lighthouse/sync/wire_protocol.rs diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index efff37624..3d7d2d8d2 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -6,6 +6,7 @@ extern crate network_libp2p; pub mod messages; pub mod network; pub mod sync_future; +pub mod wire_protocol; pub use self::sync_future::run_sync_future; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs index 4954edc1c..c2d1f1149 100644 --- a/lighthouse/sync/network.rs +++ b/lighthouse/sync/network.rs @@ -8,10 +8,16 @@ use super::network_libp2p::message::{ NetworkEventType, }; +use super::wire_protocol::{ WireMessageType, message_type }; + use super::futures::sync::mpsc::{ UnboundedSender, }; +/// Accept a network event and perform all required processing. +/// +/// This function should be called whenever an underlying network +/// (e.g., libp2p) has an event to push up to the sync process. pub fn handle_network_event( event: NetworkEvent, db: Arc>, @@ -19,26 +25,42 @@ pub fn handle_network_event( log: Logger) -> Result<(), ()> { + debug!(&log, ""; + "network_event" => format!("{:?}", &event)); match event.event { NetworkEventType::PeerConnect => Ok(()), NetworkEventType::PeerDrop => Ok(()), - NetworkEventType::Message => handle_network_message( - event.data, - db, - network_tx, - log - ) + NetworkEventType::Message => { + if let Some(data) = event.data { + handle_network_message( + data, + db, + network_tx, + log) + } else { + Ok(()) + } + } } } +/// Accept a message from the network and perform all required +/// processing. +/// +/// This function should be called whenever a peer from a network +/// (e.g., libp2p) has sent a message to us. fn handle_network_message( - message: Option>, + message: Vec, _db: Arc>, _network_tx: UnboundedSender, - log: Logger) + _log: Logger) -> Result<(), ()> { - debug!(&log, ""; - "network_msg" => format!("{:?}", message)); - Ok(()) + match message_type(&message) { + Some(WireMessageType::Blocks) => { + // Do something with inbound blocks. + Ok(()) + } + _ => Ok(()) + } } diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs index 05f22a878..c3b2355fb 100644 --- a/lighthouse/sync/sync_future.rs +++ b/lighthouse/sync/sync_future.rs @@ -21,17 +21,17 @@ type SyncReceiver = UnboundedReceiver>; /// Start a syncing tokio future. /// -/// This is effectively a stub function being -/// used to test network functionality. -/// -/// Expect a full re-write. +/// Uses green-threading to process messages +/// from the network and the RPC and update +/// the state. pub fn run_sync_future( db: Arc>, network_tx: NetworkSender, network_rx: NetworkReceiver, _sync_tx: SyncSender, _sync_rx: SyncReceiver, - log: Logger) { + log: Logger) +{ let network_future = { network_rx .for_each(move |event| { @@ -44,9 +44,5 @@ pub fn run_sync_future( .map_err(|_| panic!("rx failed")) }; - /* - * This is an unfinished stub function. - */ - tokio::run(network_future); } diff --git a/lighthouse/sync/wire_protocol.rs b/lighthouse/sync/wire_protocol.rs new file mode 100644 index 000000000..5d4f77204 --- /dev/null +++ b/lighthouse/sync/wire_protocol.rs @@ -0,0 +1,24 @@ +pub enum WireMessageType { + Status, + NewBlockHashes, + GetBlockHashes, + BlockHashes, + GetBlocks, + Blocks, + NewBlock, +} + + +/// Determines the message type of some given +/// message. +/// +/// Does not check the validity of the message data, +/// it just reads the first byte. +pub fn message_type(message: &Vec) + -> Option +{ + match message.get(0) { + Some(0x06) => Some(WireMessageType::Blocks), + _ => None + } +}