Implements a basic libp2p tcp,secio,mplex,gossipsub swarm.
This commit is contained in:
parent
e8e4c4ab9b
commit
9f13731d6d
@ -10,3 +10,4 @@ libp2p = { git = "https://github.com/SigP/rust-libp2p", branch = "gossipsub" }
|
|||||||
slog = "2.4.1"
|
slog = "2.4.1"
|
||||||
version = { path = "../version" }
|
version = { path = "../version" }
|
||||||
tokio = "0.1.16"
|
tokio = "0.1.16"
|
||||||
|
futures = "0.1.25"
|
||||||
|
46
beacon_node/libp2p/src/behaviour.rs
Normal file
46
beacon_node/libp2p/src/behaviour.rs
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
use futures::prelude::*;
|
||||||
|
use libp2p::{
|
||||||
|
core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
|
||||||
|
gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, GossipsubRpc},
|
||||||
|
tokio_io::{AsyncRead, AsyncWrite},
|
||||||
|
NetworkBehaviour, PeerId,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Builds the network behaviour for the libp2p Swarm.
|
||||||
|
/// Implements gossipsub message routing.
|
||||||
|
#[derive(NetworkBehaviour)]
|
||||||
|
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
|
||||||
|
gossipsub: Gossipsub<TSubstream>,
|
||||||
|
// TODO: Add Kademlia for peer discovery
|
||||||
|
/// The events generated by this behaviour to be consumed in the swarm poll.
|
||||||
|
// We use gossipsub events for now, generalise later.
|
||||||
|
#[behaviour(ignore)]
|
||||||
|
events: Vec<GossipsubEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour
|
||||||
|
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
|
||||||
|
for Behaviour<TSubstream>
|
||||||
|
{
|
||||||
|
fn inject_event(&mut self, event: GossipsubEvent) {
|
||||||
|
self.events.push(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream> {
|
||||||
|
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self {
|
||||||
|
Behaviour {
|
||||||
|
gossipsub: Gossipsub::new(local_peer_id, gs_config),
|
||||||
|
events: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consume the events list when polled.
|
||||||
|
fn poll(&mut self) -> Async<NetworkBehaviourAction<GossipsubRpc, GossipsubEvent>> {
|
||||||
|
if !self.events.is_empty() {
|
||||||
|
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Async::NotReady
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@
|
|||||||
/// all required libp2p functionality.
|
/// all required libp2p functionality.
|
||||||
///
|
///
|
||||||
/// This crate builds and manages the libp2p services required by the beacon node.
|
/// This crate builds and manages the libp2p services required by the beacon node.
|
||||||
|
mod behaviour;
|
||||||
mod network_config;
|
mod network_config;
|
||||||
mod service;
|
mod service;
|
||||||
|
|
||||||
|
@ -1,18 +1,29 @@
|
|||||||
|
use crate::behaviour::Behaviour;
|
||||||
use crate::NetworkConfig;
|
use crate::NetworkConfig;
|
||||||
use libp2p::core::{muxing::StreamMuxer, nodes::Substream};
|
use futures::prelude::*;
|
||||||
use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent};
|
use libp2p::core::{
|
||||||
|
muxing::StreamMuxerBox,
|
||||||
|
nodes::Substream,
|
||||||
|
transport::boxed::Boxed,
|
||||||
|
upgrade::{InboundUpgrade, InboundUpgradeExt, OutboundUpgrade, OutboundUpgradeExt},
|
||||||
|
};
|
||||||
use libp2p::{build_tcp_ws_secio_mplex_yamux, core, secio, Transport};
|
use libp2p::{build_tcp_ws_secio_mplex_yamux, core, secio, Transport};
|
||||||
use libp2p::{core::swarm::NetworkBehaviour, PeerId, Swarm};
|
use libp2p::{PeerId, Swarm};
|
||||||
use slog::debug;
|
use slog::debug;
|
||||||
use std::error;
|
use std::error;
|
||||||
|
use std::io::{Error, ErrorKind};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
/// The configuration and state of the libp2p components for the beacon node.
|
/// The configuration and state of the libp2p components for the beacon node.
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
/// The libp2p Swarm handler.
|
/// The libp2p Swarm handler.
|
||||||
swarm: String,
|
swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Behaviour<Substream<StreamMuxerBox>>>,
|
||||||
/// This node's PeerId.
|
/// This node's PeerId.
|
||||||
local_peer_id: PeerId,
|
local_peer_id: PeerId,
|
||||||
}
|
}
|
||||||
|
//Swarm<impl std::clone::Clone+libp2p_core::transport::Transport, behaviour::Behaviour<libp2p_core::muxing::SubstreamRef<std::sync::Arc<impl std::marker::Send+std::marker::Sync+libp2p_core::muxing::StreamMuxer>>>>
|
||||||
|
|
||||||
|
//swarm: Swarm<Boxed<(PeerId, StreamMuxerBox), IoError>, Behaviour<TMessage, Substream<StreamMuxerBox>>>,
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
pub fn new(config: NetworkConfig, log: slog::Logger) -> Self {
|
pub fn new(config: NetworkConfig, log: slog::Logger) -> Self {
|
||||||
@ -25,9 +36,9 @@ impl Service {
|
|||||||
// Set up the transport
|
// Set up the transport
|
||||||
let transport = build_transport(local_private_key);
|
let transport = build_transport(local_private_key);
|
||||||
// Set up gossipsub routing
|
// Set up gossipsub routing
|
||||||
let behaviour = build_behaviour(local_peer_id, config.gs_config);
|
let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config);
|
||||||
// Set up Topology
|
// Set up Topology
|
||||||
let topology = local_peer_id;
|
let topology = local_peer_id.clone();
|
||||||
|
|
||||||
let swarm = Swarm::new(transport, behaviour, topology);
|
let swarm = Swarm::new(transport, behaviour, topology);
|
||||||
|
|
||||||
@ -42,28 +53,33 @@ impl Service {
|
|||||||
/// mplex or yamux as the multiplexing layer.
|
/// mplex or yamux as the multiplexing layer.
|
||||||
fn build_transport(
|
fn build_transport(
|
||||||
local_private_key: secio::SecioKeyPair,
|
local_private_key: secio::SecioKeyPair,
|
||||||
) -> impl Transport<
|
) -> Boxed<(PeerId, StreamMuxerBox), Error> {
|
||||||
Output = (
|
|
||||||
PeerId,
|
|
||||||
impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send>
|
|
||||||
+ Send
|
|
||||||
+ Sync,
|
|
||||||
),
|
|
||||||
Error = impl error::Error + Send,
|
|
||||||
Listener = impl Send,
|
|
||||||
Dial = impl Send,
|
|
||||||
ListenerUpgrade = impl Send,
|
|
||||||
> + Clone {
|
|
||||||
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
|
// TODO: The Wire protocol currently doesn't specify encryption and this will need to be customised
|
||||||
// in the future.
|
// in the future.
|
||||||
build_tcp_ws_secio_mplex_yamux(local_private_key)
|
let transport = libp2p::tcp::TcpConfig::new();
|
||||||
}
|
let transport = libp2p::dns::DnsConfig::new(transport);
|
||||||
|
#[cfg(feature = "libp2p-websocket")]
|
||||||
|
let transport = {
|
||||||
|
let trans_clone = transport.clone();
|
||||||
|
transport.or_transport(websocket::WsConfig::new(trans_clone))
|
||||||
|
};
|
||||||
|
transport
|
||||||
|
.with_upgrade(secio::SecioConfig::new(local_private_key))
|
||||||
|
.and_then(move |out, endpoint| {
|
||||||
|
let peer_id = out.remote_key.into_peer_id();
|
||||||
|
let peer_id2 = peer_id.clone();
|
||||||
|
let upgrade = core::upgrade::SelectUpgrade::new(
|
||||||
|
libp2p::yamux::Config::default(),
|
||||||
|
libp2p::mplex::MplexConfig::new(),
|
||||||
|
)
|
||||||
|
// TODO: use a single `.map` instead of two maps
|
||||||
|
.map_inbound(move |muxer| (peer_id, muxer))
|
||||||
|
.map_outbound(move |muxer| (peer_id2, muxer));
|
||||||
|
|
||||||
/// Builds the network behaviour for the libp2p Swarm.
|
core::upgrade::apply(out.stream, upgrade, endpoint)
|
||||||
fn build_behaviour<TSubstream>(
|
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
|
||||||
local_peer_id: PeerId,
|
})
|
||||||
config: GossipsubConfig,
|
.with_timeout(Duration::from_secs(20))
|
||||||
) -> impl NetworkBehaviour {
|
.map_err(|err| Error::new(ErrorKind::Other, err))
|
||||||
// TODO: Add Kademlia/Peer discovery
|
.boxed()
|
||||||
Gossipsub::new(local_peer_id, config)
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user