diff --git a/beacon_node/libp2p/src/behaviour.rs b/beacon_node/libp2p/src/behaviour.rs index 604b84c8f..f0a89027b 100644 --- a/beacon_node/libp2p/src/behaviour.rs +++ b/beacon_node/libp2p/src/behaviour.rs @@ -72,14 +72,16 @@ impl Behaviour { } } +/// Implements the combined behaviour for the libp2p service. impl Behaviour { + /// Subscribes to a gossipsub topic. pub fn subscribe(&mut self, topic: Topic) -> bool { self.gossipsub.subscribe(topic) } - pub fn send_message(&self, message: String) { - // TODO: Encode and send via gossipsub - + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.serenity_rpc.send_rpc(peer_id, rpc_event); } } diff --git a/beacon_node/libp2p/src/rpc/mod.rs b/beacon_node/libp2p/src/rpc/mod.rs index d40e53935..907e95763 100644 --- a/beacon_node/libp2p/src/rpc/mod.rs +++ b/beacon_node/libp2p/src/rpc/mod.rs @@ -13,7 +13,7 @@ use libp2p::core::swarm::{ use libp2p::{Multiaddr, PeerId}; pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; pub use protocol::{RPCEvent, RPCProtocol}; -use slog::{debug, o, Logger}; +use slog::{debug, o}; use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncWrite}; @@ -40,15 +40,10 @@ impl Rpc { } /// Submits and RPC request. - pub fn send_request(&mut self, peer_id: PeerId, id: u64, method_id: u16, body: RPCRequest) { - let request = RPCEvent::Request { - id, - method_id, - body, - }; + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { self.events.push(NetworkBehaviourAction::SendEvent { peer_id, - event: request, + event: rpc_event, }); } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7ad3bdb3e..4cb1038d1 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -7,9 +7,10 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use futures::prelude::*; use futures::sync::oneshot; use futures::Stream; +use libp2p::RPCEvent; use libp2p::Service as LibP2PService; use libp2p::{Libp2pEvent, PeerId}; -use slog::{debug, info, o}; +use slog::{debug, info, o, trace}; use std::sync::Arc; use tokio::runtime::TaskExecutor; @@ -63,8 +64,10 @@ impl Service { // TODO: Testing only pub fn send_message(&self, message: String) { let node_message = NodeMessage::Message(message); - self.network_send - .send(NetworkMessage::Send(PeerId::random(), node_message)); + self.network_send.send(NetworkMessage::Send( + PeerId::random(), + OutgoingMessage::NotifierTest, + )); } } @@ -113,13 +116,13 @@ fn network_service( ); message_handler_send .send(HandlerMessage::RPC(rpc_event)) - .map_err(|_| "failed to send rpc to handler"); + .map_err(|_| "failed to send rpc to handler")?; } Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id); message_handler_send .send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "failed to send rpc to handler"); + .map_err(|_| "failed to send rpc to handler")?; } Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( libp2p_service.log, @@ -133,24 +136,23 @@ fn network_service( loop { match network_recv.try_recv() { // TODO: Testing message - remove - Ok(NetworkMessage::Send(_peer_id, node_message)) => { - match node_message { - NodeMessage::Message(m) => { - debug!(log, "Message received via network channel: {:?}", m); + Ok(NetworkMessage::Send(peer_id, outgoing_message)) => { + match outgoing_message { + OutgoingMessage::RPC(rpc_event) => { + trace!(log, "Sending RPC Event: {:?}", rpc_event); //TODO: Make swarm private //TODO: Implement correct peer id topic message handling - libp2p_service.swarm.send_message(m); + libp2p_service.swarm.send_rpc(peer_id, rpc_event); + } + OutgoingMessage::NotifierTest => { + debug!(log, "Received message from notifier"); } - //TODO: Handle all NodeMessage types - _ => break, }; } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { return Err(libp2p::error::Error::from("Network channel disconnected")); } - // TODO: Implement all NetworkMessage - _ => break, } } Ok(Async::NotReady) @@ -162,5 +164,14 @@ fn network_service( pub enum NetworkMessage { /// Send a message to libp2p service. //TODO: Define typing for messages across the wire - Send(PeerId, NodeMessage), + Send(PeerId, OutgoingMessage), +} + +/// Type of outgoing messages that can be sent through the network service. +#[derive(Debug, Clone)] +pub enum OutgoingMessage { + /// Send an RPC request/response. + RPC(RPCEvent), + //TODO: Remove + NotifierTest, }