From 9f13731d6d02feb3045a27471ae8061e0c5a77cc Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 7 Mar 2019 16:17:06 +1100 Subject: [PATCH] Implements a basic libp2p tcp,secio,mplex,gossipsub swarm. --- beacon_node/libp2p/Cargo.toml | 1 + beacon_node/libp2p/src/behaviour.rs | 46 +++++++++++++++++++ beacon_node/libp2p/src/lib.rs | 1 + beacon_node/libp2p/src/service.rs | 70 ++++++++++++++++++----------- 4 files changed, 91 insertions(+), 27 deletions(-) create mode 100644 beacon_node/libp2p/src/behaviour.rs diff --git a/beacon_node/libp2p/Cargo.toml b/beacon_node/libp2p/Cargo.toml index fff7dc82d..9c4c6e7a5 100644 --- a/beacon_node/libp2p/Cargo.toml +++ b/beacon_node/libp2p/Cargo.toml @@ -10,3 +10,4 @@ libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" } slog = "2.4.1" version = { path = "../version" } tokio = "0.1.16" +futures = "0.1.25" diff --git a/beacon_node/libp2p/src/behaviour.rs b/beacon_node/libp2p/src/behaviour.rs new file mode 100644 index 000000000..0c9aae16e --- /dev/null +++ b/beacon_node/libp2p/src/behaviour.rs @@ -0,0 +1,46 @@ +use futures::prelude::*; +use libp2p::{ + core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, + gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, GossipsubRpc}, + tokio_io::{AsyncRead, AsyncWrite}, + NetworkBehaviour, PeerId, +}; + +/// Builds the network behaviour for the libp2p Swarm. +/// Implements gossipsub message routing. +#[derive(NetworkBehaviour)] +pub struct Behaviour { + gossipsub: Gossipsub, + // TODO: Add Kademlia for peer discovery + /// The events generated by this behaviour to be consumed in the swarm poll. + // We use gossipsub events for now, generalise later. + #[behaviour(ignore)] + events: Vec, +} + +// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: GossipsubEvent) { + self.events.push(event); + } +} + +impl Behaviour { + pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self { + Behaviour { + gossipsub: Gossipsub::new(local_peer_id, gs_config), + events: Vec::new(), + } + } + + /// Consume the events list when polled. + fn poll(&mut self) -> Async> { + if !self.events.is_empty() { + return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } + + Async::NotReady + } +} diff --git a/beacon_node/libp2p/src/lib.rs b/beacon_node/libp2p/src/lib.rs index 7b4514337..01dc42073 100644 --- a/beacon_node/libp2p/src/lib.rs +++ b/beacon_node/libp2p/src/lib.rs @@ -2,6 +2,7 @@ /// all required libp2p functionality. /// /// This crate builds and manages the libp2p services required by the beacon node. +mod behaviour; mod network_config; mod service; diff --git a/beacon_node/libp2p/src/service.rs b/beacon_node/libp2p/src/service.rs index 528d24ce8..7ed715bd6 100644 --- a/beacon_node/libp2p/src/service.rs +++ b/beacon_node/libp2p/src/service.rs @@ -1,18 +1,29 @@ +use crate::behaviour::Behaviour; use crate::NetworkConfig; -use libp2p::core::{muxing::StreamMuxer, nodes::Substream}; -use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent}; +use futures::prelude::*; +use libp2p::core::{ + muxing::StreamMuxerBox, + nodes::Substream, + transport::boxed::Boxed, + upgrade::{InboundUpgrade, InboundUpgradeExt, OutboundUpgrade, OutboundUpgradeExt}, +}; use libp2p::{build_tcp_ws_secio_mplex_yamux, core, secio, Transport}; -use libp2p::{core::swarm::NetworkBehaviour, PeerId, Swarm}; +use libp2p::{PeerId, Swarm}; use slog::debug; use std::error; +use std::io::{Error, ErrorKind}; +use std::time::Duration; /// The configuration and state of the libp2p components for the beacon node. pub struct Service { /// The libp2p Swarm handler. - swarm: String, + swarm: Swarm, Behaviour>>, /// This node's PeerId. local_peer_id: PeerId, } +//Swarm>>> + +//swarm: Swarm, Behaviour>>, impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> Self { @@ -25,9 +36,9 @@ impl Service { // Set up the transport let transport = build_transport(local_private_key); // Set up gossipsub routing - let behaviour = build_behaviour(local_peer_id, config.gs_config); + let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config); // Set up Topology - let topology = local_peer_id; + let topology = local_peer_id.clone(); let swarm = Swarm::new(transport, behaviour, topology); @@ -42,28 +53,33 @@ impl Service { /// mplex or yamux as the multiplexing layer. fn build_transport( local_private_key: secio::SecioKeyPair, -) -> impl Transport< - Output = ( - PeerId, - impl core::muxing::StreamMuxer - + Send - + Sync, - ), - Error = impl error::Error + Send, - Listener = impl Send, - Dial = impl Send, - ListenerUpgrade = impl Send, -> + Clone { +) -> Boxed<(PeerId, StreamMuxerBox), Error> { // TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised // in the future. - build_tcp_ws_secio_mplex_yamux(local_private_key) -} + let transport = libp2p::tcp::TcpConfig::new(); + let transport = libp2p::dns::DnsConfig::new(transport); + #[cfg(feature = "libp2p-websocket")] + let transport = { + let trans_clone = transport.clone(); + transport.or_transport(websocket::WsConfig::new(trans_clone)) + }; + transport + .with_upgrade(secio::SecioConfig::new(local_private_key)) + .and_then(move |out, endpoint| { + let peer_id = out.remote_key.into_peer_id(); + let peer_id2 = peer_id.clone(); + let upgrade = core::upgrade::SelectUpgrade::new( + libp2p::yamux::Config::default(), + libp2p::mplex::MplexConfig::new(), + ) + // TODO: use a single `.map` instead of two maps + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); -/// Builds the network behaviour for the libp2p Swarm. -fn build_behaviour( - local_peer_id: PeerId, - config: GossipsubConfig, -) -> impl NetworkBehaviour { - // TODO: Add Kademlia/Peer discovery - Gossipsub::new(local_peer_id, config) + core::upgrade::apply(out.stream, upgrade, endpoint) + .map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer))) + }) + .with_timeout(Duration::from_secs(20)) + .map_err(|err| Error::new(ErrorKind::Other, err)) + .boxed() }