From a38f4c4cd1dc1a3c9a22b2c8747364b7316daed1 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 9 Apr 2019 09:15:11 +1000 Subject: [PATCH] Adds Kademlia for peer discovery --- beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/behaviour.rs | 72 ++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 264d77d76..31c0a7f2d 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -19,3 +19,4 @@ version = { path = "../version" } tokio = "0.1.16" futures = "0.1.25" error-chain = "0.12.0" +tokio-timer = "0.2.10" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index f362f5795..591d93bb5 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -9,14 +9,21 @@ use libp2p::{ }, gossipsub::{Gossipsub, GossipsubEvent}, identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, + kad::{Kademlia, KademliaOut}, ping::{Ping, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; use slog::{debug, o, trace, warn}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; +use std::time::{Duration, Instant}; +use tokio_timer::Delay; +>>>>>>> Adds Kademlia for peer discovery use types::{Attestation, BeaconBlock}; +//TODO: Make this dynamic +const TIME_BETWEEN_KAD_REQUESTS: Duration = Duration::from_secs(30); + /// Builds the network behaviour for the libp2p Swarm. /// Implements gossipsub message routing. #[derive(NetworkBehaviour)] @@ -24,17 +31,20 @@ use types::{Attestation, BeaconBlock}; pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, - // TODO: Add Kademlia for peer discovery /// The events generated by this behaviour to be consumed in the swarm poll. serenity_rpc: Rpc, /// Allows discovery of IP addresses for peers on the network. identify: Identify, /// Keep regular connection to peers and disconnect if absent. - // TODO: Keepalive, likely remove this later. - // TODO: Make the ping time customizeable. ping: Ping, + /// Kademlia for peer discovery. + kad: Kademlia, + /// Queue of behaviour events to be processed. #[behaviour(ignore)] events: Vec, + /// The delay until we next search for more peers. + #[behaviour(ignore)] + kad_delay: Delay, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, @@ -121,6 +131,33 @@ impl NetworkBehaviourEventProcess } } +// implement the kademlia behaviour +impl NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, out: KademliaOut) { + match out { + KademliaOut::Discovered { .. } => { + // send this to our topology behaviour + } + KademliaOut::KBucketAdded { .. } => { + // send this to our topology behaviour + } + KademliaOut::FindNodeResult { closer_peers, .. } => { + debug!( + self.log, + "Kademlia query found {} peers", + closer_peers.len() + ); + if closer_peers.is_empty() { + warn!(self.log, "Kademlia random query yielded empty results"); + } + } + KademliaOut::GetProvidersResult { .. } => (), + } + } +} + impl Behaviour { pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { let local_peer_id = local_public_key.clone().into_peer_id(); @@ -128,8 +165,9 @@ impl Behaviour { let behaviour_log = log.new(o!()); Behaviour { - gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), serenity_rpc: Rpc::new(log), + gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), + kad: Kademlia::new(local_peer_id), identify: Identify::new( identify_config.version, identify_config.user_agent, @@ -137,6 +175,7 @@ impl Behaviour { ), ping: Ping::new(), events: Vec::new(), + kad_delay: Delay::new(Instant::now()), log: behaviour_log, } } @@ -149,6 +188,19 @@ impl Behaviour { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } + // check to see if it's time to search for me peers with kademlia + loop { + match self.kad_delay.poll() { + Ok(Async::Ready(_)) => { + self.get_kad_peers(); + } + Ok(Async::NotReady) => break, + Err(e) => { + warn!(self.log, "Error getting peers from Kademlia. Err: {:?}", e); + } + } + } + Async::NotReady } } @@ -172,6 +224,18 @@ impl Behaviour { self.gossipsub.publish(topic, message_bytes.clone()); } } + + /// Queries for more peers randomly using Kademlia. + pub fn get_kad_peers(&mut self) { + // pick a random PeerId + let random_peer = PeerId::random(); + debug!(self.log, "Running kademlia random peer query"); + self.kad.find_node(random_peer); + + // update the kademlia timeout + self.kad_delay + .reset(Instant::now() + TIME_BETWEEN_KAD_REQUESTS); + } } /// The types of events than can be obtained from polling the behaviour.