diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index d0b096416..6600c9e39 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -22,9 +22,9 @@ use tokio::runtime::TaskExecutor; pub struct Client { config: ClientConfig, // beacon_chain: Arc>, - network: Option>, - exit: exit_future::Exit, - exit_signal: Option, + pub network: Arc, + pub exit: exit_future::Exit, + pub exit_signal: Signal, log: slog::Logger, phantom: PhantomData, } @@ -44,14 +44,15 @@ impl Client { // TODO: Add beacon_chain reference to network parameters let network_config = config.net_conf.clone(); let network_logger = log.new(o!("Service" => "Network")); - let (network, network_send) = NetworkService::new(network_config, network_logger)?; + let (network, network_send) = + NetworkService::new(network_config, executor, network_logger)?; Ok(Client { config, exit, - exit_signal: Some(exit_signal), + exit_signal: exit_signal, log, - network: Some(network), + network: network, phantom: PhantomData, }) } diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index dd38701c9..6b52e670a 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -4,9 +4,10 @@ use db::ClientDB; use exit_future::Exit; use fork_choice::ForkChoice; use futures::{Future, Stream}; +use network::NodeMessage; use slog::{debug, info, o}; use slot_clock::SlotClock; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::runtime::TaskExecutor; use tokio::timer::Interval; @@ -19,9 +20,21 @@ pub fn run(client: &Client, executor: TaskExecutor, exit: Exi let log = client.log.new(o!("Service" => "Notifier")); + // TODO: Debugging only + let counter = Arc::new(Mutex::new(0)); + let network = client.network.clone(); + // build heartbeat logic here let heartbeat = move |_| { info!(log, "Temp heartbeat output"); + let mut count = counter.lock().unwrap(); + *count += 1; + + if *count % 5 == 0 { + debug!(log, "Sending Message"); + network.send_message(String::from("Testing network channel")) + } + Ok(()) }; diff --git a/beacon_node/libp2p/Cargo.toml b/beacon_node/libp2p/Cargo.toml index 496d30268..ecd91e170 100644 --- a/beacon_node/libp2p/Cargo.toml +++ b/beacon_node/libp2p/Cargo.toml @@ -12,3 +12,4 @@ slog = "2.4.1" version = { path = "../version" } tokio = "0.1.16" futures = "0.1.25" +error-chain = "0.12.0" diff --git a/beacon_node/libp2p/src/behaviour.rs b/beacon_node/libp2p/src/behaviour.rs index 0c9aae16e..be12011dd 100644 --- a/beacon_node/libp2p/src/behaviour.rs +++ b/beacon_node/libp2p/src/behaviour.rs @@ -1,7 +1,7 @@ use futures::prelude::*; use libp2p::{ core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, GossipsubRpc}, + gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; @@ -9,13 +9,14 @@ use libp2p::{ /// Builds the network behaviour for the libp2p Swarm. /// Implements gossipsub message routing. #[derive(NetworkBehaviour)] +#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] pub struct Behaviour { gossipsub: Gossipsub, // TODO: Add Kademlia for peer discovery /// The events generated by this behaviour to be consumed in the swarm poll. // We use gossipsub events for now, generalise later. #[behaviour(ignore)] - events: Vec, + events: Vec, } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour @@ -23,7 +24,15 @@ impl NetworkBehaviourEventProcess { fn inject_event(&mut self, event: GossipsubEvent) { - self.events.push(event); + match event { + GossipsubEvent::Message(message) => { + let gs_message = String::from_utf8_lossy(&message.data); + // TODO: Remove this type - debug only + self.events + .push(BehaviourEvent::Message(gs_message.to_string())) + } + _ => {} + } } } @@ -35,8 +44,10 @@ impl Behaviour { } } - /// Consume the events list when polled. - fn poll(&mut self) -> Async> { + /// Consumes the events list when polled. + fn poll( + &mut self, + ) -> Async> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } @@ -44,3 +55,16 @@ impl Behaviour { Async::NotReady } } + +impl Behaviour { + pub fn send_message(&self, message: String) { + // TODO: Encode and send via gossipsub + + } +} + +/// The types of events than can be obtained from polling the behaviour. +pub enum BehaviourEvent { + // TODO: This is a stub at the moment + Message(String), +} diff --git a/beacon_node/libp2p/src/error.rs b/beacon_node/libp2p/src/error.rs new file mode 100644 index 000000000..163fe575d --- /dev/null +++ b/beacon_node/libp2p/src/error.rs @@ -0,0 +1,8 @@ +// generates error types + +use error_chain::{ + error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, + impl_extract_backtrace, +}; + +error_chain! {} diff --git a/beacon_node/libp2p/src/lib.rs b/beacon_node/libp2p/src/lib.rs index f72725e49..a1bf4402c 100644 --- a/beacon_node/libp2p/src/lib.rs +++ b/beacon_node/libp2p/src/lib.rs @@ -2,7 +2,8 @@ /// all required libp2p functionality. /// /// This crate builds and manages the libp2p services required by the beacon node. -mod behaviour; +pub mod behaviour; +pub mod error; mod network_config; mod service; @@ -11,6 +12,7 @@ pub use libp2p::{ PeerId, }; pub use network_config::NetworkConfig; +pub use service::Libp2pEvent; pub use service::Service; pub use types::multiaddr; pub use types::Multiaddr; diff --git a/beacon_node/libp2p/src/service.rs b/beacon_node/libp2p/src/service.rs index ee36cefd5..dceb62511 100644 --- a/beacon_node/libp2p/src/service.rs +++ b/beacon_node/libp2p/src/service.rs @@ -1,7 +1,9 @@ -use crate::behaviour::Behaviour; +use crate::behaviour::{Behaviour, BehaviourEvent}; +use crate::error; use crate::multiaddr::Protocol; use crate::NetworkConfig; use futures::prelude::*; +use futures::Stream; use libp2p::core::{ muxing::StreamMuxerBox, nodes::Substream, @@ -17,13 +19,16 @@ use std::time::Duration; /// The configuration and state of the libp2p components for the beacon node. pub struct Service { /// The libp2p Swarm handler. - swarm: Swarm, Behaviour>>, + //TODO: Make this private + pub swarm: Swarm, Behaviour>>, /// This node's PeerId. local_peer_id: PeerId, + /// The libp2p logger handle. + pub log: slog::Logger, } impl Service { - pub fn new(config: NetworkConfig, log: slog::Logger) -> Self { + pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Libp2p Service starting"); let local_private_key = config.local_private_key; @@ -50,7 +55,7 @@ impl Service { Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err), }; } - // connect to boot nodes - these are currently stored as multiadders + // connect to boot nodes - these are currently stored as multiaddrs // Once we have discovery, can set to peerId for bootnode in config.boot_nodes { match Swarm::dial_addr(&mut swarm, bootnode.clone()) { @@ -62,10 +67,36 @@ impl Service { }; } - Service { + Ok(Service { local_peer_id, swarm, + log, + }) + } +} + +impl Stream for Service { + type Item = Libp2pEvent; + type Error = crate::error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + // TODO: Currently only gossipsub events passed here. + // Build a type for more generic events + match self.swarm.poll() { + Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => { + // TODO: Stub here for debugging + debug!(self.log, "Message received: {}", m); + return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); + } + // TODO: Fill with all behaviour events + _ => break, + Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), + Ok(Async::NotReady) => break, + _ => break, + } } + Ok(Async::NotReady) } } @@ -103,3 +134,8 @@ fn build_transport( .map_err(|err| Error::new(ErrorKind::Other, err)) .boxed() } + +/// Events that can be obtained from polling the Libp2p Service. +pub enum Libp2pEvent { + Message(String), +} diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index f32ee1f90..19d3e82ad 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,3 +13,4 @@ slog = "2.4.1" futures = "0.1.25" error-chain = "0.12.0" crossbeam-channel = "0.3.8" +tokio = "0.1.16" diff --git a/beacon_node/network/src/error.rs b/beacon_node/network/src/error.rs index 163fe575d..2005f76ae 100644 --- a/beacon_node/network/src/error.rs +++ b/beacon_node/network/src/error.rs @@ -1,8 +1,13 @@ // generates error types +use libp2p; use error_chain::{ error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, impl_extract_backtrace, }; -error_chain! {} +error_chain! { + links { + Libp2p(libp2p::error::Error, libp2p::error::ErrorKind); + } +} diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index ae03d8367..49b2abadd 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -5,4 +5,5 @@ mod messages; mod service; pub use libp2p::NetworkConfig; +pub use messages::NodeMessage; pub use service::Service; diff --git a/beacon_node/network/src/messages.rs b/beacon_node/network/src/messages.rs index 05b899269..d3a83fd5c 100644 --- a/beacon_node/network/src/messages.rs +++ b/beacon_node/network/src/messages.rs @@ -2,11 +2,15 @@ use libp2p::PeerId; use types::{Hash256, Slot}; /// Messages between nodes across the network. +#[derive(Debug, Clone)] pub enum NodeMessage { Status(Status), BlockRequest, + // TODO: only for testing - remove + Message(String), } +#[derive(Debug, Clone)] pub struct Status { /// Current node version. version: u8, @@ -19,6 +23,7 @@ pub struct Status { } /// Types of messages that the network service can receive. +#[derive(Debug, Clone)] pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ac8d9b442..e75b7e49a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -2,39 +2,135 @@ use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::messages::{NetworkMessage, NodeMessage}; use crate::NetworkConfig; -use crossbeam_channel::{unbounded as channel, Sender}; +use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; +use futures::future::lazy; +use futures::future::poll_fn; +use futures::prelude::*; use futures::sync::oneshot; +use futures::Stream; +use libp2p::behaviour::BehaviourEvent; +use libp2p::error::Error as libp2pError; use libp2p::Service as LibP2PService; +use libp2p::{Libp2pEvent, PeerId}; use slog::{debug, info, o, trace, warn, Logger}; use std::sync::{Arc, Mutex}; +use tokio::runtime::TaskExecutor; /// Service that handles communication between internal services and the libp2p network service. pub struct Service { //libp2p_service: Arc>, -//libp2p_thread: oneshot::Sender<()>, -//message_handler: MessageHandler, -//message_handler_send: Sender, + libp2p_exit: oneshot::Sender<()>, + network_send: crossbeam_channel::Sender, + //message_handler: MessageHandler, + //message_handler_send: Sender, } impl Service { pub fn new( config: NetworkConfig, + executor: TaskExecutor, log: slog::Logger, ) -> error::Result<(Arc, Sender)> { - debug!(log, "Service starting"); - let (network_send, network_recv) = channel::(); - // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); - let message_handler_send = MessageHandler::new(message_handler_log); + let message_handler_send = MessageHandler::new(message_handler_log)?; // launch libp2p service let libp2p_log = log.new(o!("Service" => "Libp2p")); - let libp2p_service = LibP2PService::new(config, libp2p_log); + let libp2p_service = LibP2PService::new(config, libp2p_log)?; // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. - let network = Service {}; + let (network_send, libp2p_exit) = + spawn_service(libp2p_service, message_handler_send, executor, log)?; + let network = Service { + libp2p_exit, + network_send: network_send.clone(), + }; Ok((Arc::new(network), network_send)) } + + // TODO: Testing only + pub fn send_message(&self, message: String) { + let node_message = NodeMessage::Message(message); + self.network_send + .send(NetworkMessage::Send(PeerId::random(), node_message)); + } +} + +fn spawn_service( + libp2p_service: LibP2PService, + message_handler_send: crossbeam_channel::Sender, + executor: TaskExecutor, + log: slog::Logger, +) -> error::Result<( + crossbeam_channel::Sender, + oneshot::Sender<()>, +)> { + let (network_exit, exit_rx) = oneshot::channel(); + let (network_send, network_recv) = channel::(); + + // spawn on the current executor + executor.spawn( + network_service( + libp2p_service, + network_recv, + message_handler_send, + log.clone(), + ) + // allow for manual termination + .select(exit_rx.then(|_| Ok(()))) + .then(move |_| { + debug!(log.clone(), "Network service ended"); + Ok(()) + }), + ); + + Ok((network_send, network_exit)) +} + +fn network_service( + mut libp2p_service: LibP2PService, + network_recv: crossbeam_channel::Receiver, + message_handler_send: crossbeam_channel::Sender, + log: slog::Logger, +) -> impl futures::Future { + futures::future::poll_fn(move || -> Result<_, libp2p::error::Error> { + // poll the swarm + loop { + match libp2p_service.poll() { + Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( + libp2p_service.log, + "Network Service: Message received: {}", m + ), + _ => break, + } + } + // poll the network channel + // TODO: refactor - combine poll_fn's? + loop { + match network_recv.try_recv() { + // TODO: Testing message - remove + Ok(NetworkMessage::Send(_peer_id, node_message)) => { + match node_message { + NodeMessage::Message(m) => { + debug!(log, "Message received via network channel: {:?}", m); + //TODO: Make swarm private + //TODO: Implement correct peer id topic message handling + libp2p_service.swarm.send_message(m); + } + //TODO: Handle all NodeMessage types + _ => break, + }; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Err(libp2p::error::Error::from("Network channel disconnected")); + } + // TODO: Implement all NetworkMessage + _ => break, + } + } + Ok(Async::NotReady) + }) } diff --git a/beacon_node/src/run.rs b/beacon_node/src/run.rs index f2a703cbc..cfae001a0 100644 --- a/beacon_node/src/run.rs +++ b/beacon_node/src/run.rs @@ -40,6 +40,8 @@ pub fn run_beacon_node(config: ClientConfig, log: slog::Logger) -> error::Result // perform global shutdown operations. info!(log, "Shutting down.."); exit_signal.fire(); + // shutdown the client + // client.exit_signal.fire(); drop(client); runtime.shutdown_on_idle().wait().unwrap(); Ok(())