Update to latest libp2p, gossipsub improvements

This commit is contained in:
Age Manning 2019-08-06 15:09:47 +10:00
parent 04ce9ec95e
commit 0613bc16fc
No known key found for this signature in database
GPG Key ID: 05EED64B79E06A93
11 changed files with 48 additions and 34 deletions

View File

@ -7,8 +7,8 @@ edition = "2018"
[dependencies] [dependencies]
clap = "2.32.0" clap = "2.32.0"
#SigP repository #SigP repository
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b" } libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "35104cca27231b9178e1fea5b3788ea41ba8af76" }
enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "be5710bbde69d8c5be732c13ba64239e2f370a7b", features = ["serde"] } enr = { git = "https://github.com/SigP/rust-libp2p/", rev = "35104cca27231b9178e1fea5b3788ea41ba8af76", features = ["serde"] }
types = { path = "../../eth2/types" } types = { path = "../../eth2/types" }
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"

View File

@ -4,14 +4,12 @@ use crate::{error, NetworkConfig};
use crate::{Topic, TopicHash}; use crate::{Topic, TopicHash};
use futures::prelude::*; use futures::prelude::*;
use libp2p::{ use libp2p::{
core::{ core::identity::Keypair,
identity::Keypair,
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
},
discv5::Discv5Event, discv5::Discv5Event,
gossipsub::{Gossipsub, GossipsubEvent}, gossipsub::{Gossipsub, GossipsubEvent},
identify::{Identify, IdentifyEvent}, identify::{Identify, IdentifyEvent},
ping::{Ping, PingConfig, PingEvent}, ping::{Ping, PingConfig, PingEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess},
tokio_io::{AsyncRead, AsyncWrite}, tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId, NetworkBehaviour, PeerId,
}; };

View File

@ -64,9 +64,9 @@ impl Default for Config {
discovery_port: 9000, discovery_port: 9000,
max_peers: 10, max_peers: 10,
//TODO: Set realistic values for production //TODO: Set realistic values for production
// Note: This defaults topics to plain strings. Not hashes
gs_config: GossipsubConfigBuilder::new() gs_config: GossipsubConfigBuilder::new()
.max_gossip_size(4_000_000) .max_transmit_size(1_000_000)
.inactivity_timeout(Duration::from_secs(90))
.heartbeat_interval(Duration::from_secs(20)) .heartbeat_interval(Duration::from_secs(20))
.build(), .build(),
boot_nodes: vec![], boot_nodes: vec![],
@ -134,6 +134,10 @@ impl Config {
.collect::<Result<Vec<Multiaddr>, _>>()?; .collect::<Result<Vec<Multiaddr>, _>>()?;
} }
if let Some(topics_str) = args.value_of("topics") {
self.topics = topics_str.split(',').map(|s| s.into()).collect();
}
if let Some(discovery_address_str) = args.value_of("discovery-address") { if let Some(discovery_address_str) = args.value_of("discovery-address") {
self.discovery_address = discovery_address_str self.discovery_address = discovery_address_str
.parse() .parse()

View File

@ -4,13 +4,11 @@ use crate::{error, NetworkConfig};
/// Currently using discv5 for peer discovery. /// Currently using discv5 for peer discovery.
/// ///
use futures::prelude::*; use futures::prelude::*;
use libp2p::core::swarm::{ use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId};
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::core::{identity::Keypair, Multiaddr, PeerId, ProtocolsHandler};
use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::discv5::{Discv5, Discv5Event};
use libp2p::enr::{Enr, EnrBuilder, NodeId}; use libp2p::enr::{Enr, EnrBuilder, NodeId};
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use slog::{debug, info, o, warn}; use slog::{debug, info, o, warn};
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;

View File

@ -13,7 +13,7 @@ pub use behaviour::PubsubMessage;
pub use config::{ pub use config::{
Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX, Config as NetworkConfig, BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC, SHARD_TOPIC_PREFIX,
}; };
pub use libp2p::floodsub::{Topic, TopicBuilder, TopicHash}; pub use libp2p::gossipsub::{Topic, TopicHash};
pub use libp2p::multiaddr; pub use libp2p::multiaddr;
pub use libp2p::Multiaddr; pub use libp2p::Multiaddr;
pub use libp2p::{ pub use libp2p::{

View File

@ -5,10 +5,10 @@ use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use core::marker::PhantomData; use core::marker::PhantomData;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::prelude::*; use futures::prelude::*;
use libp2p::core::protocols_handler::{ use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
}; };
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -273,7 +273,8 @@ where
Self::Error, Self::Error,
> { > {
if let Some(err) = self.pending_error.take() { if let Some(err) = self.pending_error.take() {
return Err(err); dbg!(&err);
//return Err(err);
} }
// return any events that need to be reported // return any events that need to be reported

View File

@ -6,9 +6,9 @@
use futures::prelude::*; use futures::prelude::*;
use handler::RPCHandler; use handler::RPCHandler;
use libp2p::core::protocols_handler::ProtocolsHandler; use libp2p::core::ConnectedPoint;
use libp2p::core::swarm::{ use libp2p::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
}; };
use libp2p::{Multiaddr, PeerId}; use libp2p::{Multiaddr, PeerId};
pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId}; pub use methods::{ErrorMessage, HelloMessage, RPCErrorResponse, RPCResponse, RequestId};

View File

@ -3,7 +3,7 @@ use crate::error;
use crate::multiaddr::Protocol; use crate::multiaddr::Protocol;
use crate::rpc::RPCEvent; use crate::rpc::RPCEvent;
use crate::NetworkConfig; use crate::NetworkConfig;
use crate::{TopicBuilder, TopicHash}; use crate::{Topic, TopicHash};
use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC}; use crate::{BEACON_ATTESTATION_TOPIC, BEACON_PUBSUB_TOPIC};
use futures::prelude::*; use futures::prelude::*;
use futures::Stream; use futures::Stream;
@ -91,14 +91,20 @@ impl<E: EthSpec> Service<E> {
let mut topics = vec![]; let mut topics = vec![];
//TODO: Handle multiple shard attestations. For now we simply use a separate topic for //TODO: Handle multiple shard attestations. For now we simply use a separate topic for
// attestations // attestations
topics.push(BEACON_ATTESTATION_TOPIC.to_string()); topics.push(Topic::new(BEACON_ATTESTATION_TOPIC.into()));
topics.push(BEACON_PUBSUB_TOPIC.to_string()); topics.push(Topic::new(BEACON_PUBSUB_TOPIC.into()));
topics.append(&mut config.topics.clone()); topics.append(
&mut config
.topics
.iter()
.cloned()
.map(|s| Topic::new(s))
.collect(),
);
let mut subscribed_topics = vec![]; let mut subscribed_topics = vec![];
for topic in topics { for topic in topics {
let t = TopicBuilder::new(topic.clone()).build(); if swarm.subscribe(topic.clone()) {
if swarm.subscribe(t) {
trace!(log, "Subscribed to topic: {:?}", topic); trace!(log, "Subscribed to topic: {:?}", topic);
subscribed_topics.push(topic); subscribed_topics.push(topic);
} else { } else {

View File

@ -1,6 +1,6 @@
use beacon_chain::{BeaconChain, BeaconChainTypes}; use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::PubsubMessage; use eth2_libp2p::PubsubMessage;
use eth2_libp2p::TopicBuilder; use eth2_libp2p::Topic;
use eth2_libp2p::BEACON_ATTESTATION_TOPIC; use eth2_libp2p::BEACON_ATTESTATION_TOPIC;
use futures::Future; use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
@ -140,7 +140,7 @@ impl<T: BeaconChainTypes> AttestationService for AttestationServiceInstance<T> {
); );
// valid attestation, propagate to the network // valid attestation, propagate to the network
let topic = TopicBuilder::new(BEACON_ATTESTATION_TOPIC).build(); let topic = Topic::new(BEACON_ATTESTATION_TOPIC.into());
let message = PubsubMessage::Attestation(attestation); let message = PubsubMessage::Attestation(attestation);
self.network_chan self.network_chan

View File

@ -1,6 +1,6 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome};
use eth2_libp2p::BEACON_PUBSUB_TOPIC; use eth2_libp2p::BEACON_PUBSUB_TOPIC;
use eth2_libp2p::{PubsubMessage, TopicBuilder}; use eth2_libp2p::{PubsubMessage, Topic};
use futures::Future; use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use network::NetworkMessage; use network::NetworkMessage;
@ -106,7 +106,7 @@ impl<T: BeaconChainTypes> BeaconBlockService for BeaconBlockServiceInstance<T> {
); );
// get the network topic to send on // get the network topic to send on
let topic = TopicBuilder::new(BEACON_PUBSUB_TOPIC).build(); let topic = Topic::new(BEACON_PUBSUB_TOPIC.into());
let message = PubsubMessage::Block(block); let message = PubsubMessage::Block(block);
// Publish the block to the p2p network via gossipsub. // Publish the block to the p2p network via gossipsub.

View File

@ -52,14 +52,14 @@ fn main() {
.arg( .arg(
Arg::with_name("listen-address") Arg::with_name("listen-address")
.long("listen-address") .long("listen-address")
.value_name("Address") .value_name("ADDRESS")
.help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).") .help("The address lighthouse will listen for UDP and TCP connections. (default 127.0.0.1).")
.takes_value(true), .takes_value(true),
) )
.arg( .arg(
Arg::with_name("port") Arg::with_name("port")
.long("port") .long("port")
.value_name("Lighthouse Port") .value_name("PORT")
.help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.") .help("The TCP/UDP port to listen on. The UDP port can be modified by the --discovery-port flag.")
.takes_value(true), .takes_value(true),
) )
@ -80,17 +80,24 @@ fn main() {
.arg( .arg(
Arg::with_name("discovery-port") Arg::with_name("discovery-port")
.long("disc-port") .long("disc-port")
.value_name("DiscoveryPort") .value_name("PORT")
.help("The discovery UDP port.") .help("The discovery UDP port.")
.takes_value(true), .takes_value(true),
) )
.arg( .arg(
Arg::with_name("discovery-address") Arg::with_name("discovery-address")
.long("discovery-address") .long("discovery-address")
.value_name("Address") .value_name("ADDRESS")
.help("The IP address to broadcast to other peers on how to reach this node.") .help("The IP address to broadcast to other peers on how to reach this node.")
.takes_value(true), .takes_value(true),
) )
.arg(
Arg::with_name("topics")
.long("topics")
.value_name("STRING")
.help("One or more comma-delimited gossipsub topic strings to subscribe to.")
.takes_value(true),
)
.arg( .arg(
Arg::with_name("libp2p-addresses") Arg::with_name("libp2p-addresses")
.long("libp2p-addresses") .long("libp2p-addresses")