From f7c2e4c5af318a5f064d2b350eddd534048a06c7 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 30 Apr 2019 15:12:57 +1000 Subject: [PATCH] Initial core grouping of libp2p behaviours --- beacon_node/client/Cargo.toml | 2 +- beacon_node/eth2-libp2p/Cargo.toml | 2 +- beacon_node/eth2-libp2p/src/behaviour.rs | 48 +-- beacon_node/eth2-libp2p/src/core-behaviour.rs | 279 ++++++++++++++++++ beacon_node/eth2-libp2p/src/discovery.rs | 13 +- beacon_node/network/Cargo.toml | 2 +- 6 files changed, 311 insertions(+), 35 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/core-behaviour.rs diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 9e4474644..f97173bea 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -19,7 +19,7 @@ slot_clock = { path = "../../eth2/utils/slot_clock" } serde = "1.0" serde_derive = "1.0" error-chain = "0.12.0" -slog = "^2.2.3" +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_trace"] } slog-term = "^2.4.0" slog-async = "^2.3.0" ssz = { path = "../../eth2/utils/ssz" } diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 13dfcdbe8..2deeaf5f0 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -15,7 +15,7 @@ serde = "1.0" serde_derive = "1.0" ssz = { path = "../../eth2/utils/ssz" } ssz_derive = { path = "../../eth2/utils/ssz_derive" } -slog = "^2.2.3" +slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_trace"] } version = { path = "../version" } tokio = "0.1.16" futures = "0.1.25" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index e952d1f81..7ddbd95b7 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -3,6 +3,7 @@ use crate::rpc::{RPCEvent, RPCMessage, Rpc}; use crate::NetworkConfig; use crate::{Topic, TopicHash}; use futures::prelude::*; +use libp2p::Multiaddr; use libp2p::{ core::{ swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, @@ -19,16 +20,18 @@ use slog::{debug, o, trace, warn}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; use std::time::{Duration, Instant}; use tokio_timer::Delay; +use std::collections::HashMap; use types::{Attestation, BeaconBlock}; -/// Builds the network behaviour for the libp2p Swarm. -/// Implements gossipsub message routing. +/// Builds the network behaviour that manages the core protocols of eth2. +/// This core behaviour is managed by `Behaviour` which adds peer management to all core +/// behaviours. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] -pub struct Behaviour { +#[behaviour(out_event = "CoreBehaviourEvent", poll_method = "poll")] +pub struct CoreCoreBehaviourTSubstream: AsyncRead + AsyncWrite> { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, - /// The events generated by this behaviour to be consumed in the swarm poll. + /// The serenity RPC specified in the wire-0 protocol. serenity_rpc: Rpc, /// Allows discovery of IP addresses for peers on the network. identify: Identify, @@ -36,9 +39,9 @@ pub struct Behaviour { ping: Ping, /// Kademlia for peer discovery. discovery: Discovery, - /// Queue of behaviour events to be processed. #[behaviour(ignore)] - events: Vec, + /// The events generated by this behaviour to be consumed in the swarm poll. + events: Vec, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, @@ -46,7 +49,7 @@ pub struct Behaviour { // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour impl NetworkBehaviourEventProcess - for Behaviour + for CoreBehaviourTSubstream> { fn inject_event(&mut self, event: GossipsubEvent) { match event { @@ -79,7 +82,7 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour + for CoreBehaviourTSubstream> { fn inject_event(&mut self, event: RPCMessage) { match event { @@ -94,7 +97,7 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour + for CoreBehaviourTSubstream> { fn inject_event(&mut self, event: IdentifyEvent) { match event { @@ -124,7 +127,7 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour + for CoreBehaviourTSubstream> { fn inject_event(&mut self, _event: PingEvent) { // not interested in ping responses at the moment. @@ -133,14 +136,14 @@ impl NetworkBehaviourEventProcess // implement the discovery behaviour (currently kademlia) impl NetworkBehaviourEventProcess - for Behaviour + for CoreBehaviourTSubstream> { fn inject_event(&mut self, _out: KademliaOut) { // not interested in kademlia results at the moment } } -impl Behaviour { +impl CoreBehaviourTSubstream> { pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { let local_peer_id = local_public_key.clone().into_peer_id(); let identify_config = net_conf.identify_config.clone(); @@ -174,17 +177,14 @@ impl Behaviour { } /// Implements the combined behaviour for the libp2p service. -impl Behaviour { +impl CoreBehaviourTSubstream> { + /* Pubsub behaviour functions */ + /// Subscribes to a gossipsub topic. pub fn subscribe(&mut self, topic: Topic) -> bool { self.gossipsub.subscribe(topic) } - /// Sends an RPC Request/Response via the RPC protocol. - pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.serenity_rpc.send_rpc(peer_id, rpc_event); - } - /// Publishes a message on the pubsub (gossipsub) behaviour. pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { let message_bytes = ssz_encode(&message); @@ -192,10 +192,18 @@ impl Behaviour { self.gossipsub.publish(topic, message_bytes.clone()); } } + + /* Eth2 RPC behaviour functions */ + + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.serenity_rpc.send_rpc(peer_id, rpc_event); + } + } /// The types of events than can be obtained from polling the behaviour. -pub enum BehaviourEvent { +pub enum CoreBehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), Identified(PeerId, Box), diff --git a/beacon_node/eth2-libp2p/src/core-behaviour.rs b/beacon_node/eth2-libp2p/src/core-behaviour.rs new file mode 100644 index 000000000..e59183b4c --- /dev/null +++ b/beacon_node/eth2-libp2p/src/core-behaviour.rs @@ -0,0 +1,279 @@ +use crate::discovery::Discovery; +use crate::rpc::{RPCEvent, RPCMessage, Rpc}; +use crate::NetworkConfig; +use crate::{Topic, TopicHash}; +use futures::prelude::*; +use libp2p::Multiaddr; +use libp2p::{ + core::{ + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, + PublicKey, + }, + gossipsub::{Gossipsub, GossipsubEvent}, + identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, + kad::KademliaOut, + ping::{Ping, PingEvent}, + tokio_io::{AsyncRead, AsyncWrite}, + NetworkBehaviour, PeerId, +}; +use slog::{debug, o, trace, warn}; +use ssz::{ssz_encode, Decodable, DecodeError, Encodable, SszStream}; +use std::collections::HashMap; +use types::{Attestation, BeaconBlock}; + +/// Builds the network behaviour that manages the core protocols of eth2. +/// This core behaviour is managed by `Behaviour` which adds peer management to all core +/// behaviours. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "CoreBehaviourEvent", poll_method = "poll")] +pub struct CoreBehaviour { + /// The routing pub-sub mechanism for eth2. + gossipsub: Gossipsub, + /// The serenity RPC specified in the wire-0 protocol. + serenity_rpc: Rpc, + /// Allows discovery of IP addresses for peers on the network. + identify: Identify, + /// Keep regular connection to peers and disconnect if absent. + ping: Ping, + /// Kademlia for peer discovery. + discovery: Discovery, + #[behaviour(ignore)] + /// The events generated by this behaviour to be consumed by the global behaviour. + events: Vec, + /// Logger for behaviour actions. + #[behaviour(ignore)] + log: slog::Logger, +} + +impl CoreBehaviour { + pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { + let local_peer_id = local_public_key.clone().into_peer_id(); + let identify_config = net_conf.identify_config.clone(); + let behaviour_log = log.new(o!()); + + CoreBehaviour { + serenity_rpc: Rpc::new(log), + gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), + discovery: Discovery::new(local_peer_id, log), + identify: Identify::new( + identify_config.version, + identify_config.user_agent, + local_public_key, + ), + ping: Ping::new(), + events: Vec::new(), + log: behaviour_log, + } + } + + /// Consumes the events list when polled. + fn poll( + &mut self, + ) -> Async> { + if !self.events.is_empty() { + return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } + + Async::NotReady + } +} + +// Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for CoreBehaviour +impl NetworkBehaviourEventProcess + for CoreBehaviour +{ + fn inject_event(&mut self, event: GossipsubEvent) { + match event { + GossipsubEvent::Message(gs_msg) => { + trace!(self.log, "Received GossipEvent"; "msg" => format!("{:?}", gs_msg)); + + let pubsub_message = match PubsubMessage::ssz_decode(&gs_msg.data, 0) { + //TODO: Punish peer on error + Err(e) => { + warn!( + self.log, + "Received undecodable message from Peer {:?} error", gs_msg.source; + "error" => format!("{:?}", e) + ); + return; + } + Ok((msg, _index)) => msg, + }; + + self.events.push(BehaviourEvent::GossipMessage { + source: gs_msg.source, + topics: gs_msg.topics, + message: pubsub_message, + }); + } + GossipsubEvent::Subscribed { + peer_id: _, + topic: _, + } + | GossipsubEvent::Unsubscribed { + peer_id: _, + topic: _, + } => {} + } + } +} + +impl NetworkBehaviourEventProcess + for CoreBehaviour +{ + fn inject_event(&mut self, event: RPCMessage) { + match event { + RPCMessage::PeerDialed(peer_id) => { + self.events.push(BehaviourEvent::PeerDialed(peer_id)) + } + RPCMessage::RPC(peer_id, rpc_event) => { + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) + } + } + } +} + +impl NetworkBehaviourEventProcess + for CoreBehaviour +{ + fn inject_event(&mut self, event: IdentifyEvent) { + match event { + IdentifyEvent::Identified { + peer_id, mut info, .. + } => { + if info.listen_addrs.len() > 20 { + debug!( + self.log, + "More than 20 peers have been identified, truncating" + ); + info.listen_addrs.truncate(20); + } + trace!(self.log, "Found addresses"; "Peer Id" => format!("{:?}", peer_id), "Addresses" => format!("{:?}", info.listen_addrs)); + // inject the found addresses into our discovery behaviour + for address in &info.listen_addrs { + self.discovery + .add_connected_address(&peer_id, address.clone()); + } + self.events.push(BehaviourEvent::Identified(peer_id, info)); + } + IdentifyEvent::Error { .. } => {} + IdentifyEvent::SendBack { .. } => {} + } + } +} + +impl NetworkBehaviourEventProcess + for CoreBehaviour +{ + fn inject_event(&mut self, _event: PingEvent) { + // not interested in ping responses at the moment. + } +} + +// implement the discovery behaviour (currently kademlia) +impl NetworkBehaviourEventProcess + for CoreBehaviour +{ + fn inject_event(&mut self, _out: KademliaOut) { + // not interested in kademlia results at the moment + } +} + +/// Implements the combined behaviour for the libp2p service. +impl CoreBehaviour { + /* Pubsub behaviour functions */ + + /// Subscribes to a gossipsub topic. + pub fn subscribe(&mut self, topic: Topic) -> bool { + self.gossipsub.subscribe(topic) + } + + /// Publishes a message on the pubsub (gossipsub) behaviour. + pub fn publish(&mut self, topics: Vec, message: PubsubMessage) { + let message_bytes = ssz_encode(&message); + for topic in topics { + self.gossipsub.publish(topic, message_bytes.clone()); + } + } + + /* Eth2 RPC behaviour functions */ + + /// Sends an RPC Request/Response via the RPC protocol. + pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + self.serenity_rpc.send_rpc(peer_id, rpc_event); + } +} + +/// The types of events than can be obtained from polling the behaviour. +pub enum CoreBehaviourEvent { + RPC(PeerId, RPCEvent), + PeerDialed(PeerId), + Identified(PeerId, IdentifyInfo), + // TODO: This is a stub at the moment + GossipMessage { + source: PeerId, + topics: Vec, + message: PubsubMessage, + }, +} + +/// Messages that are passed to and from the pubsub (Gossipsub) behaviour. +#[derive(Debug, Clone, PartialEq)] +pub enum PubsubMessage { + /// Gossipsub message providing notification of a new block. + Block(BeaconBlock), + /// Gossipsub message providing notification of a new attestation. + Attestation(Attestation), +} + +//TODO: Correctly encode/decode enums. Prefixing with integer for now. +impl Encodable for PubsubMessage { + fn ssz_append(&self, s: &mut SszStream) { + match self { + PubsubMessage::Block(block_gossip) => { + 0u32.ssz_append(s); + block_gossip.ssz_append(s); + } + PubsubMessage::Attestation(attestation_gossip) => { + 1u32.ssz_append(s); + attestation_gossip.ssz_append(s); + } + } + } +} + +impl Decodable for PubsubMessage { + fn ssz_decode(bytes: &[u8], index: usize) -> Result<(Self, usize), DecodeError> { + let (id, index) = u32::ssz_decode(bytes, index)?; + match id { + 0 => { + let (block, index) = BeaconBlock::ssz_decode(bytes, index)?; + Ok((PubsubMessage::Block(block), index)) + } + 1 => { + let (attestation, index) = Attestation::ssz_decode(bytes, index)?; + Ok((PubsubMessage::Attestation(attestation), index)) + } + _ => Err(DecodeError::Invalid), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use types::*; + + #[test] + fn ssz_encoding() { + let original = PubsubMessage::Block(BeaconBlock::empty(&ChainSpec::foundation())); + + let encoded = ssz_encode(&original); + + println!("{:?}", encoded); + + let (decoded, _i) = PubsubMessage::ssz_decode(&encoded, 0).unwrap(); + + assert_eq!(original, decoded); + } +} diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index d6fd43ef4..dc91b487c 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -9,7 +9,6 @@ use libp2p::core::swarm::{ use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler}; use libp2p::kad::{Kademlia, KademliaOut}; use slog::{debug, o, warn}; -use std::collections::HashMap; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; @@ -27,8 +26,6 @@ pub struct Discovery { discovery: Kademlia, /// The delay between peer discovery searches. peer_discovery_delay: Delay, - /// Mapping of known addresses for peer ids. - known_peers: HashMap>, /// Logger for the discovery behaviour. log: slog::Logger, } @@ -37,10 +34,8 @@ impl Discovery { pub fn new(local_peer_id: PeerId, log: &slog::Logger) -> Self { let log = log.new(o!("Service" => "Libp2p-Discovery")); Self { - // events: Vec::new(), discovery: Kademlia::new(local_peer_id), peer_discovery_delay: Delay::new(Instant::now()), - known_peers: HashMap::new(), log, } } @@ -59,13 +54,6 @@ impl Discovery { /// We have discovered an address for a peer, add it to known peers. pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { - let known_peers = self - .known_peers - .entry(peer_id.clone()) - .or_insert_with(|| vec![]); - if !known_peers.contains(&address) { - known_peers.push(address.clone()); - } // pass the address on to kademlia self.discovery.add_connected_address(peer_id, address); } @@ -160,6 +148,7 @@ where }, _ => {} }; + // propagate result upwards return Async::Ready(action); } Async::NotReady => (), diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index ebf71aa4e..cd8ba0820 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -13,7 +13,7 @@ store = { path = "../store" } eth2-libp2p = { path = "../eth2-libp2p" } version = { path = "../version" } types = { path = "../../eth2/types" } -slog = { version = "^2.2.3" , features = ["max_level_trace", "release_max_level_debug"] } +slog = { version = "^2.2.3" } ssz = { path = "../../eth2/utils/ssz" } tree_hash = { path = "../../eth2/utils/tree_hash" } futures = "0.1.25"