Allows Libp2p service to be read outside network thread

This commit is contained in:
Age Manning 2019-07-24 17:45:31 +10:00
parent 7d38cba252
commit ae96325c81
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
6 changed files with 47 additions and 56 deletions

View File

@ -8,7 +8,7 @@ use tokio::runtime::TaskExecutor;
use tokio::timer::Interval; use tokio::timer::Interval;
/// The interval between heartbeat events. /// The interval between heartbeat events.
pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 5; pub const HEARTBEAT_INTERVAL_SECONDS: u64 = 15;
/// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS` /// Spawns a thread that can be used to run code periodically, on `HEARTBEAT_INTERVAL_SECONDS`
/// durations. /// durations.
@ -25,19 +25,22 @@ pub fn run<T: BeaconChainTypes + Send + Sync + 'static>(
Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS), Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS),
); );
let _log = client.log.new(o!("Service" => "Notifier")); let log = client.log.new(o!("Service" => "Notifier"));
let libp2p = client.network.libp2p_service();
let heartbeat = move |_| {
// Notify the number of connected nodes
// Panic if libp2p is poisoned
debug!(log, ""; "Connected Peers" => libp2p.lock().swarm.connected_peers());
let heartbeat = |_| {
// There is not presently any heartbeat logic.
//
// We leave this function empty for future use.
Ok(()) Ok(())
}; };
// map error and spawn // map error and spawn
let log = client.log.clone(); let err_log = client.log.clone();
let heartbeat_interval = interval let heartbeat_interval = interval
.map_err(move |e| debug!(log, "Timer error {}", e)) .map_err(move |e| debug!(err_log, "Timer error {}", e))
.for_each(heartbeat); .for_each(heartbeat);
executor.spawn(exit.until(heartbeat_interval).map(|_| ())); executor.spawn(exit.until(heartbeat_interval).map(|_| ()));

View File

@ -171,6 +171,11 @@ impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) {
self.serenity_rpc.send_rpc(peer_id, rpc_event); self.serenity_rpc.send_rpc(peer_id, rpc_event);
} }
/* Discovery / Peer management functions */
pub fn connected_peers(&self) -> usize {
self.discovery.connected_peers()
}
} }
/// The types of events than can be obtained from polling the behaviour. /// The types of events than can be obtained from polling the behaviour.

View File

@ -106,6 +106,11 @@ impl<TSubstream> Discovery<TSubstream> {
self.discovery.add_enr(enr); self.discovery.add_enr(enr);
} }
/// The current number of connected libp2p peers.
pub fn connected_peers(&self) -> usize {
self.connected_peers.len()
}
/// Search for new peers using the underlying discovery mechanism. /// Search for new peers using the underlying discovery mechanism.
fn find_peers(&mut self) { fn find_peers(&mut self) {
// pick a random NodeId // pick a random NodeId

View File

@ -19,3 +19,4 @@ tree_hash = { path = "../../eth2/utils/tree_hash" }
futures = "0.1.25" futures = "0.1.25"
error-chain = "0.12.0" error-chain = "0.12.0"
tokio = "0.1.16" tokio = "0.1.16"
parking_lot = "0.9.0"

View File

@ -105,7 +105,7 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) {
match rpc_message { match rpc_message {
RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req),
RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), RPCEvent::Response(_id, resp) => self.handle_rpc_response(peer_id, resp),
RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error), RPCEvent::Error(id, error) => self.handle_rpc_error(peer_id, id, error),
} }
} }
@ -148,18 +148,10 @@ impl<T: BeaconChainTypes + 'static> MessageHandler<T> {
/// An RPC response has been received from the network. /// An RPC response has been received from the network.
// we match on id and ignore responses past the timeout. // we match on id and ignore responses past the timeout.
fn handle_rpc_response( fn handle_rpc_response(&mut self, peer_id: PeerId, error_response: RPCErrorResponse) {
&mut self,
peer_id: PeerId,
id: RequestId,
error_response: RPCErrorResponse,
) {
// an error could have occurred. // an error could have occurred.
// TODO: Handle Error gracefully // TODO: Handle Error gracefully
match error_response { match error_response {
RPCErrorResponse::EncodingError => {
warn!(self.log, "Encoding Error"; "peer" => format!("{:?}", peer_id), "request_id" => format!("{}",id))
}
RPCErrorResponse::InvalidRequest(error) => { RPCErrorResponse::InvalidRequest(error) => {
warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string()) warn!(self.log, "";"peer" => format!("{:?}", peer_id), "Invalid Request" => error.as_string())
} }

View File

@ -8,6 +8,7 @@ use eth2_libp2p::{Libp2pEvent, PeerId};
use eth2_libp2p::{PubsubMessage, RPCEvent}; use eth2_libp2p::{PubsubMessage, RPCEvent};
use futures::prelude::*; use futures::prelude::*;
use futures::Stream; use futures::Stream;
use parking_lot::Mutex;
use slog::{debug, info, o, trace}; use slog::{debug, info, o, trace};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
@ -16,9 +17,9 @@ use tokio::sync::{mpsc, oneshot};
/// Service that handles communication between internal services and the eth2_libp2p network service. /// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service<T: BeaconChainTypes> { pub struct Service<T: BeaconChainTypes> {
//libp2p_service: Arc<Mutex<LibP2PService>>, libp2p_service: Arc<Mutex<LibP2PService>>,
_libp2p_exit: oneshot::Sender<()>, _libp2p_exit: oneshot::Sender<()>,
network_send: mpsc::UnboundedSender<NetworkMessage>, _network_send: mpsc::UnboundedSender<NetworkMessage>,
_phantom: PhantomData<T>, //message_handler: MessageHandler, _phantom: PhantomData<T>, //message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage> //message_handler_send: Sender<HandlerMessage>
} }
@ -43,38 +44,33 @@ impl<T: BeaconChainTypes + 'static> Service<T> {
// launch libp2p service // launch libp2p service
let libp2p_log = log.new(o!("Service" => "Libp2p")); let libp2p_log = log.new(o!("Service" => "Libp2p"));
let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; let libp2p_service = Arc::new(Mutex::new(LibP2PService::new(config.clone(), libp2p_log)?));
// TODO: Spawn thread to handle libp2p messages and pass to message handler thread. // TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let libp2p_exit = spawn_service( let libp2p_exit = spawn_service(
libp2p_service, libp2p_service.clone(),
network_recv, network_recv,
message_handler_send, message_handler_send,
executor, executor,
log, log,
)?; )?;
let network_service = Service { let network_service = Service {
libp2p_service,
_libp2p_exit: libp2p_exit, _libp2p_exit: libp2p_exit,
network_send: network_send.clone(), _network_send: network_send.clone(),
_phantom: PhantomData, _phantom: PhantomData,
}; };
Ok((Arc::new(network_service), network_send)) Ok((Arc::new(network_service), network_send))
} }
// TODO: Testing only pub fn libp2p_service(&self) -> Arc<Mutex<LibP2PService>> {
pub fn send_message(&mut self) { self.libp2p_service.clone()
self.network_send
.try_send(NetworkMessage::Send(
PeerId::random(),
OutgoingMessage::NotifierTest,
))
.unwrap();
} }
} }
fn spawn_service( fn spawn_service(
libp2p_service: LibP2PService, libp2p_service: Arc<Mutex<LibP2PService>>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>, network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
message_handler_send: mpsc::UnboundedSender<HandlerMessage>, message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
executor: &TaskExecutor, executor: &TaskExecutor,
@ -103,7 +99,7 @@ fn spawn_service(
//TODO: Potentially handle channel errors //TODO: Potentially handle channel errors
fn network_service( fn network_service(
mut libp2p_service: LibP2PService, libp2p_service: Arc<Mutex<LibP2PService>>,
mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>, mut network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>, mut message_handler_send: mpsc::UnboundedSender<HandlerMessage>,
log: slog::Logger, log: slog::Logger,
@ -115,28 +111,18 @@ fn network_service(
not_ready_count = 0; not_ready_count = 0;
// poll the network channel // poll the network channel
match network_recv.poll() { match network_recv.poll() {
Ok(Async::Ready(Some(message))) => { Ok(Async::Ready(Some(message))) => match message {
match message { NetworkMessage::Send(peer_id, outgoing_message) => match outgoing_message {
// TODO: Testing message - remove OutgoingMessage::RPC(rpc_event) => {
NetworkMessage::Send(peer_id, outgoing_message) => { trace!(log, "Sending RPC Event: {:?}", rpc_event);
match outgoing_message { libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event);
OutgoingMessage::RPC(rpc_event) => {
trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private
//TODO: Implement correct peer id topic message handling
libp2p_service.swarm.send_rpc(peer_id, rpc_event);
}
OutgoingMessage::NotifierTest => {
// debug!(log, "Received message from notifier");
}
};
}
NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.swarm.publish(topics, *message);
} }
},
NetworkMessage::Publish { topics, message } => {
debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics));
libp2p_service.lock().swarm.publish(topics, *message);
} }
} },
Ok(Async::NotReady) => not_ready_count += 1, Ok(Async::NotReady) => not_ready_count += 1,
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
return Err(eth2_libp2p::error::Error::from("Network channel closed")); return Err(eth2_libp2p::error::Error::from("Network channel closed"));
@ -147,7 +133,7 @@ fn network_service(
} }
// poll the swarm // poll the swarm
match libp2p_service.poll() { match libp2p_service.lock().poll() {
Ok(Async::Ready(Some(event))) => match event { Ok(Async::Ready(Some(event))) => match event {
Libp2pEvent::RPC(peer_id, rpc_event) => { Libp2pEvent::RPC(peer_id, rpc_event) => {
trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
@ -182,6 +168,7 @@ fn network_service(
Err(_) => not_ready_count += 1, Err(_) => not_ready_count += 1,
} }
} }
Ok(Async::NotReady) Ok(Async::NotReady)
}) })
} }
@ -204,6 +191,4 @@ pub enum NetworkMessage {
pub enum OutgoingMessage { pub enum OutgoingMessage {
/// Send an RPC request/response. /// Send an RPC request/response.
RPC(RPCEvent), RPC(RPCEvent),
//TODO: Remove
NotifierTest,
} }