lighthouse/beacon_node/network/src/service.rs

178 lines
6.1 KiB
Rust
Raw Normal View History

use crate::beacon_chain::BeaconChain;
use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::messages::NodeMessage;
use crate::NetworkConfig;
2019-03-12 06:28:11 +00:00
use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use futures::prelude::*;
use futures::sync::oneshot;
2019-03-12 06:28:11 +00:00
use futures::Stream;
2019-03-18 12:34:44 +00:00
use libp2p::RPCEvent;
use libp2p::Service as LibP2PService;
2019-03-12 06:28:11 +00:00
use libp2p::{Libp2pEvent, PeerId};
2019-03-18 12:34:44 +00:00
use slog::{debug, info, o, trace};
use std::sync::Arc;
2019-03-12 06:28:11 +00:00
use tokio::runtime::TaskExecutor;
/// Service that handles communication between internal services and the libp2p network service.
pub struct Service {
//libp2p_service: Arc<Mutex<LibP2PService>>,
2019-03-12 06:28:11 +00:00
libp2p_exit: oneshot::Sender<()>,
network_send: crossbeam_channel::Sender<NetworkMessage>,
//message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>,
}
impl Service {
pub fn new(
beacon_chain: Arc<BeaconChain>,
config: &NetworkConfig,
executor: &TaskExecutor,
log: slog::Logger,
) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> {
// build the network channel
let (network_send, network_recv) = channel::<NetworkMessage>();
// launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::new(
beacon_chain,
network_send.clone(),
executor,
message_handler_log,
)?;
// launch libp2p service
let libp2p_log = log.new(o!("Service" => "Libp2p"));
let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?;
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let libp2p_exit = spawn_service(
libp2p_service,
network_recv,
message_handler_send,
executor,
log,
)?;
2019-03-12 06:28:11 +00:00
let network = Service {
libp2p_exit,
network_send: network_send.clone(),
};
Ok((Arc::new(network), network_send))
}
2019-03-12 06:28:11 +00:00
// TODO: Testing only
pub fn send_message(&self, message: String) {
let node_message = NodeMessage::Message(message);
2019-03-18 12:34:44 +00:00
self.network_send.send(NetworkMessage::Send(
PeerId::random(),
OutgoingMessage::NotifierTest,
));
2019-03-12 06:28:11 +00:00
}
}
fn spawn_service(
libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
2019-03-12 06:28:11 +00:00
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
executor: &TaskExecutor,
2019-03-12 06:28:11 +00:00
log: slog::Logger,
) -> error::Result<oneshot::Sender<()>> {
2019-03-12 06:28:11 +00:00
let (network_exit, exit_rx) = oneshot::channel();
// spawn on the current executor
executor.spawn(
network_service(
libp2p_service,
network_recv,
message_handler_send,
log.clone(),
)
// allow for manual termination
.select(exit_rx.then(|_| Ok(())))
.then(move |_| {
2019-03-18 07:22:01 +00:00
info!(log.clone(), "Network service shutdown");
2019-03-12 06:28:11 +00:00
Ok(())
}),
);
Ok(network_exit)
2019-03-12 06:28:11 +00:00
}
fn network_service(
mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
log: slog::Logger,
) -> impl futures::Future<Item = (), Error = libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, libp2p::error::Error> {
// poll the swarm
loop {
match libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::RPC(rpc_event)))) => {
debug!(
libp2p_service.log,
"RPC Event: Rpc message received: {:?}", rpc_event
);
message_handler_send
.send(HandlerMessage::RPC(rpc_event))
2019-03-18 12:34:44 +00:00
.map_err(|_| "failed to send rpc to handler")?;
}
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => {
debug!(libp2p_service.log, "Peer Dialed: {:?}", peer_id);
message_handler_send
.send(HandlerMessage::PeerDialed(peer_id))
2019-03-18 12:34:44 +00:00
.map_err(|_| "failed to send rpc to handler")?;
}
2019-03-12 06:28:11 +00:00
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
libp2p_service.log,
"Network Service: Message received: {}", m
),
_ => break,
}
}
// poll the network channel
// TODO: refactor - combine poll_fn's?
loop {
match network_recv.try_recv() {
// TODO: Testing message - remove
2019-03-18 12:34:44 +00:00
Ok(NetworkMessage::Send(peer_id, outgoing_message)) => {
match outgoing_message {
OutgoingMessage::RPC(rpc_event) => {
trace!(log, "Sending RPC Event: {:?}", rpc_event);
2019-03-12 06:28:11 +00:00
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
2019-03-18 12:34:44 +00:00
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier");
2019-03-12 06:28:11 +00:00
}
};
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Err(libp2p::error::Error::from("Network channel disconnected"));
}
}
}
Ok(Async::NotReady)
})
}
/// Types of messages that the network service can receive.
#[derive(Debug, Clone)]
pub enum NetworkMessage {
/// Send a message to libp2p service.
//TODO: Define typing for messages across the wire
2019-03-18 12:34:44 +00:00
Send(PeerId, OutgoingMessage),
}
/// Type of outgoing messages that can be sent through the network service.
#[derive(Debug, Clone)]
pub enum OutgoingMessage {
/// Send an RPC request/response.
RPC(RPCEvent),
//TODO: Remove
NotifierTest,
}