From 3aa2b376bb3d4c0d2f09b6631631e0f1f3124bc2 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 10:43:21 +1100 Subject: [PATCH 01/14] Increase RPC read/write limit to 4M --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index c19aca8ff..7ec2abc59 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -6,7 +6,7 @@ use std::iter; use tokio::io::{AsyncRead, AsyncWrite}; /// The maximum bytes that can be sent across the RPC. -const MAX_READ_SIZE: usize = 2048; +const MAX_READ_SIZE: usize = 4_194_304; // 4M /// Implementation of the `ConnectionUpgrade` for the rpc protocol. From a14426349a0e417968e6b4ed421b0eb25c6344e5 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 10:53:29 +1100 Subject: [PATCH 02/14] Implement Goodbye RPC call --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 7ec2abc59..ab24a32c3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -81,6 +81,10 @@ fn decode(packet: Vec) -> Result { let (hello_body, _index) = HelloMessage::ssz_decode(&packet, index)?; RPCRequest::Hello(hello_body) } + RPCMethod::Goodbye => { + let (goodbye_code, _index) = u64::ssz_decode(&packet, index)?; + RPCRequest::Goodbye(goodbye_code) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; From 766a79adfa7de11548705fb2f94fe9521d331806 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 11:02:52 +1100 Subject: [PATCH 03/14] Implement BeaconBlockRoots RPC method --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index ab24a32c3..db05df1aa 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -1,4 +1,4 @@ -use super::methods::{HelloMessage, RPCMethod, RPCRequest, RPCResponse}; +use super::methods::*; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use ssz::{ssz_encode, Decodable, Encodable, SszStream}; use std::io; @@ -85,6 +85,11 @@ fn decode(packet: Vec) -> Result { let (goodbye_code, _index) = u64::ssz_decode(&packet, index)?; RPCRequest::Goodbye(goodbye_code) } + RPCMethod::BeaconBlockRoots => { + let (block_roots_request, _index) = + BeaconBlockRootsRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconBlockRoots(block_roots_request) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; @@ -101,6 +106,11 @@ fn decode(packet: Vec) -> Result { let (body, _index) = HelloMessage::ssz_decode(&packet, index)?; RPCResponse::Hello(body) } + RPCMethod::Goodbye => unreachable!("Should never receive a goodbye response"), + RPCMethod::BeaconBlockRoots => { + let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconBlockRoots(body) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Response { From 56cd77ead8dd92c6a1101d7d8cf47f9ba7783981 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 11:05:26 +1100 Subject: [PATCH 04/14] Implement BeaconBlockHeaders RPC method --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index db05df1aa..a15b48e08 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -90,6 +90,11 @@ fn decode(packet: Vec) -> Result { BeaconBlockRootsRequest::ssz_decode(&packet, index)?; RPCRequest::BeaconBlockRoots(block_roots_request) } + RPCMethod::BeaconBlockHeaders => { + let (block_headers_request, _index) = + BeaconBlockHeadersRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconBlockHeaders(block_headers_request) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; @@ -111,6 +116,10 @@ fn decode(packet: Vec) -> Result { let (body, _index) = BeaconBlockRootsResponse::ssz_decode(&packet, index)?; RPCResponse::BeaconBlockRoots(body) } + RPCMethod::BeaconBlockHeaders => { + let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconBlockHeaders(body) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Response { From 3063d5eac951d976ccd725b8cd2f6632b7c495b1 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 11:16:09 +1100 Subject: [PATCH 05/14] Implement BeaconBlockBodies RPC method --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index a15b48e08..0697bdf17 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -95,6 +95,11 @@ fn decode(packet: Vec) -> Result { BeaconBlockHeadersRequest::ssz_decode(&packet, index)?; RPCRequest::BeaconBlockHeaders(block_headers_request) } + RPCMethod::BeaconBlockBodies => { + let (block_bodies_request, _index) = + BeaconBlockBodiesRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconBlockBodies(block_bodies_request) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; @@ -120,6 +125,10 @@ fn decode(packet: Vec) -> Result { let (body, _index) = BeaconBlockHeadersResponse::ssz_decode(&packet, index)?; RPCResponse::BeaconBlockHeaders(body) } + RPCMethod::BeaconBlockBodies => { + let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconBlockBodies(body) + } RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Response { From 950186eca7b504de9bd7e64d233474f93e9cc73b Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 11:18:47 +1100 Subject: [PATCH 06/14] Implement BeaconChainState RPC method --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 0697bdf17..b9dddb71a 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -100,7 +100,12 @@ fn decode(packet: Vec) -> Result { BeaconBlockBodiesRequest::ssz_decode(&packet, index)?; RPCRequest::BeaconBlockBodies(block_bodies_request) } - RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), + RPCMethod::BeaconChainState => { + let (chain_state_request, _index) = + BeaconChainStateRequest::ssz_decode(&packet, index)?; + RPCRequest::BeaconChainState(chain_state_request) + } + RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Request { @@ -129,7 +134,11 @@ fn decode(packet: Vec) -> Result { let (body, _index) = BeaconBlockBodiesResponse::ssz_decode(&packet, index)?; RPCResponse::BeaconBlockBodies(body) } - RPCMethod::Unknown | _ => return Err(DecodeError::UnknownRPCMethod), + RPCMethod::BeaconChainState => { + let (body, _index) = BeaconChainStateResponse::ssz_decode(&packet, index)?; + RPCResponse::BeaconChainState(body) + } + RPCMethod::Unknown => return Err(DecodeError::UnknownRPCMethod), }; Ok(RPCEvent::Response { id, From 437a0505c960f3a218f6d52e56ee053983979f5c Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 11:25:22 +1100 Subject: [PATCH 07/14] Implement encodeable on all RPC methods --- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 29 +++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index b9dddb71a..f4fe26fac 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -178,7 +178,21 @@ impl Encodable for RPCEvent { RPCRequest::Hello(body) => { s.append(body); } - _ => {} + RPCRequest::Goodbye(body) => { + s.append(body); + } + RPCRequest::BeaconBlockRoots(body) => { + s.append(body); + } + RPCRequest::BeaconBlockHeaders(body) => { + s.append(body); + } + RPCRequest::BeaconBlockBodies(body) => { + s.append(body); + } + RPCRequest::BeaconChainState(body) => { + s.append(body); + } } } RPCEvent::Response { @@ -193,7 +207,18 @@ impl Encodable for RPCEvent { RPCResponse::Hello(response) => { s.append(response); } - _ => {} + RPCResponse::BeaconBlockRoots(response) => { + s.append(response); + } + RPCResponse::BeaconBlockHeaders(response) => { + s.append(response); + } + RPCResponse::BeaconBlockBodies(response) => { + s.append(response); + } + RPCResponse::BeaconChainState(response) => { + s.append(response); + } } } } From 7ec37939c8b0129359e5714e79a1ed02c4993067 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 12:41:05 +1100 Subject: [PATCH 08/14] Adds Identify protocol and cleans up network config --- beacon_node/eth2-libp2p/src/behaviour.rs | 44 ++++++++++++-- beacon_node/eth2-libp2p/src/lib.rs | 4 +- beacon_node/eth2-libp2p/src/network_config.rs | 59 ------------------- beacon_node/eth2-libp2p/src/service.rs | 5 +- 4 files changed, 45 insertions(+), 67 deletions(-) delete mode 100644 beacon_node/eth2-libp2p/src/network_config.rs diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 78d013002..a29484e2b 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,8 +1,13 @@ use crate::rpc::{RPCEvent, RPCMessage, Rpc}; +use crate::NetworkConfig; use futures::prelude::*; use libp2p::{ - core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent}, + core::{ + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, + PublicKey, + }, + gossipsub::{Gossipsub, GossipsubEvent}, + identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; @@ -13,10 +18,13 @@ use types::Topic; #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] pub struct Behaviour { + /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, // TODO: Add Kademlia for peer discovery /// The events generated by this behaviour to be consumed in the swarm poll. serenity_rpc: Rpc, + /// Allows discovery of IP addresses for peers on the network. + identify: Identify, #[behaviour(ignore)] events: Vec, } @@ -53,11 +61,38 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, event: IdentifyEvent) { + match event { + IdentifyEvent::Identified { + peer_id, mut info, .. + } => { + if info.listen_addrs.len() > 20 { + info.listen_addrs.truncate(20); + } + self.events.push(BehaviourEvent::Identified(peer_id, info)); + } + IdentifyEvent::Error { .. } => {} + IdentifyEvent::SendBack { .. } => {} + } + } +} + impl Behaviour { - pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig, log: &slog::Logger) -> Self { + pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { + let local_peer_id = local_public_key.clone().into_peer_id(); + let identify_config = net_conf.identify_config.clone(); + Behaviour { - gossipsub: Gossipsub::new(local_peer_id, gs_config), + gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), serenity_rpc: Rpc::new(log), + identify: Identify::new( + identify_config.version, + identify_config.user_agent, + local_public_key, + ), events: Vec::new(), } } @@ -91,6 +126,7 @@ impl Behaviour { pub enum BehaviourEvent { RPC(PeerId, RPCEvent), PeerDialed(PeerId), + Identified(PeerId, IdentifyInfo), // TODO: This is a stub at the moment Message(String), } diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index f3e97355d..f7a961bb2 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -3,16 +3,16 @@ /// /// This crate builds and manages the libp2p services required by the beacon node. pub mod behaviour; +mod config; pub mod error; -mod network_config; pub mod rpc; mod service; +pub use config::Config as NetworkConfig; pub use libp2p::{ gossipsub::{GossipsubConfig, GossipsubConfigBuilder}, PeerId, }; -pub use network_config::NetworkConfig; pub use rpc::{HelloMessage, RPCEvent}; pub use service::Libp2pEvent; pub use service::Service; diff --git a/beacon_node/eth2-libp2p/src/network_config.rs b/beacon_node/eth2-libp2p/src/network_config.rs deleted file mode 100644 index 176892bb0..000000000 --- a/beacon_node/eth2-libp2p/src/network_config.rs +++ /dev/null @@ -1,59 +0,0 @@ -use crate::Multiaddr; -use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; -use libp2p::secio; -use std::fmt; - -#[derive(Clone)] -/// Network configuration for lighthouse. -pub struct NetworkConfig { - //TODO: stubbing networking initial params, change in the future - /// IP address to listen on. - pub listen_addresses: Vec, - /// Listen port UDP/TCP. - pub listen_port: u16, - /// Gossipsub configuration parameters. - pub gs_config: GossipsubConfig, - /// List of nodes to initially connect to. - pub boot_nodes: Vec, - /// Peer key related to this nodes PeerId. - pub local_private_key: secio::SecioKeyPair, - /// Client version - pub client_version: String, - /// List of topics to subscribe to as strings - pub topics: Vec, -} - -impl Default for NetworkConfig { - /// Generate a default network configuration. - fn default() -> Self { - // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this - // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 - - NetworkConfig { - listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000" - .parse() - .expect("is a correct multi-address")], - listen_port: 9000, - gs_config: GossipsubConfigBuilder::new().build(), - boot_nodes: Vec::new(), - local_private_key: secio::SecioKeyPair::secp256k1_generated().unwrap(), - client_version: version::version(), - topics: vec![String::from("beacon_chain")], - } - } -} - -impl NetworkConfig { - pub fn new(boot_nodes: Vec) -> Self { - let mut conf = NetworkConfig::default(); - conf.boot_nodes = boot_nodes; - - conf - } -} - -impl fmt::Debug for NetworkConfig { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "NetworkConfig: listen_addresses: {:?}, listen_port: {:?}, gs_config: {:?}, boot_nodes: {:?}, local_private_key: , client_version: {:?}", self.listen_addresses, self.listen_port, self.gs_config, self.boot_nodes, self.local_private_key.to_public_key(), self.client_version) - } -} diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index e378cd634..7a3937883 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -33,7 +33,8 @@ impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Libp2p Service starting"); - let local_private_key = config.local_private_key; + let local_private_key = config.local_private_key.clone(); + let local_public_key = local_private_key.to_public_key(); let local_peer_id = local_private_key.to_peer_id(); info!(log, "Local peer id: {:?}", local_peer_id); @@ -41,7 +42,7 @@ impl Service { // Set up the transport let transport = build_transport(local_private_key); // Set up gossipsub routing - let behaviour = Behaviour::new(local_peer_id.clone(), config.gs_config, &log); + let behaviour = Behaviour::new(local_public_key.clone(), &config, &log); // Set up Topology let topology = local_peer_id.clone(); Swarm::new(transport, behaviour, topology) From 67a3dfe05218085c56e49fd333368917cb67ac23 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 12:45:23 +1100 Subject: [PATCH 09/14] Remove node private key from config --- beacon_node/eth2-libp2p/src/config.rs | 75 ++++++++++++++++++++++++++ beacon_node/eth2-libp2p/src/service.rs | 6 ++- 2 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 beacon_node/eth2-libp2p/src/config.rs diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs new file mode 100644 index 000000000..a86045b78 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -0,0 +1,75 @@ +use crate::Multiaddr; +use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder}; + +#[derive(Clone, Debug)] +/// Network configuration for lighthouse. +pub struct Config { + //TODO: stubbing networking initial params, change in the future + /// IP address to listen on. + pub listen_addresses: Vec, + /// Listen port UDP/TCP. + pub listen_port: u16, + /// Gossipsub configuration parameters. + pub gs_config: GossipsubConfig, + /// Configuration parameters for node identification protocol. + pub identify_config: IdentifyConfig, + /// List of nodes to initially connect to. + pub boot_nodes: Vec, + /// Client version + pub client_version: String, + /// List of topics to subscribe to as strings + pub topics: Vec, +} + +impl Default for Config { + /// Generate a default network configuration. + fn default() -> Self { + Config { + listen_addresses: vec!["/ip4/127.0.0.1/tcp/9000" + .parse() + .expect("is a correct multi-address")], + listen_port: 9000, + gs_config: GossipsubConfigBuilder::new().build(), + identify_config: IdentifyConfig::new(&version::version()), + boot_nodes: Vec::new(), + client_version: version::version(), + topics: vec![String::from("beacon_chain")], + } + } +} + +impl Config { + pub fn new(boot_nodes: Vec) -> Self { + let mut conf = Config::default(); + conf.boot_nodes = boot_nodes; + + conf + } +} + +/// The configuration parameters for the Identify protocol +#[derive(Debug, Clone)] +pub struct IdentifyConfig { + /// The protocol version to listen on. + pub version: String, + /// The client's name and version for identification. + pub user_agent: String, +} + +impl Default for IdentifyConfig { + fn default() -> Self { + Self { + version: "/eth/serenity/1.0".to_string(), + user_agent: "Lighthouse".to_string(), + } + } +} + +impl IdentifyConfig { + /// Adds the version to the user agent. + pub fn new(version: &String) -> Self { + let mut config = IdentifyConfig::default(); + config.user_agent += version; + config + } +} diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 7a3937883..ca20465a6 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -33,7 +33,11 @@ impl Service { pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { debug!(log, "Libp2p Service starting"); - let local_private_key = config.local_private_key.clone(); + // TODO: Currently using secp256k1 key pairs. Wire protocol specifies RSA. Waiting for this + // PR to be merged to generate RSA keys: https://github.com/briansmith/ring/pull/733 + // TODO: Save and recover node key from disk + let local_private_key = secio::SecioKeyPair::secp256k1_generated().unwrap(); + let local_public_key = local_private_key.to_public_key(); let local_peer_id = local_private_key.to_peer_id(); info!(log, "Local peer id: {:?}", local_peer_id); From 35815ce7868b3f0af016b396dff8fa378ae6f5bd Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 12:57:41 +1100 Subject: [PATCH 10/14] Cleans up swarm poll and adds identify behaviour --- beacon_node/eth2-libp2p/src/service.rs | 31 ++++++++++++++++---------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index ca20465a6..d23fae211 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -11,8 +11,8 @@ use libp2p::core::{ transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, }; -use libp2p::{core, secio, Transport}; -use libp2p::{PeerId, Swarm}; +use libp2p::identify::protocol::IdentifyInfo; +use libp2p::{core, secio, PeerId, Swarm, Transport}; use slog::{debug, info, trace, warn}; use std::io::{Error, ErrorKind}; use std::time::Duration; @@ -104,17 +104,23 @@ impl Stream for Service { // TODO: Currently only gossipsub events passed here. // Build a type for more generic events match self.swarm.poll() { - Ok(Async::Ready(Some(BehaviourEvent::Message(m)))) => { + //Behaviour events + Ok(Async::Ready(Some(event))) => match event { // TODO: Stub here for debugging - debug!(self.log, "Message received: {}", m); - return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); - } - Ok(Async::Ready(Some(BehaviourEvent::RPC(peer_id, event)))) => { - return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); - } - Ok(Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id)))) => { - return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); - } + BehaviourEvent::Message(m) => { + debug!(self.log, "Message received: {}", m); + return Ok(Async::Ready(Some(Libp2pEvent::Message(m)))); + } + BehaviourEvent::RPC(peer_id, event) => { + return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); + } + BehaviourEvent::PeerDialed(peer_id) => { + return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); + } + BehaviourEvent::Identified(peer_id, info) => { + return Ok(Async::Ready(Some(Libp2pEvent::Identified(peer_id, info)))); + } + }, Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, _ => break, @@ -164,5 +170,6 @@ pub enum Libp2pEvent { // We have received an RPC event on the swarm RPC(PeerId, RPCEvent), PeerDialed(PeerId), + Identified(PeerId, IdentifyInfo), Message(String), } From 13ac5b1d255594175ebf76015ee9cc6dd428fe07 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 13:15:14 +1100 Subject: [PATCH 11/14] Tidy network poll and implement Identify --- beacon_node/eth2-libp2p/src/service.rs | 5 +- beacon_node/network/src/service.rs | 78 ++++++++++++++------------ 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index d23fae211..e68df2d38 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -167,9 +167,12 @@ fn build_transport( /// Events that can be obtained from polling the Libp2p Service. pub enum Libp2pEvent { - // We have received an RPC event on the swarm + /// An RPC response request has been received on the swarm. RPC(PeerId, RPCEvent), + /// Initiated the connection to a new peer. PeerDialed(PeerId), + /// Received information about a peer on the network. Identified(PeerId, IdentifyInfo), + // TODO: Pub-sub testing only. Message(String), } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 14f994e4a..a3eb6f0d9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -15,8 +15,8 @@ use tokio::runtime::TaskExecutor; /// Service that handles communication between internal services and the eth2_libp2p network service. pub struct Service { - //eth2_libp2p_service: Arc>, - eth2_libp2p_exit: oneshot::Sender<()>, + //libp2p_service: Arc>, + libp2p_exit: oneshot::Sender<()>, network_send: crossbeam_channel::Sender, //message_handler: MessageHandler, //message_handler_send: Sender, @@ -40,20 +40,20 @@ impl Service { message_handler_log, )?; - // launch eth2_libp2p service - let eth2_libp2p_log = log.new(o!("Service" => "Libp2p")); - let eth2_libp2p_service = LibP2PService::new(config.clone(), eth2_libp2p_log)?; + // launch libp2p service + let libp2p_log = log.new(o!("Service" => "Libp2p")); + let libp2p_service = LibP2PService::new(config.clone(), libp2p_log)?; - // TODO: Spawn thread to handle eth2_libp2p messages and pass to message handler thread. - let eth2_libp2p_exit = spawn_service( - eth2_libp2p_service, + // TODO: Spawn thread to handle libp2p messages and pass to message handler thread. + let libp2p_exit = spawn_service( + libp2p_service, network_recv, message_handler_send, executor, log, )?; let network_service = Service { - eth2_libp2p_exit, + libp2p_exit, network_send: network_send.clone(), }; @@ -72,7 +72,7 @@ impl Service { } fn spawn_service( - eth2_libp2p_service: LibP2PService, + libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, executor: &TaskExecutor, @@ -83,7 +83,7 @@ fn spawn_service( // spawn on the current executor executor.spawn( network_service( - eth2_libp2p_service, + libp2p_service, network_recv, message_handler_send, log.clone(), @@ -100,7 +100,7 @@ fn spawn_service( } fn network_service( - mut eth2_libp2p_service: LibP2PService, + mut libp2p_service: LibP2PService, network_recv: crossbeam_channel::Receiver, message_handler_send: crossbeam_channel::Sender, log: slog::Logger, @@ -108,28 +108,34 @@ fn network_service( futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { // poll the swarm loop { - match eth2_libp2p_service.poll() { - Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, rpc_event)))) => { - trace!( - eth2_libp2p_service.log, - "RPC Event: RPC message received: {:?}", - rpc_event - ); - message_handler_send - .send(HandlerMessage::RPC(peer_id, rpc_event)) - .map_err(|_| "failed to send rpc to handler")?; - } - Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))) => { - debug!(eth2_libp2p_service.log, "Peer Dialed: {:?}", peer_id); - message_handler_send - .send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "failed to send rpc to handler")?; - } - Ok(Async::Ready(Some(Libp2pEvent::Message(m)))) => debug!( - eth2_libp2p_service.log, - "Network Service: Message received: {}", m - ), - _ => break, + match libp2p_service.poll() { + Ok(Async::Ready(Some(event))) => match event { + Libp2pEvent::RPC(peer_id, rpc_event) => { + trace!(log, "RPC Event: RPC message received: {:?}", rpc_event); + message_handler_send + .send(HandlerMessage::RPC(peer_id, rpc_event)) + .map_err(|_| "failed to send rpc to handler")?; + } + Libp2pEvent::PeerDialed(peer_id) => { + debug!(log, "Peer Dialed: {:?}", peer_id); + message_handler_send + .send(HandlerMessage::PeerDialed(peer_id)) + .map_err(|_| "failed to send rpc to handler")?; + } + Libp2pEvent::Identified(peer_id, info) => { + debug!( + log, + "We have identified peer: {:?} with {:?}", peer_id, info + ); + } + Libp2pEvent::Message(m) => debug!( + libp2p_service.log, + "Network Service: Message received: {}", m + ), + }, + Ok(Async::Ready(None)) => unreachable!("Stream never ends"), + Ok(Async::NotReady) => break, + Err(_) => break, } } // poll the network channel @@ -143,7 +149,7 @@ fn network_service( trace!(log, "Sending RPC Event: {:?}", rpc_event); //TODO: Make swarm private //TODO: Implement correct peer id topic message handling - eth2_libp2p_service.swarm.send_rpc(peer_id, rpc_event); + libp2p_service.swarm.send_rpc(peer_id, rpc_event); } OutgoingMessage::NotifierTest => { debug!(log, "Received message from notifier"); @@ -165,7 +171,7 @@ fn network_service( /// Types of messages that the network service can receive. #[derive(Debug, Clone)] pub enum NetworkMessage { - /// Send a message to eth2_libp2p service. + /// Send a message to libp2p service. //TODO: Define typing for messages across the wire Send(PeerId, OutgoingMessage), } From 71dca8af366918153350698302f073835ecc1a2d Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 13:28:34 +1100 Subject: [PATCH 12/14] Correct user agent string --- beacon_node/eth2-libp2p/src/config.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index a86045b78..2b4972237 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -30,7 +30,7 @@ impl Default for Config { .expect("is a correct multi-address")], listen_port: 9000, gs_config: GossipsubConfigBuilder::new().build(), - identify_config: IdentifyConfig::new(&version::version()), + identify_config: IdentifyConfig::default(), boot_nodes: Vec::new(), client_version: version::version(), topics: vec![String::from("beacon_chain")], @@ -60,16 +60,7 @@ impl Default for IdentifyConfig { fn default() -> Self { Self { version: "/eth/serenity/1.0".to_string(), - user_agent: "Lighthouse".to_string(), + user_agent: version::version(), } } } - -impl IdentifyConfig { - /// Adds the version to the user agent. - pub fn new(version: &String) -> Self { - let mut config = IdentifyConfig::default(); - config.user_agent += version; - config - } -} From 7f976124dff099fe6fb0d1f1c9aaa23751c55f31 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 13:34:37 +1100 Subject: [PATCH 13/14] Add logging to libp2p behaviour --- beacon_node/eth2-libp2p/src/behaviour.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index a29484e2b..a3ac8417f 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -11,6 +11,7 @@ use libp2p::{ tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; +use slog::{debug, o}; use types::Topic; /// Builds the network behaviour for the libp2p Swarm. @@ -27,6 +28,9 @@ pub struct Behaviour { identify: Identify, #[behaviour(ignore)] events: Vec, + #[behaviour(ignore)] + /// Logger for behaviour actions. + log: slog::Logger, } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour @@ -70,6 +74,10 @@ impl NetworkBehaviourEventProcess { if info.listen_addrs.len() > 20 { + debug!( + self.log, + "More than 20 peers have been identified, truncating" + ); info.listen_addrs.truncate(20); } self.events.push(BehaviourEvent::Identified(peer_id, info)); @@ -84,6 +92,7 @@ impl Behaviour { pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { let local_peer_id = local_public_key.clone().into_peer_id(); let identify_config = net_conf.identify_config.clone(); + let behaviour_log = log.new(o!()); Behaviour { gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), @@ -94,6 +103,7 @@ impl Behaviour { local_public_key, ), events: Vec::new(), + log: behaviour_log, } } From 84f0ad2ae7464795a8d2ed92d0d4126eff163b5e Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 21 Mar 2019 13:42:02 +1100 Subject: [PATCH 14/14] Add Ping protocol to lighthouse --- beacon_node/eth2-libp2p/src/behaviour.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index a3ac8417f..458b32cf9 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -8,6 +8,7 @@ use libp2p::{ }, gossipsub::{Gossipsub, GossipsubEvent}, identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, + ping::{Ping, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; @@ -26,10 +27,14 @@ pub struct Behaviour { serenity_rpc: Rpc, /// Allows discovery of IP addresses for peers on the network. identify: Identify, + /// Keep regular connection to peers and disconnect if absent. + // TODO: Keepalive, likely remove this later. + // TODO: Make the ping time customizeable. + ping: Ping, #[behaviour(ignore)] events: Vec, - #[behaviour(ignore)] /// Logger for behaviour actions. + #[behaviour(ignore)] log: slog::Logger, } @@ -88,6 +93,14 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess + for Behaviour +{ + fn inject_event(&mut self, _event: PingEvent) { + // not interested in ping responses at the moment. + } +} + impl Behaviour { pub fn new(local_public_key: PublicKey, net_conf: &NetworkConfig, log: &slog::Logger) -> Self { let local_peer_id = local_public_key.clone().into_peer_id(); @@ -102,6 +115,7 @@ impl Behaviour { identify_config.user_agent, local_public_key, ), + ping: Ping::new(), events: Vec::new(), log: behaviour_log, }