diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 3866e4d47..a74889426 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -53,8 +53,11 @@ pub const TARGET_SUBNET_PEERS: usize = config::MESH_N_LOW; const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6; /// Number of times to attempt a discovery request. const MAX_DISCOVERY_RETRY: usize = 3; -/// The maximum number of concurrent discovery queries. -const MAX_CONCURRENT_QUERIES: usize = 2; +/// The maximum number of concurrent subnet discovery queries. +/// Note: we always allow a single FindPeers query, so we would be +/// running a maximum of `MAX_CONCURRENT_SUBNET_QUERIES + 1` +/// discovery queries at a time. +const MAX_CONCURRENT_SUBNET_QUERIES: usize = 2; /// The max number of subnets to search for in a single subnet discovery query. const MAX_SUBNETS_IN_QUERY: usize = 3; /// The number of closest peers to search for when doing a regular peer search. @@ -81,6 +84,19 @@ struct SubnetQuery { retries: usize, } +impl SubnetQuery { + /// Returns true if this query has expired. + pub fn expired(&self) -> bool { + if let Some(ttl) = self.min_ttl { + ttl < Instant::now() + } + // `None` corresponds to long lived subnet discovery requests. + else { + false + } + } +} + impl std::fmt::Debug for SubnetQuery { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let min_ttl_secs = self @@ -97,37 +113,16 @@ impl std::fmt::Debug for SubnetQuery { #[derive(Debug, Clone, PartialEq)] enum QueryType { /// We are searching for subnet peers. - Subnet(SubnetQuery), - /// We are searching for more peers without ENR or time constraints. - FindPeers, -} - -#[derive(Debug, Clone, PartialEq)] -enum GroupedQueryType { - /// We are searching for peers on one of a few subnets. Subnet(Vec), /// We are searching for more peers without ENR or time constraints. FindPeers, } -impl QueryType { - /// Returns true if this query has expired. - pub fn expired(&self) -> bool { - match self { - Self::FindPeers => false, - Self::Subnet(subnet_query) => { - if let Some(ttl) = subnet_query.min_ttl { - ttl < Instant::now() - } else { - true - } - } - } - } -} - /// The result of a query. -struct QueryResult(GroupedQueryType, Result, discv5::QueryError>); +struct QueryResult { + query_type: QueryType, + result: Result, discv5::QueryError>, +} // Awaiting the event stream future enum EventStream { @@ -169,8 +164,8 @@ pub struct Discovery { /// a time, regardless of the query concurrency. find_peer_active: bool, - /// A queue of discovery queries to be processed. - queued_queries: VecDeque, + /// A queue of subnet queries to be processed. + queued_queries: VecDeque, /// Active discovery queries. active_queries: FuturesUnordered + Send>>>, @@ -328,15 +323,12 @@ impl Discovery { if !self.started || self.find_peer_active { return; } - - // If there is not already a find peer's query queued, add one - let query = QueryType::FindPeers; - if !self.queued_queries.contains(&query) { - debug!(self.log, "Queuing a peer discovery request"); - self.queued_queries.push_back(query); - // update the metrics - metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); - } + // Immediately start a FindNode query + debug!(self.log, "Starting a peer discovery request"); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS, |_| { + true + }); } /// Processes a request to search for more peers on a subnet. @@ -347,7 +339,7 @@ impl Discovery { } trace!( self.log, - "Making discovery query for subnets"; + "Starting discovery query for subnets"; "subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::>() ); for subnet in subnets_to_discover { @@ -612,30 +604,26 @@ impl Discovery { // Search through any queued requests and update the timeout if a query for this subnet // already exists let mut found = false; - for query in self.queued_queries.iter_mut() { - if let QueryType::Subnet(ref mut subnet_query) = query { - if subnet_query.subnet == subnet { - if subnet_query.min_ttl < min_ttl { - subnet_query.min_ttl = min_ttl; - } - // update the number of retries - subnet_query.retries = retries; - // mimic an `Iter::Find()` and short-circuit the loop - found = true; - break; + for subnet_query in self.queued_queries.iter_mut() { + if subnet_query.subnet == subnet { + if subnet_query.min_ttl < min_ttl { + subnet_query.min_ttl = min_ttl; } + // update the number of retries + subnet_query.retries = retries; + // mimic an `Iter::Find()` and short-circuit the loop + found = true; + break; } } if !found { - // Set up the query and add it to the queue - let query = QueryType::Subnet(SubnetQuery { + // update the metrics and insert into the queue. + trace!(self.log, "Queuing subnet query"; "subnet" => ?subnet, "retries" => retries); + self.queued_queries.push_back(SubnetQuery { subnet, min_ttl, retries, }); - // update the metrics and insert into the queue. - trace!(self.log, "Queuing subnet query"; "subnet" => ?subnet, "retries" => retries); - self.queued_queries.push_back(query); metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); } } @@ -643,8 +631,8 @@ impl Discovery { /// Consume the discovery queue and initiate queries when applicable. /// /// This also sanitizes the queue removing out-dated queries. - /// Returns `true` if any of the queued queries is processed and a discovery - /// query (Subnet or FindPeers) is started. + /// Returns `true` if any of the queued queries is processed and a subnet discovery + /// query is started. fn process_queue(&mut self) -> bool { // Sanitize the queue, removing any out-dated subnet queries self.queued_queries.retain(|query| !query.expired()); @@ -655,44 +643,19 @@ impl Discovery { // Check that we are within our query concurrency limit while !self.at_capacity() && !self.queued_queries.is_empty() { // consume and process the query queue - match self.queued_queries.pop_front() { - Some(QueryType::FindPeers) => { - // Only start a find peers query if it is the last message in the queue. - // We want to prioritize subnet queries, so we don't miss attestations. - if self.queued_queries.is_empty() { - // This is a regular request to find additional peers - debug!(self.log, "Discovery query started"); - self.find_peer_active = true; - self.start_query( - GroupedQueryType::FindPeers, - FIND_NODE_QUERY_CLOSEST_PEERS, - |_| true, - ); - processed = true; - } else { - self.queued_queries.push_back(QueryType::FindPeers); - } - } - Some(QueryType::Subnet(subnet_query)) => { - subnet_queries.push(subnet_query); + if let Some(subnet_query) = self.queued_queries.pop_front() { + subnet_queries.push(subnet_query); - // We want to start a grouped subnet query if: - // 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together. - // 2. There are no more messages in the queue. - // 3. There is exactly one message in the queue and it is FindPeers. - if subnet_queries.len() == MAX_SUBNETS_IN_QUERY - || self.queued_queries.is_empty() - || (self.queued_queries.front() == Some(&QueryType::FindPeers) - && self.queued_queries.len() == 1) - { - // This query is for searching for peers of a particular subnet - // Drain subnet_queries so we can re-use it as we continue to process the queue - let grouped_queries: Vec = subnet_queries.drain(..).collect(); - self.start_subnet_query(grouped_queries); - processed = true; - } + // We want to start a grouped subnet query if: + // 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together. + // 2. There are no more messages in the queue. + if subnet_queries.len() == MAX_SUBNETS_IN_QUERY || self.queued_queries.is_empty() { + // This query is for searching for peers of a particular subnet + // Drain subnet_queries so we can re-use it as we continue to process the queue + let grouped_queries: Vec = subnet_queries.drain(..).collect(); + self.start_subnet_query(grouped_queries); + processed = true; } - None => {} // Queue is empty } } // Update the queue metric @@ -701,9 +664,12 @@ impl Discovery { } // Returns a boolean indicating if we are currently processing the maximum number of - // concurrent queries or not. + // concurrent subnet queries or not. fn at_capacity(&self) -> bool { - self.active_queries.len() >= MAX_CONCURRENT_QUERIES + self.active_queries + .len() + .saturating_sub(self.find_peer_active as usize) // We only count active subnet queries + >= MAX_CONCURRENT_SUBNET_QUERIES } /// Runs a discovery request for a given group of subnets. @@ -754,7 +720,7 @@ impl Discovery { "subnets" => ?filtered_subnet_queries, ); self.start_query( - GroupedQueryType::Subnet(filtered_subnet_queries), + QueryType::Subnet(filtered_subnet_queries), TARGET_PEERS_FOR_GROUPED_QUERY, subnet_predicate, ); @@ -768,14 +734,14 @@ impl Discovery { /// ENR. fn start_query( &mut self, - grouped_query: GroupedQueryType, + query: QueryType, target_peers: usize, additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, ) { // Make sure there are subnet queries included - let contains_queries = match &grouped_query { - GroupedQueryType::Subnet(queries) => !queries.is_empty(), - GroupedQueryType::FindPeers => true, + let contains_queries = match &query { + QueryType::Subnet(queries) => !queries.is_empty(), + QueryType::FindPeers => true, }; if !contains_queries { @@ -813,7 +779,10 @@ impl Discovery { let query_future = self .discv5 .find_node_predicate(random_node, predicate, target_peers) - .map(|v| QueryResult(grouped_query, v)); + .map(|v| QueryResult { + query_type: query, + result: v, + }); // Add the future to active queries, to be executed. self.active_queries.push(Box::pin(query_future)); @@ -822,12 +791,12 @@ impl Discovery { /// Process the completed QueryResult returned from discv5. fn process_completed_queries( &mut self, - query_result: QueryResult, + query: QueryResult, ) -> Option>> { - match query_result.0 { - GroupedQueryType::FindPeers => { + match query.query_type { + QueryType::FindPeers => { self.find_peer_active = false; - match query_result.1 { + match query.result { Ok(r) if r.is_empty() => { debug!(self.log, "Discovery query yielded no results."); } @@ -846,10 +815,10 @@ impl Discovery { } } } - GroupedQueryType::Subnet(queries) => { + QueryType::Subnet(queries) => { let subnets_searched_for: Vec = queries.iter().map(|query| query.subnet).collect(); - match query_result.1 { + match query.result { Ok(r) if r.is_empty() => { debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for); queries.iter().for_each(|query| { @@ -1144,10 +1113,7 @@ mod tests { subnet_query.min_ttl, subnet_query.retries, ); - assert_eq!( - discovery.queued_queries.back(), - Some(&QueryType::Subnet(subnet_query.clone())) - ); + assert_eq!(discovery.queued_queries.back(), Some(&subnet_query)); // New query should replace old query subnet_query.min_ttl = Some(now + Duration::from_secs(1)); @@ -1158,7 +1124,7 @@ mod tests { assert_eq!(discovery.queued_queries.len(), 1); assert_eq!( discovery.queued_queries.pop_back(), - Some(QueryType::Subnet(subnet_query.clone())) + Some(subnet_query.clone()) ); // Retries > MAX_DISCOVERY_RETRY must return immediately without adding @@ -1172,39 +1138,6 @@ mod tests { assert_eq!(discovery.queued_queries.len(), 0); } - #[tokio::test] - async fn test_process_queue() { - let mut discovery = build_discovery().await; - - // FindPeers query is processed if there is no subnet query - discovery.queued_queries.push_back(QueryType::FindPeers); - assert!(discovery.process_queue()); - - let now = Instant::now(); - let subnet_query = SubnetQuery { - subnet: Subnet::Attestation(SubnetId::new(1)), - min_ttl: Some(now + Duration::from_secs(10)), - retries: 0, - }; - - // Refresh active queries - discovery.active_queries = Default::default(); - - // SubnetQuery is processed if it's the only queued query - discovery - .queued_queries - .push_back(QueryType::Subnet(subnet_query.clone())); - assert!(discovery.process_queue()); - - // SubnetQuery is processed if it's there is also 1 queued discovery query - discovery.queued_queries.push_back(QueryType::FindPeers); - discovery - .queued_queries - .push_back(QueryType::Subnet(subnet_query)); - // Process Subnet query and FindPeers afterwards. - assert!(discovery.process_queue()); - } - fn make_enr(subnet_ids: Vec) -> Enr { let mut builder = EnrBuilder::new("v4"); let keypair = libp2p::identity::Keypair::generate_secp256k1(); @@ -1227,7 +1160,7 @@ mod tests { let instant1 = Some(now + Duration::from_secs(10)); let instant2 = Some(now + Duration::from_secs(5)); - let query = GroupedQueryType::Subnet(vec![ + let query = QueryType::Subnet(vec![ SubnetQuery { subnet: Subnet::Attestation(SubnetId::new(1)), min_ttl: instant1, @@ -1248,7 +1181,10 @@ mod tests { let enrs: Vec = vec![enr1.clone(), enr2, enr3]; let results = discovery - .process_completed_queries(QueryResult(query, Ok(enrs))) + .process_completed_queries(QueryResult { + query_type: query, + result: Ok(enrs), + }) .unwrap(); // enr1 and enr2 are required peers based on the requested subnet ids