Builds RPC infrastructure to handle RPC responses

This commit is contained in:
Age Manning 2019-03-19 12:47:36 +11:00
parent 31333e8f8e
commit 2657dc1465
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
5 changed files with 42 additions and 14 deletions

View File

@ -46,7 +46,9 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<RPCMessage
RPCMessage::PeerDialed(peer_id) => {
self.events.push(BehaviourEvent::PeerDialed(peer_id))
}
RPCMessage::RPC(rpc_event) => self.events.push(BehaviourEvent::RPC(rpc_event)),
RPCMessage::RPC(peer_id, rpc_event) => {
self.events.push(BehaviourEvent::RPC(peer_id, rpc_event))
}
}
}
}
@ -87,7 +89,7 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent {
RPC(RPCEvent),
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
// TODO: This is a stub at the moment
Message(String),

View File

@ -76,7 +76,7 @@ where
fn inject_node_event(
&mut self,
_source: PeerId,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
// ignore successful send events
@ -88,7 +88,7 @@ where
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC(
event,
source, event,
)));
}
@ -110,7 +110,7 @@ where
/// Messages sent to the user from the RPC protocol.
pub enum RPCMessage {
RPC(RPCEvent),
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
}

View File

@ -105,8 +105,8 @@ impl Stream for Service {
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
Ok(Async::Ready(Some(BehaviourEvent::RPC(event)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(event))));
Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))));
}
Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => {
return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))));
@ -158,7 +158,7 @@ fn build_transport(
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent {
// We have received an RPC event on the swarm
RPC(RPCEvent),
RPC(PeerId, RPCEvent),
PeerDialed(PeerId),
Message(String),
}

View File

@ -10,8 +10,8 @@ use libp2p::{
rpc::{RPCMethod, RPCRequest, RPCResponse},
PeerId, RPCEvent,
};
use slog::debug;
use slog::warn;
use slog::{debug, trace};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -54,7 +54,7 @@ pub enum HandlerMessage {
/// A Node message has been received.
Message(PeerId, NodeMessage),
/// An RPC response/request has been received.
RPC(RPCEvent),
RPC(PeerId, RPCEvent),
}
impl MessageHandler {
@ -98,14 +98,39 @@ impl MessageHandler {
fn handle_message(&mut self, message: HandlerMessage) {
match message {
// we have initiated a connection to a peer
HandlerMessage::PeerDialed(peer_id) => {
self.send_hello_request(peer_id);
}
// we have received an RPC message request/response
HandlerMessage::RPC(peer_id, rpc_event) => {
self.handle_rpc_message(peer_id, rpc_event);
}
//TODO: Handle all messages
_ => {}
}
}
fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) {
match rpc_message {
RPCEvent::Request {
id,
method_id: _,
body,
} => self.handle_rpc_request(peer_id, id, body),
RPCEvent::Response {
id,
method_id: _,
result,
} => self.handle_rpc_response(peer_id, id, result),
}
}
fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) {}
// we match on id and ignore responses past the timeout.
fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {}
/// Sends a HELLO RPC request to a newly connected peer.
fn send_hello_request(&mut self, peer_id: PeerId) {
// generate a unique id for the peer
@ -136,10 +161,11 @@ impl MessageHandler {
};
// send the hello request to the network
trace!(self.log, "Sending HELLO message to peer {:?}", peer_id);
self.send_rpc(peer_id, rpc_event);
}
/// Sends and RPC response
/// Sends an RPC request/response to the network server.
fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) {
self.network_send
.send(NetworkMessage::Send(

View File

@ -109,13 +109,13 @@ fn network_service(
// poll the swarm
loop {
match libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::RPC(rpc_event)))) => {
Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => {
debug!(
libp2p_service.log,
"RPC Event: Rpc message received: {:?}", rpc_event
"RPC Event: RPC message received: {:?}", rpc_event
);
message_handler_send
.send(HandlerMessage::RPC(rpc_event))
.send(HandlerMessage::RPC(peer_id, rpc_event))
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {