From 5eb4c7d682e645d781eea376f1004d68bbeb8be8 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 1 Apr 2020 16:25:52 +1100 Subject: [PATCH] Updates all discovery searches to predicate searches (#975) * Update global gossip topic handling * Adds ENR predicate searches to lighthouse * Correct log --- Cargo.lock | 54 ++++++------ beacon_node/eth2-libp2p/Cargo.toml | 3 +- beacon_node/eth2-libp2p/src/behaviour.rs | 31 +++---- beacon_node/eth2-libp2p/src/discovery/mod.rs | 91 +++++++++++++++++--- beacon_node/eth2-libp2p/src/service.rs | 2 +- beacon_node/eth2-libp2p/src/types/globals.rs | 6 +- beacon_node/eth2-libp2p/src/types/topics.rs | 6 +- 7 files changed, 126 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 013752d30..b3983b23b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "enr" version = "0.1.0-alpha.3" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "base64 0.12.0", "bs58 0.3.0", @@ -2032,7 +2032,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2071,7 +2071,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "asn1_der", "bs58 0.3.0", @@ -2106,7 +2106,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "quote 1.0.3", "syn 1.0.17", @@ -2115,7 +2115,7 @@ dependencies = [ [[package]] name = "libp2p-deflate" version = "0.5.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "flate2", "futures", @@ -2126,7 +2126,7 @@ dependencies = [ [[package]] name = "libp2p-discv5" version = "0.1.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "arrayvec 0.4.12", "bigint", @@ -2157,7 +2157,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "futures", "libp2p-core", @@ -2168,7 +2168,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bs58 0.3.0", "bytes", @@ -2186,7 +2186,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.1.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "base64 0.10.1", "bs58 0.2.5", @@ -2211,7 +2211,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2230,7 +2230,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.13.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "arrayvec 0.5.1", "bytes", @@ -2257,7 +2257,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "data-encoding", "dns-parser", @@ -2279,7 +2279,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "fnv", @@ -2295,7 +2295,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.11.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "curve25519-dalek 1.2.3", @@ -2315,7 +2315,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2332,7 +2332,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2347,7 +2347,7 @@ dependencies = [ [[package]] name = "libp2p-secio" version = "0.13.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "aes-ctr", "bytes", @@ -2376,7 +2376,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.3.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "futures", "libp2p-core", @@ -2389,7 +2389,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2405,7 +2405,7 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "futures", "libp2p-core", @@ -2416,7 +2416,7 @@ dependencies = [ [[package]] name = "libp2p-wasm-ext" version = "0.6.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "futures", "js-sys", @@ -2430,7 +2430,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2448,7 +2448,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.13.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "futures", "libp2p-core", @@ -2747,7 +2747,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.6.1" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", @@ -2980,7 +2980,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.6.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "arrayref", "bs58 0.3.0", @@ -2997,7 +2997,7 @@ dependencies = [ [[package]] name = "parity-multihash" version = "0.2.0" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "blake2", "bytes", @@ -3768,7 +3768,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.1.2" -source = "git+https://github.com/SigP/rust-libp2p?rev=44d7a9c9cd7be74109817bcabe74b991d5bd0fee#44d7a9c9cd7be74109817bcabe74b991d5bd0fee" +source = "git+https://github.com/SigP/rust-libp2p?rev=4e3003d5283040fee10da1299252dd060a838d97#4e3003d5283040fee10da1299252dd060a838d97" dependencies = [ "bytes", "futures", diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index f2428eeec..5cbf7dffb 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" hex = "0.3" # rust-libp2p is presently being sourced from a Sigma Prime fork of the # `libp2p/rust-libp2p` repository. -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "44d7a9c9cd7be74109817bcabe74b991d5bd0fee" } +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "4e3003d5283040fee10da1299252dd060a838d97" } types = { path = "../../eth2/types" } eth2_ssz_types = { path = "../../eth2/utils/ssz_types" } serde = "1.0.102" @@ -17,7 +17,6 @@ eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" slog = { version = "2.5.2", features = ["max_level_trace"] } version = { path = "../version" } -# beacon_chain = { path = "../beacon_chain" } tokio = "0.1.22" futures = "0.1.29" error-chain = "0.12.1" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index a5a5b575b..20a33e4e7 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -101,17 +101,12 @@ impl Behaviour bool { - if !self - .network_globals + // update the network globals + self.network_globals .gossipsub_subscriptions - .read() - .contains(&topic) - { - self.network_globals - .gossipsub_subscriptions - .write() - .push(topic.clone()); - } + .write() + .insert(topic.clone()); + // subscribe to the topic self.gossipsub.subscribe(topic.into()) } @@ -123,18 +118,12 @@ impl Behaviour bool { - let pos = self - .network_globals + // update the network globals + self.network_globals .gossipsub_subscriptions - .read() - .iter() - .position(|s| s == &topic); - if let Some(pos) = pos { - self.network_globals - .gossipsub_subscriptions - .write() - .swap_remove(pos); - } + .write() + .remove(&topic); + // unsubscribe from the topic self.gossipsub.unsubscribe(topic.into()) } diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 1ccc95767..466a3b7e0 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -12,7 +12,7 @@ use libp2p::discv5::enr::NodeId; use libp2p::discv5::{Discv5, Discv5Event}; use libp2p::multiaddr::Protocol; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; -use slog::{debug, info, warn}; +use slog::{crit, debug, info, trace, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; use std::collections::HashSet; @@ -263,24 +263,88 @@ impl Discovery { }); if peers_on_subnet < TARGET_SUBNET_PEERS { + let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; debug!(self.log, "Searching for peers for subnet"; "subnet_id" => *subnet_id, "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS + "target_subnet_peers" => TARGET_SUBNET_PEERS, + "target_peers" => target_peers ); - // TODO: Update to predicate search - self.find_peers(); + + let log_clone = self.log.clone(); + + let subnet_predicate = move |enr: &Enr| { + if let Some(bitfield_bytes) = enr.get(BITFIELD_ENR_KEY) { + let bitfield = match BitVector::::from_ssz_bytes( + bitfield_bytes, + ) { + Ok(v) => v, + Err(e) => { + warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e)); + return false; + } + }; + + return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| { + debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id())); + false + }); + } + false + }; + + // start the query + self.start_query(subnet_predicate, target_peers as usize); } + debug!(self.log, "Discovery ignored"; + "reason" => "Already connected to desired peers", + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + ); } /* Internal Functions */ - /// Search for new peers using the underlying discovery mechanism. + /// Run a standard query to search for more peers. + /// + /// This searches for the standard kademlia bucket size (16) peers. fn find_peers(&mut self) { + debug!(self.log, "Searching for peers"); + self.start_query(|_| true, 16); + } + + /// Search for a specified number of new peers using the underlying discovery mechanism. + /// + /// This can optionally search for peers for a given predicate. Regardless of the predicate + /// given, this will only search for peers on the same enr_fork_id as specified in the local + /// ENR. + fn start_query(&mut self, enr_predicate: F, num_nodes: usize) + where + F: Fn(&Enr) -> bool + Send + 'static + Clone, + { // pick a random NodeId let random_node = NodeId::random(); - debug!(self.log, "Searching for peers"); - self.discovery.find_node(random_node); + + let enr_fork_id = self.enr_fork_id().to_vec(); + // predicate for finding nodes with a matching fork + let eth2_fork_predicate = move |enr: &Enr| enr.get(ETH2_ENR_KEY) == Some(&enr_fork_id); + let predicate = move |enr: &Enr| eth2_fork_predicate(enr) && enr_predicate(enr); + + // general predicate + self.discovery + .find_enr_predicate(random_node, predicate, num_nodes); + } + + /// Returns our current `eth2` field as SSZ bytes, associated with the local ENR. We only search for peers + /// that have this field. + fn enr_fork_id(&self) -> Vec { + self.local_enr() + .get(ETH2_ENR_KEY) + .map(|bytes| bytes.clone()) + .unwrap_or_else(|| { + crit!(self.log, "Local ENR has no eth2 field"); + Vec::new() + }) } } @@ -403,9 +467,16 @@ where match self.discovery.poll(params) { Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { match event { - Discv5Event::Discovered(_enr) => { - // not concerned about FINDNODE results, rather the result of an entire - // query. + Discv5Event::Discovered(enr) => { + // peers that get discovered during a query but are not contactable or + // don't match a predicate can end up here. For debugging purposes we + // log these to see if we are unnecessarily dropping discovered peers + if enr.get(ETH2_ENR_KEY) == Some(&self.enr_fork_id().to_vec()) { + trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + } else { + // this is temporary warning for debugging the DHT + warn!(self.log, "Found peer during discovery not on correct fork"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); + } } Discv5Event::SocketUpdated(socket) => { info!(self.log, "Address updated"; "ip" => format!("{}",socket.ip()), "udp_port" => format!("{}", socket.port())); diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index b056eac05..3aefcce95 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -153,7 +153,7 @@ impl Service { network_globals .gossipsub_subscriptions .write() - .push(topic.clone()); + .insert(topic.clone()); } else { warn!(log, "Could not subscribe to topic"; "topic" => format!("{}",topic_string)); } diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2-libp2p/src/types/globals.rs index 6fa6349fe..93574f9d8 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2-libp2p/src/types/globals.rs @@ -1,7 +1,7 @@ //! A collection of variables that are accessible outside of the network thread itself. use crate::{Enr, GossipTopic, Multiaddr, PeerId, PeerInfo}; use parking_lot::RwLock; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU16, Ordering}; use types::EthSpec; @@ -19,7 +19,7 @@ pub struct NetworkGlobals { /// The collection of currently connected peers. pub connected_peer_set: RwLock>>, /// The current gossipsub topic subscriptions. - pub gossipsub_subscriptions: RwLock>, + pub gossipsub_subscriptions: RwLock>, } impl NetworkGlobals { @@ -31,7 +31,7 @@ impl NetworkGlobals { listen_port_tcp: AtomicU16::new(tcp_port), listen_port_udp: AtomicU16::new(udp_port), connected_peer_set: RwLock::new(HashMap::new()), - gossipsub_subscriptions: RwLock::new(Vec::new()), + gossipsub_subscriptions: RwLock::new(HashSet::new()), } } diff --git a/beacon_node/eth2-libp2p/src/types/topics.rs b/beacon_node/eth2-libp2p/src/types/topics.rs index 3745746a8..5ea0b1e76 100644 --- a/beacon_node/eth2-libp2p/src/types/topics.rs +++ b/beacon_node/eth2-libp2p/src/types/topics.rs @@ -19,7 +19,7 @@ pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; /// A gossipsub topic which encapsulates the type of messages that should be sent and received over /// the pubsub protocol and the way the messages should be encoded. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct GossipTopic { /// The encoding of the topic. encoding: GossipEncoding, @@ -28,7 +28,7 @@ pub struct GossipTopic { } /// Enum that brings these topics into the rust type system. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum GossipKind { /// Topic for publishing beacon blocks. BeaconBlock, @@ -45,7 +45,7 @@ pub enum GossipKind { } /// The known encoding types for gossipsub messages. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum GossipEncoding { /// Messages are encoded with SSZ. SSZ,