From e47739047d0b97d8838e2128b702df2861ff4d76 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 19 Nov 2020 22:32:09 +0000 Subject: [PATCH] Add additional libp2p tests (#1867) ## Issue Addressed N/A ## Proposed Changes Adds tests for the eth2_libp2p crate. --- beacon_node/eth2_libp2p/src/discovery/mod.rs | 407 +++++++++++++----- beacon_node/eth2_libp2p/src/rpc/codec/base.rs | 74 +++- beacon_node/eth2_libp2p/src/rpc/protocol.rs | 4 +- beacon_node/eth2_libp2p/src/types/pubsub.rs | 17 + beacon_node/eth2_libp2p/src/types/subnet.rs | 21 +- beacon_node/eth2_libp2p/src/types/topics.rs | 88 ++++ .../network/src/attestation_service/mod.rs | 28 +- 7 files changed, 504 insertions(+), 135 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 300901336..5cbfe28b0 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -556,13 +556,15 @@ impl Discovery { /// Consume the discovery queue and initiate queries when applicable. /// /// This also sanitizes the queue removing out-dated queries. - fn process_queue(&mut self) { + /// Returns `true` if any of the queued queries is processed and a discovery + /// query (Subnet or FindPeers) is started. + fn process_queue(&mut self) -> bool { // Sanitize the queue, removing any out-dated subnet queries self.queued_queries.retain(|query| !query.expired()); // use this to group subnet queries together for a single discovery request let mut subnet_queries: Vec = Vec::new(); - + let mut processed = false; // Check that we are within our query concurrency limit while !self.at_capacity() && !self.queued_queries.is_empty() { // consume and process the query queue @@ -579,6 +581,7 @@ impl Discovery { FIND_NODE_QUERY_CLOSEST_PEERS, |_| true, ); + processed = true; } else { self.queued_queries.push_back(QueryType::FindPeers); } @@ -604,6 +607,7 @@ impl Discovery { "subnets" => format!("{:?}", grouped_queries.iter().map(|q| q.subnet_id).collect::>()), ); self.start_subnet_query(grouped_queries); + processed = true; } } None => {} // Queue is empty @@ -611,6 +615,7 @@ impl Discovery { } // Update the queue metric metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + processed } // Returns a boolean indicating if we are currently processing the maximum number of @@ -724,111 +729,122 @@ impl Discovery { self.active_queries.push(Box::pin(query_future)); } + /// Process the completed QueryResult returned from discv5. + fn process_completed_queries( + &mut self, + query_result: QueryResult, + ) -> Option>> { + match query_result.0 { + GroupedQueryType::FindPeers => { + self.find_peer_active = false; + match query_result.1 { + Ok(r) if r.is_empty() => { + debug!(self.log, "Discovery query yielded no results."); + } + Ok(r) => { + debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + let mut results: HashMap> = HashMap::new(); + r.iter().for_each(|enr| { + // cache the found ENR's + self.cached_enrs.put(enr.peer_id(), enr.clone()); + results.insert(enr.peer_id(), None); + }); + return Some(results); + } + Err(e) => { + warn!(self.log, "Discovery query failed"; "error" => e.to_string()); + } + } + } + GroupedQueryType::Subnet(queries) => { + let subnets_searched_for: Vec = + queries.iter().map(|query| query.subnet_id).collect(); + match query_result.1 { + Ok(r) if r.is_empty() => { + debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => format!("{:?}",subnets_searched_for)); + } + Ok(r) => { + debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for)); + + let mut mapped_results: HashMap> = HashMap::new(); + + // cache the found ENR's + for enr in r.iter().cloned() { + self.cached_enrs.put(enr.peer_id(), enr); + } + + // Map each subnet query's min_ttl to the set of ENR's returned for that subnet. + queries.iter().for_each(|query| { + // A subnet query has completed. Add back to the queue, incrementing retries. + self.add_subnet_query( + query.subnet_id, + query.min_ttl, + query.retries + 1, + ); + + // Check the specific subnet against the enr + let subnet_predicate = + subnet_predicate::(vec![query.subnet_id], &self.log); + + r.iter() + .filter(|enr| subnet_predicate(enr)) + .map(|enr| enr.peer_id()) + .for_each(|peer_id| { + let other_min_ttl = mapped_results.get_mut(&peer_id); + + // map peer IDs to the min_ttl furthest in the future + match (query.min_ttl, other_min_ttl) { + // update the mapping if the min_ttl is greater + ( + Some(min_ttl_instant), + Some(Some(other_min_ttl_instant)), + ) => { + if min_ttl_instant + .saturating_duration_since(*other_min_ttl_instant) + > DURATION_DIFFERENCE + { + *other_min_ttl_instant = min_ttl_instant; + } + } + // update the mapping if we have a specified min_ttl + (Some(min_ttl), Some(None)) => { + mapped_results.insert(peer_id, Some(min_ttl)); + } + // first seen min_ttl for this enr + (Some(min_ttl), None) => { + mapped_results.insert(peer_id, Some(min_ttl)); + } + // first seen min_ttl for this enr + (None, None) => { + mapped_results.insert(peer_id, None); + } + (None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl + (None, Some(None)) => {} // No-op because this is a duplicate + } + }); + }); + + if mapped_results.is_empty() { + return None; + } else { + return Some(mapped_results); + } + } + Err(e) => { + warn!(self.log,"Grouped subnet discovery query failed"; "subnets_searched_for" => format!("{:?}",subnets_searched_for), "error" => e.to_string()); + } + } + } + } + None + } + /// Drives the queries returning any results from completed queries. fn poll_queries(&mut self, cx: &mut Context) -> Option>> { - while let Poll::Ready(Some(query_future)) = self.active_queries.poll_next_unpin(cx) { - match query_future.0 { - GroupedQueryType::FindPeers => { - self.find_peer_active = false; - match query_future.1 { - Ok(r) if r.is_empty() => { - debug!(self.log, "Discovery query yielded no results."); - } - Ok(r) => { - debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); - let mut results: HashMap> = HashMap::new(); - r.iter().for_each(|enr| { - // cache the found ENR's - self.cached_enrs.put(enr.peer_id(), enr.clone()); - results.insert(enr.peer_id(), None); - }); - return Some(results); - } - Err(e) => { - warn!(self.log, "Discovery query failed"; "error" => e.to_string()); - } - } - } - GroupedQueryType::Subnet(queries) => { - let subnets_searched_for: Vec = - queries.iter().map(|query| query.subnet_id).collect(); - match query_future.1 { - Ok(r) if r.is_empty() => { - debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => format!("{:?}",subnets_searched_for)); - } - Ok(r) => { - debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for)); - - let mut mapped_results: HashMap> = - HashMap::new(); - - // cache the found ENR's - for enr in r.iter().cloned() { - self.cached_enrs.put(enr.peer_id(), enr); - } - - // Map each subnet query's min_ttl to the set of ENR's returned for that subnet. - queries.iter().for_each(|query| { - // A subnet query has completed. Add back to the queue, incrementing retries. - self.add_subnet_query( - query.subnet_id, - query.min_ttl, - query.retries + 1, - ); - - // Check the specific subnet against the enr - let subnet_predicate = - subnet_predicate::(vec![query.subnet_id], &self.log); - - r.iter() - .filter(|enr| subnet_predicate(enr)) - .map(|enr| enr.peer_id()) - .for_each(|peer_id| { - let other_min_ttl = mapped_results.get_mut(&peer_id); - - // map peer IDs to the min_ttl furthest in the future - match (query.min_ttl, other_min_ttl) { - // update the mapping if the min_ttl is greater - ( - Some(min_ttl_instant), - Some(Some(other_min_ttl_instant)), - ) => { - if min_ttl_instant.saturating_duration_since( - *other_min_ttl_instant, - ) > DURATION_DIFFERENCE - { - *other_min_ttl_instant = min_ttl_instant; - } - } - // update the mapping if we have a specified min_ttl - (Some(min_ttl), Some(None)) => { - mapped_results.insert(peer_id, Some(min_ttl)); - } - // first seen min_ttl for this enr - (Some(min_ttl), None) => { - mapped_results.insert(peer_id, Some(min_ttl)); - } - // first seen min_ttl for this enr - (None, None) => { - mapped_results.insert(peer_id, None); - } - (None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl - (None, Some(None)) => {} // No-op because this is a duplicate - } - }); - }); - - if mapped_results.is_empty() { - return None; - } else { - return Some(mapped_results); - } - } - Err(e) => { - warn!(self.log,"Grouped subnet discovery query failed"; "subnets_searched_for" => format!("{:?}",subnets_searched_for), "error" => e.to_string()); - } - } - } + while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { + let result = self.process_completed_queries(query_result); + if result.is_some() { + return result; } } None @@ -904,3 +920,184 @@ impl Discovery { Poll::Pending } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::rpc::methods::MetaData; + use enr::EnrBuilder; + use slog::{o, Drain}; + use std::net::UdpSocket; + use types::MinimalEthSpec; + + type E = MinimalEthSpec; + + pub fn unused_port() -> u16 { + let socket = UdpSocket::bind("127.0.0.1:0").expect("should create udp socket"); + let local_addr = socket.local_addr().expect("should read udp socket"); + local_addr.port() + } + + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + async fn build_discovery() -> Discovery { + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let mut config = NetworkConfig::default(); + config.discovery_port = unused_port(); + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + let enr: Enr = build_enr::(&enr_key, &config, EnrForkId::default()).unwrap(); + let log = build_log(slog::Level::Debug, false); + let globals = NetworkGlobals::new( + enr, + 9000, + 9000, + MetaData { + seq_number: 0, + attnets: Default::default(), + }, + vec![], + &log, + ); + Discovery::new(&keypair, &config, Arc::new(globals), &log) + .await + .unwrap() + } + + #[tokio::test] + async fn test_add_subnet_query() { + let mut discovery = build_discovery().await; + let now = Instant::now(); + let mut subnet_query = SubnetQuery { + subnet_id: SubnetId::new(1), + min_ttl: Some(now), + retries: 0, + }; + discovery.add_subnet_query( + subnet_query.subnet_id, + subnet_query.min_ttl, + subnet_query.retries, + ); + assert_eq!( + discovery.queued_queries.back(), + Some(&QueryType::Subnet(subnet_query.clone())) + ); + + // New query should replace old query + subnet_query.min_ttl = Some(now + Duration::from_secs(1)); + discovery.add_subnet_query(subnet_query.subnet_id, subnet_query.min_ttl, 1); + + subnet_query.retries += 1; + + assert_eq!(discovery.queued_queries.len(), 1); + assert_eq!( + discovery.queued_queries.pop_back(), + Some(QueryType::Subnet(subnet_query.clone())) + ); + + // Retries > MAX_DISCOVERY_RETRY must return immediately without adding + // anything. + discovery.add_subnet_query( + subnet_query.subnet_id, + subnet_query.min_ttl, + MAX_DISCOVERY_RETRY + 1, + ); + + assert_eq!(discovery.queued_queries.len(), 0); + } + + #[tokio::test] + async fn test_process_queue() { + let mut discovery = build_discovery().await; + + // FindPeers query is processed if there is no subnet query + discovery.queued_queries.push_back(QueryType::FindPeers); + assert!(discovery.process_queue()); + + let now = Instant::now(); + let subnet_query = SubnetQuery { + subnet_id: SubnetId::new(1), + min_ttl: Some(now + Duration::from_secs(10)), + retries: 0, + }; + + // Refresh active queries + discovery.active_queries = Default::default(); + + // SubnetQuery is processed if it's the only queued query + discovery + .queued_queries + .push_back(QueryType::Subnet(subnet_query.clone())); + assert!(discovery.process_queue()); + + // SubnetQuery is processed if it's there is also 1 queued discovery query + discovery.queued_queries.push_back(QueryType::FindPeers); + discovery + .queued_queries + .push_back(QueryType::Subnet(subnet_query.clone())); + // Process Subnet query and FindPeers afterwards. + assert!(discovery.process_queue()); + } + + fn make_enr(subnet_ids: Vec) -> Enr { + let mut builder = EnrBuilder::new("v4"); + let keypair = libp2p::identity::Keypair::generate_secp256k1(); + let enr_key: CombinedKey = CombinedKey::from_libp2p(&keypair).unwrap(); + + // set the "attnets" field on our ENR + let mut bitfield = BitVector::::new(); + for id in subnet_ids { + bitfield.set(id, true).unwrap(); + } + + builder.add_value(BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); + builder.build(&enr_key).unwrap() + } + + #[tokio::test] + async fn test_completed_subnet_queries() { + let mut discovery = build_discovery().await; + let now = Instant::now(); + let instant1 = Some(now + Duration::from_secs(10)); + let instant2 = Some(now + Duration::from_secs(5)); + + let query = GroupedQueryType::Subnet(vec![ + SubnetQuery { + subnet_id: SubnetId::new(1), + min_ttl: instant1, + retries: 0, + }, + SubnetQuery { + subnet_id: SubnetId::new(2), + min_ttl: instant2, + retries: 0, + }, + ]); + + // Create enr which is subscribed to subnets 1 and 2 + let enr1 = make_enr(vec![1, 2]); + let enr2 = make_enr(vec![2]); + // Unwanted enr for the given grouped query + let enr3 = make_enr(vec![3]); + + let enrs: Vec = vec![enr1.clone(), enr2.clone(), enr3.clone()]; + let results = discovery + .process_completed_queries(QueryResult(query, Ok(enrs))) + .unwrap(); + + // enr1 and enr2 are required peers based on the requested subnet ids + assert_eq!(results.len(), 2); + + // when a peer belongs to multiple subnet ids, we use the highest ttl. + assert_eq!(results.get(&enr1.peer_id()).unwrap(), &instant1); + } +} diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs index 1613b59d5..61c4b0926 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs @@ -189,7 +189,7 @@ mod tests { #[test] fn test_decode_status_message() { - let message = hex::decode("ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); + let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); let mut buf = BytesMut::new(); buf.extend_from_slice(&message); @@ -199,19 +199,79 @@ mod tests { let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); + // remove response code + let mut snappy_buf = buf.clone(); + let _ = snappy_buf.split_to(1); + // decode message just as snappy message - let snappy_decoded_message = snappy_outbound_codec.decode(&mut buf.clone()); - // decode message just a ssz message + let snappy_decoded_message = snappy_outbound_codec.decode(&mut snappy_buf).unwrap(); // build codecs for entire chunk let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec); // decode message as ssz snappy chunk - let snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf.clone()); - // decode message just a ssz chunk + let snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf).unwrap(); - let _ = dbg!(snappy_decoded_message); - let _ = dbg!(snappy_decoded_chunk); + dbg!(snappy_decoded_message); + dbg!(snappy_decoded_chunk); + } + + #[test] + fn test_invalid_length_prefix() { + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Smallest > 10 byte varint + let len: u128 = 2u128.pow(70); + + // Insert length-prefix + uvi_codec.encode(len, &mut dst).unwrap(); + + let snappy_protocol_id = + ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); + let mut snappy_outbound_codec = + SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576); + + let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); + + assert_eq!( + snappy_decoded_message, + RPCError::IoError("input bytes exceed maximum".to_string()), + "length-prefix of > 10 bytes is invalid" + ); + } + + #[test] + fn test_length_limits() { + fn encode_len(len: usize) -> BytesMut { + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + uvi_codec.encode(len, &mut dst).unwrap(); + dst + } + + let protocol_id = + ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy); + + // Response limits + let limit = protocol_id.rpc_response_limits::(); + let mut max = encode_len(limit.max + 1); + let mut codec = SSZSnappyOutboundCodec::::new(protocol_id.clone(), 1_048_576); + assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + + let mut min = encode_len(limit.min - 1); + let mut codec = SSZSnappyOutboundCodec::::new(protocol_id.clone(), 1_048_576); + assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + + // Request limits + let limit = protocol_id.rpc_request_limits(); + let mut max = encode_len(limit.max + 1); + let mut codec = SSZSnappyOutboundCodec::::new(protocol_id.clone(), 1_048_576); + assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + + let mut min = encode_len(limit.min - 1); + let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, 1_048_576); + assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); } #[test] diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 9adfa241a..ba6e5c187 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -163,8 +163,8 @@ impl UpgradeInfo for RPCProtocol { /// Represents the ssz length bounds for RPC messages. #[derive(Debug, PartialEq)] pub struct RpcLimits { - min: usize, - max: usize, + pub min: usize, + pub max: usize, } impl RpcLimits { diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index 6d9502fa4..f62898e9c 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -200,3 +200,20 @@ impl std::fmt::Display for PubsubMessage { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gossip_max_size() { + // Cannot decode more than gossip max size + let mut encoder = Encoder::new(); + let payload = encoder.compress_vec(&[0; GOSSIP_MAX_SIZE + 1]).unwrap(); + let message_data: MessageData = payload.into(); + assert_eq!( + message_data.decompressed.unwrap_err(), + "ssz_snappy decoded data > GOSSIP_MAX_SIZE".to_string() + ); + } +} diff --git a/beacon_node/eth2_libp2p/src/types/subnet.rs b/beacon_node/eth2_libp2p/src/types/subnet.rs index 0136e6301..847a63b60 100644 --- a/beacon_node/eth2_libp2p/src/types/subnet.rs +++ b/beacon_node/eth2_libp2p/src/types/subnet.rs @@ -1,28 +1,9 @@ -use std::time::{Duration, Instant}; +use std::time::Instant; use types::SubnetId; -const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); - /// A subnet to discover peers on along with the instant after which it's no longer useful. #[derive(Debug, Clone)] pub struct SubnetDiscovery { pub subnet_id: SubnetId, pub min_ttl: Option, } - -impl PartialEq for SubnetDiscovery { - fn eq(&self, other: &SubnetDiscovery) -> bool { - self.subnet_id == other.subnet_id - && match (self.min_ttl, other.min_ttl) { - (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { - min_ttl_instant.saturating_duration_since(other_min_ttl_instant) - < DURATION_DIFFERENCE - && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) - < DURATION_DIFFERENCE - } - (None, None) => true, - (None, Some(_)) => true, - (Some(_), None) => true, - } - } -} diff --git a/beacon_node/eth2_libp2p/src/types/topics.rs b/beacon_node/eth2_libp2p/src/types/topics.rs index 7409607d6..dd1a4bee4 100644 --- a/beacon_node/eth2_libp2p/src/types/topics.rs +++ b/beacon_node/eth2_libp2p/src/types/topics.rs @@ -202,3 +202,91 @@ fn committee_topic_index(topic: &str) -> Option { } None } + +#[cfg(test)] +mod tests { + use super::GossipKind::*; + use super::*; + + const GOOD_FORK_DIGEST: &str = "e1925f3b"; + const BAD_PREFIX: &str = "tezos"; + const BAD_FORK_DIGEST: &str = "e1925f3b4b"; + const BAD_ENCODING: &str = "rlp"; + const BAD_KIND: &str = "blocks"; + + fn topics() -> Vec { + let mut topics = Vec::new(); + let fork_digest: [u8; 4] = [1, 2, 3, 4]; + for encoding in [GossipEncoding::SSZSnappy].iter() { + for kind in [ + BeaconBlock, + BeaconAggregateAndProof, + Attestation(SubnetId::new(42)), + VoluntaryExit, + ProposerSlashing, + AttesterSlashing, + ] + .iter() + { + topics.push(GossipTopic::new(kind.clone(), encoding.clone(), fork_digest).into()); + } + } + topics + } + + fn create_topic(prefix: &str, fork_digest: &str, kind: &str, encoding: &str) -> String { + format!("/{}/{}/{}/{}", prefix, fork_digest, kind, encoding) + } + + #[test] + fn test_decode() { + for topic in topics().iter() { + assert!(GossipTopic::decode(topic.as_str()).is_ok()); + } + } + + #[test] + fn test_decode_malicious() { + let bad_prefix_str = create_topic( + BAD_PREFIX, + GOOD_FORK_DIGEST, + BEACON_BLOCK_TOPIC, + SSZ_SNAPPY_ENCODING_POSTFIX, + ); + assert!(GossipTopic::decode(bad_prefix_str.as_str()).is_err()); + + let bad_digest_str = create_topic( + TOPIC_PREFIX, + BAD_FORK_DIGEST, + BEACON_BLOCK_TOPIC, + SSZ_SNAPPY_ENCODING_POSTFIX, + ); + assert!(GossipTopic::decode(bad_digest_str.as_str()).is_err()); + + let bad_kind_str = create_topic( + TOPIC_PREFIX, + GOOD_FORK_DIGEST, + BAD_KIND, + SSZ_SNAPPY_ENCODING_POSTFIX, + ); + assert!(GossipTopic::decode(bad_kind_str.as_str()).is_err()); + + let bad_encoding_str = create_topic( + TOPIC_PREFIX, + GOOD_FORK_DIGEST, + BEACON_BLOCK_TOPIC, + BAD_ENCODING, + ); + assert!(GossipTopic::decode(bad_encoding_str.as_str()).is_err()); + + // Extra parts + assert!( + GossipTopic::decode("/eth2/e1925f3b/beacon_block/ssz_snappy/yolo").is_err(), + "should have exactly 5 parts" + ); + // Empty string + assert!(GossipTopic::decode("").is_err()); + // Empty parts + assert!(GossipTopic::decode("////").is_err()); + } +} diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 645732fa0..a172e6fb5 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -38,7 +38,7 @@ const ADVANCE_SUBSCRIBE_TIME: u32 = 3; /// 36s at 12s slot time const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, Clone)] pub enum AttServiceMessage { /// Subscribe to the specified subnet id. Subscribe(SubnetId), @@ -52,6 +52,32 @@ pub enum AttServiceMessage { DiscoverPeers(Vec), } +/// Note: This `PartialEq` impl is for use only in tests. +/// The `DiscoverPeers` comparison is good enough for testing only. +#[cfg(test)] +impl PartialEq for AttServiceMessage { + fn eq(&self, other: &AttServiceMessage) -> bool { + match (self, other) { + (AttServiceMessage::Subscribe(a), AttServiceMessage::Subscribe(b)) => a == b, + (AttServiceMessage::Unsubscribe(a), AttServiceMessage::Unsubscribe(b)) => a == b, + (AttServiceMessage::EnrAdd(a), AttServiceMessage::EnrAdd(b)) => a == b, + (AttServiceMessage::EnrRemove(a), AttServiceMessage::EnrRemove(b)) => a == b, + (AttServiceMessage::DiscoverPeers(a), AttServiceMessage::DiscoverPeers(b)) => { + if a.len() != b.len() { + return false; + } + for i in 0..a.len() { + if a[i].subnet_id != b[i].subnet_id || a[i].min_ttl != b[i].min_ttl { + return false; + } + } + true + } + _ => false, + } + } +} + /// A particular subnet at a given slot. #[derive(PartialEq, Eq, Hash, Clone, Debug)] pub struct ExactSubnet {