From 2657dc1465d7ac746c5c92394a68f0d76eef5a23 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 19 Mar 2019 12:47:36 +1100 Subject: [PATCH] Builds RPC infrastructure to handle RPC responses --- beacon_node/libp2p/src/behaviour.rs | 6 ++-- beacon_node/libp2p/src/rpc/mod.rs | 6 ++-- beacon_node/libp2p/src/service.rs | 6 ++-- beacon_node/network/src/message_handler.rs | 32 ++++++++++++++++++++-- beacon_node/network/src/service.rs | 6 ++-- 5 files changed, 42 insertions(+), 14 deletions(-) diff --git a/beacon_node/libp2p/src/behaviour.rs b/beacon_node/libp2p/src/behaviour.rs index f0a89027b..78d013002 100644 --- a/beacon_node/libp2p/src/behaviour.rs +++ b/beacon_node/libp2p/src/behaviour.rs @@ -46,7 +46,9 @@ impl NetworkBehaviourEventProcess { self.events.push(BehaviourEvent::PeerDialed(peer_id)) } - RPCMessage::RPC(rpc_event) => self.events.push(BehaviourEvent::RPC(rpc_event)), + RPCMessage::RPC(peer_id, rpc_event) => { + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) + } } } } @@ -87,7 +89,7 @@ impl Behaviour { /// The types of events than can be obtained from polling the behaviour. pub enum BehaviourEvent { - RPC(RPCEvent), + RPC(PeerId, RPCEvent), PeerDialed(PeerId), // TODO: This is a stub at the moment Message(String), diff --git a/beacon_node/libp2p/src/rpc/mod.rs b/beacon_node/libp2p/src/rpc/mod.rs index 907e95763..e06f4effc 100644 --- a/beacon_node/libp2p/src/rpc/mod.rs +++ b/beacon_node/libp2p/src/rpc/mod.rs @@ -76,7 +76,7 @@ where fn inject_node_event( &mut self, - _source: PeerId, + source: PeerId, event: ::OutEvent, ) { // ignore successful send events @@ -88,7 +88,7 @@ where // send the event to the user self.events .push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC( - event, + source, event, ))); } @@ -110,7 +110,7 @@ where /// Messages sent to the user from the RPC protocol. pub enum RPCMessage { - RPC(RPCEvent), + RPC(PeerId, RPCEvent), PeerDialed(PeerId), } diff --git a/beacon_node/libp2p/src/service.rs b/beacon_node/libp2p/src/service.rs index dd6deabad..92e6e8897 100644 --- a/beacon_node/libp2p/src/service.rs +++ b/beacon_node/libp2p/src/service.rs @@ -105,8 +105,8 @@ impl Stream for Service { debug!(self.log, "Message received: {}", m); return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); } - Ok(Async::Ready(Some(BehaviourEvent::RPC(event)))) => { - return Ok(Async::Ready(Some(Libp2pEvent::RPC(event)))); + Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => { + return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); } Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => { return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); @@ -158,7 +158,7 @@ fn build_transport( /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { // We have received an RPC event on the swarm - RPC(RPCEvent), + RPC(PeerId, RPCEvent), PeerDialed(PeerId), Message(String), } diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index dcc145294..11ce3d4c0 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -10,8 +10,8 @@ use libp2p::{ rpc::{RPCMethod, RPCRequest, RPCResponse}, PeerId, RPCEvent, }; -use slog::debug; use slog::warn; +use slog::{debug, trace}; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -54,7 +54,7 @@ pub enum HandlerMessage { /// A Node message has been received. Message(PeerId, NodeMessage), /// An RPC response/request has been received. - RPC(RPCEvent), + RPC(PeerId, RPCEvent), } impl MessageHandler { @@ -98,14 +98,39 @@ impl MessageHandler { fn handle_message(&mut self, message: HandlerMessage) { match message { + // we have initiated a connection to a peer HandlerMessage::PeerDialed(peer_id) => { self.send_hello_request(peer_id); } + // we have received an RPC message request/response + HandlerMessage::RPC(peer_id, rpc_event) => { + self.handle_rpc_message(peer_id, rpc_event); + } //TODO: Handle all messages _ => {} } } + fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { + match rpc_message { + RPCEvent::Request { + id, + method_id: _, + body, + } => self.handle_rpc_request(peer_id, id, body), + RPCEvent::Response { + id, + method_id: _, + result, + } => self.handle_rpc_response(peer_id, id, result), + } + } + + fn handle_rpc_request(&mut self, peer_id: PeerId, id: u64, request: RPCRequest) {} + + // we match on id and ignore responses past the timeout. + fn handle_rpc_response(&mut self, peer_id: PeerId, id: u64, response: RPCResponse) {} + /// Sends a HELLO RPC request to a newly connected peer. fn send_hello_request(&mut self, peer_id: PeerId) { // generate a unique id for the peer @@ -136,10 +161,11 @@ impl MessageHandler { }; // send the hello request to the network + trace!(self.log, "Sending HELLO message to peer {:?}", peer_id); self.send_rpc(peer_id, rpc_event); } - /// Sends and RPC response + /// Sends an RPC request/response to the network server. fn send_rpc(&self, peer_id: PeerId, rpc_event: RPCEvent) { self.network_send .send(NetworkMessage::Send( diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 4cb1038d1..84e46e707 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -109,13 +109,13 @@ fn network_service( // poll the swarm loop { match libp2p_service.poll() { - Ok(Async::Ready(Some(Libp2pEvent::RPC(rpc_event)))) => { + Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => { debug!( libp2p_service.log, - "RPC Event: Rpc message received: {:?}", rpc_event + "RPC Event: RPC message received: {:?}", rpc_event ); message_handler_send - .send(HandlerMessage::RPC(rpc_event)) + .send(HandlerMessage::RPC(peer_id, rpc_event)) .map_err(|_| "failed to send rpc to handler")?; } Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {