From 0613bc16fc54f5d02434ec7540500ba255ab5dc9 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 6 Aug 2019 15:09:47 +1000 Subject: [PATCH] Update to latest libp2p, gossipsub improvements --- beacon_node/eth2-libp2p/Cargo.toml | 4 ++-- beacon_node/eth2-libp2p/src/behaviour.rs | 6 ++---- beacon_node/eth2-libp2p/src/config.rs | 8 ++++++-- beacon_node/eth2-libp2p/src/discovery.rs | 6 ++---- beacon_node/eth2-libp2p/src/lib.rs | 2 +- beacon_node/eth2-libp2p/src/rpc/handler.rs | 7 ++++--- beacon_node/eth2-libp2p/src/rpc/mod.rs | 6 +++--- beacon_node/eth2-libp2p/src/service.rs | 20 +++++++++++++------- beacon_node/rpc/src/attestation.rs | 4 ++-- beacon_node/rpc/src/beacon_block.rs | 4 ++-- beacon_node/src/main.rs | 15 +++++++++++---- 11 files changed, 48 insertions(+), 34 deletions(-) diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 405c72cc4..f5fe8a877 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -7,8 +7,8 @@ edition = "2018" [dependencies] clap = "2.32.0" #SigP repository -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" } -enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "35104cca27231b9178e1fea5b3788ea41ba8af76" } +enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "35104cca27231b9178e1fea5b3788ea41ba8af76", features = ["serde"] } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 33acd41e1..fcb147949 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -4,14 +4,12 @@ use crate::{error, NetworkConfig}; use crate::{Topic, TopicHash}; use futures::prelude::*; use libp2p::{ - core::{ - identity::Keypair, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, - }, + core::identity::Keypair, discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent}, identify::{Identify, IdentifyEvent}, ping::{Ping, PingConfig, PingEvent}, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, }; diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index d04eae14b..44d07795b 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -64,9 +64,9 @@ impl Default for Config { discovery_port: 9000, max_peers: 10, //TODO: Set realistic values for production + // Note: This defaults topics to plain strings. Not hashes gs_config: GossipsubConfigBuilder::new() - .max_gossip_size(4_000_000) - .inactivity_timeout(Duration::from_secs(90)) + .max_transmit_size(1_000_000) .heartbeat_interval(Duration::from_secs(20)) .build(), boot_nodes: vec![], @@ -134,6 +134,10 @@ impl Config { .collect::, _>>()?; } + if let Some(topics_str) = args.value_of("topics") { + self.topics = topics_str.split(',').map(|s| s.into()).collect(); + } + if let Some(discovery_address_str) = args.value_of("discovery-address") { self.discovery_address = discovery_address_str .parse() diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index 96cf71846..4c1794945 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -4,13 +4,11 @@ use crate::{error, NetworkConfig}; /// Currently using discv5 for peer discovery. /// use futures::prelude::*; -use libp2p::core::swarm::{ - ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, -}; -use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId}; use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::enr::{Enr, EnrBuilder, NodeId}; use libp2p::multiaddr::Protocol; +use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use slog::{debug, info, o, warn}; use std::collections::HashSet; use std::fs::File; diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 7a3b2e632..ca6ac3760 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -13,7 +13,7 @@ pub use behaviour::PubsubMessage; pub use config::{ Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, }; -pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; +pub use libp2p::gossipsub::{Topic, TopicHash}; pub use libp2p::multiaddr; pub use libp2p::Multiaddr; pub use libp2p::{ diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 4e796f6fb..76e04d24e 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -5,10 +5,10 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; -use libp2p::core::protocols_handler::{ +use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade}; use smallvec::SmallVec; use std::time::{Duration, Instant}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -273,7 +273,8 @@ where Self::Error, > { if let Some(err) = self.pending_error.take() { - return Err(err); + dbg!(&err); + //return Err(err); } // return any events that need to be reported diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 88060e602..5593660ff 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -6,9 +6,9 @@ use futures::prelude::*; use handler::RPCHandler; -use libp2p::core::protocols_handler::ProtocolsHandler; -use libp2p::core::swarm::{ - ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +use libp2p::core::ConnectedPoint; +use libp2p::swarm::{ + protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 5c7c0c7f1..5a2fc8d8b 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -3,7 +3,7 @@ use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; -use crate::{TopicBuilder, TopicHash}; +use crate::{Topic, TopicHash}; use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; use futures::prelude::*; use futures::Stream; @@ -90,15 +90,21 @@ impl Service { // subscribe to default gossipsub topics let mut topics = vec![]; //TODO: Handle multiple shard attestations. For now we simply use a separate topic for - //attestations - topics.push(BEACON_ATTESTATION_TOPIC.to_string()); - topics.push(BEACON_PUBSUB_TOPIC.to_string()); - topics.append(&mut config.topics.clone()); + // attestations + topics.push(Topic::new(BEACON_ATTESTATION_TOPIC.into())); + topics.push(Topic::new(BEACON_PUBSUB_TOPIC.into())); + topics.append( + &mut config + .topics + .iter() + .cloned() + .map(|s| Topic::new(s)) + .collect(), + ); let mut subscribed_topics = vec![]; for topic in topics { - let t = TopicBuilder::new(topic.clone()).build(); - if swarm.subscribe(t) { + if swarm.subscribe(topic.clone()) { trace!(log, "Subscribed to topic: {:?}", topic); subscribed_topics.push(topic); } else { diff --git a/beacon_node/rpc/src/attestation.rs b/beacon_node/rpc/src/attestation.rs index 5ea8368fd..cbbe4de6e 100644 --- a/beacon_node/rpc/src/attestation.rs +++ b/beacon_node/rpc/src/attestation.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::PubsubMessage; -use eth2_libp2p::TopicBuilder; +use eth2_libp2p::Topic; use eth2_libp2p::BEACON_ATTESTATION_TOPIC; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; @@ -140,7 +140,7 @@ impl AttestationService for AttestationServiceInstance { ); // valid attestation, propagate to the network - let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build(); + let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into()); let message = PubsubMessage::Attestation(attestation); self.network_chan diff --git a/beacon_node/rpc/src/beacon_block.rs b/beacon_node/rpc/src/beacon_block.rs index b42bbb208..2a8ae2c6b 100644 --- a/beacon_node/rpc/src/beacon_block.rs +++ b/beacon_node/rpc/src/beacon_block.rs @@ -1,6 +1,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use eth2_libp2p::BEACON_PUBSUB_TOPIC; -use eth2_libp2p::{PubsubMessage, TopicBuilder}; +use eth2_libp2p::{PubsubMessage, Topic}; use futures::Future; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use network::NetworkMessage; @@ -106,7 +106,7 @@ impl BeaconBlockService for BeaconBlockServiceInstance { ); // get the network topic to send on - let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build(); + let topic = Topic::new(BEACON_PUBSUB_TOPIC.into()); let message = PubsubMessage::Block(block); // Publish the block to the p2p network via gossipsub. diff --git a/beacon_node/src/main.rs b/beacon_node/src/main.rs index 9a1af2e08..c85eeedac 100644 --- a/beacon_node/src/main.rs +++ b/beacon_node/src/main.rs @@ -52,14 +52,14 @@ fn main() { .arg( Arg::with_name("listen-address") .long("listen-address") - .value_name("Address") + .value_name("ADDRESS") .help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).") .takes_value(true), ) .arg( Arg::with_name("port") .long("port") - .value_name("Lighthouse Port") + .value_name("PORT") .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") .takes_value(true), ) @@ -80,17 +80,24 @@ fn main() { .arg( Arg::with_name("discovery-port") .long("disc-port") - .value_name("DiscoveryPort") + .value_name("PORT") .help("The discovery UDP port.") .takes_value(true), ) .arg( Arg::with_name("discovery-address") .long("discovery-address") - .value_name("Address") + .value_name("ADDRESS") .help("The IP address to broadcast to other peers on how to reach this node.") .takes_value(true), ) + .arg( + Arg::with_name("topics") + .long("topics") + .value_name("STRING") + .help("One or more comma-delimited gossipsub topic strings to subscribe to.") + .takes_value(true), + ) .arg( Arg::with_name("libp2p-addresses") .long("libp2p-addresses")