diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index d23fae211..e68df2d38 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -167,9 +167,12 @@ fn build_transport( /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { - // We have received an RPC event on the swarm + /// An RPC response request has been received on the swarm. RPC(PeerId, RPCEvent), + /// Initiated the connection to a new peer. PeerDialed(PeerId), + /// Received information about a peer on the network. Identified(PeerId, IdentifyInfo), + // TODO: Pub-sub testing only. Message(String), } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 14f994e4a..a3eb6f0d9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - //eth2_libp2p_service: Arc>, - eth2_libp2p_exit: oneshot::Sender<()>, + //libp2p_service: Arc>, + libp2p_exit: oneshot::Sender<()>, network_send: crossbeam_channel::Sender, //message_handler: MessageHandler, //message_handler_send: Sender, @@ -40,20 +40,20 @@ impl Service { message_handler_log, )?; - // launch eth2_libp2p service - let eth2_libp2p_log = log.new(o!("Service" => "Libp2p")); - let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?; + // launch libp2p service + let libp2p_log = log.new(o!("Service" => "Libp2p")); + let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; - // TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread. - let eth2_libp2p_exit = spawn_service( - eth2_libp2p_service, + // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. + let libp2p_exit = spawn_service( + libp2p_service, network_recv, message_handler_send, executor, log, )?; let network_service = Service { - eth2_libp2p_exit, + libp2p_exit, network_send: network_send.clone(), }; @@ -72,7 +72,7 @@ impl Service { } fn spawn_service( - eth2_libp2p_service: LibP2PService, + libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, executor: &TaskExecutor, @@ -83,7 +83,7 @@ fn spawn_service( // spawn on the current executor executor.spawn( network_service( - eth2_libp2p_service, + libp2p_service, network_recv, message_handler_send, log.clone(), @@ -100,7 +100,7 @@ fn spawn_service( } fn network_service( - mut eth2_libp2p_service: LibP2PService, + mut libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, log: slog::Logger, @@ -108,28 +108,34 @@ fn network_service( futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { // poll the swarm loop { - match eth2_libp2p_service.poll() { - Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => { - trace!( - eth2_libp2p_service.log, - "RPC Event: RPC message received: {:?}", - rpc_event - ); - message_handler_send - .send(HandlerMessage::RPC(peer_id, rpc_event)) - .map_err(|_| "failed to send rpc to handler")?; - } - Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { - debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id); - message_handler_send - .send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "failed to send rpc to handler")?; - } - Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( - eth2_libp2p_service.log, - "Network Service: Message received: {}", m - ), - _ => break, + match libp2p_service.poll() { + Ok(Async::Ready(Some(event))) => match event { + Libp2pEvent::RPC(peer_id, rpc_event) => { + trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); + message_handler_send + .send(HandlerMessage::RPC(peer_id, rpc_event)) + .map_err(|_| "failed to send rpc to handler")?; + } + Libp2pEvent::PeerDialed(peer_id) => { + debug!(log, "Peer Dialed: {:?}", peer_id); + message_handler_send + .send(HandlerMessage::PeerDialed(peer_id)) + .map_err(|_| "failed to send rpc to handler")?; + } + Libp2pEvent::Identified(peer_id, info) => { + debug!( + log, + "We have identified peer: {:?} with {:?}", peer_id, info + ); + } + Libp2pEvent::Message(m) => debug!( + libp2p_service.log, + "Network Service: Message received: {}", m + ), + }, + Ok(Async::Ready(None)) => unreachable!("Stream never ends"), + Ok(Async::NotReady) => break, + Err(_) => break, } } // poll the network channel @@ -143,7 +149,7 @@ fn network_service( trace!(log, "Sending RPC Event: {:?}", rpc_event); //TODO: Make swarm private //TODO: Implement correct peer id topic message handling - eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event); + libp2p_service.swarm.send_rpc(peer_id, rpc_event); } OutgoingMessage::NotifierTest => { debug!(log, "Received message from notifier"); @@ -165,7 +171,7 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug, Clone)] pub enum NetworkMessage { - /// Send a message to eth2_libp2p service. + /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), }