Tidy network poll and implement Identify

This commit is contained in:
Age Manning 2019-03-21 13:15:14 +11:00
parent 35815ce786
commit 13ac5b1d25
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
2 changed files with 46 additions and 37 deletions

View File

@ -167,9 +167,12 @@ fn build_transport(
/// Events that can be obtained from polling the Libp2p Service. /// Events that can be obtained from polling the Libp2p Service.
pub enum Libp2pEvent { pub enum Libp2pEvent {
// We have received an RPC event on the swarm /// An RPC response request has been received on the swarm.
RPC(PeerId, RPCEvent), RPC(PeerId, RPCEvent),
/// Initiated the connection to a new peer.
PeerDialed(PeerId), PeerDialed(PeerId),
/// Received information about a peer on the network.
Identified(PeerId, IdentifyInfo), Identified(PeerId, IdentifyInfo),
// TODO: Pub-sub testing only.
Message(String), Message(String),
} }

View File

@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor;
/// Service that handles communication between internal services and the eth2_libp2p network service. /// Service that handles communication between internal services and the eth2_libp2p network service.
pub struct Service { pub struct Service {
//eth2_libp2p_service: Arc<Mutex<LibP2PService>>, //libp2p_service: Arc<Mutex<LibP2PService>>,
eth2_libp2p_exit: oneshot::Sender<()>, libp2p_exit: oneshot::Sender<()>,
network_send: crossbeam_channel::Sender<NetworkMessage>, network_send: crossbeam_channel::Sender<NetworkMessage>,
//message_handler: MessageHandler, //message_handler: MessageHandler,
//message_handler_send: Sender<HandlerMessage>, //message_handler_send: Sender<HandlerMessage>,
@ -40,20 +40,20 @@ impl Service {
message_handler_log, message_handler_log,
)?; )?;
// launch eth2_libp2p service // launch libp2p service
let eth2_libp2p_log = log.new(o!("Service" => "Libp2p")); let libp2p_log = log.new(o!("Service" => "Libp2p"));
let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?; let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?;
// TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread. // TODO: Spawn thread to handle libp2p messages and pass to message handler thread.
let eth2_libp2p_exit = spawn_service( let libp2p_exit = spawn_service(
eth2_libp2p_service, libp2p_service,
network_recv, network_recv,
message_handler_send, message_handler_send,
executor, executor,
log, log,
)?; )?;
let network_service = Service { let network_service = Service {
eth2_libp2p_exit, libp2p_exit,
network_send: network_send.clone(), network_send: network_send.clone(),
}; };
@ -72,7 +72,7 @@ impl Service {
} }
fn spawn_service( fn spawn_service(
eth2_libp2p_service: LibP2PService, libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>, network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>, message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
executor: &TaskExecutor, executor: &TaskExecutor,
@ -83,7 +83,7 @@ fn spawn_service(
// spawn on the current executor // spawn on the current executor
executor.spawn( executor.spawn(
network_service( network_service(
eth2_libp2p_service, libp2p_service,
network_recv, network_recv,
message_handler_send, message_handler_send,
log.clone(), log.clone(),
@ -100,7 +100,7 @@ fn spawn_service(
} }
fn network_service( fn network_service(
mut eth2_libp2p_service: LibP2PService, mut libp2p_service: LibP2PService,
network_recv: crossbeam_channel::Receiver<NetworkMessage>, network_recv: crossbeam_channel::Receiver<NetworkMessage>,
message_handler_send: crossbeam_channel::Sender<HandlerMessage>, message_handler_send: crossbeam_channel::Sender<HandlerMessage>,
log: slog::Logger, log: slog::Logger,
@ -108,28 +108,34 @@ fn network_service(
futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> {
// poll the swarm // poll the swarm
loop { loop {
match eth2_libp2p_service.poll() { match libp2p_service.poll() {
Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => { Ok(Async::Ready(Some(event))) => match event {
trace!( Libp2pEvent::RPC(peer_id, rpc_event) => {
eth2_libp2p_service.log, trace!(log, "RPC Event: RPC message received: {:?}", rpc_event);
"RPC Event: RPC message received: {:?}", message_handler_send
rpc_event .send(HandlerMessage::RPC(peer_id, rpc_event))
); .map_err(|_| "failed to send rpc to handler")?;
message_handler_send }
.send(HandlerMessage::RPC(peer_id, rpc_event)) Libp2pEvent::PeerDialed(peer_id) => {
.map_err(|_| "failed to send rpc to handler")?; debug!(log, "Peer Dialed: {:?}", peer_id);
} message_handler_send
Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { .send(HandlerMessage::PeerDialed(peer_id))
debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id); .map_err(|_| "failed to send rpc to handler")?;
message_handler_send }
.send(HandlerMessage::PeerDialed(peer_id)) Libp2pEvent::Identified(peer_id, info) => {
.map_err(|_| "failed to send rpc to handler")?; debug!(
} log,
Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( "We have identified peer: {:?} with {:?}", peer_id, info
eth2_libp2p_service.log, );
"Network Service: Message received: {}", m }
), Libp2pEvent::Message(m) => debug!(
_ => break, libp2p_service.log,
"Network Service: Message received: {}", m
),
},
Ok(Async::Ready(None)) => unreachable!("Stream never ends"),
Ok(Async::NotReady) => break,
Err(_) => break,
} }
} }
// poll the network channel // poll the network channel
@ -143,7 +149,7 @@ fn network_service(
trace!(log, "Sending RPC Event: {:?}", rpc_event); trace!(log, "Sending RPC Event: {:?}", rpc_event);
//TODO: Make swarm private //TODO: Make swarm private
//TODO: Implement correct peer id topic message handling //TODO: Implement correct peer id topic message handling
eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event); libp2p_service.swarm.send_rpc(peer_id, rpc_event);
} }
OutgoingMessage::NotifierTest => { OutgoingMessage::NotifierTest => {
debug!(log, "Received message from notifier"); debug!(log, "Received message from notifier");
@ -165,7 +171,7 @@ fn network_service(
/// Types of messages that the network service can receive. /// Types of messages that the network service can receive.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum NetworkMessage { pub enum NetworkMessage {
/// Send a message to eth2_libp2p service. /// Send a message to libp2p service.
//TODO: Define typing for messages across the wire //TODO: Define typing for messages across the wire
Send(PeerId, OutgoingMessage), Send(PeerId, OutgoingMessage),
} }