From 0625bb6b03ecaed807db54208d1c6749ceecc52d Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 18 Mar 2019 23:18:25 +1100 Subject: [PATCH] Add network channel into message handler --- beacon_node/network/src/message_handler.rs | 16 +++++++-- beacon_node/network/src/messages.rs | 11 ++----- beacon_node/network/src/service.rs | 38 +++++++++++++++------- 3 files changed, 44 insertions(+), 21 deletions(-) diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index b904993bb..02234f326 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -1,6 +1,7 @@ use crate::beacon_chain::BeaconChain; use crate::error; use crate::messages::NodeMessage; +use crate::service::NetworkMessage; use crossbeam_channel::{unbounded as channel, Sender}; use futures::future; use futures::prelude::*; @@ -22,6 +23,8 @@ pub struct MessageHandler { chain: Arc, /// The syncing framework. sync: SimpleSync, + /// The network channel to relay messages to the Network service. + network_send: crossbeam_channel::Sender, /// A mapping of peers we have sent a HELLO rpc request to. hello_requests: HashMap, /// The `MessageHandler` logger. @@ -45,6 +48,7 @@ impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn new( beacon_chain: Arc, + network_send: crossbeam_channel::Sender, executor: &tokio::runtime::TaskExecutor, log: slog::Logger, ) -> error::Result> { @@ -62,6 +66,7 @@ impl MessageHandler { let mut handler = MessageHandler { chain: beacon_chain.clone(), sync, + network_send, hello_requests: HashMap::new(), log: log.clone(), }; @@ -81,12 +86,19 @@ impl MessageHandler { fn handle_message(&mut self, message: HandlerMessage) { match message { - HandlerMessage::PeerDialed(peer_id) => self.send_hello(peer_id), + HandlerMessage::PeerDialed(peer_id) => { + // register RPC request + self.hello_requests.insert(peer_id.clone(), Instant::now()); + self.send_hello(peer_id); + } //TODO: Handle all messages _ => {} } } /// Sends a HELLO RPC request to a newly connected peer. - fn send_hello(&self, peer_id: PeerId) {} + fn send_hello(&self, peer_id: PeerId) { + // send the hello request to the network + //sync.hello() + } } diff --git a/beacon_node/network/src/messages.rs b/beacon_node/network/src/messages.rs index 930c90b3e..6a69cbb87 100644 --- a/beacon_node/network/src/messages.rs +++ b/beacon_node/network/src/messages.rs @@ -2,7 +2,10 @@ use libp2p::PeerId; use libp2p::{HelloMessage, RPCEvent}; use types::{Hash256, Slot}; +//TODO: This module can be entirely replaced in the RPC rewrite + /// Messages between nodes across the network. +//TODO: Remove this in the RPC rewrite #[derive(Debug, Clone)] pub enum NodeMessage { RPC(RPCEvent), @@ -10,11 +13,3 @@ pub enum NodeMessage { // TODO: only for testing - remove Message(String), } - -/// Types of messages that the network service can receive. -#[derive(Debug, Clone)] -pub enum NetworkMessage { - /// Send a message to libp2p service. - //TODO: Define typing for messages across the wire - Send(PeerId, NodeMessage), -} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 21f948a71..7ad3bdb3e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,7 +1,7 @@ use crate::beacon_chain::BeaconChain; use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; -use crate::messages::{NetworkMessage, NodeMessage}; +use crate::messages::NodeMessage; use crate::NetworkConfig; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError}; use futures::prelude::*; @@ -29,18 +29,29 @@ impl Service { executor: &TaskExecutor, log: slog::Logger, ) -> error::Result<(Arc, Sender)> { + // build the network channel + let (network_send, network_recv) = channel::(); // launch message handler thread let message_handler_log = log.new(o!("Service" => "MessageHandler")); - let message_handler_send = - MessageHandler::new(beacon_chain, executor, message_handler_log)?; + let message_handler_send = MessageHandler::new( + beacon_chain, + network_send.clone(), + executor, + message_handler_log, + )?; // launch libp2p service let libp2p_log = log.new(o!("Service" => "Libp2p")); let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. - let (network_send, libp2p_exit) = - spawn_service(libp2p_service, message_handler_send, executor, log)?; + let libp2p_exit = spawn_service( + libp2p_service, + network_recv, + message_handler_send, + executor, + log, + )?; let network = Service { libp2p_exit, network_send: network_send.clone(), @@ -59,15 +70,12 @@ impl Service { fn spawn_service( libp2p_service: LibP2PService, + network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, executor: &TaskExecutor, log: slog::Logger, -) -> error::Result<( - crossbeam_channel::Sender, - oneshot::Sender<()>, -)> { +) -> error::Result> { let (network_exit, exit_rx) = oneshot::channel(); - let (network_send, network_recv) = channel::(); // spawn on the current executor executor.spawn( @@ -85,7 +93,7 @@ fn spawn_service( }), ); - Ok((network_send, network_exit)) + Ok(network_exit) } fn network_service( @@ -148,3 +156,11 @@ fn network_service( Ok(Async::NotReady) }) } + +/// Types of messages that the network service can receive. +#[derive(Debug, Clone)] +pub enum NetworkMessage { + /// Send a message to libp2p service. + //TODO: Define typing for messages across the wire + Send(PeerId, NodeMessage), +}