use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; use core::marker::PhantomData; use eth2_libp2p::Service as LibP2PService; use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, Multiaddr, PeerId, Swarm, Topic}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; use parking_lot::Mutex; use slog::{debug, info, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { libp2p_service: Arc>, libp2p_port: u16, _libp2p_exit: oneshot::Sender<()>, _network_send: mpsc::UnboundedSender, _phantom: PhantomData, } impl Service { pub fn new( beacon_chain: Arc>, config: &NetworkConfig, executor: &TaskExecutor, network_log: slog::Logger, ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::(); // launch message handler thread let message_handler_send = MessageHandler::spawn( beacon_chain, network_send.clone(), executor, network_log.clone(), )?; // launch libp2p service let libp2p_service = Arc::new(Mutex::new(LibP2PService::new( config.clone(), network_log.clone(), )?)); let libp2p_exit = spawn_service( libp2p_service.clone(), network_recv, message_handler_send, executor, network_log, config.propagation_percentage, )?; let network_service = Service { libp2p_service, libp2p_port: config.libp2p_port, _libp2p_exit: libp2p_exit, _network_send: network_send.clone(), _phantom: PhantomData, }; Ok((Arc::new(network_service), network_send)) } /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. pub fn local_enr(&self) -> Enr { self.libp2p_service .lock() .swarm .discovery() .local_enr() .clone() } /// Returns the local libp2p PeerID. pub fn local_peer_id(&self) -> PeerId { self.libp2p_service.lock().local_peer_id.clone() } /// Returns the list of `Multiaddr` that the underlying libp2p instance is listening on. pub fn listen_multiaddrs(&self) -> Vec { Swarm::listeners(&self.libp2p_service.lock().swarm) .cloned() .collect() } /// Returns the libp2p port that this node has been configured to listen using. pub fn listen_port(&self) -> u16 { self.libp2p_port } /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.libp2p_service.lock().swarm.connected_peers() } /// Returns the set of `PeerId` that are connected via libp2p. pub fn connected_peer_set(&self) -> Vec { self.libp2p_service .lock() .swarm .discovery() .connected_peer_set() .iter() .cloned() .collect() } /// Provides a reference to the underlying libp2p service. pub fn libp2p_service(&self) -> Arc> { self.libp2p_service.clone() } } fn spawn_service( libp2p_service: Arc>, network_recv: mpsc::UnboundedReceiver, message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, log: slog::Logger, propagation_percentage: Option, ) -> error::Result> { let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); // spawn on the current executor executor.spawn( network_service( libp2p_service, network_recv, message_handler_send, log.clone(), propagation_percentage, ) // allow for manual termination .select(exit_rx.then(|_| Ok(()))) .then(move |_| { info!(log.clone(), "Network service shutdown"); Ok(()) }), ); Ok(network_exit) } //TODO: Potentially handle channel errors fn network_service( libp2p_service: Arc>, mut network_recv: mpsc::UnboundedReceiver, mut message_handler_send: mpsc::UnboundedSender, log: slog::Logger, propagation_percentage: Option, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { // processes the network channel before processing the libp2p swarm loop { // poll the network channel match network_recv.poll() { Ok(Async::Ready(Some(message))) => match message { NetworkMessage::RPC(peer_id, rpc_event) => { trace!(log, "Sending RPC"; "rpc" => format!("{}", rpc_event)); libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); } NetworkMessage::Propagate { propagation_source, message_id, } => { // TODO: Remove this for mainnet // randomly prevents propagation let mut should_send = true; if let Some(percentage) = propagation_percentage { // not exact percentage but close enough let rand = rand::random::() % 100; if rand > percentage { // don't propagate should_send = false; } } if !should_send { info!(log, "Random filter did not propagate message"); } else { trace!(log, "Propagating gossipsub message"; "propagation_peer" => format!("{:?}", propagation_source), "message_id" => message_id.to_string(), ); libp2p_service .lock() .swarm .propagate_message(&propagation_source, message_id); } } NetworkMessage::Publish { topics, message } => { // TODO: Remove this for mainnet // randomly prevents propagation let mut should_send = true; if let Some(percentage) = propagation_percentage { // not exact percentage but close enough let rand = rand::random::() % 100; if rand > percentage { // don't propagate should_send = false; } } if !should_send { info!(log, "Random filter did not publish message"); } else { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); libp2p_service.lock().swarm.publish(&topics, message); } } NetworkMessage::Disconnect { peer_id } => { libp2p_service.lock().disconnect_and_ban_peer(peer_id); } }, Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { return Err(eth2_libp2p::error::Error::from("Network channel closed")); } Err(_) => { return Err(eth2_libp2p::error::Error::from("Network channel error")); } } } loop { // poll the swarm let mut locked_service = libp2p_service.lock(); match locked_service.poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event)); // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { locked_service.disconnect_and_ban_peer(peer_id.clone()); }; message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "Failed to send RPC to handler")?; } Libp2pEvent::PeerDialed(peer_id) => { debug!(log, "Peer Dialed"; "PeerID" => format!("{:?}", peer_id)); message_handler_send .try_send(HandlerMessage::PeerDialed(peer_id)) .map_err(|_| "Failed to send PeerDialed to handler")?; } Libp2pEvent::PeerDisconnected(peer_id) => { debug!(log, "Peer Disconnected"; "PeerID" => format!("{:?}", peer_id)); message_handler_send .try_send(HandlerMessage::PeerDisconnected(peer_id)) .map_err(|_| "Failed to send PeerDisconnected to handler")?; } Libp2pEvent::PubsubMessage { id, source, message, .. } => { message_handler_send .try_send(HandlerMessage::PubsubMessage(id, source, message)) .map_err(|_| "Failed to send pubsub message to handler")?; } Libp2pEvent::PeerSubscribed(_, _) => {} }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), Ok(Async::NotReady) => break, Err(_) => break, } } Ok(Async::NotReady) }) } /// Types of messages that the network service can receive. #[derive(Debug)] pub enum NetworkMessage { /// Send an RPC message to the libp2p service. RPC(PeerId, RPCEvent), /// Publish a message to gossipsub. Publish { topics: Vec, message: PubsubMessage, }, /// Propagate a received gossipsub message. Propagate { propagation_source: PeerId, message_id: String, }, /// Disconnect and bans a peer id. Disconnect { peer_id: PeerId }, }