use crate::beacon_chain::BeaconChain; use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::messages::NodeMessage; use crate::NetworkConfig; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use futures::prelude::*; use futures::sync::oneshot; use futures::Stream; use libp2p::Service as LibP2PService; use libp2p::{Libp2pEvent, PeerId}; use slog::{debug, info, o}; use std::sync::Arc; use tokio::runtime::TaskExecutor; /// Service that handles communication between internal services and the libp2p network service. pub struct Service { //libp2p_service: Arc>, libp2p_exit: oneshot::Sender<()>, network_send: crossbeam_channel::Sender, //message_handler: MessageHandler, //message_handler_send: Sender, } impl Service { pub fn new( beacon_chain: Arc, config: &NetworkConfig, executor: &TaskExecutor, log: slog::Logger, ) -> error::Result<(Arc, Sender)> { // build the network channel let (network_send, network_recv) = channel::(); // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_send = MessageHandler::new( beacon_chain, network_send.clone(), executor, message_handler_log, )?; // launch libp2p service let libp2p_log = log.new(o!("Service" => "Libp2p")); let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. let libp2p_exit = spawn_service( libp2p_service, network_recv, message_handler_send, executor, log, )?; let network = Service { libp2p_exit, network_send: network_send.clone(), }; Ok((Arc::new(network), network_send)) } // TODO: Testing only pub fn send_message(&self, message: String) { let node_message = NodeMessage::Message(message); self.network_send .send(NetworkMessage::Send(PeerId::random(), node_message)); } } fn spawn_service( libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, executor: &TaskExecutor, log: slog::Logger, ) -> error::Result> { let (network_exit, exit_rx) = oneshot::channel(); // spawn on the current executor executor.spawn( network_service( libp2p_service, network_recv, message_handler_send, log.clone(), ) // allow for manual termination .select(exit_rx.then(|_| Ok(()))) .then(move |_| { info!(log.clone(), "Network service shutdown"); Ok(()) }), ); Ok(network_exit) } fn network_service( mut libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, log: slog::Logger, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, libp2p::error::Error> { // poll the swarm loop { match libp2p_service.poll() { Ok(Async::Ready(Some(Libp2pEvent::RPC(rpc_event)))) => { debug!( libp2p_service.log, "RPC Event: Rpc message received: {:?}", rpc_event ); message_handler_send .send(HandlerMessage::RPC(rpc_event)) .map_err(|_| "failed to send rpc to handler"); } Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id); message_handler_send .send(HandlerMessage::PeerDialed(peer_id)) .map_err(|_| "failed to send rpc to handler"); } Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( libp2p_service.log, "Network Service: Message received: {}", m ), _ => break, } } // poll the network channel // TODO: refactor - combine poll_fn's? loop { match network_recv.try_recv() { // TODO: Testing message - remove Ok(NetworkMessage::Send(_peer_id, node_message)) => { match node_message { NodeMessage::Message(m) => { debug!(log, "Message received via network channel: {:?}", m); //TODO: Make swarm private //TODO: Implement correct peer id topic message handling libp2p_service.swarm.send_message(m); } //TODO: Handle all NodeMessage types _ => break, }; } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { return Err(libp2p::error::Error::from("Network channel disconnected")); } // TODO: Implement all NetworkMessage _ => break, } } Ok(Async::NotReady) }) } /// Types of messages that the network service can receive. #[derive(Debug, Clone)] pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, NodeMessage), }