Basic networking service with channel

This commit is contained in:
Age Manning 2019-03-12 17:28:11 +11:00
parent 21032334ac
commit ae983a9347
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
13 changed files with 224 additions and 29 deletions

View File

@ -22,9 +22,9 @@ use tokio::runtime::TaskExecutor;
pub struct Client<T: ClientTypes> { pub struct Client<T: ClientTypes> {
config: ClientConfig, config: ClientConfig,
// beacon_chain: Arc<BeaconChain<T, U, F>>, // beacon_chain: Arc<BeaconChain<T, U, F>>,
network: Option<Arc<NetworkService>>, pub network: Arc<NetworkService>,
exit: exit_future::Exit, pub exit: exit_future::Exit,
exit_signal: Option<Signal>, pub exit_signal: Signal,
log: slog::Logger, log: slog::Logger,
phantom: PhantomData<T>, phantom: PhantomData<T>,
} }
@ -44,14 +44,15 @@ impl<T: ClientTypes> Client<T> {
// TODO: Add beacon_chain reference to network parameters // TODO: Add beacon_chain reference to network parameters
let network_config = config.net_conf.clone(); let network_config = config.net_conf.clone();
let network_logger = log.new(o!("Service" => "Network")); let network_logger = log.new(o!("Service" => "Network"));
let (network, network_send) = NetworkService::new(network_config, network_logger)?; let (network, network_send) =
NetworkService::new(network_config, executor, network_logger)?;
Ok(Client { Ok(Client {
config, config,
exit, exit,
exit_signal: Some(exit_signal), exit_signal: exit_signal,
log, log,
network: Some(network), network: network,
phantom: PhantomData, phantom: PhantomData,
}) })
} }

View File

@ -4,9 +4,10 @@ use db::ClientDB;
use exit_future::Exit; use exit_future::Exit;
use fork_choice::ForkChoice; use fork_choice::ForkChoice;
use futures::{Future, Stream}; use futures::{Future, Stream};
use network::NodeMessage;
use slog::{debug, info, o}; use slog::{debug, info, o};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use tokio::timer::Interval; use tokio::timer::Interval;
@ -19,9 +20,21 @@ pub fn run<T: ClientTypes>(client: &Client<T>, executor: TaskExecutor, exit: Exi
let log = client.log.new(o!("Service" => "Notifier")); let log = client.log.new(o!("Service" => "Notifier"));
// TODO: Debugging only
let counter = Arc::new(Mutex::new(0));
let network = client.network.clone();
// build heartbeat logic here // build heartbeat logic here
let heartbeat = move |_| { let heartbeat = move |_| {
info!(log, "Temp heartbeat output"); info!(log, "Temp heartbeat output");
let mut count = counter.lock().unwrap();
*count += 1;
if *count % 5 == 0 {
debug!(log, "Sending Message");
network.send_message(String::from("Testing network channel"))
}
Ok(()) Ok(())
}; };

View File

@ -12,3 +12,4 @@ slog = "2.4.1"
version = { path = "../version" } version = { path = "../version" }
tokio = "0.1.16" tokio = "0.1.16"
futures = "0.1.25" futures = "0.1.25"
error-chain = "0.12.0"

View File

@ -1,7 +1,7 @@
use futures::prelude::*; use futures::prelude::*;
use libp2p::{ use libp2p::{
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, GossipsubRpc}, gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent},
tokio_io::{AsyncRead, AsyncWrite}, tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };
@ -9,13 +9,14 @@ use libp2p::{
/// Builds the network behaviour for the libp2p Swarm. /// Builds the network behaviour for the libp2p Swarm.
/// Implements gossipsub message routing. /// Implements gossipsub message routing.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")]
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> { pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
gossipsub: Gossipsub<TSubstream>, gossipsub: Gossipsub<TSubstream>,
// TODO: Add Kademlia for peer discovery // TODO: Add Kademlia for peer discovery
/// The events generated by this behaviour to be consumed in the swarm poll. /// The events generated by this behaviour to be consumed in the swarm poll.
// We use gossipsub events for now, generalise later. // We use gossipsub events for now, generalise later.
#[behaviour(ignore)] #[behaviour(ignore)]
events: Vec<GossipsubEvent>, events: Vec<BehaviourEvent>,
} }
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
@ -23,7 +24,15 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
for Behaviour<TSubstream> for Behaviour<TSubstream>
{ {
fn inject_event(&mut self, event: GossipsubEvent) { fn inject_event(&mut self, event: GossipsubEvent) {
self.events.push(event); match event {
GossipsubEvent::Message(message) => {
let gs_message = String::from_utf8_lossy(&message.data);
// TODO: Remove this type - debug only
self.events
.push(BehaviourEvent::Message(gs_message.to_string()))
}
_ => {}
}
} }
} }
@ -35,8 +44,10 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
} }
} }
/// Consume the events list when polled. /// Consumes the events list when polled.
fn poll(&mut self) -> Async<NetworkBehaviourAction<GossipsubRpc, GossipsubEvent>> { fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, BehaviourEvent>> {
if !self.events.is_empty() { if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
} }
@ -44,3 +55,16 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
Async::NotReady Async::NotReady
} }
} }
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn send_message(&self, message: String) {
// TODO: Encode and send via gossipsub
}
}
/// The types of events than can be obtained from polling the behaviour.
pub enum BehaviourEvent {
// TODO: This is a stub at the moment
Message(String),
}

View File

@ -0,0 +1,8 @@
// generates error types
use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {}

View File

@ -2,7 +2,8 @@
/// all required libp2p functionality. /// all required libp2p functionality.
/// ///
/// This crate builds and manages the libp2p services required by the beacon node. /// This crate builds and manages the libp2p services required by the beacon node.
mod behaviour; pub mod behaviour;
pub mod error;
mod network_config; mod network_config;
mod service; mod service;
@ -11,6 +12,7 @@ pub use libp2p::{
PeerId, PeerId,
}; };
pub use network_config::NetworkConfig; pub use network_config::NetworkConfig;
pub use service::Libp2pEvent;
pub use service::Service; pub use service::Service;
pub use types::multiaddr; pub use types::multiaddr;
pub use types::Multiaddr; pub use types::Multiaddr;

View File

@ -1,7 +1,9 @@
use crate::behaviour::Behaviour; use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::error;
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::NetworkConfig; use crate::NetworkConfig;
use futures::prelude::*; use futures::prelude::*;
use futures::Stream;
use libp2p::core::{ use libp2p::core::{
muxing::StreamMuxerBox, muxing::StreamMuxerBox,
nodes::Substream, nodes::Substream,
@ -17,13 +19,16 @@ use std::time::Duration;
/// The configuration and state of the libp2p components for the beacon node. /// The configuration and state of the libp2p components for the beacon node.
pub struct Service { pub struct Service {
/// The libp2p Swarm handler. /// The libp2p Swarm handler.
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Behaviour<Substream<StreamMuxerBox>>>, //TODO: Make this private
pub swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Behaviour<Substream<StreamMuxerBox>>>,
/// This node's PeerId. /// This node's PeerId.
local_peer_id: PeerId, local_peer_id: PeerId,
/// The libp2p logger handle.
pub log: slog::Logger,
} }
impl Service { impl Service {
pub fn new(config: NetworkConfig, log: slog::Logger) -> Self { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result<Self> {
debug!(log, "Libp2p Service starting"); debug!(log, "Libp2p Service starting");
let local_private_key = config.local_private_key; let local_private_key = config.local_private_key;
@ -50,7 +55,7 @@ impl Service {
Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err), Err(err) => warn!(log, "Cannot listen on: {} : {:?}", address, err),
}; };
} }
// connect to boot nodes - these are currently stored as multiadders // connect to boot nodes - these are currently stored as multiaddrs
// Once we have discovery, can set to peerId // Once we have discovery, can set to peerId
for bootnode in config.boot_nodes { for bootnode in config.boot_nodes {
match Swarm::dial_addr(&mut swarm, bootnode.clone()) { match Swarm::dial_addr(&mut swarm, bootnode.clone()) {
@ -62,10 +67,36 @@ impl Service {
}; };
} }
Service { Ok(Service {
local_peer_id, local_peer_id,
swarm, swarm,
log,
})
}
}
impl Stream for Service {
type Item = Libp2pEvent;
type Error = crate::error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
// TODO: Currently only gossipsub events passed here.
// Build a type for more generic events
match self.swarm.poll() {
Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => {
// TODO: Stub here for debugging
debug!(self.log, "Message received: {}", m);
return Ok(Async::Ready(Some(Libp2pEvent::Message(m))));
}
// TODO: Fill with all behaviour events
_ => break,
Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"),
Ok(Async::NotReady) => break,
_ => break,
}
} }
Ok(Async::NotReady)
} }
} }
@ -103,3 +134,8 @@ fn build_transport(
.map_err(|err| Error::new(ErrorKind::Other, err)) .map_err(|err| Error::new(ErrorKind::Other, err))
.boxed() .boxed()
} }
/// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent {
Message(String),
}

View File

@ -13,3 +13,4 @@ slog = "2.4.1"
futures = "0.1.25" futures = "0.1.25"
error-chain = "0.12.0" error-chain = "0.12.0"
crossbeam-channel = "0.3.8" crossbeam-channel = "0.3.8"
tokio = "0.1.16"

View File

@ -1,8 +1,13 @@
// generates error types // generates error types
use libp2p;
use error_chain::{ use error_chain::{
error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed,
impl_extract_backtrace, impl_extract_backtrace,
}; };
error_chain! {} error_chain! {
links {
Libp2p(libp2p::error::Error, libp2p::error::ErrorKind);
}
}

View File

@ -5,4 +5,5 @@ mod messages;
mod service; mod service;
pub use libp2p::NetworkConfig; pub use libp2p::NetworkConfig;
pub use messages::NodeMessage;
pub use service::Service; pub use service::Service;

View File

@ -2,11 +2,15 @@ use libp2p::PeerId;
use types::{Hash256, Slot}; use types::{Hash256, Slot};
/// Messages between nodes across the network. /// Messages between nodes across the network.
#[derive(Debug, Clone)]
pub enum NodeMessage { pub enum NodeMessage {
Status(Status), Status(Status),
BlockRequest, BlockRequest,
// TODO: only for testing - remove
Message(String),
} }
#[derive(Debug, Clone)]
pub struct Status { pub struct Status {
/// Current node version. /// Current node version.
version: u8, version: u8,
@ -19,6 +23,7 @@ pub struct Status {
} }
/// Types of messages that the network service can receive. /// Types of messages that the network service can receive.
#[derive(Debug, Clone)]
pub enum NetworkMessage { pub enum NetworkMessage {
/// Send a message to libp2p service. /// Send a message to libp2p service.
//TODO: Define typing for messages across the wire //TODO: Define typing for messages across the wire

View File

@ -2,39 +2,135 @@ use crate::error;
use crate::message_handler::{HandlerMessage, MessageHandler}; use crate::message_handler::{HandlerMessage, MessageHandler};
use crate::messages::{NetworkMessage, NodeMessage}; use crate::messages::{NetworkMessage, NodeMessage};
use crate::NetworkConfig; use crate::NetworkConfig;
use crossbeam_channel::{unbounded as channel, Sender}; use crossbeam_channel::{unbounded as channel, Sender, TryRecvError};
use futures::future::lazy;
use futures::future::poll_fn;
use futures::prelude::*;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::Stream;
use libp2p::behaviour::BehaviourEvent;
use libp2p::error::Error as libp2pError;
use libp2p::Service as LibP2PService; use libp2p::Service as LibP2PService;
use libp2p::{Libp2pEvent, PeerId};
use slog::{debug, info, o, trace, warn, Logger}; use slog::{debug, info, o, trace, warn, Logger};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::runtime::TaskExecutor;
/// Service that handles communication between internal services and the libp2p network service. /// Service that handles communication between internal services and the libp2p network service.
pub struct Service { pub struct Service {
//libp2p_service: Arc<Mutex<LibP2PService>>, //libp2p_service: Arc<Mutex<LibP2PService>>,
//libp2p_thread: oneshot::Sender<()>, libp2p_exit: oneshot::Sender<()>,
//message_handler: MessageHandler, network_send: crossbeam_channel::Sender<NetworkMessage>,
//message_handler_send: Sender<HandlerMessage>, //message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>,
} }
impl Service { impl Service {
pub fn new( pub fn new(
config: NetworkConfig, config: NetworkConfig,
executor: TaskExecutor,
log: slog::Logger, log: slog::Logger,
) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> { ) -> error::Result<(Arc<Self>, Sender<NetworkMessage>)> {
debug!(log, "Service starting");
let (network_send, network_recv) = channel::<NetworkMessage>();
// launch message handler thread // launch message handler thread
let message_handler_log = log.new(o!("Service" => "MessageHandler")); let message_handler_log = log.new(o!("Service" => "MessageHandler"));
let message_handler_send = MessageHandler::new(message_handler_log); let message_handler_send = MessageHandler::new(message_handler_log)?;
// launch libp2p service // launch libp2p service
let libp2p_log = log.new(o!("Service" => "Libp2p")); let libp2p_log = log.new(o!("Service" => "Libp2p"));
let libp2p_service = LibP2PService::new(config, libp2p_log); let libp2p_service = LibP2PService::new(config, libp2p_log)?;
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread. // TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let network = Service {}; let (network_send, libp2p_exit) =
spawn_service(libp2p_service, message_handler_send, executor, log)?;
let network = Service {
libp2p_exit,
network_send: network_send.clone(),
};
Ok((Arc::new(network), network_send)) Ok((Arc::new(network), network_send))
} }
// TODO: Testing only
pub fn send_message(&self, message: String) {
let node_message = NodeMessage::Message(message);
self.network_send
.send(NetworkMessage::Send(PeerId::random(), node_message));
}
}
fn spawn_service(
libp2p_service: LibP2PService,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
executor: TaskExecutor,
log: slog::Logger,
) -> error::Result<(
crossbeam_channel::Sender<NetworkMessage>,
oneshot::Sender<()>,
)> {
let (network_exit, exit_rx) = oneshot::channel();
let (network_send, network_recv) = channel::<NetworkMessage>();
// spawn on the current executor
executor.spawn(
network_service(
libp2p_service,
network_recv,
message_handler_send,
log.clone(),
)
// allow for manual termination
.select(exit_rx.then(|_| Ok(())))
.then(move |_| {
debug!(log.clone(), "Network service ended");
Ok(())
}),
);
Ok((network_send, network_exit))
}
fn network_service(
mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
log: slog::Logger,
) -> impl futures::Future<Item = (), Error = libp2p::error::Error> {
futures::future::poll_fn(move || -> Result<_, libp2p::error::Error> {
// poll the swarm
loop {
match libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!(
libp2p_service.log,
"Network Service: Message received: {}", m
),
_ => break,
}
}
// poll the network channel
// TODO: refactor - combine poll_fn's?
loop {
match network_recv.try_recv() {
// TODO: Testing message - remove
Ok(NetworkMessage::Send(_peer_id, node_message)) => {
match node_message {
NodeMessage::Message(m) => {
debug!(log, "Message received via network channel: {:?}", m);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
libp2p_service.swarm.send_message(m);
}
//TODO: Handle all NodeMessage types
_ => break,
};
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Err(libp2p::error::Error::from("Network channel disconnected"));
}
// TODO: Implement all NetworkMessage
_ => break,
}
}
Ok(Async::NotReady)
})
} }

View File

@ -40,6 +40,8 @@ pub fn run_beacon_node(config: ClientConfig, log: slog::Logger) -> error::Result
// perform global shutdown operations. // perform global shutdown operations.
info!(log, "Shutting down.."); info!(log, "Shutting down..");
exit_signal.fire(); exit_signal.fire();
// shutdown the client
// client.exit_signal.fire();
drop(client); drop(client);
runtime.shutdown_on_idle().wait().unwrap(); runtime.shutdown_on_idle().wait().unwrap();
Ok(()) Ok(())