Discover query grouping (#1364)

## Issue Addressed

#1281

## Proposed Changes

Groups queries for specific subnets into groups of up to 3.

## Additional Info
This commit is contained in:
realbigsean 2020-07-29 02:43:50 +00:00
parent 9ae9df806c
commit 09b40b7a5e
5 changed files with 256 additions and 139 deletions

View File

@ -43,7 +43,7 @@ rand = "0.7.3"
git = "https://github.com/sigp/rust-libp2p"
rev = "147bb43fa56c1b84253606eabedb0794eeed8b94"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "secio", "tcp-tokio"]
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "secio", "tcp-tokio"]
[dev-dependencies]
tokio = { version = "0.2.21", features = ["full"] }

View File

@ -19,13 +19,13 @@ use slog::{crit, debug, info, warn};
use ssz::{Decode, Encode};
use ssz_types::BitVector;
use std::{
collections::VecDeque,
collections::{HashMap, VecDeque},
net::SocketAddr,
path::Path,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
time::{Duration, Instant},
};
use tokio::sync::mpsc;
use types::{EnrForkId, EthSpec, SubnetId};
@ -37,33 +37,50 @@ use subnet_predicate::subnet_predicate;
pub const ENR_FILENAME: &str = "enr.dat";
/// Target number of peers we'd like to have connected to a given long-lived subnet.
const TARGET_SUBNET_PEERS: usize = 3;
/// Number of times to attempt a discovery request
/// Target number of peers to search for given a grouped subnet query.
const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6;
/// Number of times to attempt a discovery request.
const MAX_DISCOVERY_RETRY: usize = 3;
/// The maximum number of concurrent discovery queries.
const MAX_CONCURRENT_QUERIES: usize = 1;
const MAX_CONCURRENT_QUERIES: usize = 2;
/// The max number of subnets to search for in a single subnet discovery query.
const MAX_SUBNETS_IN_QUERY: usize = 3;
/// The number of closest peers to search for when doing a regular peer search.
///
/// We could reduce this constant to speed up queries however at the cost of security. It will
/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16.
const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16;
/// The threshold for updating `min_ttl` on a connected peer.
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
/// The events emitted by polling discovery.
pub enum DiscoveryEvent {
/// A query has completed. The first parameter is the `min_ttl` of the peers if it is specified
/// and the second parameter are the discovered peers.
QueryResult(Option<Instant>, Vec<Enr>),
/// A query has completed. This result contains a mapping of discovered peer IDs to the `min_ttl`
/// of the peer if it is specified.
QueryResult(HashMap<PeerId, Option<Instant>>),
/// This indicates that our local UDP socketaddr has been updated and we should inform libp2p.
SocketUpdated(SocketAddr),
}
#[derive(Debug, Clone, PartialEq)]
struct SubnetQuery {
subnet_id: SubnetId,
min_ttl: Option<Instant>,
retries: usize,
}
#[derive(Debug, Clone, PartialEq)]
enum QueryType {
/// We are searching for subnet peers.
Subnet {
subnet_id: SubnetId,
min_ttl: Option<Instant>,
retries: usize,
},
Subnet(SubnetQuery),
/// We are searching for more peers without ENR or time constraints.
FindPeers,
}
#[derive(Debug, Clone, PartialEq)]
enum GroupedQueryType {
/// We are searching for peers on one of a few subnets.
Subnet(Vec<SubnetQuery>),
/// We are searching for more peers without ENR or time constraints.
FindPeers,
}
@ -73,30 +90,19 @@ impl QueryType {
pub fn expired(&self) -> bool {
match self {
Self::FindPeers => false,
Self::Subnet { min_ttl, .. } => {
if let Some(ttl) = min_ttl {
ttl < &Instant::now()
Self::Subnet(subnet_query) => {
if let Some(ttl) = subnet_query.min_ttl {
ttl < Instant::now()
} else {
true
}
}
}
}
/// Returns the min_ttl of the query if one exists
///
/// This is required for returning to the peer manager. The peer manager will update newly
/// connected peers with this `min_ttl`
pub fn min_ttl(&self) -> Option<Instant> {
match self {
Self::FindPeers => None,
Self::Subnet { min_ttl, .. } => *min_ttl,
}
}
}
/// The result of a query.
struct QueryResult(QueryType, Result<Vec<Enr>, discv5::QueryError>);
struct QueryResult(GroupedQueryType, Result<Vec<Enr>, discv5::QueryError>);
// Awaiting the event stream future
enum EventStream {
@ -381,18 +387,13 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// already exists
let mut found = false;
for query in self.queued_queries.iter_mut() {
if let QueryType::Subnet {
subnet_id: ref mut q_subnet_id,
min_ttl: ref mut q_min_ttl,
retries: ref mut q_retries,
} = query
{
if *q_subnet_id == subnet_id {
if *q_min_ttl < min_ttl {
*q_min_ttl = min_ttl;
if let QueryType::Subnet(ref mut subnet_query) = query {
if subnet_query.subnet_id == subnet_id {
if subnet_query.min_ttl < min_ttl {
subnet_query.min_ttl = min_ttl;
}
// update the number of retries
*q_retries = retries;
subnet_query.retries = retries;
// mimic an `Iter::Find()` and short-circuit the loop
found = true;
break;
@ -401,11 +402,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
if !found {
// Set up the query and add it to the queue
let query = QueryType::Subnet {
let query = QueryType::Subnet(SubnetQuery {
subnet_id,
min_ttl,
retries,
};
});
// update the metrics and insert into the queue.
debug!(self.log, "Queuing subnet query"; "subnet" => *subnet_id, "retries" => retries);
self.queued_queries.push_back(query);
@ -420,28 +421,46 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Sanitize the queue, removing any out-dated subnet queries
self.queued_queries.retain(|query| !query.expired());
// use this to group subnet queries together for a single discovery request
let mut subnet_queries: Vec<SubnetQuery> = Vec::new();
// Check that we are within our query concurrency limit
while !self.at_capacity() && !self.queued_queries.is_empty() {
// consume and process the query queue
match self.queued_queries.pop_front() {
Some(QueryType::FindPeers) => {
// Only permit one FindPeers query at a time
if self.find_peer_active {
// Only start a find peers query if it is the last message in the queue.
// We want to prioritize subnet queries, so we don't miss attestations.
if self.queued_queries.is_empty() {
// This is a regular request to find additional peers
debug!(self.log, "Discovery query started");
self.find_peer_active = true;
self.start_query(
GroupedQueryType::FindPeers,
FIND_NODE_QUERY_CLOSEST_PEERS,
|_| true,
);
} else {
self.queued_queries.push_back(QueryType::FindPeers);
continue;
}
// This is a regular request to find additional peers
debug!(self.log, "Discovery query started");
self.find_peer_active = true;
self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS);
}
Some(QueryType::Subnet {
subnet_id,
min_ttl,
retries,
}) => {
// This query is for searching for peers of a particular subnet
self.start_subnet_query(subnet_id, min_ttl, retries);
Some(QueryType::Subnet(subnet_query)) => {
subnet_queries.push(subnet_query);
// We want to start a grouped subnet query if:
// 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together.
// 2. There are no more messages in the queue.
// 3. There is exactly one message in the queue and it is FindPeers.
if subnet_queries.len() == MAX_SUBNETS_IN_QUERY
|| self.queued_queries.is_empty()
|| (self.queued_queries.front() == Some(&QueryType::FindPeers)
&& self.queued_queries.len() == 1)
{
// This query is for searching for peers of a particular subnet
// Drain subnet_queries so we can re-use it as we continue to process the queue
let grouped_queries: Vec<SubnetQuery> = subnet_queries.drain(..).collect();
self.start_subnet_query(grouped_queries);
}
}
None => {} // Queue is empty
}
@ -456,47 +475,57 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.active_queries.len() >= MAX_CONCURRENT_QUERIES
}
/// Runs a discovery request for a given subnet_id if one already exists.
fn start_subnet_query(
&mut self,
subnet_id: SubnetId,
min_ttl: Option<Instant>,
retries: usize,
) {
// Determine if we have sufficient peers, which may make this discovery unnecessary.
let peers_on_subnet = self
.network_globals
.peers
.read()
.peers_on_subnet(subnet_id)
.count();
/// Runs a discovery request for a given group of subnets.
fn start_subnet_query(&mut self, subnet_queries: Vec<SubnetQuery>) {
let mut filtered_subnet_ids: Vec<SubnetId> = Vec::new();
if peers_on_subnet > TARGET_SUBNET_PEERS {
debug!(self.log, "Discovery ignored";
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
// find subnet queries that are still necessary
let filtered_subnet_queries: Vec<SubnetQuery> = subnet_queries
.into_iter()
.filter(|subnet_query| {
// Determine if we have sufficient peers, which may make this discovery unnecessary.
let peers_on_subnet = self
.network_globals
.peers
.read()
.peers_on_subnet(subnet_query.subnet_id)
.count();
if peers_on_subnet > TARGET_SUBNET_PEERS {
debug!(self.log, "Discovery ignored";
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
);
return false;
}
let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet;
debug!(self.log, "Discovery query started for subnet";
"subnet_id" => *subnet_query.subnet_id,
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
"peers_to_find" => target_peers,
"attempt" => subnet_query.retries,
"min_ttl" => format!("{:?}", subnet_query.min_ttl),
);
filtered_subnet_ids.push(subnet_query.subnet_id);
true
})
.collect();
// Only start a discovery query if we have a subnet to look for.
if !filtered_subnet_queries.is_empty() {
// build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate = subnet_predicate::<TSpec>(filtered_subnet_ids, &self.log);
self.start_query(
GroupedQueryType::Subnet(filtered_subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
return;
}
let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet;
debug!(self.log, "Discovery query started for subnet";
"subnet_id" => *subnet_id,
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
"peers_to_find" => target_peers,
"attempt" => retries,
"min_ttl" => format!("{:?}", min_ttl),
);
// start the query, and update the queries map if necessary
let query = QueryType::Subnet {
subnet_id,
min_ttl,
retries,
};
self.start_query(query, target_peers);
}
/// Search for a specified number of new peers using the underlying discovery mechanism.
@ -504,7 +533,26 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// This can optionally search for peers for a given predicate. Regardless of the predicate
/// given, this will only search for peers on the same enr_fork_id as specified in the local
/// ENR.
fn start_query(&mut self, query: QueryType, target_peers: usize) {
fn start_query(
&mut self,
grouped_query: GroupedQueryType,
target_peers: usize,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
// Make sure there are subnet queries included
let contains_queries = match &grouped_query {
GroupedQueryType::Subnet(queries) => !queries.is_empty(),
GroupedQueryType::FindPeers => true,
};
if !contains_queries {
debug!(
self.log,
"No subnets included in this request. Skipping discovery request."
);
return;
}
// Generate a random target node id.
let random_node = NodeId::random();
@ -519,31 +567,24 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone());
// General predicate
let predicate: Box<dyn Fn(&Enr) -> bool + Send> = match &query {
QueryType::FindPeers => Box::new(eth2_fork_predicate),
QueryType::Subnet { subnet_id, .. } => {
// build the subnet predicate as a combination of the eth2_fork_predicate and the
// subnet predicate
let subnet_predicate = subnet_predicate::<TSpec>(*subnet_id, &self.log);
Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && subnet_predicate(enr))
}
};
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr));
// Build the future
let query_future = self
.discv5
.find_node_predicate(random_node, predicate, target_peers)
.map(|v| QueryResult(query, v));
.map(|v| QueryResult(grouped_query, v));
// Add the future to active queries, to be executed.
self.active_queries.push(Box::pin(query_future));
}
/// Drives the queries returning any results from completed queries.
fn poll_queries(&mut self, cx: &mut Context) -> Option<(Option<Instant>, Vec<Enr>)> {
fn poll_queries(&mut self, cx: &mut Context) -> Option<HashMap<PeerId, Option<Instant>>> {
while let Poll::Ready(Some(query_future)) = self.active_queries.poll_next_unpin(cx) {
match query_future.0 {
QueryType::FindPeers => {
GroupedQueryType::FindPeers => {
self.find_peer_active = false;
match query_future.1 {
Ok(r) if r.is_empty() => {
@ -551,31 +592,96 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
return Some((None, r));
let mut results: HashMap<PeerId, Option<Instant>> = HashMap::new();
r.iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr.peer_id(), None);
});
return Some(results);
}
Err(e) => {
warn!(self.log, "Discovery query failed"; "error" => e.to_string());
}
}
}
QueryType::Subnet {
subnet_id,
min_ttl,
retries,
} => {
GroupedQueryType::Subnet(queries) => {
let subnets_searched_for: Vec<SubnetId> =
queries.iter().map(|query| query.subnet_id).collect();
match query_future.1 {
Ok(r) if r.is_empty() => {
debug!(self.log, "Subnet discovery query yielded no results."; "subnet_id" => *subnet_id, "retries" => retries);
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => format!("{:?}",subnets_searched_for));
}
Ok(r) => {
debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => r.len(), "subnet_id" => *subnet_id);
// A subnet query has completed. Add back to the queue, incrementing retries.
self.add_subnet_query(subnet_id, min_ttl, retries + 1);
// Report the results back to the peer manager.
return Some((query_future.0.min_ttl(), r));
debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for));
let mut mapped_results: HashMap<PeerId, Option<Instant>> =
HashMap::new();
// cache the found ENR's
for enr in r.iter().cloned() {
self.cached_enrs.put(enr.peer_id(), enr);
}
// Map each subnet query's min_ttl to the set of ENR's returned for that subnet.
queries.iter().for_each(|query| {
// A subnet query has completed. Add back to the queue, incrementing retries.
self.add_subnet_query(
query.subnet_id,
query.min_ttl,
query.retries + 1,
);
// Check the specific subnet against the enr
let subnet_predicate =
subnet_predicate::<TSpec>(vec![query.subnet_id], &self.log);
r.iter()
.filter(|enr| subnet_predicate(enr))
.map(|enr| enr.peer_id())
.for_each(|peer_id| {
let other_min_ttl = mapped_results.get_mut(&peer_id);
// map peer IDs to the min_ttl furthest in the future
match (query.min_ttl, other_min_ttl) {
// update the mapping if the min_ttl is greater
(
Some(min_ttl_instant),
Some(Some(other_min_ttl_instant)),
) => {
if min_ttl_instant.saturating_duration_since(
*other_min_ttl_instant,
) > DURATION_DIFFERENCE
{
*other_min_ttl_instant = min_ttl_instant;
}
}
// update the mapping if we have a specified min_ttl
(Some(min_ttl), Some(None)) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(Some(min_ttl), None) => {
mapped_results.insert(peer_id, Some(min_ttl));
}
// first seen min_ttl for this enr
(None, None) => {
mapped_results.insert(peer_id, None);
}
(None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl
(None, Some(None)) => {} // No-op because this is a duplicate
}
});
});
if mapped_results.is_empty() {
return None;
} else {
return Some(mapped_results);
}
}
Err(e) => {
warn!(self.log,"Subnet Discovery query failed"; "subnet_id" => *subnet_id, "error" => e.to_string());
warn!(self.log,"Grouped subnet discovery query failed"; "subnets_searched_for" => format!("{:?}",subnets_searched_for), "error" => e.to_string());
}
}
}
@ -594,13 +700,9 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.process_queue();
// Drive the queries and return any results from completed queries
if let Some((min_ttl, result)) = self.poll_queries(cx) {
// cache the found ENR's
for enr in result.iter().cloned() {
self.cached_enrs.put(enr.peer_id(), enr);
}
if let Some(results) = self.poll_queries(cx) {
// return the result to the peer manager
return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, result));
return Poll::Ready(DiscoveryEvent::QueryResult(results));
}
// Process the server event stream

View File

@ -1,9 +1,10 @@
///! The subnet predicate used for searching for a particular subnet.
use super::*;
use std::ops::Deref;
/// Returns the predicate for a given subnet.
pub fn subnet_predicate<TSpec>(
subnet_id: SubnetId,
subnet_ids: Vec<SubnetId>,
log: &slog::Logger,
) -> impl Fn(&Enr) -> bool + Send
where
@ -23,10 +24,27 @@ where
}
};
return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| {
debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id()));
false
});
let matches: Vec<&SubnetId> = subnet_ids
.iter()
.filter(|id| bitfield.get(**id.deref() as usize).unwrap_or(false))
.collect();
if matches.is_empty() {
debug!(
log_clone,
"Peer found but not on any of the desired subnets";
"peer_id" => format!("{}", enr.peer_id())
);
return false;
} else {
debug!(
log_clone,
"Peer found on desired subnet(s)";
"peer_id" => format!("{}", enr.peer_id()),
"subnets" => format!("{:?}", matches.as_slice())
);
return true;
}
}
false
}

View File

@ -4,7 +4,7 @@ pub use self::peerdb::*;
use crate::discovery::{Discovery, DiscoveryEvent};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::{error, metrics};
use crate::{Enr, EnrExt, NetworkConfig, NetworkGlobals, PeerId};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId};
use futures::prelude::*;
use futures::Stream;
use hashset_delay::HashSetDelay;
@ -32,6 +32,7 @@ pub(crate) mod score;
pub use peer_info::{PeerConnectionStatus::*, PeerInfo};
pub use peer_sync_status::{PeerSyncStatus, SyncInfo};
use score::{PeerAction, ScoreState};
use std::collections::HashMap;
/// The time in seconds between re-status's peers.
const STATUS_INTERVAL: u64 = 300;
/// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within
@ -39,7 +40,7 @@ const STATUS_INTERVAL: u64 = 300;
const PING_INTERVAL: u64 = 30;
/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
/// requests. This defines the interval in seconds.
const HEARTBEAT_INTERVAL: u64 = 30;
/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of
@ -487,13 +488,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// with a new `PeerId` which involves a discovery routing table lookup. We could dial the
/// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup
/// proves resource constraining, we should switch to multiaddr dialling here.
fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option<Instant>) {
fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) {
let mut to_dial_peers = Vec::new();
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for enr in peers {
let peer_id = enr.peer_id();
for (peer_id, min_ttl) in results {
// we attempt a connection if this peer is a subnet peer or if the max peer count
// is not yet filled (including dialling peers)
if (min_ttl.is_some() || connected_or_dialing + to_dial_peers.len() < self.max_peers)
@ -726,9 +725,7 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
while let Poll::Ready(event) = self.discovery.poll(cx) {
match event {
DiscoveryEvent::SocketUpdated(socket_addr) => self.socket_updated(socket_addr),
DiscoveryEvent::QueryResult(min_ttl, peers) => {
self.peers_discovered(&peers, min_ttl)
}
DiscoveryEvent::QueryResult(results) => self.peers_discovered(results),
}
}

View File

@ -41,7 +41,7 @@ const ADVANCE_SUBSCRIBE_TIME: u32 = 3;
/// The default number of slots before items in hash delay sets used by this class should expire.
const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3;
// 36s at 12s slot time
/// The default number of slots before items in hash delay sets used by this class should expire.
/// The duration difference between two instance for them to be considered equal.
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
#[derive(Debug, Eq, Clone)]