Implements RPC call functionality

This commit is contained in:
Age Manning 2019-03-18 23:34:44 +11:00
parent 0625bb6b03
commit 8ec0688cb9
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
3 changed files with 34 additions and 26 deletions

View File

@ -72,14 +72,16 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
} }
} }
/// Implements the combined behaviour for the libp2p service.
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> { impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// Subscribes to a gossipsub topic.
pub fn subscribe(&mut self, topic: Topic) -> bool { pub fn subscribe(&mut self, topic: Topic) -> bool {
self.gossipsub.subscribe(topic) self.gossipsub.subscribe(topic)
} }
pub fn send_message(&self, message: String) { /// Sends an RPC Request/Response via the RPC protocol.
// TODO: Encode and send via gossipsub pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event);
} }
} }

View File

@ -13,7 +13,7 @@ use libp2p::core::swarm::{
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; pub use methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse};
pub use protocol::{RPCEvent, RPCProtocol}; pub use protocol::{RPCEvent, RPCProtocol};
use slog::{debug, o, Logger}; use slog::{debug, o};
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
@ -40,15 +40,10 @@ impl<TSubstream> Rpc<TSubstream> {
} }
/// Submits and RPC request. /// Submits and RPC request.
pub fn send_request(&mut self, peer_id: PeerId, id: u64, method_id: u16, body: RPCRequest) { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
let request = RPCEvent::Request {
id,
method_id,
body,
};
self.events.push(NetworkBehaviourAction::SendEvent { self.events.push(NetworkBehaviourAction::SendEvent {
peer_id, peer_id,
event: request, event: rpc_event,
}); });
} }
} }

View File

@ -7,9 +7,10 @@ use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use futures::prelude::*; use futures::prelude::*;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::Stream; use futures::Stream;
use libp2p::RPCEvent;
use libp2p::Service as LibP2PService; use libp2p::Service as LibP2PService;
use libp2p::{Libp2pEvent, PeerId}; use libp2p::{Libp2pEvent, PeerId};
use slog::{debug, info, o}; use slog::{debug, info, o, trace};
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
@ -63,8 +64,10 @@ impl Service {
// TODO: Testing only // TODO: Testing only
pub fn send_message(&self, message: String) { pub fn send_message(&self, message: String) {
let node_message = NodeMessage::Message(message); let node_message = NodeMessage::Message(message);
self.network_send self.network_send.send(NetworkMessage::Send(
.send(NetworkMessage::Send(PeerId::random(), node_message)); PeerId::random(),
OutgoingMessage::NotifierTest,
));
} }
} }
@ -113,13 +116,13 @@ fn network_service(
); );
message_handler_send message_handler_send
.send(HandlerMessage::RPC(rpc_event)) .send(HandlerMessage::RPC(rpc_event))
.map_err(|_| "failed to send rpc to handler"); .map_err(|_| "failed to send rpc to handler")?;
} }
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id); debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id);
message_handler_send message_handler_send
.send(HandlerMessage::PeerDialed(peer_id)) .send(HandlerMessage::PeerDialed(peer_id))
.map_err(|_| "failed to send rpc to handler"); .map_err(|_| "failed to send rpc to handler")?;
} }
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
libp2p_service.log, libp2p_service.log,
@ -133,24 +136,23 @@ fn network_service(
loop { loop {
match network_recv.try_recv() { match network_recv.try_recv() {
// TODO: Testing message - remove // TODO: Testing message - remove
Ok(NetworkMessage::Send(_peer_id, node_message)) => { Ok(NetworkMessage::Send(peer_id, outgoing_message)) => {
match node_message { match outgoing_message {
NodeMessage::Message(m) => { OutgoingMessage::RPC(rpc_event) => {
debug!(log, "Message received via network channel: {:?}", m); trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private //TODO: Make swarm private
//TODO: Implement correct peer id topic message handling //TODO: Implement correct peer id topic message handling
libp2p_service.swarm.send_message(m); libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier");
} }
//TODO: Handle all NodeMessage types
_ => break,
}; };
} }
Err(TryRecvError::Empty) => break, Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => { Err(TryRecvError::Disconnected) => {
return Err(libp2p::error::Error::from("Network channel disconnected")); return Err(libp2p::error::Error::from("Network channel disconnected"));
} }
// TODO: Implement all NetworkMessage
_ => break,
} }
} }
Ok(Async::NotReady) Ok(Async::NotReady)
@ -162,5 +164,14 @@ fn network_service(
pub enum NetworkMessage { pub enum NetworkMessage {
/// Send a message to libp2p service. /// Send a message to libp2p service.
//TODO: Define typing for messages across the wire //TODO: Define typing for messages across the wire
Send(PeerId, NodeMessage), Send(PeerId, OutgoingMessage),
}
/// Type of outgoing messages that can be sent through the network service.
#[derive(Debug, Clone)]
pub enum OutgoingMessage {
/// Send an RPC request/response.
RPC(RPCEvent),
//TODO: Remove
NotifierTest,
} }