From c33d3689a77744fc405ff1151556a538dd95856b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 9 Sep 2018 16:36:00 +0200 Subject: [PATCH] Begin sync framework --- lighthouse/client/mod.rs | 4 +-- lighthouse/state/block.rs | 2 +- lighthouse/sync/messages.rs | 12 ++++++++ lighthouse/sync/mod.rs | 50 ++++---------------------------- lighthouse/sync/network.rs | 44 ++++++++++++++++++++++++++++ lighthouse/sync/sync_future.rs | 52 ++++++++++++++++++++++++++++++++++ network-libp2p/src/service.rs | 26 ++++++++--------- 7 files changed, 130 insertions(+), 60 deletions(-) create mode 100644 lighthouse/sync/messages.rs create mode 100644 lighthouse/sync/network.rs create mode 100644 lighthouse/sync/sync_future.rs diff --git a/lighthouse/client/mod.rs b/lighthouse/client/mod.rs index 8474bf105..f87f05738 100644 --- a/lighthouse/client/mod.rs +++ b/lighthouse/client/mod.rs @@ -8,7 +8,7 @@ use super::futures::sync::mpsc::{ use super::network_libp2p::service::listen as network_listen; use super::network_libp2p::state::NetworkState; use super::slog::Logger; -use super::sync::start_sync; +use super::sync::run_sync_future; /// Represents the co-ordination of the /// networking, syncing and RPC (not-yet-implemented) threads. @@ -59,7 +59,7 @@ impl Client { let sync_log = log.new(o!()); let sync_db = Arc::clone(&db); let thread = thread::spawn(move || { - start_sync( + run_sync_future( sync_db, network_tx.clone(), network_rx, diff --git a/lighthouse/state/block.rs b/lighthouse/state/block.rs index 11f9192bb..676d716bd 100644 --- a/lighthouse/state/block.rs +++ b/lighthouse/state/block.rs @@ -12,7 +12,7 @@ pub struct Block { pub pow_chain_ref: Hash256, pub active_state_root: Hash256, pub crystallized_state_root: Hash256, -} +} impl Block { pub fn zero() -> Self { diff --git a/lighthouse/sync/messages.rs b/lighthouse/sync/messages.rs new file mode 100644 index 000000000..9173f1c40 --- /dev/null +++ b/lighthouse/sync/messages.rs @@ -0,0 +1,12 @@ +pub enum SyncEventType { + Invalid, + PeerConnect, + PeerDrop, + ReceiveBlocks, + ReceiveAttestationRecords, +} + +pub struct SyncEvent { + event: SyncEventType, + data: Option> +} diff --git a/lighthouse/sync/mod.rs b/lighthouse/sync/mod.rs index a668093f4..efff37624 100644 --- a/lighthouse/sync/mod.rs +++ b/lighthouse/sync/mod.rs @@ -1,50 +1,12 @@ extern crate futures; extern crate slog; extern crate tokio; +extern crate network_libp2p; -use self::futures::sync::mpsc::{ - UnboundedReceiver, - UnboundedSender, -}; -use self::tokio::prelude::*; -use std::sync::{ RwLock, Arc }; -use super::network_libp2p::message::{ - NetworkEvent, - OutgoingMessage, -}; -use super::db::DB; -use slog::Logger; +pub mod messages; +pub mod network; +pub mod sync_future; -type NetworkSender = UnboundedSender; -type NetworkReceiver = UnboundedReceiver; +pub use self::sync_future::run_sync_future; -type SyncSender = UnboundedSender>; -type SyncReceiver = UnboundedReceiver>; - -/// Start a syncing tokio future. -/// -/// This is effectively a stub function being -/// used to test network functionality. -/// -/// Expect a full re-write. -pub fn start_sync( - _db: Arc>, - _network_tx: NetworkSender, - network_rx: NetworkReceiver, - _sync_tx: SyncSender, - _sync_rx: SyncReceiver, - log: Logger) { - let rx_future = network_rx - .for_each(move |event| { - debug!(&log, "Sync receive"; - "msg" => format!("{:?}", event)); - Ok(()) - }) - .map_err(|_| panic!("rx failed")); - - /* - * This is an unfinished stub function. - */ - - tokio::run(rx_future); -} +use super::db; diff --git a/lighthouse/sync/network.rs b/lighthouse/sync/network.rs new file mode 100644 index 000000000..4954edc1c --- /dev/null +++ b/lighthouse/sync/network.rs @@ -0,0 +1,44 @@ +use std::sync::{ RwLock, Arc }; +use super::db::DB; +use slog::Logger; + +use super::network_libp2p::message::{ + NetworkEvent, + OutgoingMessage, + NetworkEventType, +}; + +use super::futures::sync::mpsc::{ + UnboundedSender, +}; + +pub fn handle_network_event( + event: NetworkEvent, + db: Arc>, + network_tx: UnboundedSender, + log: Logger) + -> Result<(), ()> +{ + match event.event { + NetworkEventType::PeerConnect => Ok(()), + NetworkEventType::PeerDrop => Ok(()), + NetworkEventType::Message => handle_network_message( + event.data, + db, + network_tx, + log + ) + } +} + +fn handle_network_message( + message: Option>, + _db: Arc>, + _network_tx: UnboundedSender, + log: Logger) + -> Result<(), ()> +{ + debug!(&log, ""; + "network_msg" => format!("{:?}", message)); + Ok(()) +} diff --git a/lighthouse/sync/sync_future.rs b/lighthouse/sync/sync_future.rs new file mode 100644 index 000000000..05f22a878 --- /dev/null +++ b/lighthouse/sync/sync_future.rs @@ -0,0 +1,52 @@ +use super::tokio; +use super::futures::{ Future, Stream }; +use super::futures::sync::mpsc::{ + UnboundedReceiver, + UnboundedSender, +}; +use super::network_libp2p::message::{ + NetworkEvent, + OutgoingMessage, +}; +use super::network::handle_network_event; +use std::sync::{ RwLock, Arc }; +use super::db::DB; +use slog::Logger; + +type NetworkSender = UnboundedSender; +type NetworkReceiver = UnboundedReceiver; + +type SyncSender = UnboundedSender>; +type SyncReceiver = UnboundedReceiver>; + +/// Start a syncing tokio future. +/// +/// This is effectively a stub function being +/// used to test network functionality. +/// +/// Expect a full re-write. +pub fn run_sync_future( + db: Arc>, + network_tx: NetworkSender, + network_rx: NetworkReceiver, + _sync_tx: SyncSender, + _sync_rx: SyncReceiver, + log: Logger) { + let network_future = { + network_rx + .for_each(move |event| { + handle_network_event( + event, + db.clone(), + network_tx.clone(), + log.clone()) + }) + .map_err(|_| panic!("rx failed")) + }; + + /* + * This is an unfinished stub function. + */ + + tokio::run(network_future); +} diff --git a/network-libp2p/src/service.rs b/network-libp2p/src/service.rs index 1189c5444..5994ef0d5 100644 --- a/network-libp2p/src/service.rs +++ b/network-libp2p/src/service.rs @@ -18,7 +18,7 @@ use super::state::NetworkState; use super::message::{ NetworkEvent, NetworkEventType, OutgoingMessage }; use self::bigint::U512; use self::futures::{ Future, Stream, Poll }; -use self::futures::sync::mpsc::{ +use self::futures::sync::mpsc::{ UnboundedSender, UnboundedReceiver }; use self::libp2p_core::{ AddrComponent, Endpoint, Multiaddr, @@ -39,14 +39,14 @@ pub use self::libp2p_floodsub::Message; pub fn listen(state: NetworkState, events_to_app: UnboundedSender, raw_rx: UnboundedReceiver, - log: Logger) + log: Logger) { let peer_store = state.peer_store; let peer_id = state.peer_id; let listen_multiaddr = state.listen_multiaddr; let listened_addrs = Arc::new(RwLock::new(vec![])); let rx = ApplicationReciever{ inner: raw_rx }; - + // Build a tokio core let mut core = tokio_core::reactor::Core::new().expect("tokio failure."); // Build a base TCP libp2p transport @@ -65,10 +65,10 @@ pub fn listen(state: NetworkState, // is stored not the internal addr. .map(move |out, _, _| { if let(Some(ref observed), ref listen_multiaddr) = - (out.observed_addr, listen_multiaddr) + (out.observed_addr, listen_multiaddr) { - if let Some(viewed_from_outside) = - transport.nat_traversal(listen_multiaddr, observed) + if let Some(viewed_from_outside) = + transport.nat_traversal(listen_multiaddr, observed) { listened_addrs.write().unwrap() .push(viewed_from_outside); @@ -79,7 +79,7 @@ pub fn listen(state: NetworkState, }; // Configure and build a Kademlia upgrade to be applied - // to the base TCP transport. + // to the base TCP transport. let kad_config = libp2p_kad::KademliaConfig { parallelism: 3, record_store: (), @@ -91,10 +91,10 @@ pub fn listen(state: NetworkState, KademliaControllerPrototype::new(kad_config); let kad_upgrade = libp2p_kad:: KademliaUpgrade::from_prototype(&kad_ctl_proto); - + // Build a floodsub upgrade to allow pushing topic'ed // messages across the network. - let (floodsub_upgrade, floodsub_rx) = + let (floodsub_upgrade, floodsub_rx) = FloodSubUpgrade::new(peer_id.clone()); // Combine the Kademlia and Identify upgrades into a single @@ -104,7 +104,7 @@ pub fn listen(state: NetworkState, floodsub: floodsub_upgrade.clone(), identify: libp2p_identify::IdentifyProtocolConfig, }; - + // Build a Swarm to manage upgrading connections to peers. let swarm_listened_addrs = listened_addrs.clone(); let swarm_peer_id = peer_id.clone(); @@ -166,7 +166,7 @@ pub fn listen(state: NetworkState, for peer in peers { let peer_hash = U512::from(peer.hash()); let distance = 512 - (local_hash ^ peer_hash).leading_zeros(); - info!(kad_poll_log, "Discovered peer"; + info!(kad_poll_log, "Discovered peer"; "distance" => distance, "peer_id" => peer.to_base58()); let peer_addr = AddrComponent::P2P(peer.into_bytes()).into(); @@ -240,7 +240,7 @@ struct ConnectionUpgrader { } impl ConnectionUpgrade for ConnectionUpgrader -where +where C: AsyncRead + AsyncWrite + 'static, P: Deref + Clone + 'static, for<'r> &'r Pc: libp2p_peerstore::Peerstore, @@ -251,7 +251,7 @@ where type Output = FinalUpgrade; type Future = Box, Error = IoError>>; - #[inline] + #[inline] fn protocol_names(&self) -> Self::NamesIter { vec![ (Bytes::from("/ipfs/kad/1.0.0"), 0),